Skip to content
Snippets Groups Projects
Commit 0bd73ec0 authored by Nils-Arne Dreier's avatar Nils-Arne Dreier
Browse files

feat: export timestep index ofer http

parent d1095591
No related branches found
No related tags found
1 merge request!18feat(loco): add http endpoint for requesting the current timestep index
Pipeline #90563 passed
......@@ -7,6 +7,7 @@ class DataHandler:
def __init__(self, var, t0, cell_slice, prune_offset=None):
self.var = var
self.cell_slice = cell_slice
self.loco_server = None
shape = {
dim: chunk_size for dim, chunk_size in zip(var.attrs["_ARRAY_DIMENSIONS"], var.shape)
}
......@@ -23,22 +24,27 @@ class DataHandler:
def flush(self):
if self.prune_offset is not None:
left = max(0, self.t_flushed - self.prune_offset)
right = max(0, self.t - self.prune_offset)
logging.info(f"pruning {self.var.name} for timesteps {left}:{right}")
self.var[left:right, ..., self.cell_slice] = self.var.fill_value
fro = max(0, self.t_flushed - self.prune_offset)
to = max(0, self.t - self.prune_offset)
logging.info(f"pruning {self.var.name} for timesteps {fro}:{to}")
self.var[fro:to, ..., self.cell_slice] = self.var.fill_value
logging.info(f"writing {self.var.name} for timesteps {self.t_flushed}:{self.t}")
self.var[self.t_flushed : self.t, ..., self.cell_slice] = self.buf[
-(self.t - self.t_flushed) :,
...,
]
fro = self.t_flushed % self.time_chunk_size
to = self.t % self.time_chunk_size
self.var[self.t_flushed : self.t, ..., self.cell_slice] = self.buf[fro:to, ...]
self.t_flushed = self.t
if self.loco_server is not None:
self.loco_server.set_timestep(self.t)
def __call__(self, event):
self.buf[self.t % self.time_chunk_size, ...] = event.data
self.buf[self.t % self.time_chunk_size, ...] = np.transpose(event.data)
logging.info(
f"received {self.var.name} at {self.t % self.time_chunk_size+1}/{self.time_chunk_size}",
f"received {self.var.name} at {self.t % self.time_chunk_size+1}/{self.time_chunk_size}"
)
self.t += 1
if self.t % self.time_chunk_size == 0: # chunk complete
self.flush()
def set_loco_server(self, loco_server):
self.loco_server = loco_server
......@@ -114,10 +114,7 @@ def distribute_work(grouped_data_vars, size):
num_var_groups = min(len(variables), ceil(nranks / nchunks))
var_groups = _map_weighted(
range(num_var_groups),
variables,
None,
[_estimate_size(v) for v in variables],
range(num_var_groups), variables, None, [_estimate_size(v) for v in variables]
)
ranks_per_group = _map_weighted(range(num_var_groups), range(nranks))
......
......@@ -35,7 +35,7 @@ def create_dataset(dataset, order, startdate, enddate, dt):
def _collect_groups(dataset):
if "crs" in dataset.array_keys():
yield dataset
for g in dataset.groups().values():
for _name, g in dataset.groups():
yield from _collect_groups(g)
......@@ -67,11 +67,7 @@ def add_variable(
shape = (ntime, ncells)
_chunk_shape = np.minimum(chunk_shape, shape) if chunk_shape is not None else None
v = g.create_dataset(
name,
shape=shape,
dtype=np.float32,
fill_value=np.nan,
chunks=_chunk_shape,
name, shape=shape, dtype=np.float32, fill_value=np.nan, chunks=_chunk_shape
)
v.attrs["_ARRAY_DIMENSIONS"] = ("time", "cell")
else:
......@@ -79,11 +75,7 @@ def add_variable(
shape = (ntime, nheight, ncells)
_chunk_shape = np.minimum(chunk_shape, shape) if chunk_shape is not None else None
v = g.create_dataset(
name,
shape=shape,
dtype=np.float32,
fill_value=np.nan,
chunks=_chunk_shape,
name, shape=shape, dtype=np.float32, fill_value=np.nan, chunks=_chunk_shape
)
v.attrs["_ARRAY_DIMENSIONS"] = ("time", zaxis, "cell")
if time_method is not None:
......@@ -160,10 +152,7 @@ def main():
help="shape of the data chunks (time, zaxis(if applicable), cell)",
)
add_variable_parser.add_argument(
"--frac-mask",
type=str,
default=None,
help="Name of the frac_mask array in the same group",
"--frac-mask", type=str, default=None, help="Name of the frac_mask array in the same group"
)
add_variable_parser.add_argument("--yac-name", type=str, help="name of the yac field")
......
......@@ -4,59 +4,79 @@ from threading import Thread
from aiohttp import web
def expose_store_over_http(store, host, port):
def _serve(request):
try:
key = request.match_info["key"]
data = store[key]
except KeyError:
return web.Response(status=404, reason="Data not found!")
return web.Response(body=data)
def _run_server(runner, host, port):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(runner.setup())
site = web.TCPSite(runner, host, port)
loop.run_until_complete(site.start())
loop.run_forever()
app = web.Application()
app.add_routes([web.get("/{key:.*}", _serve)])
runner = web.AppRunner(app)
t = Thread(target=_run_server, args=(runner, host, port), daemon=True)
t.start()
def write_nginx_config(distributed_data_vars, port, rank):
import platform
from regex_engine import generator
with open("backend_map.conf", "w") as f:
f.write("map $request_uri $backend {\n")
for r, data_vars in enumerate(distributed_data_vars):
for group, _vgroup, chunk_slice, dvars in data_vars:
for v in dvars:
if chunk_slice == slice(None):
cell_regex = r"\d+"
else:
cell_chunk_size = dict(zip(v.attrs["_ARRAY_DIMENSIONS"], v.chunks))["cell"]
chunk_range = (
chunk_slice.start // cell_chunk_size,
chunk_slice.stop // cell_chunk_size,
)
cell_regex = generator().numerical_range(*chunk_range)[1:-1]
f.write(
f' "~^/{group}/{v.name}/\\d+\\.\\d+\\.{cell_regex}" loco_backend{r};\n'
)
f.write("}\n\n")
for i in range(len(distributed_data_vars)):
f.write(f'include "loco_backend{i}.conf";\n')
with open(f"loco_backend{rank}.conf", "w") as f:
f.write(f"""
upstream loco_backend{rank} {{
server {platform.node()}:{port};
}}
""")
class LocoServer:
def __init__(self, store, host="0.0.0.0", port=8080):
self.timestep = None
self.port = port
def _serve_timestep(request):
return web.Response(body=str(self.timestep))
def _serve(request):
try:
key = request.match_info["key"]
data = store[key]
except KeyError:
return web.Response(status=404, reason="Data not found!")
return web.Response(body=data)
def _run_server(runner, host, port):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(runner.setup())
site = web.TCPSite(runner, host, port)
loop.run_until_complete(site.start())
loop.run_forever()
app = web.Application()
app.add_routes([web.get("/timestep", _serve_timestep)])
app.add_routes([web.get("/{key:.*}", _serve)])
runner = web.AppRunner(app)
t = Thread(target=_run_server, args=(runner, host, port), daemon=True)
t.start()
def set_timestep(self, timestep):
self.timestep = timestep
def write_nginx_config(self, distributed_data_vars, rank):
import platform
from regex_engine import generator
if rank == 0:
with open("backend_map.conf", "w") as f:
f.write("map $request_uri $backend {\n")
for r, data_vars in enumerate(distributed_data_vars):
for group, _vgroup, chunk_slice, dvars in data_vars:
for v in dvars:
if chunk_slice == slice(None):
cell_regex = r"\d+"
else:
cell_chunk_size = dict(zip(v.attrs["_ARRAY_DIMENSIONS"], v.chunks))[
"cell"
]
chunk_range = (
chunk_slice.start // cell_chunk_size,
chunk_slice.stop // cell_chunk_size,
)
cell_regex = generator().numerical_range(*chunk_range)[1:-1]
f.write(
f' "~^/{group}/{v.name}/\\d+\\.\\d+\\.{cell_regex}"'
f"loco_backend{r};\n"
)
# use last rank for timestep assumption: this rank has
# the healpix order 0 grid where the information
# arrives last
f.write(f' "/timestep" loco_backend{r};\n')
f.write("}\n\n")
for i in range(len(distributed_data_vars)):
f.write(f'include "loco_backend{i}.conf";\n')
with open(f"loco_backend{rank}.conf", "w") as f:
f.write(f"""
upstream loco_backend{rank} {{
server {platform.node()}:{self.port};
}}
""")
......@@ -6,181 +6,183 @@ from itertools import chain, groupby
import numpy as np
import zarr
from _data_handler import DataHandler
from _distribute_work import distribute_work
from coyote import Coyote, group_comm_rank, group_comm_size, init, run, start_datetime
from loco import expose_store_over_http, write_nginx_config
parser = ArgumentParser()
# zarr.copy_store (used with loco) does not work with zarr.open_consolidated
parser.add_argument(
"datasets",
type=zarr.open,
nargs="+",
help="URL of the dataset. To be passed to zarr.open_consolidated.",
)
parser.add_argument(
"--log-level",
default=logging.INFO,
type=lambda x: getattr(logging, x),
help="Configure the logging level.",
)
parser.add_argument(
"--process-group",
default="hiopy",
help="A string determining the process group for distributing the work",
)
parser.add_argument("--loco", action="store_true", help="Run in LOCO mode")
parser.add_argument(
"--loco-host",
type=str,
default="0.0.0.0",
help="Host address of the LOCO webserver",
)
parser.add_argument("--loco-port", type=int, default=8080, help="Port of the LOCO webserver")
parser.add_argument(
"--loco-capacity",
type=int,
default=5,
help="Number of timesteps to be stored per variable when in LOCO mode",
)
args = parser.parse_args()
if args.loco:
assert len(args.datasets) == 1, "Loco only supports reading from 1 dataset"
loco_store = zarr.MemoryStore()
zarr.copy_store(args.datasets[0].store, loco_store)
zarr.convenience.consolidate_metadata(loco_store)
expose_store_over_http(loco_store, args.loco_host, args.loco_port)
args.datasets = [zarr.open(store=loco_store)]
init(group_name=args.process_group)
rank, size = group_comm_rank(), group_comm_size()
logging.basicConfig(
level=args.log_level, format=f"{args.process_group}({rank}/{size}): %(message)s"
)
def collect_data_vars(group):
for item in group.arrays().values():
if "_ARRAY_DIMENSIONS" in item.attrs and "cell" in item.attrs["_ARRAY_DIMENSIONS"]:
item.group = group # amend the variable object with the group
yield item
for item in group.groups().values():
yield from collect_data_vars(item)
def grid_selector(v):
return v.group.path.split("/")[-1]
data_vars = sorted(chain(*[collect_data_vars(z) for z in args.datasets]), key=grid_selector)
logging.info(f"Found {len(data_vars)} variables")
grouped_data_vars = {
grid: list(variables) for grid, variables in groupby(data_vars, key=grid_selector)
}
distributed_data_vars = distribute_work(grouped_data_vars, group_comm_size())
if args.loco:
write_nginx_config(distributed_data_vars, args.loco_port, group_comm_rank())
my_data_vars = distributed_data_vars[group_comm_rank()]
# my_data_vars list of tuples: ((group, slice), list of variables)
if group_comm_rank() == 0:
logging.debug(distributed_data_vars)
frac_masks = {} # the numpy array must keep alive until the end of the program
coyote_instances = {
(group, vgroup): Coyote(f"{args.process_group}_{group}_{vgroup}")
for group, vgroup, chunk_slice, data_vars in my_data_vars
}
for group, vgroup, chunk_slice, data_vars in my_data_vars:
coyote = coyote_instances[(group, vgroup)]
nside = data_vars[0].group["crs"].attrs["healpix_nside"]
ncells_global = 12 * (nside**2)
mi, ma, st = chunk_slice.indices(ncells_global)
assert st == 1, "Invalid chunk slice"
ncells = ma - mi
healpix_order = nside.bit_length() - 1
coyote.def_healpix_grid(healpix_order, (mi, ma))
data_handlers = []
for v in data_vars:
# compute timestep
assert (
"seconds since " in v.group.time.attrs["units"]
), "Currently the time must be given in seconds"
dt = v.group.time[1] - v.group.time[0]
# compute time start index
t0 = (
np.datetime64(start_datetime())
- np.datetime64(v.group.time.attrs["units"][len("seconds since ") :])
) / np.timedelta64(1, "s")
t0_idx = np.searchsorted(v.group.time, t0)
assert v.group.time[t0_idx] == t0, "start_datetime not found in time axis"
# see YAC_REDUCTION_TIME_NONE etc. (TODO: pass constants through coyote)
time_methods2yac = {"point": 0, "sum": 1, "mean": 2, "min": 3, "max": 4}
collection_size = v.shape[1] if len(v.shape) == 3 else 1
src_name = v.attrs.get("hiopy::src_name", default=v.name.split("/")[-1])
if "hiopy::yac_source" in v.attrs:
src_comp, src_grid = v.attrs["hiopy::yac_source"]
else:
# find the component with that variables
src_group, src_vgroup = [
(g, vg)
for g, vg, _, gvars in chain(*distributed_data_vars)
if g == v.group.attrs["hiopy::parent"]
and len([_ for v in gvars if v.name.split("/")[-1] == src_name]) > 0
][0]
src_comp = src_grid = f"{args.process_group}_{src_group}_{src_vgroup}"
time_method = v.attrs.get("hiopy::time_method", "point")
nnn = v.attrs.get("hiopy::nnn", 1)
frac_mask_name = v.attrs.get("hiopy::frac_mask", None)
frac_mask = []
if frac_mask_name is not None:
if frac_mask_name not in frac_masks[nside]:
frac_masks[nside][frac_mask_name] = np.array(v.group[frac_mask_name][chunk_slice])
frac_mask = frac_masks[nside][frac_mask_name]
logging.info(
f"registering {v.name} with "
f"{collection_size=}, "
f"{src_comp=}, "
f"{src_grid=}, "
f"{src_name=}, "
f"{time_method=}",
)
data_handlers.append(
DataHandler(
v,
t0_idx,
chunk_slice,
prune_offset=args.loco_capacity if args.loco else None,
),
)
coyote.add_field(
src_comp,
src_grid,
src_name,
data_handlers[-1],
f"PT{dt}S",
collection_size,
yac_time_reduction=time_methods2yac[time_method],
interpolation_stack_yaml=f"- nnn:\n n: {nnn}",
frac_mask=frac_mask,
)
run()
for dh in data_handlers:
dh.flush()
from ._data_handler import DataHandler
from ._distribute_work import distribute_work
from .loco import LocoServer
def main():
parser = ArgumentParser()
# zarr.copy_store (used with loco) does not work with zarr.open_consolidated
parser.add_argument(
"datasets",
type=zarr.open,
nargs="+",
help="URL of the dataset. To be passed to zarr.open_consolidated.",
)
parser.add_argument(
"--log-level",
default=logging.INFO,
type=lambda x: getattr(logging, x),
help="Configure the logging level.",
)
parser.add_argument(
"--process-group",
default="hiopy",
help="A string determining the process group for distributing the work",
)
parser.add_argument("--loco", action="store_true", help="Run in LOCO mode")
parser.add_argument(
"--loco-host", type=str, default="0.0.0.0", help="Host address of the LOCO webserver"
)
parser.add_argument("--loco-port", type=int, default=8080, help="Port of the LOCO webserver")
parser.add_argument(
"--loco-capacity",
type=int,
default=5,
help="Number of timesteps to be stored per variable when in LOCO mode",
)
args = parser.parse_args()
if args.loco:
assert len(args.datasets) == 1, "Loco only supports reading from 1 dataset"
loco_store = zarr.MemoryStore()
zarr.copy_store(args.datasets[0].store, loco_store)
zarr.convenience.consolidate_metadata(loco_store)
loco_server = LocoServer(loco_store, args.loco_host, args.loco_port)
args.datasets = [zarr.open(store=loco_store)]
init(group_name=args.process_group)
rank, size = group_comm_rank(), group_comm_size()
logging.basicConfig(
level=args.log_level, format=f"{args.process_group}({rank}/{size}): %(message)s"
)
def collect_data_vars(group):
for _name, item in group.arrays():
if "_ARRAY_DIMENSIONS" in item.attrs and "cell" in item.attrs["_ARRAY_DIMENSIONS"]:
item.group = group # amend the variable object with the group
yield item
for _name, item in group.groups():
yield from collect_data_vars(item)
def grid_selector(v):
return v.group.path.split("/")[-1]
data_vars = sorted(chain(*[collect_data_vars(z) for z in args.datasets]), key=grid_selector)
logging.info(f"Found {len(data_vars)} variables")
grouped_data_vars = {
grid: list(variables) for grid, variables in groupby(data_vars, key=grid_selector)
}
distributed_data_vars = distribute_work(grouped_data_vars, group_comm_size())
if args.loco:
loco_server.write_nginx_config(distributed_data_vars, group_comm_rank())
my_data_vars = distributed_data_vars[group_comm_rank()]
# my_data_vars list of tuples: ((group, slice), list of variables)
if group_comm_rank() == 0:
logging.debug(distributed_data_vars)
frac_masks = {} # the numpy array must keep alive until the end of the program
coyote_instances = {
(group, vgroup): Coyote(f"{args.process_group}_{group}_{vgroup}")
for group, vgroup, chunk_slice, data_vars in my_data_vars
}
for group, vgroup, chunk_slice, data_vars in my_data_vars:
coyote = coyote_instances[(group, vgroup)]
nside = data_vars[0].group["crs"].attrs["healpix_nside"]
ncells_global = 12 * (nside**2)
mi, ma, st = chunk_slice.indices(ncells_global)
assert st == 1, "Invalid chunk slice"
healpix_order = nside.bit_length() - 1
coyote.def_healpix_grid(healpix_order, (mi, ma))
data_handlers = []
for v in data_vars:
# compute timestep
assert (
"seconds since " in v.group.time.attrs["units"]
), "Currently the time must be given in seconds"
dt = v.group.time[1] - v.group.time[0]
# compute time start index
t0 = (
np.datetime64(start_datetime())
- np.datetime64(v.group.time.attrs["units"][len("seconds since ") :])
) / np.timedelta64(1, "s")
t0_idx = np.searchsorted(v.group.time, t0)
assert v.group.time[t0_idx] == t0, "start_datetime not found in time axis"
# see YAC_REDUCTION_TIME_NONE etc. (TODO: pass constants through coyote)
time_methods2yac = {"point": 0, "sum": 1, "mean": 2, "min": 3, "max": 4}
collection_size = v.shape[1] if len(v.shape) == 3 else 1
src_name = v.attrs.get("hiopy::src_name", default=v.name.split("/")[-1])
if "hiopy::yac_source" in v.attrs:
src_comp, src_grid = v.attrs["hiopy::yac_source"]
else:
# find the component with that variables
src_group, src_vgroup = [
(g, vg)
for g, vg, _, gvars in chain(*distributed_data_vars)
if g == v.group.attrs["hiopy::parent"]
and len([_ for v in gvars if v.name.split("/")[-1] == src_name]) > 0
][0]
src_comp = src_grid = f"{args.process_group}_{src_group}_{src_vgroup}"
time_method = v.attrs.get("hiopy::time_method", "point")
nnn = v.attrs.get("hiopy::nnn", 1)
frac_mask_name = v.attrs.get("hiopy::frac_mask", None)
frac_mask = []
if frac_mask_name is not None:
if frac_mask_name not in frac_masks[nside]:
frac_masks[nside][frac_mask_name] = np.array(
v.group[frac_mask_name][chunk_slice]
)
frac_mask = frac_masks[nside][frac_mask_name]
logging.info(
f"registering {v.name} with "
f"{collection_size=}, "
f"{src_comp=}, "
f"{src_grid=}, "
f"{src_name=}, "
f"{time_method=}"
)
data_handlers.append(
DataHandler(
v, t0_idx, chunk_slice, prune_offset=args.loco_capacity if args.loco else None
)
)
if args.loco:
data_handlers[-1].set_loco_server(loco_server)
coyote.add_field(
src_comp,
src_grid,
src_name,
data_handlers[-1],
f"PT{dt}S",
collection_size,
yac_time_reduction=time_methods2yac[time_method],
interpolation_stack_yaml=f"- nnn:\n n: {nnn}",
frac_mask=frac_mask,
)
run()
for dh in data_handlers:
dh.flush()
if __name__ == "__main__":
main()
......@@ -16,6 +16,8 @@ editable.rebuild = true
# Specify a build directory (required for editable.rebuild)
build-dir = "build_{cache_tag}"
cmake.build-type = "RelWithDebInfo"
[project.scripts]
hiopy-configure = "hiopy.configure:main"
hiopy-worker = "hiopy.worker:main"
......
......@@ -91,8 +91,8 @@ PYBIND11_MODULE(coyote, m) {
py::class_<Event>(m, "Event").def_property_readonly("data", [](const Event& self) {
return py::memoryview::from_buffer(
self.data.data(), // buffer pointer
{(py::ssize_t)self.collection_size, (py::ssize_t)self.point_size}, // shape (rows, cols)
{sizeof(double), sizeof(double) * self.collection_size} // strides
{(py::ssize_t)self.point_size, (py::ssize_t)self.collection_size}, // shape (rows, cols)
{sizeof(double), sizeof(double) * self.point_size} // strides
);
});
......
......@@ -15,10 +15,7 @@ parser.add_argument("timestep", type=str, help="Output timestep width in iso for
parser.add_argument("src_comp", type=str, help="Component name of the yac source component")
parser.add_argument("src_grid", type=str, help="Grid name of the yac source grid")
parser.add_argument(
"vars",
type=str,
nargs="+",
help="variables names of variables that should be received",
"vars", type=str, nargs="+", help="variables names of variables that should be received"
)
args = parser.parse_args()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment