Skip to content
Snippets Groups Projects
Commit 65f1b8fb authored by Siddhant Tibrewal's avatar Siddhant Tibrewal
Browse files

Merge remote-tracking branch 'origin/main' into 6-zarr3-sharding

parents f6f32c33 415fc4c7
No related branches found
No related tags found
1 merge request!43Draft: Resolve "Migrate to Zarr3 and add example for a dataset using sharding"
......@@ -104,13 +104,26 @@ hiopy-levante:
- |
DATASET=$(pwd -P)/dataset.zarr
hiopy-configure ${DATASET} create-healpix-hierarchic-dataset 1979-01-01T00:00:00 1979-01-01T06:00:00 900 4
hiopy-configure ${DATASET} add-height full_level 47
hiopy-configure ${DATASET} add-variable temp atmo_output icon_atmos_grid --zaxis full_level --chunk-shape 8 16 2000
hiopy-configure ${DATASET} add-variable pres atmo_output icon_atmos_grid --zaxis full_level --chunk-shape 4 16 2000
hiopy-configure ${DATASET} add-variable pres_sfc atmo_output icon_atmos_grid --chunk-shape 1 20
hiopy-configure ${DATASET} consolidate
hiopy-configure ${DATASET} info --tree
python << EOF
import zarr
import hiopy.configure as hc
z = zarr.open("${DATASET}")
hc.add_healpix_hierarchy(z, order=4)
hc.add_time(z, "1979-01-01T00:00:00", "1979-01-01T06:00:00", 900)
hc.add_height(z, "full_level", 47)
hc.add_variable(z, "temp", "atmo_output", "icon_atmos_grid",
zaxis="full_level", chunk_shape=(8, 16, 2000))
hc.add_variable(z, "pres", "atmo_output", "icon_atmos_grid",
zaxis="full_level", chunk_shape=(8, 16, 2000))
hc.add_variable(z, "pres_sfc", "atmo_output", "icon_atmos_grid",
chunk_shape=(1, 20))
zarr.consolidate_metadata(z.store)
print(z.info)
print(z.tree())
EOF
- |
MPMD=`pwd -P`/mpmd.conf
cd ${ICON_DIR}/build
......
......@@ -17,6 +17,7 @@ set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE CACHE BOOL
set(CMAKE_INSTALL_RPATH $ORIGIN)
option(BUILD_SHARED_LIBS "Build shared libraries" ON)
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
find_package(MPI REQUIRED COMPONENTS CXX)
......
from .configure import * # noqa: F403
from .create_dataset import * # noqa: F403
from .configure import main
main()
#!/usr/bin/env python3
from argparse import ArgumentParser
import healpy
import numpy as np
import zarr
from .create_dataset import (
create_dataset_healpix,
create_dataset_healpix_hierarchic,
create_dataset_icon,
)
def add_time(dataset, startdate, enddate, dt, name="time"):
if type(startdate) is str:
startdate = np.datetime64(startdate)
if type(enddate) is str:
enddate = np.datetime64(enddate)
def add_coordinates(dataset, coordinate_list):
lat_list, lon_list = zip(*coordinate_list)
time_data = (
np.arange(startdate, enddate, np.timedelta64(dt, "s")) - startdate
) // np.timedelta64(1, "s")
for g in _collect_groups(dataset):
assert "coordinates" in g.crs.attrs
clon = g.create_dataset("clon", data=np.array(lon_list))
clon.attrs["_ARRAY_DIMENSIONS"] = ["clon"]
clon.attrs["long_name"] = "center longitude"
clon.attrs["units"] = "degree"
clon.attrs["standard_name"] = "grid_longitude"
clat = g.create_dataset("clat", data=np.array(lat_list))
clat.attrs["_ARRAY_DIMENSIONS"] = ["clat"]
clat.attrs["long_name"] = "center latitude"
clat.attrs["units"] = "degree"
clat.attrs["standard_name"] = "grid_latitude"
time = g.create_dataset(name, data=time_data, fill_value=None, shape=time_data.shape)
time.attrs["_ARRAY_DIMENSIONS"] = (name,)
time.attrs["axis"] = "T"
time.attrs["calendar"] = "proleptic_gregorian"
time.attrs["units"] = f"seconds since {startdate}"
def _collect_groups(dataset):
......@@ -55,6 +46,7 @@ def add_variable(
yac_source_comp,
yac_source_grid,
zaxis=None,
taxis="time",
time_method=None,
chunk_shape=None,
frac_mask=None,
......@@ -64,42 +56,32 @@ def add_variable(
**kwargs,
):
for g in _collect_groups(dataset):
ntime = g.get("time").shape[0]
taxis_tuple = tuple() if taxis is None else (taxis,)
ntime = tuple() if taxis is None else (g.get(taxis).shape[0],)
grid_mapping_name = g.get("crs").attrs["grid_mapping_name"]
spatial_attr = "point" if (grid_mapping_name == "point_cloud") else "cell"
crs_len = 0
if grid_mapping_name == "healpix":
crs_len = healpy.nside2npix(g.get("crs").attrs["healpix_nside"])
elif grid_mapping_name == "point_cloud":
lon_coord, lat_coord = g.get("crs").attrs["coordinates"].split(" ")
assert lon_coord in g and lat_coord in g
assert g.get(lon_coord).shape[0] == g.get(lat_coord).shape[0]
crs_len = g.get(lat_coord).shape[0]
else:
if "clon" not in g or "clat" not in g:
raise Exception("Coordinates not defined appropriately in the dataset.")
assert g.get("clon").shape[0] == g.get("clat").shape[0]
crs_len = g.get("clon").shape[0]
raise Exception("Unknown crs.")
_attributes = attributes or {}
if zaxis is None:
shape = (ntime, crs_len)
_chunk_shape = (
(np.minimum(chunk_shape, shape).item(0), np.minimum(chunk_shape, shape).item(1))
if chunk_shape is not None
else ()
)
_attributes["_ARRAY_DIMENSIONS"] = ("time", spatial_attr)
shape = (*ntime, crs_len)
_chunk_shape = (np.min(chunk_shape, shape),) if chunk_shape is not None else None
_attributes["_ARRAY_DIMENSIONS"] = (*taxis_tuple, spatial_attr)
else:
nheight = g[zaxis].shape[0]
shape = (ntime, nheight, crs_len)
_chunk_shape = (
(
np.minimum(chunk_shape, shape).item(0),
nheight,
np.minimum(chunk_shape, shape).item(2),
)
if chunk_shape is not None
else None
)
_attributes["_ARRAY_DIMENSIONS"] = ("time", zaxis, spatial_attr)
nheight = g.get(zaxis).shape[0]
shape = (*ntime, nheight, crs_len)
_chunk_shape = (np.min(chunk_shape, shape),) if chunk_shape is not None else None
_attributes["_ARRAY_DIMENSIONS"] = (*taxis_tuple, zaxis, spatial_attr)
_attributes["grid_mapping"] = "crs"
_shard_shape = None
......@@ -140,103 +122,3 @@ def extend_time(dataset, enddate):
enddate_s = (enddate - refdate) // np.timedelta64(1, "s")
g.time.append(np.arange(g.time[-1] + dt, enddate_s + dt, dt))
print(g)
def consolidate(dataset):
zarr.consolidate_metadata(dataset.store)
def info(dataset, tree):
print(dataset.info)
if tree:
print(dataset.tree())
def main():
parser = ArgumentParser()
parser.add_argument("dataset", type=zarr.open)
subparsers = parser.add_subparsers()
create_hierahchy_parser = subparsers.add_parser("create-healpix-hierarchic-dataset")
create_hierahchy_parser.set_defaults(func=create_dataset_healpix_hierarchic)
create_hierahchy_parser.add_argument("startdate", type=np.datetime64)
create_hierahchy_parser.add_argument("enddate", type=np.datetime64)
create_hierahchy_parser.add_argument("dt", type=int)
create_hierahchy_parser.add_argument("order", type=int)
create_hierahchy_parser.add_argument("--prefix", type=str, default="healpix_")
create_dataset_parser = subparsers.add_parser("create-healpix-dataset")
create_dataset_parser.set_defaults(func=create_dataset_healpix)
create_dataset_parser.add_argument("startdate", type=np.datetime64)
create_dataset_parser.add_argument("enddate", type=np.datetime64)
create_dataset_parser.add_argument("dt", type=int)
create_dataset_parser.add_argument("order", type=int)
create_dataset_parser = subparsers.add_parser("create-dataset")
create_dataset_parser.set_defaults(func=create_dataset_icon)
create_dataset_parser.add_argument("startdate", type=np.datetime64)
create_dataset_parser.add_argument("enddate", type=np.datetime64)
create_dataset_parser.add_argument("dt", type=int)
create_dataset_parser.add_argument("grid_name", type=str)
add_height_parser = subparsers.add_parser("add-height")
add_height_parser.set_defaults(func=add_height)
add_height_parser.add_argument("name", type=str)
add_height_parser.add_argument("n", type=int)
def parse_coordinate_list(s):
lon, lat = s.split(",")
return float(lon), float(lat)
add_height_parser = subparsers.add_parser("add-coordinates")
add_height_parser.set_defaults(func=add_coordinates)
add_height_parser.add_argument(
"coordinate_list",
nargs="+",
type=parse_coordinate_list,
help="Coordinates: tuples Lat,Lon in degrees",
)
add_variable_parser = subparsers.add_parser("add-variable")
add_variable_parser.set_defaults(func=add_variable)
add_variable_parser.add_argument("name", type=str)
add_variable_parser.add_argument("yac_source_comp", type=str)
add_variable_parser.add_argument("yac_source_grid", type=str)
add_variable_parser.add_argument("--zaxis", type=str)
add_variable_parser.add_argument(
"--time-method", type=str, default="point", choices=["mean", "point", "sum"]
)
add_variable_parser.add_argument(
"--chunk-shape",
type=int,
default=None,
nargs="*",
help="shape of the data chunks (time, zaxis(if applicable), cell)",
)
add_variable_parser.add_argument(
"--frac-mask", type=str, default=None, help="Name of the frac_mask array in the same group"
)
add_variable_parser.add_argument("--yac-name", type=str, help="name of the yac field")
add_variable_parser.add_argument("--chunks-per-shard", type=int)
extend_time_parser = subparsers.add_parser("extend-time")
extend_time_parser.set_defaults(func=extend_time)
extend_time_parser.add_argument("enddate", type=np.datetime64)
consolidate_parser = subparsers.add_parser("consolidate")
consolidate_parser.set_defaults(func=consolidate)
info_parser = subparsers.add_parser("info")
info_parser.set_defaults(func=info)
info_parser.add_argument("--tree", action="store_true")
args = parser.parse_args()
def _main(func, **kwargs):
return func(**kwargs)
_main(**vars(args))
if __name__ == "__main__":
main()
......@@ -2,45 +2,42 @@
import numpy as np
def add_timedata(dataset, startdate, enddate, dt):
time_data = (
np.arange(startdate, enddate, np.timedelta64(dt, "s")) - startdate
) // np.timedelta64(1, "s")
time = dataset.create_array(
name="time", fill_value=None, shape=time_data.shape, dtype=np.longlong
)
time[:] = time_data
time.attrs["_ARRAY_DIMENSIONS"] = ("time",)
time.attrs["axis"] = "T"
time.attrs["calendar"] = "proleptic_gregorian"
time.attrs["units"] = f"seconds since {startdate}"
def create_dataset_icon(dataset, startdate, enddate, dt, grid_name="icon_atmo"):
assert grid_name in ["icon_atmo", "icon_ocean", "point_cloud"]
def add_coordinates(dataset, coordinates, coord_names=("lon", "lat")):
crs = dataset.create_array(name="crs", dtype=np.float32, shape=(1,))
crs.attrs["_ARRAY_DIMENSIONS"] = ("crs",)
crs.attrs["grid_mapping_name"] = grid_name
crs.attrs["coordinates"] = "clon clat"
crs.attrs["grid_mapping_name"] = "point_cloud"
crs.attrs["coordinates"] = f"{coord_names[0]} {coord_names[1]}"
add_timedata(dataset, startdate, enddate, dt)
lat_list, lon_list = zip(*coordinates)
lon = dataset.create_array(
name=coord_names[0], fill_value=None, shape=lon_list.shape, dtype=np.float32
)
lon[:] = np.array(lon_list)
lon.attrs["_ARRAY_DIMENSIONS"] = [coord_names[0]]
lon.attrs["long_name"] = "longitude"
lon.attrs["units"] = "degree"
lon.attrs["standard_name"] = "grid_longitude"
lat = dataset.create_array(
name=coord_names[1], fill_value=None, shape=lat_list.shape, dtype=np.float32
)
lat[:] = np.array(lat_list)
lat.attrs["_ARRAY_DIMENSIONS"] = [coord_names[1]]
lat.attrs["long_name"] = "latitude"
lat.attrs["units"] = "degree"
lat.attrs["standard_name"] = "grid_latitude"
def create_dataset_healpix(dataset, startdate, enddate, dt, order):
def add_healpix_grid(dataset, order):
crs = dataset.create_array(name="crs", dtype=np.float32, shape=(1,))
crs.attrs["_ARRAY_DIMENSIONS"] = ("crs",)
crs.attrs["grid_mapping_name"] = "healpix"
crs.attrs["healpix_nside"] = 2**order
crs.attrs["healpix_order"] = "nest"
add_timedata(dataset, startdate, enddate, dt)
# TODO: For the future, allow hierarchy also in other dataset types
def create_dataset_healpix_hierarchic(dataset, startdate, enddate, dt, order, prefix="healpix_"):
def add_healpix_hierarchy(dataset, order, prefix="healpix_"):
for o in range(order + 1):
zg = dataset.create_group(name=f"{prefix}{o}")
create_dataset_healpix(zg, startdate, enddate, dt, o)
add_healpix_grid(zg, o)
if o < order:
zg.attrs["hiopy::parent"] = f"{prefix}{o+1}"
......@@ -6,7 +6,7 @@ set_tests_properties(hiopy.test_distribute_work PROPERTIES
ENVIRONMENT "PYTHONPATH=${CMAKE_BINARY_DIR}/python:${CMAKE_SOURCE_DIR}/apps:$ENV{PYTHONPATH}"
)
configure_file(create_simple_dataset.sh.in create_simple_dataset.sh)
configure_file(create_simple_dataset.sh.in create_simple_dataset.sh @ONLY)
add_test(
NAME hiopy.create_simple_dataset
COMMAND create_simple_dataset.sh 1
......
#!/usr/bin/bash
set -e
dataset="metogram_dataset.zarr"
dataset="meteogram_dataset.zarr"
rm -rf ${dataset}
python -m hiopy.configure ${dataset} create-dataset 2000-01-01 2000-01-01T00:01:00 3 point_cloud
python -m hiopy.configure ${dataset} add-coordinates \
10.5,12.1 \
13.4,11.7 \
12.3,8.4 \
9.6,8.9 \
14.5,12.7 \
51.3,52.4 \
47.6,48.8 \
53.5,49.9 \
50.7,48.2 \
54.1,45.0
python -m hiopy.configure ${dataset} add-variable x simple_source simple_grid
python -m hiopy.configure ${dataset} add-variable y simple_source simple_grid
python -m hiopy.configure ${dataset} add-variable z simple_source simple_grid
python -m hiopy.configure ${dataset} add-variable clock simple_source simple_grid
python -m hiopy.configure ${dataset} consolidate
python -m hiopy.configure ${dataset} info --tree
python <<EOF
import hiopy.configure as hc
import zarr
z = zarr.open("${dataset}")
hc.add_coordinates(z, [
(10.5,12.1),
(13.4,11.7),
(12.3,8.4),
(9.6,8.9),
(14.5,12),
(51.3,52.4),
(47.6,48.8),
(53.5,49.9),
(50.7,48.2),
(54.1,45.0)
])
hc.add_time(z, "2000-01-01", "2000-01-01T00:01:00", 3)
hc.add_variable(z, "x", "simple_source", "simple_grid")
hc.add_variable(z, "y", "simple_source", "simple_grid")
hc.add_variable(z, "z", "simple_source", "simple_grid")
hc.add_variable(z, "clock", "simple_source", "simple_grid")
zarr.consolidate_metadata(z.store)
print(z.info)
print(z.tree())
EOF
@MPIEXEC_EXECUTABLE@ @MPIEXEC_NUMPROC_FLAG@ 1 python -m hiopy.worker ${dataset} : @MPIEXEC_NUMPROC_FLAG@ 1 @CMAKE_BINARY_DIR@/tests/simple_source -e 2000-01-01T00:01:00
......@@ -2,12 +2,25 @@
set -e
rm -rf simple_dataset.zarr
python -m hiopy.configure simple_dataset.zarr create-healpix-hierarchic-dataset 2000-01-01 2000-01-01T00:01:00 3 3
python -m hiopy.configure simple_dataset.zarr add-variable x simple_source simple_grid
python -m hiopy.configure simple_dataset.zarr add-variable y simple_source simple_grid
python -m hiopy.configure simple_dataset.zarr add-variable z simple_source simple_grid
python -m hiopy.configure simple_dataset.zarr add-variable clock simple_source simple_grid
python -m hiopy.configure simple_dataset.zarr consolidate
python -m hiopy.configure simple_dataset.zarr info --tree
${MPIEXEC_EXECUTABLE} ${MPIEXEC_NUMPROC_FLAG} 1 python -m hiopy.worker simple_dataset.zarr --nthreads $1 : ${MPIEXEC_NUMPROC_FLAG} 1 ${CMAKE_BINARY_DIR}/tests/simple_source -e 2000-01-01T00:01:00
dataset="simple_dataset.zarr"
rm -rf ${dataset}
python <<EOF
import zarr
import hiopy.configure as hc
z = zarr.open("${dataset}")
hc.add_healpix_hierarchy(z, order=3)
hc.add_time(z, "2000-01-01", "2000-01-01T00:01:00", 3)
hc.add_variable(z, "x", "simple_source", "simple_grid")
hc.add_variable(z, "y", "simple_source", "simple_grid")
hc.add_variable(z, "z", "simple_source", "simple_grid")
hc.add_variable(z, "clock", "simple_source", "simple_grid")
zarr.consolidate_metadata(z.store)
print(z.info)
print(z.tree())
EOF
@MPIEXEC_EXECUTABLE@ @MPIEXEC_NUMPROC_FLAG@ 1 python -m hiopy.worker ${dataset} --nthreads $1 : @MPIEXEC_NUMPROC_FLAG@ 1 @CMAKE_BINARY_DIR@/tests/simple_source -e 2000-01-01T00:01:00
find_package(PkgConfig REQUIRED)
set(PKG_CONFIG_USE_CMAKE_PREFIX_PATH ON)
pkg_check_modules(YAC_PKGCONFIG REQUIRED yac)
pkg_check_modules(YAC_PKGCONFIG REQUIRED yac-mci)
if(YAC_PKGCONFIG_FOUND)
add_library(YAC::YAC INTERFACE IMPORTED)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment