Skip to content
Snippets Groups Projects
Commit 13dec273 authored by Siddhant Tibrewal's avatar Siddhant Tibrewal
Browse files

updated add_height to accept an array of values instead of a number

parent b2836116
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
import logging
from math import ceil, prod
import numpy as np
logger = logging.getLogger(__name__)
def _map_weighted(a, b, wa=None, wb=None):
"""
......@@ -101,6 +104,13 @@ def distribute_work(grouped_data_vars, size):
group, gsize = max(group_sizes.items(), key=lambda x: x[1])
variables = grouped_data_vars.pop(group)
assert all(
variables[0].shape == v.shape for v in variables
), "All variables in a group need to have the same shape"
assert all(
variables[0].chunks == v.chunks for v in variables
), "All variables in a group need to have the same chunk shape"
if gsize > bytes_per_rank:
# Compute the number of ranks required for this group
nranks = gsize // bytes_per_rank
......@@ -108,31 +118,26 @@ def distribute_work(grouped_data_vars, size):
cell_chunk_size = variables[0].chunks[-1]
ncells = variables[0].shape[-1]
nchunks = ceil(ncells / cell_chunk_size)
if nchunks > nranks:
logger.warning(
f"Not enough chunks to distribute work in group {group}."
" Consider using fewer processes of split up the group."
)
nranks = nchunks
num_var_groups = min(len(variables), ceil(nranks / nchunks))
var_groups = _map_weighted(
range(num_var_groups), variables, None, [_estimate_size(v) for v in variables]
)
ranks_per_group = _map_weighted(range(num_var_groups), range(nranks))
this_group = [
[(group, varidx, sl, mvars)]
for varidx, mvars in var_groups.items()
for sl in _distribute_chunks(variables[0], len(ranks_per_group[varidx]))
]
this_group = [[(group, variables, sl)] for sl in _distribute_chunks(variables[0], nranks)]
return [*this_group, *distribute_work(grouped_data_vars, size - nranks)]
else:
del group_sizes[group]
result = [(group, 0, slice(None), variables)]
result = [(group, variables, slice(None))]
# Add additional groups to this rank until it reaches the byte limit
while gsize < bytes_per_rank:
group, next_gsize = max(group_sizes.items(), key=lambda x: x[1])
if gsize + next_gsize > bytes_per_rank:
break
result.append((group, 0, slice(None), grouped_data_vars.pop(group)))
result.append((group, grouped_data_vars.pop(group), slice(None)))
del group_sizes[group]
gsize += next_gsize
......
import numpy as np
from ._zarr_utils import get_var_group
# computes an id that identifies the grid of a variable
def grid_id(var, zgroup):
def grid_id(var):
"""
var: dtype zarr.array
zgroup: dtype zarr.group
"""
assert "grid_mapping" in var.attrs
zgroup = get_var_group(var)
crs = zgroup[var.attrs["grid_mapping"]]
var_group = var.attrs.get("hiopy::var_group", "")
spatial_chunk_shape = var.chunks[-1]
# healpix
if "grid_mapping_name" in crs.attrs and crs.attrs["grid_mapping_name"] == "healpix":
s = "healpix_"
s = var_group + "healpix_"
s += str(crs.attrs["healpix_nside"])
s += str(crs.attrs["healpix_order"])
s += "_" + str(spatial_chunk_shape)
# if the cell dimension has a coordinate, take it into account
if "_ARRAY_DIMENSIONS" in var.attrs and var.attrs["_ARRAY_DIMENSIONS"][-1] in zgroup:
s += hash(str(zgroup[var.attrs["_ARRAY_DIMENSIONS"][-1]][:]))
return s
......@@ -23,7 +32,7 @@ def grid_id(var, zgroup):
if "coordinates" in crs.attrs:
try:
lon, lat = crs.attrs["coordinates"].split(" ")
s = hash(hash(str(zgroup[lat][:])) + hash(str(zgroup[lon][:])))
s = hash((str(zgroup[lat][:]), str(zgroup[lon][:]), var_group, spatial_chunk_shape))
return s
except Exception as e:
raise RuntimeError("Cannot parse coordinates") from e
......@@ -31,7 +40,8 @@ def grid_id(var, zgroup):
raise RuntimeError("Unknown grid")
def def_grid(coyote, var, chunk_slice, zgroup):
def def_grid(coyote, var, chunk_slice):
zgroup = get_var_group(var)
crs = zgroup[var.attrs["grid_mapping"]]
# healpix
if "grid_mapping_name" in crs.attrs and crs.attrs["grid_mapping_name"] == "healpix":
......
import zarr
def get_var_group(v):
root = zarr.Group(store=v.store)
last_slash_idx = v.name.rindex("/")
return root[v.name[:last_slash_idx]]
......@@ -29,14 +29,14 @@ def _collect_groups(dataset):
yield from _collect_groups(g)
def add_height(dataset, name, n):
def add_height(dataset, name, positive_direction: str = "up", values: list[int] = []):
for g in _collect_groups(dataset):
height = g.create_array(name, fill_value=None, dtype=np.int64, shape=np.arange(n).shape)
height[:] = np.arange(n)
height = g.create_array(name, fill_value=None, dtype=np.int64, shape=(len(values),))
height[:] = np.asarray(values, dtype=np.int64)
height.attrs["_ARRAY_DIMENSIONS"] = [name]
height.attrs["axis"] = "Z"
height.attrs["long_name"] = "generalized_height"
height.attrs["positive"] = "down"
height.attrs["positive"] = positive_direction
height.attrs["standard_name"] = "height"
......
......@@ -61,7 +61,7 @@ class LocoServer:
with open("backend_map.conf", "w") as f:
f.write("map $request_uri $backend {\n")
for r, data_vars in enumerate(distributed_data_vars):
for _group, _vgroup, chunk_slice, dvars in data_vars:
for _group, dvars, chunk_slice in data_vars:
for v in dvars:
if chunk_slice == slice(None):
cell_regex = r"\d+"
......
......@@ -4,6 +4,7 @@ add_test(
)
set_tests_properties(hiopy.test_distribute_work PROPERTIES
ENVIRONMENT "PYTHONPATH=${CMAKE_BINARY_DIR}/python:${CMAKE_SOURCE_DIR}/apps:$ENV{PYTHONPATH}"
LABELS "hiopy"
)
configure_file(create_simple_dataset.sh.in create_simple_dataset.sh @ONLY)
......@@ -15,6 +16,7 @@ set_tests_properties(hiopy.create_simple_dataset PROPERTIES
FIXTURES_SETUP simple_dataset.zarr
ENVIRONMENT "PYTHONPATH=${CMAKE_BINARY_DIR}/python:${CMAKE_SOURCE_DIR}/apps:$ENV{PYTHONPATH}"
TIMEOUT 20
LABELS "hiopy"
)
add_test(
NAME hiopy.create_simple_dataset_4threads
......@@ -22,6 +24,7 @@ add_test(
)
set_tests_properties(hiopy.create_simple_dataset_4threads PROPERTIES
ENVIRONMENT "PYTHONPATH=${CMAKE_BINARY_DIR}/python:${CMAKE_SOURCE_DIR}/apps:$ENV{PYTHONPATH}"
LABELS "hiopy"
)
add_test(
......@@ -31,6 +34,7 @@ add_test(
set_tests_properties(hiopy.check_hierarchy PROPERTIES
FIXTURES_REQUIRED simple_dataset.zarr
ENVIRONMENT "PYTHONPATH=${CMAKE_BINARY_DIR}/python:${CMAKE_SOURCE_DIR}/apps:$ENV{PYTHONPATH}"
LABELS "hiopy"
)
# test the meteogram dataset
......@@ -44,4 +48,5 @@ set_tests_properties(hiopy.create_meteogram_dataset PROPERTIES
FIXTURES_SETUP metogram_dataset.zarr
ENVIRONMENT "PYTHONPATH=${CMAKE_BINARY_DIR}/python:${CMAKE_SOURCE_DIR}/apps:$ENV{PYTHONPATH}"
TIMEOUT 20
LABELS "hiopy"
)
......@@ -11,57 +11,49 @@ except ImportError:
class DummyDataVar:
def __init__(self, shape, chunksize=None):
def __init__(self, name, shape, chunksize=None):
self.name = name
self.shape = shape
self.chunks = (1, chunksize or shape[1])
self.attrs = {"_ARRAY_DIMENSIONS": ["time", "cell"]}
self.dtype = np.dtype("float64")
def __repr__(self):
return self.name
def test_sequential():
print("Test sequential case")
var1 = DummyDataVar([10, 10])
var2 = DummyDataVar([10, 10])
var1 = DummyDataVar("var1", [10, 10])
var2 = DummyDataVar("var2", [10, 10])
result = distribute_work({"g1": [var1], "g2": [var2]}, 1)
pp(result)
assert len(result) == 1
assert result[0] == [("g1", 0, slice(None), [var1]), ("g2", 0, slice(None), [var2])]
assert result[0] == [("g1", [var1], slice(None)), ("g2", [var2], slice(None))]
def test_dist_groups():
print("Test distributing groups")
var1 = DummyDataVar([10, 10])
var2 = DummyDataVar([10, 10])
var1 = DummyDataVar("var1", [10, 10])
var2 = DummyDataVar("var2", [10, 10])
result = distribute_work({"g1": [var1], "g2": [var2]}, 2)
pp(result)
assert len(result) == 2
assert result[0] == [("g1", 0, slice(None), [var1])]
assert result[1] == [("g2", 0, slice(None), [var2])]
def test_dist_vars():
print("Test distributing variables")
var1 = DummyDataVar([10, 10])
var2 = DummyDataVar([10, 10])
result = distribute_work({"g": [var1, var2]}, 2)
pp(result)
assert len(result) == 2
assert result[0] == [("g", 0, slice(None), [var1])]
assert result[1] == [("g", 1, slice(None), [var2])]
assert result[0] == [("g1", [var1], slice(None))]
assert result[1] == [("g2", [var2], slice(None))]
def test_dist_chunks():
print("Test distributing chunks")
var = DummyDataVar([10, 10], 5)
var = DummyDataVar("var", [10, 10], 5)
result = distribute_work({"g": [var]}, 2)
pp(result)
assert len(result) == 2
assert result[0] == [("g", 0, slice(0, 5), [var])]
assert result[1] == [("g", 0, slice(5, 10), [var])]
assert result[0] == [("g", [var], slice(0, 5))]
assert result[1] == [("g", [var], slice(5, 10))]
if __name__ == "__main__":
test_sequential()
test_dist_groups()
test_dist_vars()
test_dist_chunks()
......@@ -14,6 +14,7 @@ from coyote import (
from ._data_handler import DataHandler
from ._distribute_work import distribute_work
from ._grids import def_grid, grid_id
from ._zarr_utils import get_var_group
from .loco import LocoServer
import numpy as np
......@@ -82,24 +83,20 @@ def main():
def collect_data_vars(group):
for _name, item in group.arrays():
if "hiopy::enable" in item.attrs and item.attrs["hiopy::enable"]:
item.group = group # amend the variable object with the group
yield item
for _name, item in group.groups():
yield from collect_data_vars(item)
all_data_vars = list(chain(*[collect_data_vars(z) for z in args.datasets]))
logging.info(f"Found {len(all_data_vars)} variables")
if len(all_data_vars) == 0:
data_vars = list(chain(*[collect_data_vars(z) for z in args.datasets]))
logging.info(f"Found {len(data_vars)} variables")
if len(data_vars) == 0:
raise RuntimeError("No variables found by the hiopy worker.")
# group the variables by the crs grid_mapping.
# This is used to distribute them through the processes and create the coyote instances
grouped_data_vars = {
gid: list(variables)
for gid, variables in groupby(
sorted(all_data_vars, key=lambda v: grid_id(v, v.group)),
key=lambda v: grid_id(v, v.group),
)
for gid, variables in groupby(sorted(data_vars, key=grid_id), key=grid_id)
}
distributed_data_vars = distribute_work(grouped_data_vars, group_comm_size())
......@@ -107,8 +104,7 @@ def main():
loco_server.write_nginx_config(distributed_data_vars, group_comm_rank())
my_data_vars = distributed_data_vars[group_comm_rank()]
# my_data_vars list of tuples: (gid, vgroup, slice, list of variables)
# the vgroup is the group of variables if they need to be split (for the same grid)
# my_data_vars list of tuples: (gid, data_vars, slice)
if group_comm_rank() == 0:
logging.debug(distributed_data_vars)
......@@ -116,21 +112,21 @@ def main():
frac_masks = {} # the numpy array must keep alive until the end of the program
coyote_instances = {
(gid, vgroup): Coyote(f"{args.process_group}_{gid}_{vgroup}")
for gid, vgroup, chunk_slice, data_vars in my_data_vars
gid: Coyote(f"{args.process_group}_{gid}") for gid, data_vars, chunk_slice in my_data_vars
}
for gid, vgroup, chunk_slice, data_vars in my_data_vars:
coyote = coyote_instances[(gid, vgroup)]
for gid, data_vars, chunk_slice in my_data_vars:
coyote = coyote_instances[gid]
# all vars in data_vars define the same grid
def_grid(coyote, data_vars[0], chunk_slice, data_vars[0].group)
def_grid(coyote, data_vars[0], chunk_slice)
data_handlers = []
for v in data_vars:
# compute timestep
var_group = get_var_group(v)
time_dim_name = v.attrs["_ARRAY_DIMENSIONS"][0]
time_coordinate = v.group[time_dim_name]
time_coordinate = var_group[time_dim_name]
assert (
"seconds since " in time_coordinate.attrs["units"]
), "Currently the time must be given in seconds"
......@@ -138,10 +134,10 @@ def main():
# compute time start index
t0 = (
np.datetime64(start_datetime())
- np.datetime64(v.group["time"].attrs["units"][len("seconds since ") :])
- np.datetime64(var_group.time.attrs["units"][len("seconds since ") :])
) / np.timedelta64(1, "s")
t0_idx = np.searchsorted(v.group["time"], t0)
assert v.group["time"][t0_idx] == t0, "start_datetime not found in time axis"
t0_idx = np.searchsorted(var_group.time, t0)
assert var_group.time[t0_idx] == t0, "start_datetime not found in time axis"
dt = time_coordinate[t0_idx + 1] - time_coordinate[t0_idx]
......@@ -153,22 +149,18 @@ def main():
if "hiopy::yac_source" in v.attrs:
src_comp, src_grid = v.attrs["hiopy::yac_source"]
else:
# find the component with that variables
gid, src_vgroup = [
(gid, vg)
for gid, vg, _, gvars in chain(*distributed_data_vars)
for groupvar in gvars
if groupvar.name.split("/")[-2] == v.group.attrs["hiopy::parent"]
and groupvar.name.split("/")[-1] == v.basename
][0]
src_comp = src_grid = f"{args.process_group}_{gid}_{src_vgroup}"
assert "hiopy::parent" in var_group.attrs, f"No source for field {v.name} specified"
parent_var_name = var_group.attrs["hiopy::parent"] + "/" + v.name.split("/")[-1]
source_var = zarr.Group(store=v.store)[parent_var_name]
source_var_gid = grid_id(source_var)
src_comp = src_grid = f"{args.process_group}_{source_var_gid}"
time_method = v.attrs.get("hiopy::time_method", "point")
nnn = v.attrs.get("hiopy::nnn", 1)
frac_mask_name = v.attrs.get("hiopy::frac_mask", None)
frac_mask = []
if frac_mask_name is not None:
if frac_mask_name not in frac_masks:
frac_masks[frac_mask_name] = np.array(v.group[frac_mask_name][chunk_slice])
frac_masks[frac_mask_name] = np.array(var_group[frac_mask_name][chunk_slice])
frac_mask = frac_masks[frac_mask_name]
logging.info(
......@@ -206,7 +198,7 @@ def main():
ensure_enddef()
if group_comm_rank() == 0:
for v in all_data_vars:
for v in data_vars:
if "hiopy::yac_source" not in v.attrs:
continue
else:
......
......@@ -26,9 +26,9 @@ RUN apt-get update \
RUN pip install --no-cache-dir numpy mpi4py matplotlib cython healpy aiohttp zarr
# install yaxt
RUN curl -s -L https://swprojects.dkrz.de/redmine/attachments/download/529/yaxt-0.10.0.tar.gz | \
RUN curl -s -L https://gitlab.dkrz.de/dkrz-sw/yaxt/-/archive/release-0.11.3/yaxt-release-0.11.3.tar.gz | \
tar xvz && \
cd yaxt-0.10.0 && \
cd yaxt-release-0.11.3 && \
./configure --without-regard-for-quality --without-example-programs --without-perf-programs --with-pic && \
make -j 4 && \
make install
......
......@@ -88,13 +88,14 @@ hiopy specific parameters are added in the attributes and are usually prefixed b
.. csv-table::
:header: "attribute", "entity", "description"
"hiopy::parent", "Group", "Parent group of this group (used as source if no other source is specified. E.g. healpix_3 uses healpix_4 as parent)"
"hiopy::parent", "Group", "Parent group of this group (used as source if no other source is specified. E.g. healpix_3 uses healpix_4 as parent). Given as absolute path in the zarr store"
"hiopy::yac_source", "Array", "Explicit yac component and yac grid name of the source field"
"hiopy::src_name", "Array", "yac field name of the source field. Default is the name of the array itself."
"hiopy::time_method", "Array", "Time interpolation method (point, mean). Default: point"
"hiopy::nnn", "Array", "Number of nearest neighbors to use. Default is 1."
"hiopy::frac_mask", "Array", "Array name of the fractional mask that is used when forwarding the data. (see `yac_cput_frac`)"
"hiopy::enable", "Boolean", "Enable or disable the variable to be handled by the worker"
"hiopy::var_group", "Array", "Can be used to split-up variables into multiple groups, if not enough chunks are available to distribute the work (optional)."
`hiopy.configure` can also be used as a python
module. The following example creates a dataset with healpix grid
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment