Skip to content
Snippets Groups Projects
Commit e543ce66 authored by Fabian Wachsmann's avatar Fabian Wachsmann
Browse files

Updated for era5 catalog

parent dbdbee5e
No related branches found
No related tags found
No related merge requests found
%% Cell type:code id:3e5313a4-8e12-4ba3-8517-7ac34896bc6c tags:
``` python
import intake
import os
import pandas as pd
import filecmp
import tqdm
import json
```
%% Cell type:code id:6b814955-2ca4-40c1-b4cb-0b70cf58c078 tags:
``` python
TRUNK="/pool/data/Catalogs"
SOURCETRUNK=TRUNK+"/"+"Candidates"
TEMPLATETRUNK=TRUNK+"/"+"Templates"
TARGETTRUNK="/work/ik1017/Catalogs"
```
%% Cell type:code id:e6591a2f-bcb1-43b0-8809-e1ad4e2827f1 tags:
``` python
allowed_auxiliaries=["grid"]
institutes=["dkrz"]
stores=["disk","cloud","archive"]
VERBOSE=True
MIN_NO_OF_LINES_FOR_LOAD=3
```
%% Cell type:code id:ec66e472-a123-4ab0-a1c4-0d480c5c07a6 tags:
``` python
def check_naming_convention(candidate):
if not '_' in candidate :
if VERBOSE:
print("candidate should contain '_'")
return 0
name_parts=candidate.split('_')
no_of_parts=len(name_parts)
if no_of_parts < 3 or no_of_parts > 4 :
if VERBOSE:
print("No of catalog name elements separated with '_' must be 3 or 4")
return 0
elif no_of_parts == 4 and name_parts[3] not in allowed_auxiliaries:
if VERBOSE:
print("No of catalog name elements separated with '_' is 4"
" but the last element is not in the list of allowed auxiliary names")
return 0
if name_parts[0] not in institutes :
if VERBOSE:
print(f"Candidate's institute {name_parts[0]} not known")
return 0
if name_parts[2] not in stores :
if VERBOSE:
print(f"Candidate's store {name_parts[2]} not known")
return 0
return 1
```
%% Cell type:code id:ac66ad8f-03de-4173-bc82-71be4a37f011 tags:
``` python
def open_catalog(template_or_catalog, df=None):
if df:
return intake.open_esm_datastore(df,esmcol_data=template_or_catalog)
else:
return intake.open_esm_datastore(template_or_catalog)
```
%% Cell type:code id:ad223ff4-3ec8-4e6f-8d4e-6b3f2b24b306 tags:
``` python
def search_catalog(col,candidate):
project=candidate.split('_')[1]
catalog_projects=col.unique("project")["project"]["values"]
if len(catalog_projects) > 1 :
print("Found more than one project")
lower_projects=[p.lower() for p in catalog_projects]
if project not in lower_projects :
print(f"Did not find {project} in catalog's projects")
print(lower_projects)
else :
for idx,lp in enumerate(lower_projects):
if lp == project :
project=catalog_projects[idx]
break
return col.search(project=project)
```
%% Cell type:code id:306b6e70-e14c-4838-812f-7268ed6d7451 tags:
``` python
def load_catalog(col,l_hastime=True, l_isgrib=False):
col.df=col.df.loc[0:MIN_NO_OF_LINES_FOR_LOAD,:]
kwargs={"zarr_kwargs":{"consolidated": True}}
if l_hastime :
kwargs["cdf_kwargs"]={"chunks":{"time":1}}
kwargs["zarr_kwargs"]["decode_times"]=True
kwargs["zarr_kwargs"]["use_cftime"]=True
cat=col.search(uri=col.df.loc[0,"uri"])
kwargs={}
if l_isgrib:
kwargs["cdf_kwargs"]["engine"]="cfgrib"
kwargs["cdf_kwargs"]={"engine":"cfgrib"}
else:
kwargs["zarr_kwargs"]={"consolidated": True}
if l_hastime :
kwargs["cdf_kwargs"]={"chunks":{"time":1}}
kwargs["zarr_kwargs"]["decode_times"]=True
kwargs["zarr_kwargs"]["use_cftime"]=True
return col.to_dataset_dict(kwargs)
return cat.to_dataset_dict(**kwargs)
```
%% Cell type:code id:ecccdf7a-2ba7-4fb3-8e03-ab1f26929bc1 tags:
``` python
def open_search_load_catalog(candidate, template_or_catalog, df=None):
try:
col=open_catalog(template_or_catalog, df)
if VERBOSE:
print("Successfully opened catalog")
l_isgrib=False
try:
if "grib" in col.df["format"].unique() :
l_isgrib=True
except Exception as e:
pass
cat=search_catalog(col, candidate)
if not cat:
raise ValueError("Could not find project of catalog in catalog")
if VERBOSE:
print("Successfully searched catalog")
if not "archive" in candidate:
print(candidate)
if not "cloud" in candidate:
for path in tqdm.tqdm(col.df["uri"].values):
if not os.path.exists(path):
if VERBOSE:
print(f"path {path} does not exist. Check permissions.")
return FAlse
dset_dict=load_catalog(col)
#if not "cloud" in candidate:
# for path in tqdm.tqdm(col.df["uri"].values):
# if not os.path.exists(path):
# if VERBOSE:
# print(f"path {path} does not exist. Check permissions.")
# return False
dset_dict=load_catalog(col, l_isgrib=l_isgrib)
if VERBOSE:
print("Successfully loaded catalog")
return True
return True
except Exception as e:
print(e)
return False
```
%% Cell type:code id:4605a81c-962c-4597-97fa-60db8a75c847 tags:
``` python
def find_template(candidate):
templates=os.listdir(TEMPLATETRUNK)
project=candidate.split('_')[1]
template_candidate="intake-esm_template_"+project
if template_candidate in templates:
return TEMPLATETRUNK+"/"+template_candidate
if VERBOSE:
print(f"Could not find any template for project {project}")
return None
```
%% Cell type:code id:a6310f65-e80d-4180-aa0e-74b9b22db15f tags:
``` python
def find_existing_project_catalog(candidate):
candidatename=candidate.split('.')[0]
existing=[cat.split('.')[0] for cat in os.listdir(TRUNK)]
if candidatename in existing:
if VERBOSE:
print("Found existing project catalog"+TRUNK+"/"+candidatename+".json")
return TRUNK+"/"+candidatename+".json"
if VERBOSE:
print(f"Will be a new project catalog in pool {candidate}")
return None
```
%% Cell type:code id:b1b8ec2a-5629-45cf-849f-b83ea5dc3642 tags:
``` python
def check_catalog_format(candidate):
if ".csv" in candidate :
return "csv"
elif candidate.endswith(".json"):
return "json"
else:
return None
```
%% Cell type:code id:5307f97f-b81d-4695-be65-0bd9274f550e tags:
``` python
def open_candidate_dataframe(candidate):
try:
return pd.read_csv(candidate)
except Exception as e:
print(e)
return None
```
%% Cell type:code id:1fd8a674-ad4e-400b-a493-099b2e4f482b tags:
``` python
def test_catalog(candidate, idx):
candidatepath=candidatepathes[idx]
catalog_format=check_catalog_format(candidate)
if not catalog_format and VERBOSE:
print(f"Could not detect fileformat of {candidate}. "
"Choose one of csv or json.")
return None, None
else:
print(f"Found catalog format {catalog_format}")
if catalog_format == "json" :
if open_search_load_catalog(candidate, candidatepath):
print(f"Can use {candidate}")
return candidatepath, None
else:
if VERBOSE:
print(f"Open, search or load failed")
return None, None
else:
candidate_df=open_candidate_dataframe(candidatepath)
if not candidate_df:
print(f"Candidate {candidate} could not be opened with pandas as a dataframe")
return None, None
template=find_template(candidate)
existing_json=find_existing_project_catalog(candidate)
if not template :
if not existing_json:
print(f"Do not have a template or an existing catalog for {candidate}")
return None, None
template=existing_json
if open_search_load_catalog(candidate, template,candidate_df):
print(f"Can use {candidate} with template {template}")
return template, candidate
else:
if VERBOSE:
print(f"Open, search or load failed with tempate {template}")
if template!=existing_json :
if open_search_load_catalog(candidate, existing_json,candidate_df):
print(f"Can use {candidate} with existing json {existing_json}")
return existing_json, candidate
else :
if VERBOSE:
print(f"Open, search or load failed with existing json {existing_json}")
print(f"Catalog candidate {candidate} does neither work with template {template} nor with {existing_json}")
return None, None
return None,None
```
%% Cell type:code id:efae06d3-866b-4ba8-b7d4-8497c52d8bc3 tags:
``` python
def append_catalog(cat, existing_json, candidatename, catalog_type):
parent_catalog=open_catalog(existing_json)
if cat.df.equals(parent_catalog.df):
if VERBOSE:
print("Catalogs are equal. Nothing to append.")
return None
new_df=parent_catalog.df.copy()
columns=list(new_df.columns.values)
i1 = cat.df.set_index(columns).index
i2 = new_df.set_index(columns).index
cat_filtered_df=cat.df[~i1.isin(i2)]
duplicated_columns=list(set(columns)-{"uri"})
if "path" in duplicated_columns:
duplicated_columns=list(set(duplicated_columns)-{"path"})
cat_filtered_duplicated_df=cat_filtered_df[~cat_filtered_df[duplicated_columns].duplicated(keep="first")]
if len(cat_filtered_duplicated_df) < len(cat_filtered_df) and VERBOSE:
print("Catalog to append contains duplications. Those are filtered.")
new_df=pd.concat([new_df, cat_filtered_duplicated_df],
ignore_index=True)
parent_catalog.df=new_df
return parent_catalog.serialize(name="/work/ik1017/Catalogs/"+candidatename, catalog_type=catalog_type)
```
%% Cell type:code id:00631fde-6506-4245-a301-26e92a8d7ff9 tags:
``` python
def rewrite_json(catalog):
with open(catalog, "r") as f:
jsondata=json.load(f)
with open(catalog, "w") as f:
json.dump(jsondata, f, indent=4, sort_keys=True)
```
%% Cell type:code id:484f8e3c-d89e-4244-b623-dd074377724a tags:
``` python
def append_or_write_catalog(esmcol_data,df, idx, candidate):
candidatepath=candidatepathes[idx]
targetpath=TARGETTRUNK+"/"+candidate
candidatename=candidate.split('.')[0]
cat=open_catalog(esmcol_data, df=df)
existing_json_link=find_existing_project_catalog(candidate)
catalog_type="file"
if not cat.catalog_file :
if VERBOSE:
print("The catalog will be saved entirely as json")
catalog_type="dict"
if os.path.isfile(targetpath):
print(f"Target {targetpath} will be overwritten")
if targetpath == candidatepath:
print("Candidate "+candidatepath+" is the same as target "+targetpath)
if not existing_json_link:
print("Create catalog "+targetpath)
cat.serialize(name=TARGETTRUNK+"/"+candidatename, catalog_type=catalog_type)
rewrite_json(TARGETTRUNK+"/"+candidatename+".json")
return targetpath
if not os.path.islink(existing_json_link):
print(f"The existing file {existing_json_link} is not a link! Can neither write nor append to.")
return None
existing_json=os.readlink(existing_json_link)
l_appended=append_catalog(cat, existing_json, candidatename, catalog_type)
if l_appended:
rewrite_json("/work/ik1017/Catalogs/"+candidatename+".json")
return existing_json_link
```
%% Cell type:code id:132b3dbf-2784-465e-a3d8-1229d25e51a7 tags:
``` python
candidates=os.listdir(SOURCETRUNK)
candidatepathes=[]
for candidate in candidates:
if not os.path.islink(SOURCETRUNK+"/"+candidate):
print(SOURCETRUNK+"/"+candidate+" is not a link!")
candidates.remove(candidate)
candidatepathes.append(os.readlink(SOURCETRUNK+"/"+candidate))
print(f"Will check {candidates}")
print(f"Linked from {candidatepathes}")
#
candidatenames=[c.split('.')[0] for c in candidates]
for idx, candidate in enumerate(candidates):
if VERBOSE:
print(f"Checking candidate {candidate}...")
candidatename=candidate.split('.')[0]
if candidatenames.count(candidatename) > 1 and candidatename+".json" in candidates and candidatename+".json" != candidate:
if VERBOSE:
print(f"There is another {candidatename}.json catalog which is checked instead...")
continue
if not check_naming_convention(candidatename):
continue
esmcol_data,df=test_catalog(candidate, idx)
if esmcol_data:
newcatalog=append_or_write_catalog(esmcol_data,df, idx, candidate)
if not os.path.islink(newcatalog):
if VERBOSE:
print("Create link "+TRUNK+"/"+candidate+" to catalog")
!ln -s {newcatalog} {TRUNK+"/"+candidate}
```
%% Output
Will check ['dkrz_nextgems_disk.json', 'dkrz_dyamond-winter_disk.json']
Linked from ['/work/ik1017/Catalogs/dkrz_nextgems_disk.json', '/work/ik1017/Catalogs/dkrz_dyamond-winter_disk.json']
Checking candidate dkrz_nextgems_disk.json...
Found catalog format json
Successfully opened catalog
Successfully searched catalog
dkrz_nextgems_disk.json
100%|██████████| 43054/43054 [00:10<00:00, 4009.38it/s]
--> The keys in the returned dictionary of datasets are constructed as follows:
'project.institution_id.source_id.experiment_id.simulation_id.realm.frequency.time_reduction.grid_label.level_type'
Successfully loaded catalog
Can use dkrz_nextgems_disk.json
Found existing project catalog/pool/data/Catalogs/dkrz_nextgems_disk.json
The catalog will be saved entirely as json
Target /work/ik1017/Catalogs/dkrz_nextgems_disk.json will be overwritten
Candidate /work/ik1017/Catalogs/dkrz_nextgems_disk.json is the same as target /work/ik1017/Catalogs/dkrz_nextgems_disk.json
Catalogs are equal. Nothing to append.
Checking candidate dkrz_dyamond-winter_disk.json...
Found catalog format json
Successfully opened catalog
Successfully searched catalog
dkrz_dyamond-winter_disk.json
100%|██████████| 40640/40640 [00:10<00:00, 3997.74it/s]
--> The keys in the returned dictionary of datasets are constructed as follows:
'project.institution_id.source_id.experiment_id.simulation_id.realm.frequency.time_reduction.grid_label.level_type'
%% Cell type:code id:49f39d3f-383d-4ca4-9387-f718bc0ec3a9 tags:
Successfully loaded catalog
Can use dkrz_dyamond-winter_disk.json
Will be a new project catalog in pool dkrz_dyamond-winter_disk.json
The catalog will be saved entirely as json
Target /work/ik1017/Catalogs/dkrz_dyamond-winter_disk.json will be overwritten
Candidate /work/ik1017/Catalogs/dkrz_dyamond-winter_disk.json is the same as target /work/ik1017/Catalogs/dkrz_dyamond-winter_disk.json
Create catalog /work/ik1017/Catalogs/dkrz_dyamond-winter_disk.json
Writing catalog with 40640 entries into: /work/ik1017/Catalogs/dkrz_dyamond-winter_disk.json
Writing ESM collection json file to: /work/ik1017/Catalogs/dkrz_dyamond-winter_disk.json
Create link /pool/data/Catalogs/dkrz_dyamond-winter_disk.json to catalog
``` python
!ls /work/bk1099/data/ml00_1H/1979/E5ml00_1H_1979-01-04_075
```
%% Cell type:code id:49f39d3f-383d-4ca4-9387-f718bc0ec3a9 tags:
%% Cell type:code id:578c4f59-fd80-4058-a0af-6231b977de67 tags:
``` python
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment