From 7ad1de0eb0a1d2dee008a77a76e7e9b2f10da40d Mon Sep 17 00:00:00 2001 From: Fabian Wachsmann <k204210@l40075.lvt.dkrz.de> Date: Tue, 22 Oct 2024 10:31:05 +0200 Subject: [PATCH] Debugged --- scripts/cataloghost.py | 17 +++++++---------- scripts/cloudify_nextgems.py | 18 +++++++++++------- scripts/datasethelper.py | 15 +++++++++++---- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/scripts/cataloghost.py b/scripts/cataloghost.py index e2ac57e..672f782 100755 --- a/scripts/cataloghost.py +++ b/scripts/cataloghost.py @@ -26,13 +26,6 @@ nest_asyncio.apply() from intake.config import conf conf["cache_disabled"] = True -STORAGE_OPTIONS = dict( - # remote_protocol="file", - lazy=True, - cache_size=0 - # skip_instance_cache=True, - # listings_expiry_time=0 -) L_DASK = True L_NEXTGEMS = True L_ERA5 = False @@ -124,7 +117,7 @@ if __name__ == "__main__": # This avoids infinite subprocess creation hostids += find_data_sources(cat["observations.ERA5.era5-dkrz"]) dsdict = {} if L_NEXTGEMS: - dsdict = add_nextgems(dsdict) + mapper_dict, dsdict = add_nextgems(mapper_dict, dsdict) for dsid in hostids: desc = None listref = False @@ -217,7 +210,9 @@ if __name__ == "__main__": # This avoids infinite subprocess creation compress_data, ds, dask="parallelized", keep_attrs="drop_conflicts" ) else: - ds = reset_encoding_get_mapper(dsid, ds, desc=desc) + mapper_dict, ds = reset_encoding_get_mapper( + mapper_dict, dsid, ds, desc=desc + ) for var in ds.data_vars: ds[var].encoding["filters"] = numcodecs.BitRound(keepbits=12) if not "grid" in dsid: @@ -231,7 +226,9 @@ if __name__ == "__main__": # This avoids infinite subprocess creation dsdict[newid] = dss else: if L_DASK: - ds = reset_encoding_get_mapper(dsid, ds, desc=desc) + mapper_dict, ds = reset_encoding_get_mapper( + mapper_dict, dsid, ds, desc=desc + ) ds = adapt_for_zarr_plugin_and_stac(dsid, ds) dsdict[dsid] = ds kp = KerchunkPass() diff --git a/scripts/cloudify_nextgems.py b/scripts/cloudify_nextgems.py index eaf381c..2b17e22 100644 --- a/scripts/cloudify_nextgems.py +++ b/scripts/cloudify_nextgems.py @@ -13,8 +13,8 @@ DS_ADD = [ ] -def refine_nextgems(iakey, ds, md, desc): - ds = reset_encoding_get_mapper(iakey, ds, desc=desc) +def refine_nextgems(mapper_dict, iakey, ds, md, desc): + mapper_dict, ds = reset_encoding_get_mapper(mapper_dict, iakey, ds, desc=desc) ds = adapt_for_zarr_plugin_and_stac(iakey, ds) ds = set_compression(ds) for mdk, mdv in md.items(): @@ -23,10 +23,10 @@ def refine_nextgems(iakey, ds, md, desc): for mdk, mdv in desc["metadata"].items(): if not mdk in ds.attrs: ds.attrs[mdk] = mdv - return ds + return mapper_dict, ds -def add_nextgems(dsdict): +def add_nextgems(mapper_dict, dsdict): ngccat = intake.open_catalog(NGC_PROD_CAT) md = yaml.safe_load(ngccat.yaml())["sources"]["nextGEMS_prod"]["metadata"] for ia in DS_ADD: @@ -51,13 +51,17 @@ def add_nextgems(dsdict): ) try: dsdict[iakey] = ngccat[ia](**comb, chunks="auto").to_dask() - dsdict[iakey] = refine_nextgems(iakey, dsdict[iakey], md, desc) + mapper_dict, dsdict[iakey] = refine_nextgems( + mapper_dict, iakey, dsdict[iakey], md, desc + ) except Exception as e: print(e) pass else: iakey = "nextgems." + ".".join(ia.split(".")[1:]) dsdict[iakey] = ngccat[ia](chunks="auto").to_dask() - dsdict[iakey] = refine_nextgems(iakey, dsdict[iakey], md, desc) + mapper_dict, dsdict[iakey] = refine_nextgems( + mapper_dict, iakey, dsdict[iakey], md, desc + ) print(dsdict.keys()) - return dsdict + return mapper_dict, dsdict diff --git a/scripts/datasethelper.py b/scripts/datasethelper.py index 4285580..8a36bf7 100644 --- a/scripts/datasethelper.py +++ b/scripts/datasethelper.py @@ -2,9 +2,17 @@ import numcodecs import intake import fsspec import xarray as xr -import datetime +from datetime import datetime from copy import deepcopy as copy +STORAGE_OPTIONS = dict( + # remote_protocol="file", + lazy=True, + cache_size=0 + # skip_instance_cache=True, + # listings_expiry_time=0 +) + INSTITUTE_KEYS = [ "institution_id", "institute_id", @@ -46,8 +54,7 @@ def find_data_sources(catalog, name=None): return data_sources -def reset_encoding_get_mapper(dsid, ds, desc=None): - global mapper_dict +def reset_encoding_get_mapper(mapper_dict, dsid, ds, desc=None): sp = None if "source" in ds.encoding: sp = ds.encoding["source"] @@ -64,7 +71,7 @@ def reset_encoding_get_mapper(dsid, ds, desc=None): use_options.update(desc["args"].get("storage_options", {})) mapper_dict[sp] = fsspec.get_mapper(sp, **use_options) ds.encoding["source"] = sp - return ds + return mapper_dict, ds def adapt_for_zarr_plugin_and_stac(dsid, ds): -- GitLab