Skip to content
Snippets Groups Projects
Commit 6f41aa0c authored by Florian Ziemen's avatar Florian Ziemen
Browse files

first attempt at jacamar runner

parent d043f4a9
No related branches found
No related tags found
1 merge request!1first attempt at jacamar runner
include:
- project: 'anw_dienste/ci-templates'
file: '.slurm-ci.yml'
variables:
GIT_DEPTH: 50
QUARTO_VERSION: 1.4.553
......@@ -38,12 +43,36 @@ run_pre_commit_hooks:
variables:
COMPARE_SOURCE: HEAD^1
create_cat:
extends: .default
stage: build
needs: []
variables:
ACCOUNT: "mh0287"
SCHEDULER_PARAMETERS: "--account=$ACCOUNT --partition=interactive --ntasks=4 --mem=20G"
script:
- |
module load python3/unstable
pwd
ls
cd processing
pwd
ls
ipython create_yaml.ipynb
rules:
artifacts:
paths:
- public
.build: &build
stage: build
needs: [create_cat]
tags:
- conda
script:
- cp -a catalog/ public
- ls public
artifacts:
paths:
- public
......
%% Cell type:code id: tags:
``` python
import yaml
import pandas as pd
from pathlib import Path
import re
import logging
import xarray as xr
import warnings
```
%% Cell type:code id: tags:
``` python
logging.basicConfig()
logger = logging.getLogger("catalog_netcdf")
logger.setLevel(logging.INFO)
warnings.filterwarnings("ignore", category=xr.SerializationWarning)
```
%% Cell type:code id: tags:
``` python
def process_table_file(table_file: Path):
df = read_table(
table_file=table_file,
)
table_dir = Path("../catalog") / table_file.stem
table_dir.mkdir(exist_ok=True)
table_dir = Path("../public") / table_file.stem
table_dir.mkdir(exist_ok=True, parents=True)
catalog = process_table(df, table_dir)
with open(table_dir / Path("main.yaml"), "w") as outfile:
yaml.dump(catalog, outfile)
def read_table(table_file: Path) -> pd.DataFrame:
names = [
"garbage1",
"simulation_id",
"experiment",
"resolution",
"start_date",
"end_date",
"path",
"contact",
"garbage2",
]
usecols = [x for x in names if "garbage" not in x]
converters = {x: lambda s: s.strip() for x in usecols if "date not in x"}
df = pd.read_csv(
table_file,
delimiter="|",
names=names,
usecols=usecols,
header=1,
converters=converters,
)
df.iloc[:, 0] = df.iloc[:, 0].str.replace("\\_", "_").str.strip()
df.iloc[:, -2] = df.iloc[:, -2].str.replace("\\_", "_").str.strip()
df["path"] = [Path(x) for x in df["path"]]
logger.debug(df)
return df
def process_table(df: pd.DataFrame, table_dir: Path) -> dict:
catalog = dict(sources=dict())
for _, row in df.iterrows():
catalog["sources"][row["simulation_id"]] = create_entry(
row, table_dir=table_dir
)
return catalog
```
%% Cell type:code id: tags:
``` python
def create_entry(experiment, table_dir: Path):
entry_filename = table_dir / Path(f"{experiment['simulation_id']}.yaml")
entry_content = {"sources": dict()}
filegroups = analyze_dataset(experiment["simulation_id"], experiment["path"])
for filegroup, files in filegroups.items():
entry_content["sources"][filegroup] = create_stream(
experiment, filegroup, files
)
with open(entry_filename, "w") as outfile:
yaml.dump(entry_content, outfile)
return dict(
driver="yaml_file_cat",
description=experiment["experiment"],
args=dict(path="{{CATALOG_DIR}}/" + f'{experiment["simulation_id"]}.yaml'),
)
def analyze_dataset(id, input_dir: Path):
files = gen_files(id, input_dir)
id, parts = split_filenamens(id, files)
patterns = get_patterns(parts)
logger.debug(f"{id=} {patterns=}")
filelist = gen_filelist(input_dir, id, patterns)
return filelist
def gen_files(id, input_dir):
files = [str(x) for x in input_dir.glob(f"{id}*.nc")]
files = [x for x in files if "restart" not in x]
return [Path(x) for x in files]
def split_filenamens(id, files):
stems = list(f.stem for f in files)
parts = [x[len(id) :] for x in stems]
return id, parts
def gen_filelist(input_dir, id, patterns):
return {
pattern: list(input_dir.glob(f"{id}*{pattern}*.nc")) for pattern in patterns
}
def get_patterns(parts):
patterns = {
re.sub(r"\d{4}-\d{2}-\d{2}_", "", x) for x in parts
} # r'\\d\{4\}-\\d\{2\}-\\d\{2\}'
patterns = {
re.sub(r"\d{8}T\d{6}Z", "", x) for x in patterns
} # r'\\d\{8\}T\\d\{6\}Z'
patterns = {re.sub(r"^_", "", x) for x in patterns}
patterns = {re.sub(r"_$", "", x) for x in patterns}
return patterns
def create_stream(experiment, filegroup, files):
stream = dict(driver="netcdf")
stream["args"] = dict(
chunks=dict(time=1),
xarray_kwargs=dict(use_cftime=True),
urlpath=[str(x) for x in files],
)
stream["metadata"] = {k: v.strip() for k, v in experiment.items() if k != "path"}
stream["metadata"] |= get_variable_metadata(files)
return stream
def get_variable_metadata(files):
ds = xr.open_dataset(files[0])
variables = sorted(x for x in ds)
long_names = [ds[x].attrs.get("long_name", x) for x in variables]
return dict(variables=variables, variable_long_names=long_names)
```
%% Cell type:code id: tags:
``` python
table_files = sorted(Path("../inputs").glob("*.md"))
main_cat = dict(sources=dict())
for table_file in table_files:
table = table_file.stem
process_table_file(table_file)
main_cat["sources"][table] = dict(
driver="yaml_file_cat",
args=dict(path="{{CATALOG_DIR}}/" + f"{table}/main.yaml"),
)
with open(Path("../catalog/main.yaml"), "w") as outfile:
with open(Path("../public/main.yaml"), "w") as outfile:
yaml.dump(main_cat, outfile)
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment