Skip to content
Snippets Groups Projects
Commit 1afda9c2 authored by Florian Ziemen's avatar Florian Ziemen
Browse files

cleanup by ruff

parent e5f20a36
No related branches found
No related tags found
No related merge requests found
Pipeline #94690 failed
%% Cell type:code id: tags:
``` python
import yaml
import pandas as pd
from pathlib import Path
import re
import logging
from typing import Union
import xarray as xr
import warnings
```
%% Cell type:code id: tags:
``` python
logging.basicConfig()
logger = logging.getLogger("catalog_netcdf")
logger.setLevel(logging.INFO)
warnings.filterwarnings("ignore", category=xr.SerializationWarning)
```
%% Cell type:code id: tags:
``` python
def process_table_file (table_file: Path):
df = read_table(table_file=table_file, )
table_dir = Path ("../catalog") / table_file.stem
def process_table_file(table_file: Path):
df = read_table(
table_file=table_file,
)
table_dir = Path("../catalog") / table_file.stem
table_dir.mkdir(exist_ok=True)
catalog = process_table(df, table_dir)
with open (table_dir/Path("main.yaml"), 'w') as outfile:
with open(table_dir / Path("main.yaml"), "w") as outfile:
yaml.dump(catalog, outfile)
def read_table(table_file: Path) -> pd.DataFrame:
names = ['garbage1', 'simulation_id' , "experiment", "resolution", 'start_date', 'end_date', 'path', 'contact', 'garbage2']
usecols = [ x for x in names if 'garbage' not in x]
converters = { x : lambda s: s.strip() for x in usecols if "date not in x"}
df = pd.read_csv(table_file, delimiter = '|', names = names , usecols=usecols, header=1, converters=converters)
df.iloc[:,0] = df.iloc[:,0].str.replace("\\_", "_").str.strip()
df.iloc[:,-2] = df.iloc[:,-2].str.replace("\\_", "_").str.strip()
names = [
"garbage1",
"simulation_id",
"experiment",
"resolution",
"start_date",
"end_date",
"path",
"contact",
"garbage2",
]
usecols = [x for x in names if "garbage" not in x]
converters = {x: lambda s: s.strip() for x in usecols if "date not in x"}
df = pd.read_csv(
table_file,
delimiter="|",
names=names,
usecols=usecols,
header=1,
converters=converters,
)
df.iloc[:, 0] = df.iloc[:, 0].str.replace("\\_", "_").str.strip()
df.iloc[:, -2] = df.iloc[:, -2].str.replace("\\_", "_").str.strip()
df["path"] = [Path(x) for x in df["path"]]
logger.debug(df)
return df
def process_table(df: pd.DataFrame, table_dir: Path) -> dict:
catalog = dict (sources = dict())
catalog = dict(sources=dict())
for _, row in df.iterrows():
catalog['sources'] [row['simulation_id'] ]= create_entry (row, table_dir=table_dir)
catalog["sources"][row["simulation_id"]] = create_entry(
row, table_dir=table_dir
)
return catalog
```
%% Cell type:code id: tags:
``` python
def create_entry ( experiment, table_dir: Path) :
entry_filename = table_dir / Path (f"{experiment['simulation_id']}.yaml")
entry_content = {'sources' : dict()}
filegroups = analyze_dataset(experiment['simulation_id'], experiment['path'])
def create_entry(experiment, table_dir: Path):
entry_filename = table_dir / Path(f"{experiment['simulation_id']}.yaml")
entry_content = {"sources": dict()}
filegroups = analyze_dataset(experiment["simulation_id"], experiment["path"])
for filegroup, files in filegroups.items():
entry_content['sources'][filegroup] = create_stream (experiment, filegroup, files)
with open (entry_filename, 'w') as outfile:
entry_content["sources"][filegroup] = create_stream(
experiment, filegroup, files
)
with open(entry_filename, "w") as outfile:
yaml.dump(entry_content, outfile)
return dict ( driver = "yaml_file_cat", description= experiment["experiment"], args = dict (path = "{{CATALOG_DIR}}/" + f'{experiment["simulation_id"]}.yaml'))
return dict(
driver="yaml_file_cat",
description=experiment["experiment"],
args=dict(path="{{CATALOG_DIR}}/" + f'{experiment["simulation_id"]}.yaml'),
)
def analyze_dataset (id, input_dir: Path):
def analyze_dataset(id, input_dir: Path):
files = gen_files(id, input_dir)
id, parts = split_filenamens(id, files)
patterns = get_patterns(parts)
logger.debug(f"{id=} {patterns=}")
filelist = gen_filelist(input_dir, id, patterns)
return filelist
def gen_files(id, input_dir):
files = [str (x) for x in input_dir.glob(f"{id}*.nc")]
files = [ x for x in files if "restart" not in x]
return [ Path(x) for x in files ]
files = [str(x) for x in input_dir.glob(f"{id}*.nc")]
files = [x for x in files if "restart" not in x]
return [Path(x) for x in files]
def split_filenamens(id, files):
stems = list (f.stem for f in files)
parts = [ x[len(id):]for x in stems]
stems = list(f.stem for f in files)
parts = [x[len(id) :] for x in stems]
return id, parts
def gen_filelist (input_dir, id, patterns):
return { pattern : list (input_dir.glob (f"{id}*{pattern}*.nc")) for pattern in patterns}
def gen_filelist(input_dir, id, patterns):
return {
pattern: list(input_dir.glob(f"{id}*{pattern}*.nc")) for pattern in patterns
}
def get_patterns (parts):
patterns = { re.sub(r'\d{4}-\d{2}-\d{2}_', "", x ) for x in parts} # r'\\d\{4\}-\\d\{2\}-\\d\{2\}'
patterns = { re.sub(r'\d{8}T\d{6}Z', "", x) for x in patterns} # r'\\d\{8\}T\\d\{6\}Z'
patterns = { re.sub (r'^_', '', x) for x in patterns }
patterns = { re.sub (r'_$', '', x) for x in patterns }
def get_patterns(parts):
patterns = {
re.sub(r"\d{4}-\d{2}-\d{2}_", "", x) for x in parts
} # r'\\d\{4\}-\\d\{2\}-\\d\{2\}'
patterns = {
re.sub(r"\d{8}T\d{6}Z", "", x) for x in patterns
} # r'\\d\{8\}T\\d\{6\}Z'
patterns = {re.sub(r"^_", "", x) for x in patterns}
patterns = {re.sub(r"_$", "", x) for x in patterns}
return patterns
def create_stream (experiment, filegroup, files):
stream = dict (driver = "netcdf")
stream [ "args" ] = dict (chunks = dict ( time= 1), xarray_kwargs = dict (use_cftime = True), urlpath = [ str(x) for x in files])
stream [ "metadata"] = { k : v.strip() for k,v in experiment.items() if k != "path" }
stream ["metadata"] |= get_variable_metadata(files)
def create_stream(experiment, filegroup, files):
stream = dict(driver="netcdf")
stream["args"] = dict(
chunks=dict(time=1),
xarray_kwargs=dict(use_cftime=True),
urlpath=[str(x) for x in files],
)
stream["metadata"] = {k: v.strip() for k, v in experiment.items() if k != "path"}
stream["metadata"] |= get_variable_metadata(files)
return stream
def get_variable_metadata(files):
ds = xr.open_dataset(files[0])
variables = sorted ( x for x in ds)
long_names = [ ds[x].attrs.get("long_name", x) for x in variables]
return dict (variables = variables, variable_long_names = long_names)
variables = sorted(x for x in ds)
long_names = [ds[x].attrs.get("long_name", x) for x in variables]
return dict(variables=variables, variable_long_names=long_names)
```
%% Cell type:code id: tags:
``` python
table_files = sorted(Path("../inputs").glob("*.md"))
main_cat = dict (sources = dict())
main_cat = dict(sources=dict())
for table_file in table_files:
table = table_file.stem
process_table_file(table_file)
main_cat ["sources"][table] = dict ( driver = "yaml_file_cat", args = dict (path = "{{CATALOG_DIR}}/" + f"{table}/main.yaml"))
main_cat["sources"][table] = dict(
driver="yaml_file_cat",
args=dict(path="{{CATALOG_DIR}}/" + f"{table}/main.yaml"),
)
with open (Path ("../catalog/main.yaml"), 'w') as outfile:
with open(Path("../catalog/main.yaml"), "w") as outfile:
yaml.dump(main_cat, outfile)
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment