Skip to content
Snippets Groups Projects
Commit a63a5974 authored by Nils-Arne Dreier's avatar Nils-Arne Dreier
Browse files

feat: provide next-timestep endpoint in loco

parent 2bc38538
No related branches found
No related tags found
1 merge request!19feat: provide next-timestep endpoint in loco
Pipeline #90648 passed
......@@ -6,14 +6,18 @@ from aiohttp import web
class LocoServer:
def __init__(self, store, host="0.0.0.0", port=8080):
self.timestep = dict()
self.timestep_per_var = dict()
self.current_timestep = -1
self.port = port
def _serve_current_timestep(request):
if len(self.timestep) > 0:
t_idx = min(self.timestep.values())
if self.current_timestep >= 0:
return web.json_response({"timestep": int(self.current_timestep)})
else:
return web.Response(status=204)
async def _serve_next_timestep(request):
t_idx = await self.timestep_future
return web.json_response({"timestep": int(t_idx)})
def _serve(request):
......@@ -25,22 +29,28 @@ class LocoServer:
return web.Response(body=data)
def _run_server(runner, host, port):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(runner.setup())
self.loop = asyncio.new_event_loop()
self.timestep_future = self.loop.create_future()
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(runner.setup())
site = web.TCPSite(runner, host, port)
loop.run_until_complete(site.start())
loop.run_forever()
self.loop.run_until_complete(site.start())
self.loop.run_forever()
app = web.Application()
app.add_routes([web.get("/current-timestep", _serve_current_timestep)])
app.add_routes([web.get("/next-timestep", _serve_next_timestep)])
app.add_routes([web.get("/{key:.*}", _serve)])
runner = web.AppRunner(app)
t = Thread(target=_run_server, args=(runner, host, port), daemon=True)
t.start()
def set_timestep(self, varname, timestep):
self.timestep[varname] = timestep
self.timestep_per_var[varname] = timestep
if min(self.timestep_per_var.values()) > self.current_timestep:
self.current_timestep = min(self.timestep_per_var.values())
self.loop.call_soon_threadsafe(self.timestep_future.set_result, self.current_timestep)
self.timestep_future = self.loop.create_future()
def write_nginx_config(self, distributed_data_vars, rank):
import platform
......@@ -73,6 +83,7 @@ class LocoServer:
# the healpix order 0 grid where the information
# arrives last
f.write(f' "/current-timestep" loco_backend{r};\n')
f.write(f' "/next-timestep" loco_backend{r};\n')
f.write("}\n\n")
for i in range(len(distributed_data_vars)):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment