Skip to content
Snippets Groups Projects
Commit 12d7b4be authored by Florian Ziemen's avatar Florian Ziemen
Browse files

add metadata to experiment entries.

remove date entries from streams before processing.
parent 5bce4ca0
No related branches found
No related tags found
1 merge request!5Update ensoexperiments
Pipeline #95625 passed
%% Cell type:code id: tags:
``` python
import yaml
from pathlib import Path
import re
import logging
import xarray as xr
import warnings
import glob
import tools
from tools import BadDataset
from itertools import chain
```
%% Cell type:code id: tags:
``` python
logging.basicConfig()
logger = logging.getLogger("catalog_netcdf")
logger.setLevel(logging.INFO)
warnings.filterwarnings("ignore", category=xr.SerializationWarning)
```
%% Cell type:code id: tags:
``` python
def process_yaml_file(yaml_file: Path):
output_dir = Path("../public") / yaml_file.stem
output_dir.mkdir(exist_ok=True, parents=True)
with open(yaml_file) as infile:
list_of_simulations = yaml.safe_load(infile)
catalog = dict(sources=process_simulations(output_dir, list_of_simulations))
with open(output_dir / Path("main.yaml"), "w") as outfile:
yaml.dump(catalog, outfile)
def process_simulations(output_dir, list_of_simulations):
sources = {}
bad_datasets = {}
for id, properties in list_of_simulations["simulations"].items():
try:
sources[id] = create_entry(id, properties, output_dir)
except BadDataset as bds:
bad_datasets[id] = bds
handle_bad_datasets(bad_datasets)
return sources
def create_entry(id, properties, output_dir: Path):
entry_filename = output_dir / Path(f"{id}.yaml")
entry_content = {"sources": dict()}
streams = analyze_dataset(id, properties["path"])
logger.info(f" Processing {id} with file streams {sorted(streams.keys())}")
for filegroup, files in streams.items():
entry_content["sources"][filegroup] = create_stream(
files,
dict(properties.get("metadata", dict())),
)
with open(entry_filename, "w") as outfile:
yaml.dump(entry_content, outfile)
entry = dict(
driver="yaml_file_cat",
args=dict(path="{{CATALOG_DIR}}/" + f"{id}.yaml"),
metadata=dict(properties.get("metadata", dict())),
)
if properties.get("description", False):
entry["description"] = properties["description"]
return entry
def handle_bad_datasets(bad_datasets):
if len(bad_datasets):
bad_list = "\n" + "\n".join(f"{k} : {v}" for k, v in bad_datasets.items())
raise BadDataset(
f"Encountered bad Datasets in processing of simulations: {bad_list}"
)
def analyze_dataset(id, input_dir: Path):
files = gen_files(id, input_dir)
id, parts = split_filenamens(id, files)
patterns = get_patterns(parts)
logger.debug(f"{id=} {patterns=}")
filelist = gen_filelist(input_dir, id, patterns)
return filelist
def gen_files(id, input_dir):
search_pattern = str(input_dir / Path(f"{id}*.nc"))
files = glob.glob(search_pattern)
if len(files) == 0:
raise BadDataset(f"Could not find any files for {search_pattern}")
files = [x for x in files if "restart" not in x]
return [Path(x) for x in files]
def split_filenamens(id, files):
stems = list(f.stem for f in files)
parts = [x[len(id) :] for x in stems]
return id, parts
def gen_filelist(input_dir, id, patterns):
digit = "[0-9]"
date_patterns = [f"{digit*4}-{digit*2}-{digit*2}", f"{digit*8}T{digit*6}Z"]
filelist = {}
for pattern in patterns:
search_patterns = [
str(input_dir / Path(f"{id}*{pattern}*{date_pattern}.nc"))
for date_pattern in date_patterns
]
search_patterns.extend(
[
str(input_dir / Path(f"{id}*{date_pattern}*{pattern}*.nc"))
for date_pattern in date_patterns
]
)
matches = [glob.glob(search_pattern) for search_pattern in search_patterns]
filelist[pattern] = sorted(chain.from_iterable(matches))
if len(filelist[pattern]) == 0:
filelist[pattern] = glob.glob(str(input_dir / Path(f"{id}_{pattern}.nc")))
if len(filelist[pattern]) == 0:
raise BadDataset(
f"No files found for {id=}, {input_dir=} {search_patterns=}"
)
return filelist
def get_patterns(parts):
patterns = {
re.sub(r"\d{4}-\d{2}-\d{2}_", "", x) for x in parts
} # r'\\d\{4\}-\\d\{2\}-\\d\{2\}'
patterns = {
re.sub(r"\d{8}T\d{6}Z", "", x) for x in patterns
} # r'\\d\{8\}T\\d\{6\}Z'
patterns = {re.sub(r"^_", "", x) for x in patterns}
patterns = {re.sub(r"_$", "", x) for x in patterns}
return patterns
def create_stream(files, metadata):
stream = dict(driver="netcdf")
stream["args"] = dict(
chunks=dict(time=1),
xarray_kwargs=dict(use_cftime=True),
urlpath=[str(x) for x in files],
)
stream["metadata"] = metadata
stream["metadata"] |= get_variable_metadata(files)
for k in ["date_start", "date_end"]:
if k in stream["metadata"]:
del stream["metadata"][k]
try:
stream["metadata"] |= tools.get_start_end(files)
except Exception as e:
raise BadDataset(
f"failed to get time from ({files[0]}, {files[-1]}), {e}"
) from e
return stream
def get_variable_metadata(files):
try:
ds = xr.open_dataset(files[0])
except RuntimeError as e:
raise BadDataset(f"Loading metadata from {files[0]} yielded {e}") from e
variables = sorted(x for x in ds)
long_names = [ds[x].attrs.get("long_name", x) for x in variables]
return dict(variables=variables, variable_long_names=long_names)
```
%% Cell type:code id: tags:
``` python
yaml_files = sorted(Path("../inputs").glob("*.yaml"))
main_cat = dict(sources=dict())
for yaml_file in yaml_files:
process_yaml_file(yaml_file)
stem = yaml_file.stem
main_cat["sources"][stem] = dict(
driver="yaml_file_cat",
args=dict(path="{{CATALOG_DIR}}/" + f"{stem}/main.yaml"),
)
with open(Path("../public/main.yaml"), "w") as outfile:
yaml.dump(main_cat, outfile)
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment