Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • data-infrastructure-services/cloudify
1 result
Show changes
Commits on Source (6)
from typing import Sequence
from dask.array.core import Array as DASKARRAY
from fastapi import APIRouter, Depends, HTTPException, Request
from xpublish.utils.api import JSONResponse, DATASET_ID_ATTR_KEY
import xarray as xr
......@@ -28,9 +29,9 @@ PROVIDER_DEFAULT = dict(
url="https://dkrz.de",
)
ALTERNATE_KERCHUNK = dict(
kerchunk={
"name": "kerchunk",
"description": "Accessed binary data is as stored on disk via kerchunk",
processed={
"name": "Processed",
"description": "Server-side on-the-fly rechunked and unifromly encoded data",
}
)
STAC_EXTENSIONS = [
......@@ -41,19 +42,9 @@ STAC_EXTENSIONS = [
XARRAY_ITEM = {"xarray:open_kwargs": dict(consolidated="true")}
EERIE_DESC = """
Earth System Model Simulation output of the [EERIE project](https://eerie-project.eu/).
# Items of the eerie.cloud
This project, running from January 2023 to December 2026,
will reveal and quantify the role of ocean mesoscale processes
in shaping the climate trajectory over seasonal to centennial time scales.
To this end EERIE will develop a new generation of Earth System Models (ESMs)
that are capable of explicitly representing a crucially important,
yet unexplored regime of the Earth system – the ocean mesoscale.
Leveraging the latest advances in science and technology,
EERIE will substantially improve the ability of such ESMs to faithfully
represent the centennial-scale evolution of the global climate,
especially its variability, extremes and
how tipping points may unfold under the influence of the ocean mesoscale.
DKRZ hosts a data server named ‘eerie.cloud’ for global high resolution Earth System Model simulation output stored at the German Climate Computing Center (DKRZ). This was developped within the EU project EERIE. Eerie.cloud makes use of the python package xpublish. Xpublish is a plugin for xarray (Hoyer, 2023) which is widely used in the Earth System Science community. It serves ESM output formatted as zarr (Miles, 2020) via a RestAPI based on FastAPI. Served in this way, the data imitates cloud-native data (Abernathey, 2021) and features many capabilities of cloud-optimized data.
[Imprint](https://www.dkrz.de/en/about-en/contact/impressum) and
[Privacy Policy](https://www.dkrz.de/en/about-en/contact/en-datenschutzhinweise).
......@@ -67,20 +58,30 @@ TECHDOC = pystac.Asset(
)
def create_stac_eerie_collection():
def create_stac_collection():
start = datetime(1850, 1, 1)
end = datetime(2101, 1, 1)
col = pystac.Collection(
id="eerie",
title="EERIE data in Zarr format",
id="eerie-cloud-all",
title="ESM data from DKRZ in Zarr format",
description=EERIE_DESC,
stac_extensions=STAC_EXTENSIONS,
href=HOST + "stac-collection-eerie.json",
href=HOST + "stac-collection-all.json",
extent=pystac.Extent(
spatial=pystac.SpatialExtent([-180, -90, 180, 90]),
temporal=pystac.TemporalExtent(intervals=[start, end]),
),
keywords=["EERIE", "cloud", "ICON", "IFS", "FESOM", "NEMO", "HadGEM"],
keywords=[
"EERIE",
"cloud",
"ICON",
"NextGEMs",
"ERA5",
"IFS",
"FESOM",
"NEMO",
"HadGEM",
],
providers=[pystac.Provider(PROVIDER_DEFAULT)],
assets=dict(doc=copy(TECHDOC)),
)
......@@ -126,10 +127,11 @@ def xarray_to_stac_item(ds):
)
if not "bbox" in ds.attrs and all(a in ds.variables for a in ["lon", "lat"]):
try:
lonmin = ds["lon"].min().values[()]
latmin = ds["lat"].min().values[()]
lonmax = ds["lon"].max().values[()]
latmax = ds["lat"].max().values[()]
if type(ds["lon"].data) != DASKARRAY and type(ds["lat"].data) != DASKARRAY:
lonmin = ds["lon"].min().values[()]
latmin = ds["lat"].min().values[()]
lonmax = ds["lon"].max().values[()]
latmax = ds["lat"].max().values[()]
except:
pass
ds.attrs["bbox"] = [lonmin, latmin, lonmax, latmax]
......@@ -221,24 +223,36 @@ def xarray_to_stac_item(ds):
"No of data variables": str(len(ds.data_vars)),
}
source_enc = ds.encoding.get("source", None)
extra_fields.update(copy(XARRAY_ITEM))
if source_enc:
extra_fields["alternate"] = copy(ALTERNATE_KERCHUNK)
extra_fields["alternate"]["kerchunk"]["href"] = (
HOSTURL + "/" + xid + "/kerchunk"
extra_fields["alternate"]["processed"]["href"] = HOSTURL + "/" + xid + "/zarr"
extra_fields["alternate"]["processed"][
"name"
] = "Rechunked and uniformly compressed data"
item.add_asset(
"data",
Asset(
href=HOSTURL + "/" + xid + "/kerchunk",
media_type=MediaType.ZARR,
roles=["data"],
title="Data",
description="Raw-encoded source data",
extra_fields=extra_fields,
),
)
else:
item.add_asset(
"data",
Asset(
href=HOSTURL + "/" + xid + "/zarr",
media_type=MediaType.ZARR,
roles=["data"],
title="Data",
description="Accessed binary data is processed on server-side",
extra_fields=extra_fields,
),
)
extra_fields["alternate"]["kerchunk"]["name"] = "Raw data"
extra_fields.update(copy(XARRAY_ITEM))
item.add_asset(
"data",
Asset(
href=HOSTURL + "/" + xid + "/zarr",
media_type=MediaType.ZARR,
roles=["data"],
title="Data",
description="Accessed binary data is processed on server-side",
extra_fields=extra_fields,
),
)
item.add_asset(
"xarray_view",
Asset(
......@@ -279,14 +293,13 @@ def xarray_to_stac_item(ds):
title="Usage of the eerie.cloud",
)
]
if not any(xid.startswith(a) for a in ["nextgems", "era"]):
itemdict["links"].append(
dict(
rel="parent",
href=HOSTURL + "/stac-collection-eerie.json",
type="application/json",
)
itemdict["links"].append(
dict(
rel="collection",
href=HOST + "/stac-collection-all.json",
type="application/json",
)
)
#
# gridlook
#
......@@ -353,19 +366,17 @@ class Stac(Plugin):
def get_request(request: Request) -> str:
return request
@router.get("-collection-eerie.json", summary="Root stac catalog")
@router.get("-collection-all.json", summary="Root stac collection")
def get_eerie_collection(
request=Depends(get_request), dataset_ids=Depends(deps.dataset_ids)
):
coldict = create_stac_eerie_collection()
coldict = create_stac_collection()
coldict["links"].append(
dict(rel="parent", href=PARENT_CATALOG, type="application/json")
)
coldict["providers"][0] = coldict["providers"][0]["name"]
dslist = json.load(fsspec.open(HOSTURL).open())
for item in dslist:
if any(item.startswith(a) for a in ["nextgems", "era"]):
continue
coldict["links"].append(
dict(
rel="child",
......@@ -394,14 +405,12 @@ class Stac(Plugin):
# if not all(a in ds.attrs for a in STACKEYS):
# raise HTTPException(status_code=404, detail=f"Dataset does not contain all keys needed for a STAC Item")
if resp is None:
try:
item_dict = xarray_to_stac_item(ds)
resp = JSONResponse(item_dict)
except:
raise HTTPException(
status_code=404, detail="Could not create a STAC Item"
)
cache.put(cache_key, resp, 99999)
# try:
item_dict = xarray_to_stac_item(ds)
resp = JSONResponse(item_dict)
# except:
# raise HTTPException(status_code=404, detail="Could not create a STAC Item")
cache.put(cache_key, resp, 99999)
return resp
return router
#limit_conn_zone $binary_remote_addr zone=addr:10m;
limit_req_zone $binary_remote_addr zone=two:10m rate=128r/s;
limit_req_zone $binary_remote_addr zone=one:10m rate=8r/s;
#limit_req_zone $binary_remote_addr zone=two:10m rate=128r/s;
limit_req_zone $binary_remote_addr zone=one:10m rate=6r/s;
server {
listen 80 default_server;
......@@ -23,7 +23,9 @@ server {
# Load configuration files for the default server block.
# include /etc/nginx/default.d/*.conf;
location = / {
return 301 https://swift.dkrz.de/v1/dkrz_7fa6baba-db43-4d12-a295-8e3ebb1a01ed/apps/stac-browser/index.html;
}
location / {
proxy_no_cache 1;
proxy_cache_bypass 1;
......@@ -32,11 +34,18 @@ server {
proxy_read_timeout 3600;
proxy_pass http://eerie.cloud.dkrz.de:9000;
# limit_conn addr 4;
limit_req zone=one burst=1000;
}
location ~* /cmor/ {
proxy_no_cache 1;
proxy_cache_bypass 1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
location ~* /kerchunk/ {
proxy_read_timeout 3600;
rewrite ^/cmor/(.*)$ /$1 break;
proxy_pass http://136.172.32.38;
}
location ~* /zarr/ {
proxy_no_cache 1;
proxy_cache_bypass 1;
proxy_set_header Host $host;
......@@ -45,7 +54,7 @@ server {
proxy_read_timeout 3600;
proxy_pass http://eerie.cloud.dkrz.de:9000;
# limit_conn addr 4;
limit_req zone=two burst=1000;
limit_req zone=one burst=1000;
}
error_page 404 /404.html;
......
import logging
from copy import deepcopy as copy
from typing import Sequence
import yaml
from fastapi import APIRouter, Depends, Request, Response
from starlette.routing import NoMatchFound
from xpublish.plugins import Dependencies, Plugin, hookimpl
from xpublish.utils.api import DATASET_ID_ATTR_KEY
logger = logging.getLogger('intake_catalog')
def get_dataset_id(ds,url):
xpublish_id = ds.attrs.get(DATASET_ID_ATTR_KEY)
cf_dataset_id = ".".join(
[
x for x in [
ds.attrs.get('naming_authority'),
ds.attrs.get('id')
] if x
]
)
dataset_id_by_url = url.split('/')[-2]
dataset_id_options = [
xpublish_id,
dataset_id_by_url,
cf_dataset_id,
'dataset'
]
return next(x for x in dataset_id_options if x)
def get_zarr_source(xpublish_id, dataset, request):
url = ''
try:
from xpublish.plugins.included.zarr import ZarrPlugin # noqa
url = request.url_for("get_zarr_metadata")
except NoMatchFound:
# On multi-dataset servers add the dataset_id to the route
url = request.url_for("get_zarr_metadata", dataset_id=xpublish_id)
# Convert url object from <class 'starlette.datastructures.URL'> to a string
url = str(url)
# Remove .zmetadata from the URL to get the root zarr URL
url = url.replace("/.zmetadata", "")
dataset_id_by_url = url.split('/')[-2]
l_consolidated=True
#if "native" in url:
# l_consolidated=False
# if "remap" in url:
# url = "reference::"+'/'.join(url.split('/')[0:-3])+"/static/kerchunks2/"+dataset_id_by_url+".json"
# l_consolidated=False
if not url:
return {}
return {
'driver': 'zarr',
'description': dataset.attrs.get('summary', ''),
'args': {
'consolidated': l_consolidated,
'urlpath': url.replace("http://","https://")
}
}
class IntakePlugin(Plugin):
"""Adds an Intake catalog endpoint"""
name : str = 'intake_catalog'
dataset_metadata : dict = dict()
app_router_prefix: str = '/intake'
app_router_tags: Sequence[str] = ['intake']
dataset_router_prefix: str = ''
dataset_router_tags: Sequence[str] = ['intake']
@hookimpl
def app_router(self, deps: Dependencies):
"""Register an application level router for app level intake catalog"""
router = APIRouter(prefix=self.app_router_prefix, tags=self.app_router_tags)
def get_request(request: Request) -> str:
return request
@router.get(".yaml", summary="Root intake catalog")
def get_root_catalog(
request=Depends(get_request),
dataset_ids = Depends(deps.dataset_ids)
):
data = {
'metadata': {
'source': 'Served via `xpublish-intake`',
'access_url': str(request.url).replace("http://","https://"),
}
}
if dataset_ids:
# data['sources'] = {
# d: {
# 'description': self.dataset_metadata.get(d, {}).get('description', ''),
# 'driver': 'intake.catalog.local.YAMLFileCatalog',
# 'metadata': self.dataset_metadata.get(d, {}),
# 'args': {
# 'path': str(request.url_for('get_dataset_catalog', dataset_id=d)).replace("http://","https://")
# }
# }
# for d in dataset_ids
data['sources'] = {
d: {
'description': self.dataset_metadata.get(d, {}).get('description', ''),
'driver': 'zarr',
'metadata': self.dataset_metadata.get(d, {}),
'args': {
'urlpath': str(request.url_for('get_dataset_catalog', dataset_id=d)).replace("http://","https://").replace("catalog.yaml","{{ method }}"),
'consolidated':True
},
'parameters':dict(
method=dict(
allowed=["kerchunk","zarr"],
default="kerchunk",
type="str",
description="server-side loading method"
)
)
}
for d in dataset_ids
}
else:
data['sources'] = {
'dataset': {
'description': self.dataset_metadata.get('default', {}).get('description', ''),
'driver': 'intake.catalog.local.YAMLFileCatalog',
'metadata': self.dataset_metadata.get('default', {}),
'args': {
'path': str(request.url_for('get_dataset_catalog')).replace("http://","https://")
}
}
}
return Response(yaml.dump(data), media_type="text/yaml")
return router
@hookimpl
def dataset_router(self, deps: Dependencies):
router = APIRouter(prefix=self.dataset_router_prefix, tags=list(self.dataset_router_tags))
def get_request(request: Request) -> str:
return request
@router.get('/catalog.yaml', summary="Dataset intake catalog")
def get_dataset_catalog(
request=Depends(get_request),
dataset=Depends(deps.dataset),
):
xpublish_id = get_dataset_id(dataset,str(request.url))
sources = {
'zarr': get_zarr_source(xpublish_id, dataset, request),
}
if "source" in dataset.encoding:
sources.update(
{
'kerchunk':copy(sources['zarr'])
}
)
sources['kerchunk']['args']['urlpath']=sources['kerchunk']['args']['urlpath'].replace('zarr','kerchunk')
data = {
'name': xpublish_id,
'metadata': {
'source': 'Served via `xpublish-intake`',
'access_url': str(request.url).replace("http://","https://"),
},
'sources': {
f'{xpublish_id}-{k}': v for k, v in sources.items() if v
}
}
return Response(yaml.dump(data), media_type="text/yaml")
return router
This diff is collapsed.