Skip to content
Snippets Groups Projects
Commit 9e8b6b48 authored by Fabian Wachsmann's avatar Fabian Wachsmann
Browse files

Updated cloudify

parent 9c844499
No related branches found
No related tags found
No related merge requests found
Pipeline #84007 passed
......@@ -5,8 +5,10 @@ from fastapi.responses import HTMLResponse
# fro mZarr:
# from starlette.responses import StreamingResponse, Response # type: ignore
from fastapi.responses import StreamingResponse, Response
import cachey
import xarray as xr
import fsspec
import asyncio
import gc
import json
import os
......@@ -15,12 +17,18 @@ from xpublish.dependencies import (
get_dataset,
) # Assuming 'dependencies' is the module where get_dataset is defined
from xpublish import Plugin, hookimpl, Dependencies
from xpublish.utils.api import DATASET_ID_ATTR_KEY
from datetime import datetime
todaystring = datetime.today().strftime("%a, %d %b %Y %H:%M:%S GMT")
gctrigger = 0
GCLIMIT = 500
def kerchunk_stream_content(data):
async def kerchunk_stream_content(data):
await asyncio.sleep(0)
yield data
......@@ -37,9 +45,12 @@ class KerchunkPass(Plugin):
prefix=self.dataset_router_prefix, tags=list(self.dataset_router_tags)
)
@router.get("/{key:path}")
@router.api_route("/{key:path}", methods=["GET", "HEAD"])
async def get_chunk(
key: str, dataset: xr.Dataset = Depends(deps.dataset), use_cache=False
key: str,
dataset: xr.Dataset = Depends(deps.dataset),
cache: cachey.Cache = Depends(deps.cache),
# use_cache=False
):
global gctrigger, mapper_dict
if "source" in dataset.encoding:
......@@ -54,24 +65,43 @@ class KerchunkPass(Plugin):
)
# if key in fsmap:
if True:
if ".zmetadata" == key:
kv = json.loads(fsmap[key].decode("utf-8"))
kv["zarr_consolidated_format"] = 1
resp = Response(
json.dumps(kv).encode("utf-8"),
media_type="application/octet-stream",
if any(a in key for a in [".zmetadata", ".zarray", ".zgroup"]):
cache_key = (
dataset.attrs.get(DATASET_ID_ATTR_KEY, "")
+ "/kerchunk/"
+ f"{key}"
)
fsmap.fs.dircache.clear()
del fsmap, kv
return resp
resp = cache.get(cache_key)
if resp is None:
zmetadata = json.loads(fsmap[".zmetadata"].decode("utf-8"))
if ".zmetadata" == key:
zmetadata["zarr_consolidated_format"] = 1
if key == ".zgroup":
jsondump = json.dumps({"zarr_format": 2}).encode("utf-8")
elif ".zarray" in key or ".zgroup" in key or ".zattrs" in key:
jsondump = json.dumps(zmetadata["metadata"][key]).encode(
"utf-8"
)
else:
jsondump = json.dumps(zmetadata).encode("utf-8")
resp = Response(
jsondump,
media_type="application/octet-stream",
)
cache.put(cache_key, resp, 99999)
# return StreamingResponse(
# kerchunk_stream_content(fsmap[key]),
# media_type='application/octet-stream',
# )
resp = StreamingResponse(
kerchunk_stream_content(fsmap[key]),
media_type="application/octet-stream",
)
else:
resp = StreamingResponse(
kerchunk_stream_content(fsmap[key]),
media_type="application/octet-stream",
)
resp.headers["Cache-control"] = "max-age=604800"
resp.headers["X-EERIE-Request-Id"] = "True"
resp.headers["Last-Modified"] = todaystring
resp.headers["Access-Control-Allow-Origin"] = "https://swift.dkrz.de"
fsmap.fs.dircache.clear()
del sp, dataset, key
if gctrigger > GCLIMIT:
......
from typing import Sequence
from fastapi import APIRouter, Depends, HTTPException, Request
from xpublish.utils.api import JSONResponse
from xpublish.utils.api import JSONResponse, DATASET_ID_ATTR_KEY
import xarray as xr
import json
from xpublish.dependencies import (
......@@ -8,13 +8,18 @@ from xpublish.dependencies import (
) # Assuming 'dependencies' is the module where get_dataset is defined
from xpublish import Plugin, hookimpl, Dependencies
from copy import deepcopy as copy
import cachey
from pystac import Item, Asset, MediaType
from datetime import datetime
import pystac
import json
import fsspec
PARENT_CATALOG = "https://swift.dkrz.de/v1/dkrz_7fa6baba-db43-4d12-a295-8e3ebb1a01ed/catalogs/stac-catalog-eeriecloud.json"
GRIDLOOK = "https://swift.dkrz.de/v1/dkrz_7fa6baba-db43-4d12-a295-8e3ebb1a01ed/apps/gridlook/index.html"
HOST = "https://eerie.cloud.dkrz.de/"
INTAKE_SOURCE = "https://raw.githubusercontent.com/eerie-project/intake_catalogues/refs/heads/main/dkrz/disk/model-output/main.yaml"
JUPYTERLITE = "https://swift.dkrz.de/v1/dkrz_7fa6baba-db43-4d12-a295-8e3ebb1a01ed/apps/jupyterlite/index.html"
HOSTURL = HOST + "datasets"
PROVIDER_DEFAULT = dict(
name="DKRZ",
......@@ -31,40 +36,53 @@ ALTERNATE_KERCHUNK = dict(
STAC_EXTENSIONS = [
"https://stac-extensions.github.io/datacube/v2.2.0/schema.json",
"https://stac-extensions.github.io/alternate-assets/v1.2.0/schema.json",
"https://stac-extensions.github.io/xarray-assets/v1.0.0/schema.json",
]
DESC = """
DKRZ hosts a data server named ‘eerie.cloud’ for global
high resolution Earth System Model simulation output
for experiments produced within the EU project EERIE
and stored at the German Climate Computing Center (DKRZ).
Eerie.cloud makes use of the python package xpublish.
Xpublish is a plugin for xarray (Hoyer, 2023) which is widely used in the Earth System Science community.
It serves ESM output formatted as zarr (Miles, 2020) via a RestAPI based on FastAPI.
Served in this way, the data imitates cloud-native data (Abernathey, 2021)
and features many capabilities of cloud-optimized data. """
XARRAY_ITEM = {"xarray:open_kwargs": dict(consolidated="true")}
EERIE_DESC = """
Earth System Model Simulation output of the [EERIE project](https://eerie-project.eu/).
This project, running from January 2023 to December 2026,
will reveal and quantify the role of ocean mesoscale processes
in shaping the climate trajectory over seasonal to centennial time scales.
To this end EERIE will develop a new generation of Earth System Models (ESMs)
that are capable of explicitly representing a crucially important,
yet unexplored regime of the Earth system – the ocean mesoscale.
Leveraging the latest advances in science and technology,
EERIE will substantially improve the ability of such ESMs to faithfully
represent the centennial-scale evolution of the global climate,
especially its variability, extremes and
how tipping points may unfold under the influence of the ocean mesoscale.
def create_stac_collection():
techdoc = pystac.Asset(
href="https://pad.gwdg.de/OZo5HMC4R6iljvZHlo-BzQ#",
title="Technical documentation",
media_type=pystac.MediaType.HTML,
roles=["OVERVIEW"],
)
[Imprint](https://www.dkrz.de/en/about-en/contact/impressum) and
[Privacy Policy](https://www.dkrz.de/en/about-en/contact/en-datenschutzhinweise).
"""
TECHDOC = pystac.Asset(
href="https://pad.gwdg.de/OZo5HMC4R6iljvZHlo-BzQ#",
title="Technical documentation",
media_type=pystac.MediaType.HTML,
roles=["OVERVIEW"],
)
def create_stac_eerie_collection():
start = datetime(1850, 1, 1)
end = datetime(2101, 1, 1)
col = pystac.Collection(
id="eerie.cloud",
title="The EERIE cloud data server",
description=DESC,
id="eerie",
title="EERIE data in Zarr format",
description=EERIE_DESC,
stac_extensions=STAC_EXTENSIONS,
href=HOST + "stac.json",
href=HOST + "stac-collection-eerie.json",
extent=pystac.Extent(
spatial=pystac.SpatialExtent([-180, -90, 180, 90]),
temporal=pystac.TemporalExtent(intervals=[None, None]),
temporal=pystac.TemporalExtent(intervals=[start, end]),
),
keywords=["EERIE", "cloud", "ICON", "IFS", "FESOM", "NEMO", "HadGEM"],
providers=[pystac.Provider(PROVIDER_DEFAULT)],
assets=dict(doc=techdoc),
assets=dict(doc=copy(TECHDOC)),
)
return col.to_dict()
......@@ -88,7 +106,7 @@ def xarray_to_stac_item(ds):
description = ds.attrs.get("description", "No description")
xid = ds.attrs.get("_xpublish_id", "No id")
keywords = xid.split(".")
datetimeattr = datetime.now()#.isoformat()
datetimeattr = datetime.now().isoformat()
start_datetime = ds.attrs.get("time_min", None)
if start_datetime:
start_datetime = (
......@@ -128,7 +146,7 @@ def xarray_to_stac_item(ds):
]
],
}
cdate = ds.attrs.get("creation_date", datetimeattr.isoformat())
cdate = ds.attrs.get("creation_date", datetimeattr)
providers = [copy(PROVIDER_DEFAULT)]
creator_inst_id = ds.attrs.get("institution_id", None)
if not creator_inst_id:
......@@ -143,9 +161,36 @@ def xarray_to_stac_item(ds):
creator["url"] = HOSTURL + "/" + xid + "/"
creator["roles"] = ["producer"]
providers.append(creator)
description += (
"\n"
+ "You can use this dataset in xarray from intake.\n\n"
+ "On Levante:\n"
+ "```python\n"
+ "import intake\n"
+ "intake.open_catalog(\n"
+ ' "'
+ INTAKE_SOURCE
+ '"\n'
+ ')["'
+ xid
+ '"].to_dask()\n'
+ "```\n"
+ "Everywhere else:\n"
+ "```python\n"
+ "import intake\n"
+ "intake.open_stac_item(\n"
+ ' "'
+ HOSTURL
+ "/"
+ xid
+ '/stac"\n'
+ ").data.to_dask()\n"
+ "```\n"
)
properties = {
"description": description,
"title": title,
"title": " ".join(title.split(".")),
"created": cdate,
"keywords": keywords,
"providers": providers,
......@@ -182,6 +227,7 @@ def xarray_to_stac_item(ds):
HOSTURL + "/" + xid + "/kerchunk"
)
extra_fields["alternate"]["kerchunk"]["name"] = "Raw data"
extra_fields.update(copy(XARRAY_ITEM))
item.add_asset(
"data",
Asset(
......@@ -203,9 +249,28 @@ def xarray_to_stac_item(ds):
description="HTML representation of the xarray dataset",
),
)
item.add_asset(
"jupyterlite",
Asset(
href=JUPYTERLITE,
media_type=MediaType.HTML,
title="Jupyterlite access",
roles=["analysis"],
description="Web-assembly based analysis platform with access to this item",
),
)
item.add_asset(
"gridlook",
Asset(
href=GRIDLOOK + "#" + HOSTURL + "/" + xid + "/stac",
media_type=MediaType.HTML,
title="Visualization with gridlook",
roles=["Visualization"],
description="Visualization with gridlook",
),
)
itemdict = item.to_dict()
itemdict["properties"]["datetime"]=datetimeattr.isoformat()
# for asset in itemdict["assets"].keys():
itemdict["links"] = [
dict(
......@@ -214,21 +279,39 @@ def xarray_to_stac_item(ds):
title="Usage of the eerie.cloud",
)
]
if not any(xid.startswith(a) for a in ["nextgems", "era"]):
itemdict["links"].append(
dict(
rel="parent",
href=HOSTURL + "/stac-collection-eerie.json",
type="application/json",
)
)
#
# gridlook
#
griddict = copy(store_dataset_dict)
if "icon-esm-er" in xid and "native" in xid:
griddict[
"store"
] = "https://swift.dkrz.de/v1/dkrz_7fa6baba-db43-4d12-a295-8e3ebb1a01ed/grids/"
if "atmos" in xid or "land" in xid:
griddict[
"store"
] = "https://swift.dkrz.de/v1/dkrz_948e7d4bbfbb445fbff5315fc433e36a/nextGEMS/"
griddict["dataset"] = "grids/ICON_R02B08.zarr"
griddict["dataset"] = "icon_grid_0033_R02B08_G.zarr"
elif "ocean" in xid:
griddict["store"] = "https://eerie.cloud.dkrz.de/datasets/"
griddict[
"dataset"
] = "icon-esm-er.eerie-control-1950.v20231106.ocean.native.2d_grid/zarr"
griddict["dataset"] = "icon_grid_0016_R02B09_O.zarr"
if "gr025" in xid:
griddict[
"store"
] = "https://swift.dkrz.de/v1/dkrz_7fa6baba-db43-4d12-a295-8e3ebb1a01ed/grids/"
if "ifs-amip" in xid:
griddict["dataset"] = "gr025_descending.zarr"
else:
griddict["dataset"] = "gr025.zarr"
if "era5" in xid:
griddict[
"store"
] = "https://swift.dkrz.de/v1/dkrz_7fa6baba-db43-4d12-a295-8e3ebb1a01ed/grids/"
griddict["dataset"] = "era5.zarr"
itemdict["default_var"] = list(var_store_dataset_dict.keys())[0]
itemdict["name"] = title
......@@ -270,13 +353,19 @@ class Stac(Plugin):
def get_request(request: Request) -> str:
return request
@router.get("catalog.json", summary="Root stac catalog")
def get_root_catalog(
@router.get("-collection-eerie.json", summary="Root stac catalog")
def get_eerie_collection(
request=Depends(get_request), dataset_ids=Depends(deps.dataset_ids)
):
coldict = create_stac_collection()
coldict = create_stac_eerie_collection()
coldict["links"].append(
dict(rel="parent", href=PARENT_CATALOG, type="application/json")
)
coldict["providers"][0] = coldict["providers"][0]["name"]
dslist = json.load(fsspec.open(HOSTURL).open())
for item in dslist:
if any(item.startswith(a) for a in ["nextgems", "era"]):
continue
coldict["links"].append(
dict(
rel="child",
......@@ -296,10 +385,23 @@ class Stac(Plugin):
@router.get("/")
@router.get("")
async def get_stacitem(ds: xr.Dataset = Depends(deps.dataset)):
async def get_stacitem(
ds: xr.Dataset = Depends(deps.dataset),
cache: cachey.Cache = Depends(deps.cache),
):
cache_key = ds.attrs.get(DATASET_ID_ATTR_KEY, "") + "/" + "stac"
resp = cache.get(cache_key)
# if not all(a in ds.attrs for a in STACKEYS):
# raise HTTPException(status_code=404, detail=f"Dataset does not contain all keys needed for a STAC Item")
item_dict = xarray_to_stac_item(ds)
return JSONResponse(item_dict)
if resp is None:
try:
item_dict = xarray_to_stac_item(ds)
resp = JSONResponse(item_dict)
except:
raise HTTPException(
status_code=404, detail="Could not create a STAC Item"
)
cache.put(cache_key, resp, 99999)
return resp
return router
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment