Skip to content
Snippets Groups Projects

first attempt at jacamar runner

Merged Florian Ziemen requested to merge jacamar into main
Files
2
%% Cell type:code id: tags:
``` python
import yaml
import pandas as pd
from pathlib import Path
import re
import logging
import xarray as xr
import warnings
```
%% Cell type:code id: tags:
``` python
logging.basicConfig()
logger = logging.getLogger("catalog_netcdf")
logger.setLevel(logging.INFO)
warnings.filterwarnings("ignore", category=xr.SerializationWarning)
```
%% Cell type:code id: tags:
``` python
def process_table_file(table_file: Path):
df = read_table(
table_file=table_file,
)
table_dir = Path("../catalog") / table_file.stem
table_dir = Path("../public") / table_file.stem
table_dir.mkdir(exist_ok=True)
catalog = process_table(df, table_dir)
with open(table_dir / Path("main.yaml"), "w") as outfile:
yaml.dump(catalog, outfile)
def read_table(table_file: Path) -> pd.DataFrame:
names = [
"garbage1",
"simulation_id",
"experiment",
"resolution",
"start_date",
"end_date",
"path",
"contact",
"garbage2",
]
usecols = [x for x in names if "garbage" not in x]
converters = {x: lambda s: s.strip() for x in usecols if "date not in x"}
df = pd.read_csv(
table_file,
delimiter="|",
names=names,
usecols=usecols,
header=1,
converters=converters,
)
df.iloc[:, 0] = df.iloc[:, 0].str.replace("\\_", "_").str.strip()
df.iloc[:, -2] = df.iloc[:, -2].str.replace("\\_", "_").str.strip()
df["path"] = [Path(x) for x in df["path"]]
logger.debug(df)
return df
def process_table(df: pd.DataFrame, table_dir: Path) -> dict:
catalog = dict(sources=dict())
for _, row in df.iterrows():
catalog["sources"][row["simulation_id"]] = create_entry(
row, table_dir=table_dir
)
return catalog
```
%% Cell type:code id: tags:
``` python
def create_entry(experiment, table_dir: Path):
entry_filename = table_dir / Path(f"{experiment['simulation_id']}.yaml")
entry_content = {"sources": dict()}
filegroups = analyze_dataset(experiment["simulation_id"], experiment["path"])
for filegroup, files in filegroups.items():
entry_content["sources"][filegroup] = create_stream(
experiment, filegroup, files
)
with open(entry_filename, "w") as outfile:
yaml.dump(entry_content, outfile)
return dict(
driver="yaml_file_cat",
description=experiment["experiment"],
args=dict(path="{{CATALOG_DIR}}/" + f'{experiment["simulation_id"]}.yaml'),
)
def analyze_dataset(id, input_dir: Path):
files = gen_files(id, input_dir)
id, parts = split_filenamens(id, files)
patterns = get_patterns(parts)
logger.debug(f"{id=} {patterns=}")
filelist = gen_filelist(input_dir, id, patterns)
return filelist
def gen_files(id, input_dir):
files = [str(x) for x in input_dir.glob(f"{id}*.nc")]
files = [x for x in files if "restart" not in x]
return [Path(x) for x in files]
def split_filenamens(id, files):
stems = list(f.stem for f in files)
parts = [x[len(id) :] for x in stems]
return id, parts
def gen_filelist(input_dir, id, patterns):
return {
pattern: list(input_dir.glob(f"{id}*{pattern}*.nc")) for pattern in patterns
}
def get_patterns(parts):
patterns = {
re.sub(r"\d{4}-\d{2}-\d{2}_", "", x) for x in parts
} # r'\\d\{4\}-\\d\{2\}-\\d\{2\}'
patterns = {
re.sub(r"\d{8}T\d{6}Z", "", x) for x in patterns
} # r'\\d\{8\}T\\d\{6\}Z'
patterns = {re.sub(r"^_", "", x) for x in patterns}
patterns = {re.sub(r"_$", "", x) for x in patterns}
return patterns
def create_stream(experiment, filegroup, files):
stream = dict(driver="netcdf")
stream["args"] = dict(
chunks=dict(time=1),
xarray_kwargs=dict(use_cftime=True),
urlpath=[str(x) for x in files],
)
stream["metadata"] = {k: v.strip() for k, v in experiment.items() if k != "path"}
stream["metadata"] |= get_variable_metadata(files)
return stream
def get_variable_metadata(files):
ds = xr.open_dataset(files[0])
variables = sorted(x for x in ds)
long_names = [ds[x].attrs.get("long_name", x) for x in variables]
return dict(variables=variables, variable_long_names=long_names)
```
%% Cell type:code id: tags:
``` python
table_files = sorted(Path("../inputs").glob("*.md"))
main_cat = dict(sources=dict())
for table_file in table_files:
table = table_file.stem
process_table_file(table_file)
main_cat["sources"][table] = dict(
driver="yaml_file_cat",
args=dict(path="{{CATALOG_DIR}}/" + f"{table}/main.yaml"),
)
with open(Path("../catalog/main.yaml"), "w") as outfile:
with open(Path("../public/main.yaml"), "w") as outfile:
yaml.dump(main_cat, outfile)
```
Loading