In [2]:
import pandas as pd
import fnmatch
import dask.dataframe as dd
import dask
from intake.source.utils import reverse_format
import os
import re
import subprocess
from tqdm.auto import tqdm
from pathlib import Path
import shutil
import numpy as np
import datetime
import xarray
from functools import lru_cache
import itertools
import cfgrib

# ERA5 Restructured 2023


## Extract attributes of a file using information from ERA5 "DRS".


The filename is consructed by:
- `<era_id><level_type><type>_<frequency>_<validation_date>_<virt_code>`
- `<era_id><level_type><type>_<frequency>_<initialization_date>_<virt_code>`

E.g.: _"E5pl00_1H_1979-10-02_157"_

The directory root is constructed by:
- `/work/bk1099/ERA5/<era_id>/<level_type>/<type>/<frequency>/<virt_code>`

E.g.: _"/pool/data/ERA5/E5/sf/an/1H/027/"_ 

`era_id`: E5 <br>
`virt_code`: %03d <br>
`initialization_date`: YYYY[-MM][[-DD]] <br>
`validation_date`: YYYY[-MM][[-DD]] <br>
`frequency`: hourly=01=1H, monthly=1M <br>
`type`: an=00, fc=12 <br>
`level_type`: ml, pl, sf <br>
`path` : "STRING" <br>
`step`: 01 <br>

From inside the file (`cdo sinfo`): <br>
`startdate`: "YYYY-MM-DD HH:MM:SS" <br>
`enddate`: "YYYY-MM-DD HH:MM:SS" <br>
`cell_methods`:
- "time: mean within days time: mean over days", #synoptical
- "time: maximum within days time: mean over days"
- "time: minimum within days time: mean over days"
- "time: mean"
- "time: point"
- "time: maximum"
- "time: minimum"

In [4]:
era_cv={}
era_cv["era_id"]=["E5", "E1"]
era_cv["virt_code"]="[0-9][0-9][0-9]"
era_cv["initialization_date"]="%Y-%m-%d"
era_cv["validation_date"]="%Y-%m-%d" #or INVARIANT
era_cv["frequency"]={"1H":"hourly","1D":"daily", "IV":"invariant"} #, "1M":"monthly"
era_cv["dataType"]={"00":"an", "12":"fc"}
era_cv["level_type"]={"ml":"model_level", "pl":"pressure_level","sf":"surface"}
era_cv["step"]=["01"]
era_cv["startdate"]="%Y-%m-%d"
era_cv["enddate"]="%Y-%m-%d"
era_cv["cell_methods"]=[]

In [3]:
@lru_cache(maxsize=None)
def get_file_list(root_path): #, depth=0, extension='*.nc'): 
 depth=1
 from dask.diagnostics import ProgressBar

 #dirs=[p for p in Path(root_path).glob("*/*/") if p.is_dir()]
 
 cmd = ['find', "/work/bk1099/data/E5", '-mindepth', '4', '-maxdepth', '4','-type', "d"]
 proc = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
 output = proc.stdout.read().decode('utf-8').split()
 dirs=[Path(entry) for entry in output]
 cmd = ['find', "/work/bk1099/data/E1", '-mindepth', '4', '-maxdepth', '4','-type', "d"]
 proc = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
 output = proc.stdout.read().decode('utf-8').split()
 dirs+=[Path(entry) for entry in output]
 
 @dask.delayed
 def _file_dir_files(directory):
 try:
 cmd = ['find', '-L', directory.as_posix(), '-name', '*.grb','-type', "f", "-perm", "-444"]
 proc = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
 output = proc.stdout.read().decode('utf-8').split()
 except Exception:
 output = []
 return output

 print('Getting list of assets...\n')
 filelist = [_file_dir_files(directory) for directory in dirs]
 # watch progress
 with ProgressBar():
 filelist = dask.compute(*filelist)

 filelist = list(itertools.chain(*filelist))
 return filelist

In [5]:
activity_ids = ['E5','E1']

In [6]:
def parse_filename(path):
 fileparts={}
 fileparts["path"] = path
 filename=path.split(os.sep)[-1]
 if len(filename.split("_")) < 4 or len(filename.split("_")[0])<6:
 return fileparts
 
 fileparts["era_id"]=filename[0:2]
 fileparts["level_type"]=filename[2:4]
 fileparts["dataType"]=filename[4:6]
 fileparts["frequency"]=filename.split("_")[1]
 for key in fileparts :
 if key != "path" :
 if not fileparts[key] in era_cv[key]:
 return fileparts
 
 fileparts["validation_date"]=filename.split("_")[2]
 if not fileparts["validation_date"]=="INVARIANT":
 if "-" in fileparts["validation_date"] and len(fileparts["validation_date"].split("-")[1]) == 4:
 fileparts["validation_date"]=fileparts["validation_date"].split("-")[0]
 teststring = era_cv["validation_date"]
 while teststring:
 try :
 datetime.datetime.strptime(fileparts["validation_date"],teststring)
 break
 except:
 teststring = '-'.join(teststring.split("-")[0:-1])
 if not teststring:
 return fileparts

 fileparts["initialization_date"] = np.nan
 if fileparts["dataType"] == "12":
 fileparts["initialization_date"] = fileparts["validation_date"]
 fileparts["validation_date"] = np.nan
 fileparts["virt_code"]=filename.split("_")[3].split('.')[0]
 if '000' == fileparts["virt_code"] :
 return fileparts
 if not re.search(era_cv["virt_code"],fileparts["virt_code"]):
 return fileparts
 
 fileparts["code"]=fileparts["virt_code"]
 fileparts["table_id"]=128
 if int(fileparts["virt_code"]) > 256 :
 fileparts["code"]=f'{(int(fileparts["code"])-256):02}'
 fileparts["table_id"]=256

 fileparts["step"]=np.nan
 if fileparts["frequency"] in era_cv["step"]:
 fileparts["step"] = fileparts["frequency"]
 
 for key in era_cv :
 if type(era_cv[key]) == dict :
 fileparts[key] = era_cv[key][fileparts[key]]
 #For cross-checking against root dir
 fileparts["checkpath"]=f"/work/bk1099/data/{fileparts['era_id']}/{filename[2:4]}/{fileparts['dataType']}/{filename.split('_')[1]}/{fileparts['virt_code']}/{filename}"

 return fileparts 

In [7]:
entries = list(map(parse_filename, get_file_list("/work/bk1099/data")))

Getting list of assets...

[########################################] | 100% Completed | 2min 30.7s


In [17]:
df1 = pd.DataFrame(entries)
df1.head()

Unnamed: 0,path,era_id,level_type,dataType,frequency,validation_date,initialization_date,virt_code,code,table_id,step,checkpath
0,/work/bk1099/data/E5/ml/an/1H/248/E5ml00_1H_20...,E5,model_level,an,hourly,2004-02-28,,248,248,128,,/work/bk1099/data/E5/ml/an/1H/248/E5ml00_1H_20...
1,/work/bk1099/data/E5/ml/an/1H/248/E5ml00_1H_19...,E5,model_level,an,hourly,1999-03-16,,248,248,128,,/work/bk1099/data/E5/ml/an/1H/248/E5ml00_1H_19...
2,/work/bk1099/data/E5/ml/an/1H/248/E5ml00_1H_19...,E5,model_level,an,hourly,1999-06-20,,248,248,128,,/work/bk1099/data/E5/ml/an/1H/248/E5ml00_1H_19...
3,/work/bk1099/data/E5/ml/an/1H/248/E5ml00_1H_19...,E5,model_level,an,hourly,1999-06-18,,248,248,128,,/work/bk1099/data/E5/ml/an/1H/248/E5ml00_1H_19...
4,/work/bk1099/data/E5/ml/an/1H/248/E5ml00_1H_20...,E5,model_level,an,hourly,2004-04-21,,248,248,128,,/work/bk1099/data/E5/ml/an/1H/248/E5ml00_1H_20...


In [18]:
len(df1)

5004281

In [19]:
# Some entries are invalid
invalids = df1[~df1.checkpath.isin(df1["path"])]
invalids

Unnamed: 0,path,era_id,level_type,dataType,frequency,validation_date,initialization_date,virt_code,code,table_id,step,checkpath


In [None]:
#with open('/home/k/k204210/intake-esm/invalids-era5.txt', 'w') as f :
# for file in invalids.path.values :
# if type(file) == str :
# f.write(file+"\n")

In [None]:
invalids["path"].values[0]

In [20]:
df1["validation_date"]=df1["validation_date"].fillna(False)

In [21]:
df1=df1[~((df1["dataType"]=="an") &
 (df1["level_type"]=="model_level") &
 (df1["frequency"]=="hourly") &
 (df1["validation_date"].str.contains('|'.join([str(a) for a in range(1940,1959)]))))
 ]

In [22]:
df1=df1[~((df1["frequency"]=="daily" ) &
 (df1["level_type"]!="surface"))
 ]

In [25]:
len(df1)

4601595

In [None]:
#dfnew = df1[df1.era_id.isin(activity_ids)]

In [26]:
dfnew=df1[~df1[list(set(list(df1.keys()))-{"path"})].duplicated()]

In [29]:
dfnew.loc[:,["short_name", "long_name", "units", "stepType"]] = pd.DataFrame([["", "", "", ""]], index=dfnew.index)
groups = dfnew.groupby(["level_type", "dataType", "frequency", "code", "table_id"])
len(groups)

206

In [30]:
for group_name, df_group in tqdm(groups):
 groupxr = xarray.open_dataset(df_group["path"].values[0], engine="cfgrib", chunks="auto")
 varname = list(groupxr.data_vars)[0]
 varxr = groupxr[varname]
 group_indices = df_group.index[:].values.tolist()
 dfnew.loc[group_indices, "short_name"] = varname
 dfnew.loc[group_indices, "long_name"] = varxr.attrs["long_name"]
 dfnew.loc[group_indices, "units"] = varxr.attrs["units"]
 dfnew.loc[group_indices, "stepType"] = varxr.attrs["GRIB_stepType"]

 0%| | 0/206 [00:00<?, ?it/s]

Can't create file '/work/bk1099/data/E5/ml/an/1H/075/E5ml00_1H_1999-09-19_075.grb.90c91.idx'
Traceback (most recent call last):
 File "/sw/spack-levante/mambaforge-4.11.0-0-Linux-x86_64-sobz6z/lib/python3.9/site-packages/cfgrib/messages.py", line 342, in from_indexpath_or_filestream
 with compat_create_exclusive(indexpath) as new_index_file:
 File "/sw/spack-levante/mambaforge-4.11.0-0-Linux-x86_64-sobz6z/lib/python3.9/contextlib.py", line 119, in __enter__
 return next(self.gen)
 File "/sw/spack-levante/mambaforge-4.11.0-0-Linux-x86_64-sobz6z/lib/python3.9/site-packages/cfgrib/messages.py", line 274, in compat_create_exclusive
 fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
PermissionError: [Errno 13] Permission denied: '/work/bk1099/data/E5/ml/an/1H/075/E5ml00_1H_1999-09-19_075.grb.90c91.idx'
Can't read index file '/work/bk1099/data/E5/ml/an/1H/075/E5ml00_1H_1999-09-19_075.grb.90c91.idx'
Traceback (most recent call last):
 File "/sw/spack-levante/mambaforge-4.11.0-0-Linux-x

In [31]:
dfnew.loc[:,"project"]="era5"
dfnew.loc[:,"institution_id"]="ecmwf"
dfnew.loc[:,"source_id"]="IFS"
dfnew.loc[:,"format"]="grib"
dfnew.loc[:,"uri"]=dfnew["path"]

In [None]:
columns = ["project", "institution_id", "source_id", "era_id", "dataType", "level_type", "frequency", "stepType", "table_id", "code", "short_name", "long_name", "units", "validation_date", "initialization_date", "step", "uri", "path", "format"]
dfnew = dfnew[columns]
dfnew = dfnew.sort_values(columns, ascending = True).reset_index(drop=True)

In [None]:
#df=pd.concat([df,dfnew], ignore_index=True)

In [None]:
dfnew

In [32]:
dfnew.to_csv("/home/k/k204210/intake-esm/catalogs/dkrz_era5_disk.csv.gz", compression="gzip", index=False)

In [33]:
dfnew["validation_date"].unique()

array(['2004-02-28', '1999-03-16', '1999-06-20', ..., '1956-11',
 '1947-03', '1957-04'], dtype=object)

In [35]:
len(dfnew["validation_date"].unique())

31314

In [37]:
list(dfnew.groupby(["dataType", "level_type", "frequency"]).groups.keys())

[('an', 'model_level', 'hourly'),
 ('an', 'pressure_level', 'hourly'),
 ('an', 'surface', 'daily'),
 ('an', 'surface', 'hourly'),
 ('an', 'surface', 'invariant'),
 ('fc', 'surface', 'hourly'),
 ('fc', 'surface', 'invariant')]