Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • catalog/ruby
1 result
Show changes
Commits on Source (3)
This diff is collapsed.
%% Cell type:code id: tags:
``` python
import pandas as pd
import yaml
```
%% Cell type:code id: tags:
``` python
df = pd.read_excel("../inputs/enso_tuning.xlsx", index_col=0)
df.loc[df["Remark"].isna(), "Remark"] = ""
```
%% Cell type:code id: tags:
``` python
always_add = {
"experiment": "coming decade - icon xpp enso tuning",
"contact": "Dakuan Yu",
}
always_copy = ["Remark", "Resolution"]
defaults = df.iloc[5]
defaults
runs = {}
for name, row in df.iloc[4:].iterrows():
runs[name] = dict(always_add)
if name == defaults.name:
runs[name] |= {k.lower(): v for k, v in row.items() if v}
runs[name] |= dict(
parameters={
k.lower(): v for k, v in row.items() if v and k not in always_copy
}
)
else:
runs[name] |= dict(parameters=dict(defaults=defaults.name))
for k, v in row.items():
if k in always_copy:
if v:
runs[name][k.lower()] = v
else:
if v != defaults[k]:
runs[name][k.lower()] = v
runs[name]["defaults"] = defaults.name
runs[name]["parameters"][k.lower()] = v
```
%% Cell type:code id: tags:
``` python
with open("../inputs/icon-xpp-enso.metadata", "w") as mdf:
yaml.dump(runs, mdf)
```
......
%% Cell type:code id: tags:
``` python
import yaml
from pathlib import Path
import re
import logging
import xarray as xr
import warnings
import glob
import tools
from tools import BadDataset
from itertools import chain
```
%% Cell type:code id: tags:
``` python
logging.basicConfig()
logger = logging.getLogger("catalog_netcdf")
logger.setLevel(logging.INFO)
warnings.filterwarnings("ignore", category=xr.SerializationWarning)
```
%% Cell type:code id: tags:
``` python
def process_yaml_file(yaml_file: Path):
output_dir = Path("../public") / yaml_file.stem
output_dir.mkdir(exist_ok=True, parents=True)
with open(yaml_file) as infile:
list_of_simulations = yaml.safe_load(infile)
add_metadata(yaml_file=yaml_file, simulations=list_of_simulations["simulations"])
catalog = dict(sources=process_simulations(output_dir, list_of_simulations))
with open(output_dir / Path("main.yaml"), "w") as outfile:
yaml.dump(catalog, outfile)
def add_metadata(yaml_file: Path, simulations: dict):
mdf = yaml_file.with_suffix(".metadata")
if mdf.exists():
with open(mdf) as md_stream:
md = yaml.safe_load(md_stream)
for k, v in simulations.items():
v["metadata"] |= md.get(k, dict())
def process_simulations(output_dir, list_of_simulations):
sources = {}
bad_datasets = {}
for id, properties in list_of_simulations["simulations"].items():
try:
sources[id] = create_entry(id, properties, output_dir)
except BadDataset as bds:
bad_datasets[id] = bds
handle_bad_datasets(bad_datasets)
return sources
def create_entry(id, properties, output_dir: Path):
entry_filename = output_dir / Path(f"{id}.yaml")
entry_content = {"sources": dict()}
streams = analyze_dataset(id, properties["path"])
logger.info(f" Processing {id} with file streams {sorted(streams.keys())}")
for filegroup, files in streams.items():
entry_content["sources"][filegroup] = create_stream(
files,
dict(properties.get("metadata", dict())),
)
with open(entry_filename, "w") as outfile:
yaml.dump(entry_content, outfile)
entry = dict(
driver="yaml_file_cat",
args=dict(path="{{CATALOG_DIR}}/" + f"{id}.yaml"),
metadata=dict(properties.get("metadata", dict())),
)
if properties.get("description", False):
entry["description"] = properties["description"]
return entry
def handle_bad_datasets(bad_datasets):
if len(bad_datasets):
bad_list = "\n" + "\n".join(f"{k} : {v}" for k, v in bad_datasets.items())
raise BadDataset(
f"Encountered bad Datasets in processing of simulations: {bad_list}"
)
def analyze_dataset(id, input_dir: Path):
files = gen_files(id, input_dir)
id, parts = split_filenamens(id, files)
patterns = get_patterns(parts)
logger.debug(f"{id=} {patterns=}")
filelist = gen_filelist(input_dir, id, patterns)
return filelist
def gen_files(id, input_dir):
search_pattern = str(input_dir / Path(f"{id}*.nc"))
files = glob.glob(search_pattern)
if len(files) == 0:
raise BadDataset(f"Could not find any files for {search_pattern}")
files = [x for x in files if "restart" not in x]
return [Path(x) for x in files]
def split_filenamens(id, files):
stems = list(f.stem for f in files)
parts = [x[len(id) :] for x in stems]
return id, parts
def gen_filelist(input_dir, id, patterns):
digit = "[0-9]"
date_patterns = [f"{digit*4}-{digit*2}-{digit*2}", f"{digit*8}T{digit*6}Z"]
filelist = {}
for pattern in patterns:
search_patterns = [
str(input_dir / Path(f"{id}*{pattern}*{date_pattern}.nc"))
for date_pattern in date_patterns
]
search_patterns.extend(
[
str(input_dir / Path(f"{id}*{date_pattern}*{pattern}*.nc"))
for date_pattern in date_patterns
]
)
matches = [glob.glob(search_pattern) for search_pattern in search_patterns]
filelist[pattern] = sorted(chain.from_iterable(matches))
if len(filelist[pattern]) == 0:
filelist[pattern] = glob.glob(str(input_dir / Path(f"{id}_{pattern}.nc")))
if len(filelist[pattern]) == 0:
raise BadDataset(
f"No files found for {id=}, {input_dir=} {search_patterns=}"
)
return filelist
def get_patterns(parts):
patterns = {
re.sub(r"\d{4}-\d{2}-\d{2}_", "", x) for x in parts
} # r'\\d\{4\}-\\d\{2\}-\\d\{2\}'
patterns = {
re.sub(r"\d{8}T\d{6}Z", "", x) for x in patterns
} # r'\\d\{8\}T\\d\{6\}Z'
patterns = {re.sub(r"^_", "", x) for x in patterns}
patterns = {re.sub(r"_$", "", x) for x in patterns}
return patterns
def create_stream(files, metadata):
stream = dict(driver="netcdf")
stream["args"] = dict(
chunks=dict(time=1),
xarray_kwargs=dict(use_cftime=True),
urlpath=[str(x) for x in files],
)
stream["metadata"] = metadata
stream["metadata"] |= get_variable_metadata(files)
for k in ["date_start", "date_end"]:
if k in stream["metadata"]:
del stream["metadata"][k]
try:
stream["metadata"] |= tools.get_start_end(files)
except Exception as e:
raise BadDataset(
f"failed to get time from ({files[0]}, {files[-1]}), {e}"
) from e
return stream
def get_variable_metadata(files):
try:
ds = xr.open_dataset(files[0])
except RuntimeError as e:
raise BadDataset(f"Loading metadata from {files[0]} yielded {e}") from e
variables = sorted(x for x in ds)
long_names = [ds[x].attrs.get("long_name", x) for x in variables]
return dict(variables=variables, variable_long_names=long_names)
```
%% Cell type:code id: tags:
``` python
yaml_files = sorted(Path("../inputs").glob("*.yaml"))
main_cat = dict(sources=dict())
for yaml_file in yaml_files:
process_yaml_file(yaml_file)
stem = yaml_file.stem
main_cat["sources"][stem] = dict(
driver="yaml_file_cat",
args=dict(path="{{CATALOG_DIR}}/" + f"{stem}/main.yaml"),
)
with open(Path("../public/main.yaml"), "w") as outfile:
yaml.dump(main_cat, outfile)
```
......