Skip to content
Snippets Groups Projects
Commit a11c6cea authored by Florian Ziemen's avatar Florian Ziemen
Browse files

processing script

parent 072c62b9
No related branches found
No related tags found
No related merge requests found
%% Cell type:code id: tags:
``` python
import yaml
import pandas as pd
from pathlib import Path
import re
import logging
from typing import Union
import xarray as xr
import warnings
```
%% Cell type:code id: tags:
``` python
logging.basicConfig()
logger = logging.getLogger("catalog_netcdf")
logger.setLevel(logging.INFO)
warnings.filterwarnings("ignore", category=xr.SerializationWarning)
```
%% Cell type:code id: tags:
``` python
def process_table_file (table_file: Path):
df = read_table(table_file=table_file, )
table_dir = Path ("../catalog") / table_file.stem
table_dir.mkdir(exist_ok=True)
catalog = process_table(df, table_dir)
with open (table_dir/Path("main.yaml"), 'w') as outfile:
yaml.dump(catalog, outfile)
def read_table(table_file: Path) -> pd.DataFrame:
names = ['garbage1', 'simulation_id' , "experiment", "resolution", 'start_date', 'end_date', 'path', 'contact', 'garbage2']
usecols = [ x for x in names if 'garbage' not in x]
converters = { x : lambda s: s.strip() for x in usecols if "date not in x"}
df = pd.read_csv(table_file, delimiter = '|', names = names , usecols=usecols, header=1, converters=converters)
df.iloc[:,0] = df.iloc[:,0].str.replace("\\_", "_").str.strip()
df.iloc[:,-2] = df.iloc[:,-2].str.replace("\\_", "_").str.strip()
df["path"] = [Path(x) for x in df["path"]]
logger.debug(df)
return df
def process_table(df: pd.DataFrame, table_dir: Path) -> dict:
catalog = dict (sources = dict())
for _, row in df.iterrows():
catalog['sources'] [row['simulation_id'] ]= create_entry (row, table_dir=table_dir)
return catalog
```
%% Cell type:code id: tags:
``` python
def create_entry ( experiment, table_dir: Path) :
entry_filename = table_dir / Path (f"{experiment['simulation_id']}.yaml")
entry_content = {'sources' : dict()}
filegroups = analyze_dataset(experiment['simulation_id'], experiment['path'])
for filegroup, files in filegroups.items():
entry_content['sources'][filegroup] = create_stream (experiment, filegroup, files)
with open (entry_filename, 'w') as outfile:
yaml.dump(entry_content, outfile)
return dict ( driver = "yaml_file_cat", description= experiment["experiment"], args = dict (path = "{{CATALOG_DIR}}/" + f'{experiment["simulation_id"]}.yaml'))
def analyze_dataset (id, input_dir: Path):
files = gen_files(id, input_dir)
id, parts = split_filenamens(id, files)
patterns = get_patterns(parts)
logger.debug(f"{id=} {patterns=}")
filelist = gen_filelist(input_dir, id, patterns)
return filelist
def gen_files(id, input_dir):
files = [str (x) for x in input_dir.glob(f"{id}*.nc")]
files = [ x for x in files if "restart" not in x]
return [ Path(x) for x in files ]
def split_filenamens(id, files):
stems = list (f.stem for f in files)
parts = [ x[len(id):]for x in stems]
return id, parts
def gen_filelist (input_dir, id, patterns):
return { pattern : list (input_dir.glob (f"{id}*{pattern}*.nc")) for pattern in patterns}
def get_patterns (parts):
patterns = { re.sub(r'\d{4}-\d{2}-\d{2}_', "", x ) for x in parts} # r'\\d\{4\}-\\d\{2\}-\\d\{2\}'
patterns = { re.sub(r'\d{8}T\d{6}Z', "", x) for x in patterns} # r'\\d\{8\}T\\d\{6\}Z'
patterns = { re.sub (r'^_', '', x) for x in patterns }
patterns = { re.sub (r'_$', '', x) for x in patterns }
return patterns
def create_stream (experiment, filegroup, files):
stream = dict (driver = "netcdf")
stream [ "args" ] = dict (chunks = dict ( time= 1), xarray_kwargs = dict (use_cftime = True), urlpath = [ str(x) for x in files])
stream [ "metadata"] = { k : v.strip() for k,v in experiment.items() if k != "path" }
stream ["metadata"] |= get_variable_metadata(files)
return stream
def get_variable_metadata(files):
ds = xr.open_dataset(files[0])
variables = sorted ( x for x in ds)
long_names = [ ds[x].attrs.get("long_name", x) for x in variables]
return dict (variables = variables, variable_long_names = long_names)
```
%% Cell type:code id: tags:
``` python
table_files = sorted(Path("../inputs").glob("*.md"))
main_cat = dict (sources = dict())
for table_file in table_files:
table = table_file.stem
process_table_file(table_file)
main_cat ["sources"][table] = dict ( driver = "yaml_file_cat", args = dict (path = "{{CATALOG_DIR}}/" + f"{table}/main.yaml"))
with open (Path ("../catalog/main.yaml"), 'w') as outfile:
yaml.dump(main_cat, outfile)
```
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment