From 9e8b6b482713b6ec81648a9d424f42fe217102f6 Mon Sep 17 00:00:00 2001 From: Fabian Wachsmann <k204210@l40040.lvt.dkrz.de> Date: Fri, 18 Oct 2024 09:49:40 +0200 Subject: [PATCH] Updated cloudify --- cloudify/plugins/kerchunk.py | 62 ++++-- cloudify/plugins/stacer.py | 186 ++++++++++++---- scripts/cataloghost.py | 415 ++++++++++++++++++----------------- 3 files changed, 408 insertions(+), 255 deletions(-) diff --git a/cloudify/plugins/kerchunk.py b/cloudify/plugins/kerchunk.py index d424f93..aa7d4db 100755 --- a/cloudify/plugins/kerchunk.py +++ b/cloudify/plugins/kerchunk.py @@ -5,8 +5,10 @@ from fastapi.responses import HTMLResponse # fro mZarr: # from starlette.responses import StreamingResponse, Response # type: ignore from fastapi.responses import StreamingResponse, Response +import cachey import xarray as xr import fsspec +import asyncio import gc import json import os @@ -15,12 +17,18 @@ from xpublish.dependencies import ( get_dataset, ) # Assuming 'dependencies' is the module where get_dataset is defined from xpublish import Plugin, hookimpl, Dependencies +from xpublish.utils.api import DATASET_ID_ATTR_KEY + +from datetime import datetime + +todaystring = datetime.today().strftime("%a, %d %b %Y %H:%M:%S GMT") gctrigger = 0 GCLIMIT = 500 -def kerchunk_stream_content(data): +async def kerchunk_stream_content(data): + await asyncio.sleep(0) yield data @@ -37,9 +45,12 @@ class KerchunkPass(Plugin): prefix=self.dataset_router_prefix, tags=list(self.dataset_router_tags) ) - @router.get("/{key:path}") + @router.api_route("/{key:path}", methods=["GET", "HEAD"]) async def get_chunk( - key: str, dataset: xr.Dataset = Depends(deps.dataset), use_cache=False + key: str, + dataset: xr.Dataset = Depends(deps.dataset), + cache: cachey.Cache = Depends(deps.cache), + # use_cache=False ): global gctrigger, mapper_dict if "source" in dataset.encoding: @@ -54,24 +65,43 @@ class KerchunkPass(Plugin): ) # if key in fsmap: if True: - if ".zmetadata" == key: - kv = json.loads(fsmap[key].decode("utf-8")) - kv["zarr_consolidated_format"] = 1 - resp = Response( - json.dumps(kv).encode("utf-8"), - media_type="application/octet-stream", + if any(a in key for a in [".zmetadata", ".zarray", ".zgroup"]): + cache_key = ( + dataset.attrs.get(DATASET_ID_ATTR_KEY, "") + + "/kerchunk/" + + f"{key}" ) - fsmap.fs.dircache.clear() - del fsmap, kv - return resp + resp = cache.get(cache_key) + if resp is None: + zmetadata = json.loads(fsmap[".zmetadata"].decode("utf-8")) + if ".zmetadata" == key: + zmetadata["zarr_consolidated_format"] = 1 + if key == ".zgroup": + jsondump = json.dumps({"zarr_format": 2}).encode("utf-8") + elif ".zarray" in key or ".zgroup" in key or ".zattrs" in key: + jsondump = json.dumps(zmetadata["metadata"][key]).encode( + "utf-8" + ) + else: + jsondump = json.dumps(zmetadata).encode("utf-8") + resp = Response( + jsondump, + media_type="application/octet-stream", + ) + cache.put(cache_key, resp, 99999) # return StreamingResponse( # kerchunk_stream_content(fsmap[key]), # media_type='application/octet-stream', # ) - resp = StreamingResponse( - kerchunk_stream_content(fsmap[key]), - media_type="application/octet-stream", - ) + else: + resp = StreamingResponse( + kerchunk_stream_content(fsmap[key]), + media_type="application/octet-stream", + ) + resp.headers["Cache-control"] = "max-age=604800" + resp.headers["X-EERIE-Request-Id"] = "True" + resp.headers["Last-Modified"] = todaystring + resp.headers["Access-Control-Allow-Origin"] = "https://swift.dkrz.de" fsmap.fs.dircache.clear() del sp, dataset, key if gctrigger > GCLIMIT: diff --git a/cloudify/plugins/stacer.py b/cloudify/plugins/stacer.py index 2783e65..bf4b7aa 100755 --- a/cloudify/plugins/stacer.py +++ b/cloudify/plugins/stacer.py @@ -1,6 +1,6 @@ from typing import Sequence from fastapi import APIRouter, Depends, HTTPException, Request -from xpublish.utils.api import JSONResponse +from xpublish.utils.api import JSONResponse, DATASET_ID_ATTR_KEY import xarray as xr import json from xpublish.dependencies import ( @@ -8,13 +8,18 @@ from xpublish.dependencies import ( ) # Assuming 'dependencies' is the module where get_dataset is defined from xpublish import Plugin, hookimpl, Dependencies from copy import deepcopy as copy +import cachey from pystac import Item, Asset, MediaType from datetime import datetime import pystac import json import fsspec +PARENT_CATALOG = "https://swift.dkrz.de/v1/dkrz_7fa6baba-db43-4d12-a295-8e3ebb1a01ed/catalogs/stac-catalog-eeriecloud.json" +GRIDLOOK = "https://swift.dkrz.de/v1/dkrz_7fa6baba-db43-4d12-a295-8e3ebb1a01ed/apps/gridlook/index.html" HOST = "https://eerie.cloud.dkrz.de/" +INTAKE_SOURCE = "https://raw.githubusercontent.com/eerie-project/intake_catalogues/refs/heads/main/dkrz/disk/model-output/main.yaml" +JUPYTERLITE = "https://swift.dkrz.de/v1/dkrz_7fa6baba-db43-4d12-a295-8e3ebb1a01ed/apps/jupyterlite/index.html" HOSTURL = HOST + "datasets" PROVIDER_DEFAULT = dict( name="DKRZ", @@ -31,40 +36,53 @@ ALTERNATE_KERCHUNK = dict( STAC_EXTENSIONS = [ "https://stac-extensions.github.io/datacube/v2.2.0/schema.json", "https://stac-extensions.github.io/alternate-assets/v1.2.0/schema.json", + "https://stac-extensions.github.io/xarray-assets/v1.0.0/schema.json", ] -DESC = """ -DKRZ hosts a data server named ‘eerie.cloud’ for global -high resolution Earth System Model simulation output -for experiments produced within the EU project EERIE -and stored at the German Climate Computing Center (DKRZ). -Eerie.cloud makes use of the python package xpublish. -Xpublish is a plugin for xarray (Hoyer, 2023) which is widely used in the Earth System Science community. -It serves ESM output formatted as zarr (Miles, 2020) via a RestAPI based on FastAPI. -Served in this way, the data imitates cloud-native data (Abernathey, 2021) -and features many capabilities of cloud-optimized data. """ +XARRAY_ITEM = {"xarray:open_kwargs": dict(consolidated="true")} +EERIE_DESC = """ +Earth System Model Simulation output of the [EERIE project](https://eerie-project.eu/). +This project, running from January 2023 to December 2026, +will reveal and quantify the role of ocean mesoscale processes +in shaping the climate trajectory over seasonal to centennial time scales. +To this end EERIE will develop a new generation of Earth System Models (ESMs) +that are capable of explicitly representing a crucially important, +yet unexplored regime of the Earth system – the ocean mesoscale. +Leveraging the latest advances in science and technology, +EERIE will substantially improve the ability of such ESMs to faithfully +represent the centennial-scale evolution of the global climate, +especially its variability, extremes and +how tipping points may unfold under the influence of the ocean mesoscale. -def create_stac_collection(): - techdoc = pystac.Asset( - href="https://pad.gwdg.de/OZo5HMC4R6iljvZHlo-BzQ#", - title="Technical documentation", - media_type=pystac.MediaType.HTML, - roles=["OVERVIEW"], - ) +[Imprint](https://www.dkrz.de/en/about-en/contact/impressum) and +[Privacy Policy](https://www.dkrz.de/en/about-en/contact/en-datenschutzhinweise). +""" + +TECHDOC = pystac.Asset( + href="https://pad.gwdg.de/OZo5HMC4R6iljvZHlo-BzQ#", + title="Technical documentation", + media_type=pystac.MediaType.HTML, + roles=["OVERVIEW"], +) + + +def create_stac_eerie_collection(): + start = datetime(1850, 1, 1) + end = datetime(2101, 1, 1) col = pystac.Collection( - id="eerie.cloud", - title="The EERIE cloud data server", - description=DESC, + id="eerie", + title="EERIE data in Zarr format", + description=EERIE_DESC, stac_extensions=STAC_EXTENSIONS, - href=HOST + "stac.json", + href=HOST + "stac-collection-eerie.json", extent=pystac.Extent( spatial=pystac.SpatialExtent([-180, -90, 180, 90]), - temporal=pystac.TemporalExtent(intervals=[None, None]), + temporal=pystac.TemporalExtent(intervals=[start, end]), ), keywords=["EERIE", "cloud", "ICON", "IFS", "FESOM", "NEMO", "HadGEM"], providers=[pystac.Provider(PROVIDER_DEFAULT)], - assets=dict(doc=techdoc), + assets=dict(doc=copy(TECHDOC)), ) return col.to_dict() @@ -88,7 +106,7 @@ def xarray_to_stac_item(ds): description = ds.attrs.get("description", "No description") xid = ds.attrs.get("_xpublish_id", "No id") keywords = xid.split(".") - datetimeattr = datetime.now()#.isoformat() + datetimeattr = datetime.now().isoformat() start_datetime = ds.attrs.get("time_min", None) if start_datetime: start_datetime = ( @@ -128,7 +146,7 @@ def xarray_to_stac_item(ds): ] ], } - cdate = ds.attrs.get("creation_date", datetimeattr.isoformat()) + cdate = ds.attrs.get("creation_date", datetimeattr) providers = [copy(PROVIDER_DEFAULT)] creator_inst_id = ds.attrs.get("institution_id", None) if not creator_inst_id: @@ -143,9 +161,36 @@ def xarray_to_stac_item(ds): creator["url"] = HOSTURL + "/" + xid + "/" creator["roles"] = ["producer"] providers.append(creator) + description += ( + "\n" + + "You can use this dataset in xarray from intake.\n\n" + + "On Levante:\n" + + "```python\n" + + "import intake\n" + + "intake.open_catalog(\n" + + ' "' + + INTAKE_SOURCE + + '"\n' + + ')["' + + xid + + '"].to_dask()\n' + + "```\n" + + "Everywhere else:\n" + + "```python\n" + + "import intake\n" + + "intake.open_stac_item(\n" + + ' "' + + HOSTURL + + "/" + + xid + + '/stac"\n' + + ").data.to_dask()\n" + + "```\n" + ) + properties = { "description": description, - "title": title, + "title": " ".join(title.split(".")), "created": cdate, "keywords": keywords, "providers": providers, @@ -182,6 +227,7 @@ def xarray_to_stac_item(ds): HOSTURL + "/" + xid + "/kerchunk" ) extra_fields["alternate"]["kerchunk"]["name"] = "Raw data" + extra_fields.update(copy(XARRAY_ITEM)) item.add_asset( "data", Asset( @@ -203,9 +249,28 @@ def xarray_to_stac_item(ds): description="HTML representation of the xarray dataset", ), ) + item.add_asset( + "jupyterlite", + Asset( + href=JUPYTERLITE, + media_type=MediaType.HTML, + title="Jupyterlite access", + roles=["analysis"], + description="Web-assembly based analysis platform with access to this item", + ), + ) + item.add_asset( + "gridlook", + Asset( + href=GRIDLOOK + "#" + HOSTURL + "/" + xid + "/stac", + media_type=MediaType.HTML, + title="Visualization with gridlook", + roles=["Visualization"], + description="Visualization with gridlook", + ), + ) itemdict = item.to_dict() - itemdict["properties"]["datetime"]=datetimeattr.isoformat() # for asset in itemdict["assets"].keys(): itemdict["links"] = [ dict( @@ -214,21 +279,39 @@ def xarray_to_stac_item(ds): title="Usage of the eerie.cloud", ) ] + if not any(xid.startswith(a) for a in ["nextgems", "era"]): + itemdict["links"].append( + dict( + rel="parent", + href=HOSTURL + "/stac-collection-eerie.json", + type="application/json", + ) + ) # # gridlook # griddict = copy(store_dataset_dict) if "icon-esm-er" in xid and "native" in xid: + griddict[ + "store" + ] = "https://swift.dkrz.de/v1/dkrz_7fa6baba-db43-4d12-a295-8e3ebb1a01ed/grids/" if "atmos" in xid or "land" in xid: - griddict[ - "store" - ] = "https://swift.dkrz.de/v1/dkrz_948e7d4bbfbb445fbff5315fc433e36a/nextGEMS/" - griddict["dataset"] = "grids/ICON_R02B08.zarr" + griddict["dataset"] = "icon_grid_0033_R02B08_G.zarr" elif "ocean" in xid: - griddict["store"] = "https://eerie.cloud.dkrz.de/datasets/" - griddict[ - "dataset" - ] = "icon-esm-er.eerie-control-1950.v20231106.ocean.native.2d_grid/zarr" + griddict["dataset"] = "icon_grid_0016_R02B09_O.zarr" + if "gr025" in xid: + griddict[ + "store" + ] = "https://swift.dkrz.de/v1/dkrz_7fa6baba-db43-4d12-a295-8e3ebb1a01ed/grids/" + if "ifs-amip" in xid: + griddict["dataset"] = "gr025_descending.zarr" + else: + griddict["dataset"] = "gr025.zarr" + if "era5" in xid: + griddict[ + "store" + ] = "https://swift.dkrz.de/v1/dkrz_7fa6baba-db43-4d12-a295-8e3ebb1a01ed/grids/" + griddict["dataset"] = "era5.zarr" itemdict["default_var"] = list(var_store_dataset_dict.keys())[0] itemdict["name"] = title @@ -270,13 +353,19 @@ class Stac(Plugin): def get_request(request: Request) -> str: return request - @router.get("catalog.json", summary="Root stac catalog") - def get_root_catalog( + @router.get("-collection-eerie.json", summary="Root stac catalog") + def get_eerie_collection( request=Depends(get_request), dataset_ids=Depends(deps.dataset_ids) ): - coldict = create_stac_collection() + coldict = create_stac_eerie_collection() + coldict["links"].append( + dict(rel="parent", href=PARENT_CATALOG, type="application/json") + ) + coldict["providers"][0] = coldict["providers"][0]["name"] dslist = json.load(fsspec.open(HOSTURL).open()) for item in dslist: + if any(item.startswith(a) for a in ["nextgems", "era"]): + continue coldict["links"].append( dict( rel="child", @@ -296,10 +385,23 @@ class Stac(Plugin): @router.get("/") @router.get("") - async def get_stacitem(ds: xr.Dataset = Depends(deps.dataset)): + async def get_stacitem( + ds: xr.Dataset = Depends(deps.dataset), + cache: cachey.Cache = Depends(deps.cache), + ): + cache_key = ds.attrs.get(DATASET_ID_ATTR_KEY, "") + "/" + "stac" + resp = cache.get(cache_key) # if not all(a in ds.attrs for a in STACKEYS): # raise HTTPException(status_code=404, detail=f"Dataset does not contain all keys needed for a STAC Item") - item_dict = xarray_to_stac_item(ds) - return JSONResponse(item_dict) + if resp is None: + try: + item_dict = xarray_to_stac_item(ds) + resp = JSONResponse(item_dict) + except: + raise HTTPException( + status_code=404, detail="Could not create a STAC Item" + ) + cache.put(cache_key, resp, 99999) + return resp return router diff --git a/scripts/cataloghost.py b/scripts/cataloghost.py index e7054d7..01b4f83 100755 --- a/scripts/cataloghost.py +++ b/scripts/cataloghost.py @@ -1,12 +1,11 @@ -from cloudify.plugins.exposer import * -from cloudify.plugins.geoanimation import * -from cloudify.utils.daskhelper import * -from cloudify.plugins.dynamic_datasets import * -from cloudify.plugins.kerchunk import * -from cloudify.plugins.dynamic_variables import * -from cloudify.plugins.stacer import * +from exposer import * from copy import deepcopy as copy from stacer import * +from geoanimation import * +from daskhelper import * +from dynamic_datasets import * +from kerchunk import * +from dynamic_variables import * from datetime import datetime import os import numcodecs @@ -19,101 +18,121 @@ import asyncio import fastapi import itertools import nest_asyncio - -os.environ["FORWARDED_ALLOW_IPS"] = "127.0.0.1" +os.environ["FORWARDED_ALLOW_IPS"]="127.0.0.1" nest_asyncio.apply() from intake.config import conf - -conf["cache_disabled"] = True -STORAGE_OPTIONS = dict( - # remote_protocol="file", - lazy=True, - cache_size=0, - # skip_instance_cache=True, - # listings_expiry_time=0 -) -L_DASK = True -L_NEXTGEMS = True -mapper_dict = {} - -# CATALOG_FILE="/work/bm1344/DKRZ/intake/dkrz_eerie_esm.yaml" -CATALOG_FILE = "/work/bm1344/DKRZ/intake_catalogues/dkrz/disk/main.yaml" -# ADDRESS="tcp://127.0.0.1:42577" - - +conf['cache_disabled'] = True +STORAGE_OPTIONS=dict( +# remote_protocol="file", + lazy=True, + cache_size=0 +# skip_instance_cache=True, +# listings_expiry_time=0 + ) +L_DASK=True +L_NEXTGEMS=True +L_ERA5=False +mapper_dict={} + +#CATALOG_FILE="/work/bm1344/DKRZ/intake/dkrz_eerie_esm.yaml" +CATALOG_FILE="/work/bm1344/DKRZ/intake_catalogues/dkrz/disk/main.yaml" +#ADDRESS="tcp://127.0.0.1:42577" +INSTITUTE_KEYS=[ + "institution_id", + "institute_id", + "institution", + "institute", + "centre" + ] +SOURCE_KEYS=[ + "source_id", + "model_id", + "source", + "model" + ] +EXPERIMENT_KEYS=[ + "experiment_id", + "experiment" + ] +PROJECT_KEYS=[ + "project_id", + "project", + "activity_id", + "activity" + ] def set_custom_header(response: fastapi.Response) -> None: - response.headers["Cache-control"] = "max-age=604800" + response.headers["Cache-control"] = "max-age=3600" response.headers["X-EERIE-Request-Id"] = "True" - response.headers["Last-Modified"] = datetime.today().strftime( - "%a, %d %b %Y %H:%M:%S GMT" - ) - + response.headers["Last-Modified"] = datetime.today().strftime("%a, %d %b %Y %H:%M:%S GMT") -def refine_nextgems(iakey, ds, md, desc): - ds = reset_encoding_get_mapper(iakey, ds, desc=desc) - ds = adapt_for_zarr_plugin_and_stac(iakey, ds) +def refine_nextgems(iakey, ds,md,desc): + ds=reset_encoding_get_mapper(iakey, ds,desc=desc) + ds=adapt_for_zarr_plugin_and_stac(iakey,ds) ds = set_compression(ds) - for mdk, mdv in md.items(): + for mdk,mdv in md.items(): if not mdk in ds.attrs: - ds.attrs[mdk] = mdv - for mdk, mdv in desc["metadata"].items(): + ds.attrs[mdk]=mdv + for mdk,mdv in desc["metadata"].items(): if not mdk in ds.attrs: - ds.attrs[mdk] = mdv + ds.attrs[mdk]=mdv return ds - def add_nextgems(dsdict): - NGC_PROD_CAT = "https://www.wdc-climate.de/ui/cerarest/addinfoDownload/nextGEMS_prod_addinfov1/nextGEMS_prod.yaml" - # NGC_PROD_CAT="https://data.nextgems-h2020.eu/catalog.yaml" - ngccat = intake.open_catalog(NGC_PROD_CAT) - md = yaml.safe_load(ngccat.yaml())["sources"]["nextGEMS_prod"]["metadata"] - dsadd = [ - "ICON.ngc4008", - "IFS.IFS_9-FESOM_5-production.2D_hourly_healpix512", - "IFS.IFS_9-FESOM_5-production.3D_hourly_healpix512", - ] + NGC_PROD_CAT="https://www.wdc-climate.de/ui/cerarest/addinfoDownload/nextGEMS_prod_addinfov1/nextGEMS_prod.yaml" +# NGC_PROD_CAT="https://data.nextgems-h2020.eu/catalog.yaml" + ngccat=intake.open_catalog(NGC_PROD_CAT) + md=yaml.safe_load(ngccat.yaml())["sources"]["nextGEMS_prod"]["metadata"] + dsadd=["ICON.ngc4008", "IFS.IFS_9-FESOM_5-production.2D_hourly_healpix512", + "IFS.IFS_9-FESOM_5-production.3D_hourly_healpix512"] for ia in dsadd: - desc = ngccat[ia].describe() - if type(desc["args"]["urlpath"]) == str and desc["args"]["urlpath"].endswith( - "zarr" - ): + desc=ngccat[ia].describe() + if type(desc["args"]["urlpath"])==str and desc["args"]["urlpath"].endswith("zarr"): if "user_parameters" in desc: - ups = desc["user_parameters"] - uplists = [up["allowed"] for up in ups] - combinations = list(itertools.product(*uplists)) - combdict = [ - {ups[i]["name"]: comb[i] for i in range(len(ups))} - for comb in combinations - ] + ups=desc["user_parameters"] + uplists=[up["allowed"] for up in ups] + combinations = list( + itertools.product( + *uplists + ) + ) + combdict=[ + { + ups[i]["name"]:comb[i] + for i in range(len(ups)) + } + for comb in combinations + ] for comb in combdict: - iakey = ia + "." + "_".join([str(a) for a in list(comb.values())]) + iakey='nextgems.'+ia+"."+'_'.join([str(a) for a in list(comb.values())]) try: - dsdict[iakey] = ngccat[ia](**comb, chunks="auto").to_dask() - dsdict[iakey] = refine_nextgems(iakey, dsdict[iakey], md, desc) + dsdict[iakey]=ngccat[ia](**comb,chunks="auto").to_dask() + dsdict[iakey]=refine_nextgems(iakey,dsdict[iakey],md,desc) except Exception as e: print(e) pass else: - iakey = ".".join(ia.split(".")[1:]) - dsdict[iakey] = ngccat[ia](chunks="auto").to_dask() - dsdict[iakey] = refine_nextgems(iakey, dsdict[iakey], md, desc) + iakey='nextgems.'+'.'.join(ia.split('.')[1:]) + dsdict[iakey]=ngccat[ia](chunks="auto").to_dask() + dsdict[iakey]=refine_nextgems(iakey,dsdict[iakey],md,desc) print(dsdict.keys()) return dsdict - def compress_data(partds): import numcodecs - rounding = numcodecs.BitRound(keepbits=12) return rounding.decode(rounding.encode(partds)) - -def find_data_sources(catalog, name=None): - newname = ".".join([a for a in [name, catalog.name] if a]) +def find_data_sources(catalog,name=None): + newname='.'.join( + [ a + for a in [name, catalog.name] + if a + ] + ) data_sources = [] for key, entry in catalog.items(): - if isinstance(entry, intake.source.csv.CSVSource): + if isinstance(entry,intake.source.csv.CSVSource): continue if isinstance(entry, intake.catalog.Catalog): if newname == "main": @@ -121,7 +140,7 @@ def find_data_sources(catalog, name=None): # If the entry is a subcatalog, recursively search it data_sources.extend(find_data_sources(entry, newname)) elif isinstance(entry, intake.source.base.DataSource): - data_sources.append(newname + "." + key) + data_sources.append(newname+"."+key) return data_sources @@ -136,172 +155,179 @@ def list_pickles(path): return file_list - def split_ds(ds): - fv = None + fv=None for v in ds.data_vars: if "time" in ds[v].dims: - fv = v + fv=v break if not fv: return [ds] - dimno = 0 - for dimno, dim in enumerate(ds[v].dims): + dimno=0 + for dimno,dim in enumerate(ds[v].dims): if dim == "time": break - first = ds[fv].chunks[dimno][0] - subdss = [] - sumchunks = 0 - lastsum = 0 + first=ds[fv].chunks[dimno][0] + subdss=[] + sumchunks=0 + lastsum=0 for i in range(len(ds[fv].chunks[0])): - f = ds[fv].chunks[dimno][i] - sumchunks += f + f=ds[fv].chunks[dimno][i] + sumchunks+=f if f != first: - dss = ds.isel(time=slice(lastsum, sumchunks)) + dss=ds.isel(time=slice(lastsum,sumchunks)) subdss.append(dss) - lastsum = sumchunks + lastsum=sumchunks if not subdss: return [ds] if len(subdss) == 1: return [ds] return subdss - -def reset_encoding_get_mapper(dsid, ds, desc=None): +def reset_encoding_get_mapper(dsid,ds,desc=None): global mapper_dict sp = None if "source" in ds.encoding: sp = ds.encoding["source"] elif desc: - updesc = desc["args"]["urlpath"] - if type(updesc) == str or (type(updesc) == list and len(updesc) == 1): + updesc=desc["args"]["urlpath"] + if type(updesc) == str or (type(updesc) == list and len(updesc) == 1 ): if type(updesc) == list: - updesc = updesc[0] - sp = updesc + updesc=updesc[0] + sp=updesc ds = ds.reset_encoding() if sp: - use_options = copy(STORAGE_OPTIONS) + use_options=copy(STORAGE_OPTIONS) if desc: - use_options.update(desc["args"].get("storage_options", {})) - mapper_dict[sp] = fsspec.get_mapper(sp, **use_options) + use_options.update(desc["args"].get("storage_options",{})) + mapper_dict[sp]=fsspec.get_mapper( + sp,**use_options + ) ds.encoding["source"] = sp return ds - def get_options(desc): - options = dict(storage_options=desc["args"].get("storage_options", {})) + options=dict(storage_options=desc["args"].get("storage_options",{})) if L_DASK: options["storage_options"].update(STORAGE_OPTIONS) else: - options["chunks"] = None + options["chunks"]=None return options - def set_compression(ds): for var in ds.data_vars: # ds[var].encoding["compressor"]=None ds[var].encoding = { - "compressor": numcodecs.Blosc( - cname="lz4", clevel=5, shuffle=1, blocksize=0 - ), - } + 'compressor': numcodecs.Blosc(cname='lz4', clevel=5, shuffle=1, blocksize=0), + } return ds -def adapt_for_zarr_plugin_and_stac(dsid, ds): - stac_attrs = ["title", "description"] - for att in stac_attrs: - if att not in ds.attrs: - ds.attrs[att] = dsid +def adapt_for_zarr_plugin_and_stac(dsid,ds): + title = ds.attrs.get("title","default") + if title in ["default","ICON simulation"] : + ds.attrs["title"]=dsid + desc = ds.attrs.get("description",None) + if not desc: + source = next((ds.attrs.get(default) for default in SOURCE_KEYS if ds.attrs.get(default) is not None), "not Set") + exp = next((ds.attrs.get(default) for default in EXPERIMENT_KEYS if ds.attrs.get(default) is not None), "not Set") + project = next((ds.attrs.get(default) for default in PROJECT_KEYS if ds.attrs.get(default) is not None), "not Set") + institute = next((ds.attrs.get(default) for default in INSTITUTE_KEYS if ds.attrs.get(default) is not None), "not Set") + + ds.attrs["description"]="Simulation data from project '"+project+ "' produced by Earth System Model '"+ source+"' and run by institution '"+institute+"' for the experiment '"+exp+"'" if "time" in ds.variables: - ds["time"].encoding["dtype"] = "float64" + ds["time"].encoding["dtype"]="float64" ds["time"].encoding["compressor"] = None - ds.attrs["time_min"] = str(ds["time"].values[0]) - ds.attrs["time_max"] = str(ds["time"].values[-1]) - - ds.attrs["creation_date"] = datetime.today().strftime("%Y-%m-%dT%H:%M:%SZ") + ds.attrs["time_min"]=str(ds["time"].values[0]) + ds.attrs["time_max"]=str(ds["time"].values[-1]) + for att in ["units","calendar"]: + if ( ds["time"].attrs.get(att,None) and not ds["time"].encoding.get(att,None)): + ds["time"].encoding[att]=ds["time"].attrs[att] + del ds["time"].attrs[att] + ds.attrs["creation_date"]=datetime.today().strftime('%Y-%m-%dT%H:%M:%SZ') return ds - if __name__ == "__main__": # This avoids infinite subprocess creation client = asyncio.get_event_loop().run_until_complete(get_dask_client()) if L_DASK: import dask - dask.config.set({"array.slicing.split_large_chunks": False}) dask.config.set({"array.chunk-size": "100 MB"}) zarrcluster = asyncio.get_event_loop().run_until_complete(get_dask_cluster()) - # cluster.adapt( - # target_duration="0.1s", - # minimum=2, - - # maximum=6, - # minimum_cores=2, - # maximum_cores=2, - # minimum_memory="16GB", - # maximum_memory="48GB" - # ) - # client=Client(cluster) - os.environ["ZARR_ADDRESS"] = zarrcluster.scheduler._address - cat = intake.open_catalog(CATALOG_FILE) - hostids = [] + #cluster.adapt( + # target_duration="0.1s", + # minimum=2, + + # maximum=6, + # minimum_cores=2, + # maximum_cores=2, + # minimum_memory="16GB", + # maximum_memory="48GB" + # ) + #client=Client(cluster) + os.environ["ZARR_ADDRESS"]=zarrcluster.scheduler._address + cat=intake.open_catalog(CATALOG_FILE) + hostids=[] for source in list(cat["model-output"]): if source != "csv" and source != "esm-json": - hostids += find_data_sources(cat["model-output"][source]) + hostids+=find_data_sources(cat["model-output"][source]) print(hostids) - hostids += find_data_sources(cat["observations.ERA5.era5-dkrz"]) - dsdict = {} + hostids+=find_data_sources(cat["observations.ERA5.era5-dkrz"]) + dsdict={} if L_NEXTGEMS: - dsdict = add_nextgems(dsdict) + dsdict=add_nextgems(dsdict) for dsid in hostids: - desc = None - listref = False + desc=None + listref=False if not dsid.startswith("era5-dkrz"): - desc = cat["model-output"][dsid].describe() - testurl = desc["args"]["urlpath"] - if type(testurl) == list: - testurl = testurl[0] - listref = True + desc=cat["model-output"][dsid].describe() + testurl=desc["args"]["urlpath"] + if type(testurl)==list: + testurl=testurl[0] + listref=True if "bm1344" not in testurl and "bk1377" not in testurl: continue if "3d_grid" in dsid: continue - if not ( - testurl.startswith("reference::") or testurl.startswith('"reference::') - ): + if not (testurl.startswith("reference::") or testurl.startswith('"reference::') ): print(testurl) continue else: - desc = cat["observations.ERA5"][dsid].describe() - testurl = desc["args"]["urlpath"] + if L_ERA5: + desc=cat["observations.ERA5"][dsid].describe() + testurl=desc["args"]["urlpath"] - if type(testurl) == list: - testurl = testurl[0] + if type(testurl)==list: + testurl=testurl[0] + else: + continue try: - # if True: + #if True: print(dsid) - options = get_options(desc) + options=get_options(desc) if dsid.startswith("era5-dkrz"): + if not L_ERA5: + continue if "hourly" in dsid and not "surface" in dsid: continue - ds = cat["observations.ERA5"][dsid](**options).to_dask() + ds=cat["observations.ERA5"][dsid](**options).to_dask() else: - ds = cat["model-output"][dsid](**options).to_dask() + if "icon" in dsid and "native" in dsid and "2d_monthly_mean" in dsid: + options["chunks"]={} + ds=cat["model-output"][dsid](**options).to_dask() except Exception as e: - # else: + #else: print("Could not load:") print(e) continue - ds.attrs["_catalog_id"] = dsid + ds.attrs["_catalog_id"]=dsid if "d" in ds.data_vars: - ds = ds.rename(d="testd") + ds=ds.rename(d="testd") if "fesom" in dsid: if not L_DASK: continue - chunk_dict = dict( - nod2=1000000, nz1_upper=6, nz1=6, nz_upper=6, nz=6, time=4 - ) + chunk_dict = dict(nod2=1000000, nz1_upper=6, nz1=6, nz_upper=6, nz=6, time=4) keylist = [k for k in chunk_dict.keys() if k not in ds.dims] if "heightAboveGround" in ds.variables: ds = ds.drop("heightAboveGround") @@ -313,63 +339,58 @@ if __name__ == "__main__": # This avoids infinite subprocess creation droplist = [a for a in ["height"] if a in ds.variables] if droplist: ds = ds.drop(droplist) - if L_DASK: - ds = reset_encoding_get_mapper(dsid, ds, desc=desc) - to_coords = [ - a - for a in ds.data_vars - if a - in [ - "cell_sea_land_mask", - "lat", - "lon", - "coast", - "time_bnds", - "vertices_latitude", - "vertices_longitude", - ] - ] + to_coords=[a for a in ds.data_vars if a in ["cell_sea_land_mask","lat","lon","coast","time_bnds","vertices_latitude","vertices_longitude"]] if to_coords: ds = ds.set_coords(to_coords) - # if dsid.startswith("ifs-amip"): + #if dsid.startswith("ifs-amip"): # ds = ds.rename({'value':'latlon'}).set_index(latlon=("lat","lon")).unstack("latlon") if "native" in dsid and not "grid" in dsid: print("lossy") if L_DASK: - ds = xr.apply_ufunc( - compress_data, ds, dask="parallelized", keep_attrs="drop_conflicts" - ) + ds=xr.apply_ufunc( + compress_data, + ds, + dask="parallelized", + keep_attrs="drop_conflicts" + ) else: + ds = reset_encoding_get_mapper(dsid,ds,desc=desc) for var in ds.data_vars: - ds[var].encoding["filters"] = numcodecs.BitRound(keepbits=12) + ds[var].encoding["filters"]=numcodecs.BitRound(keepbits=12) if not "grid" in dsid: ds = set_compression(ds) - ds = adapt_for_zarr_plugin_and_stac(dsid, ds) if listref: - splitted_ds = split_ds(ds) - for idx, dss in enumerate(splitted_ds): - dsdict[dsid + f".{idx}"] = dss + splitted_ds=split_ds(ds) + for idx,dss in enumerate(splitted_ds): + newid=dsid+f".{idx}" + dss = adapt_for_zarr_plugin_and_stac(dsid,dss) + dsdict[newid]=dss else: - dsdict[dsid] = ds - kp = KerchunkPass() - kp.mapper_dict = mapper_dict - # collection = xp.Rest([], cache_kws=dict(available_bytes=0)) - # collection.register_plugin(DynamicKerchunk()) + if L_DASK: + ds = reset_encoding_get_mapper(dsid,ds,desc=desc) + ds = adapt_for_zarr_plugin_and_stac(dsid,ds) + dsdict[dsid]=ds + kp=KerchunkPass() + kp.mapper_dict=mapper_dict + #collection = xp.Rest([], cache_kws=dict(available_bytes=0)) + #collection.register_plugin(DynamicKerchunk()) collection = xp.Rest( - dsdict, - cache_kws=dict(available_bytes=100000000), - app_kws=dict( - redirect_slashes=False, - dependencies=[fastapi.Depends(set_custom_header)], - # middleware=middleware - ), - ) - # collection.register_plugin(DynamicKerchunk()) + dsdict, + cache_kws=dict( + available_bytes=100000000 + ), + app_kws=dict( + redirect_slashes=False, + dependencies=[fastapi.Depends(set_custom_header)] + #middleware=middleware + ) + ) + #collection.register_plugin(DynamicKerchunk()) collection.register_plugin(DynamicAdd()) collection.register_plugin(kp) collection.register_plugin(Stac()) - # collection.register_plugin(FileServe()) + #collection.register_plugin(FileServe()) collection.register_plugin(PlotPlugin()) collection.serve(host="0.0.0.0", port=9000) -- GitLab