Skip to content
Snippets Groups Projects
Commit 1ff7c7a6 authored by Florian Ziemen's avatar Florian Ziemen
Browse files

Merge branch 'improve-dakuan-processing' into 'main'

Improve dakuan processing

See merge request !7
parents 0529bc9d 718ccb27
No related branches found
No related tags found
1 merge request!7Improve dakuan processing
Pipeline #95646 passed
This diff is collapsed.
%% Cell type:code id: tags:
``` python
import pandas as pd
import yaml
```
%% Cell type:code id: tags:
``` python
df = pd.read_excel("../inputs/enso_tuning.xlsx", index_col=0)
df.loc[df["Remark"].isna(), "Remark"] = ""
```
%% Cell type:code id: tags:
``` python
always_add = {
"experiment": "coming decade - icon xpp enso tuning",
"contact": "Dakuan Yu",
}
always_copy = ["Remark", "Resolution"]
defaults = df.iloc[5]
defaults
runs = {}
for name, row in df.iloc[4:].iterrows():
runs[name] = dict(always_add)
if name == defaults.name:
runs[name] |= {k.lower(): v for k, v in row.items() if v}
runs[name] |= dict(
parameters={
k.lower(): v for k, v in row.items() if v and k not in always_copy
}
)
else:
runs[name] |= dict(parameters=dict(defaults=defaults.name))
for k, v in row.items():
if k in always_copy:
if v:
runs[name][k.lower()] = v
else:
if v != defaults[k]:
runs[name][k.lower()] = v
runs[name]["defaults"] = defaults.name
runs[name]["parameters"][k.lower()] = v
```
%% Cell type:code id: tags:
``` python
with open("../inputs/icon-xpp-enso.metadata", "w") as mdf:
yaml.dump(runs, mdf)
```
......
%% Cell type:code id: tags:
``` python
import yaml
from pathlib import Path
import re
import logging
import xarray as xr
import warnings
import glob
import tools
from tools import BadDataset
from itertools import chain
```
%% Cell type:code id: tags:
``` python
logging.basicConfig()
logger = logging.getLogger("catalog_netcdf")
logger.setLevel(logging.INFO)
warnings.filterwarnings("ignore", category=xr.SerializationWarning)
```
%% Cell type:code id: tags:
``` python
def process_yaml_file(yaml_file: Path):
output_dir = Path("../public") / yaml_file.stem
output_dir.mkdir(exist_ok=True, parents=True)
with open(yaml_file) as infile:
list_of_simulations = yaml.safe_load(infile)
add_metadata(yaml_file=yaml_file, simulations=list_of_simulations["simulations"])
catalog = dict(sources=process_simulations(output_dir, list_of_simulations))
with open(output_dir / Path("main.yaml"), "w") as outfile:
yaml.dump(catalog, outfile)
def add_metadata(yaml_file: Path, simulations: dict):
mdf = yaml_file.with_suffix(".metadata")
if mdf.exists():
with open(mdf) as md_stream:
md = yaml.safe_load(md_stream)
for k, v in simulations.items():
v["metadata"] |= md.get(k, dict())
def process_simulations(output_dir, list_of_simulations):
sources = {}
bad_datasets = {}
for id, properties in list_of_simulations["simulations"].items():
try:
sources[id] = create_entry(id, properties, output_dir)
except BadDataset as bds:
bad_datasets[id] = bds
handle_bad_datasets(bad_datasets)
return sources
def create_entry(id, properties, output_dir: Path):
entry_filename = output_dir / Path(f"{id}.yaml")
entry_content = {"sources": dict()}
streams = analyze_dataset(id, properties["path"])
logger.info(f" Processing {id} with file streams {sorted(streams.keys())}")
for filegroup, files in streams.items():
entry_content["sources"][filegroup] = create_stream(
files,
dict(properties.get("metadata", dict())),
)
with open(entry_filename, "w") as outfile:
yaml.dump(entry_content, outfile)
entry = dict(
driver="yaml_file_cat",
args=dict(path="{{CATALOG_DIR}}/" + f"{id}.yaml"),
metadata=dict(properties.get("metadata", dict())),
)
if properties.get("description", False):
entry["description"] = properties["description"]
return entry
def handle_bad_datasets(bad_datasets):
if len(bad_datasets):
bad_list = "\n" + "\n".join(f"{k} : {v}" for k, v in bad_datasets.items())
raise BadDataset(
f"Encountered bad Datasets in processing of simulations: {bad_list}"
)
def analyze_dataset(id, input_dir: Path):
files = gen_files(id, input_dir)
id, parts = split_filenamens(id, files)
patterns = get_patterns(parts)
logger.debug(f"{id=} {patterns=}")
filelist = gen_filelist(input_dir, id, patterns)
return filelist
def gen_files(id, input_dir):
search_pattern = str(input_dir / Path(f"{id}*.nc"))
files = glob.glob(search_pattern)
if len(files) == 0:
raise BadDataset(f"Could not find any files for {search_pattern}")
files = [x for x in files if "restart" not in x]
return [Path(x) for x in files]
def split_filenamens(id, files):
stems = list(f.stem for f in files)
parts = [x[len(id) :] for x in stems]
return id, parts
def gen_filelist(input_dir, id, patterns):
digit = "[0-9]"
date_patterns = [f"{digit*4}-{digit*2}-{digit*2}", f"{digit*8}T{digit*6}Z"]
filelist = {}
for pattern in patterns:
search_patterns = [
str(input_dir / Path(f"{id}*{pattern}*{date_pattern}.nc"))
for date_pattern in date_patterns
]
search_patterns.extend(
[
str(input_dir / Path(f"{id}*{date_pattern}*{pattern}*.nc"))
for date_pattern in date_patterns
]
)
matches = [glob.glob(search_pattern) for search_pattern in search_patterns]
filelist[pattern] = sorted(chain.from_iterable(matches))
if len(filelist[pattern]) == 0:
filelist[pattern] = glob.glob(str(input_dir / Path(f"{id}_{pattern}.nc")))
if len(filelist[pattern]) == 0:
raise BadDataset(
f"No files found for {id=}, {input_dir=} {search_patterns=}"
)
return filelist
def get_patterns(parts):
patterns = {
re.sub(r"\d{4}-\d{2}-\d{2}_", "", x) for x in parts
} # r'\\d\{4\}-\\d\{2\}-\\d\{2\}'
patterns = {
re.sub(r"\d{8}T\d{6}Z", "", x) for x in patterns
} # r'\\d\{8\}T\\d\{6\}Z'
patterns = {re.sub(r"^_", "", x) for x in patterns}
patterns = {re.sub(r"_$", "", x) for x in patterns}
return patterns
def create_stream(files, metadata):
stream = dict(driver="netcdf")
stream["args"] = dict(
chunks=dict(time=1),
xarray_kwargs=dict(use_cftime=True),
urlpath=[str(x) for x in files],
)
stream["metadata"] = metadata
stream["metadata"] |= get_variable_metadata(files)
for k in ["date_start", "date_end"]:
if k in stream["metadata"]:
del stream["metadata"][k]
try:
stream["metadata"] |= tools.get_start_end(files)
except Exception as e:
raise BadDataset(
f"failed to get time from ({files[0]}, {files[-1]}), {e}"
) from e
return stream
def get_variable_metadata(files):
try:
ds = xr.open_dataset(files[0])
except RuntimeError as e:
raise BadDataset(f"Loading metadata from {files[0]} yielded {e}") from e
variables = sorted(x for x in ds)
long_names = [ds[x].attrs.get("long_name", x) for x in variables]
return dict(variables=variables, variable_long_names=long_names)
```
%% Cell type:code id: tags:
``` python
yaml_files = sorted(Path("../inputs").glob("*.yaml"))
main_cat = dict(sources=dict())
for yaml_file in yaml_files:
process_yaml_file(yaml_file)
stem = yaml_file.stem
main_cat["sources"][stem] = dict(
driver="yaml_file_cat",
args=dict(path="{{CATALOG_DIR}}/" + f"{stem}/main.yaml"),
)
with open(Path("../public/main.yaml"), "w") as outfile:
yaml.dump(main_cat, outfile)
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment