diff --git a/src/converter.py b/src/converter.py index 7c93059b3fef299763663dde72c88037f941de86..4005df0b7b689cc26a2d55f2f09f8575d5e14699 100644 --- a/src/converter.py +++ b/src/converter.py @@ -5,18 +5,20 @@ from io import StringIO from pathlib import Path from typing import Optional, Dict, List from datetime import datetime +import re +import glob -today_date = datetime.today().strftime("%d %B %Y") +today_date: str = datetime.today().strftime("%d %B %Y") -BASE_DIR = Path(__file__).resolve().parent -csv_file = BASE_DIR.parent / "Tables/original_tables/ct_ecmwf.rc" -json_output_path = BASE_DIR.parent / "Tables/era5-cmor-tables/Tables" -search_dirs = [ +BASE_DIR: Path = Path(__file__).resolve().parent +csv_file: Path = BASE_DIR.parent / "Tables/original_tables/ct_ecmwf.rc" +json_output_path: Path = BASE_DIR.parent / "Tables/era5-cmor-tables/Tables" +search_dirs: List[Path] = [ BASE_DIR.parent / "Tables/source_tables/obs4MIPs-cmor-tables/Tables", BASE_DIR.parent / "Tables/source_tables/cmip6-cmor-tables/Tables" ] -frequency_priority = { +frequency_priority: Dict[str, List[str]] = { "1hr": [ "A1hr", "A3hr", "A6hr", # Atmospheric hourly tables "E1hr", "E3hr", "E6hrZ", # Earth system / energy hourly @@ -46,29 +48,43 @@ frequency_priority = { ] } -level_categories = { +level_categories: Dict[str, List[str]] = { "sfc": ["sfc_an", "sfc_fc"], "sfc_land": ["sfc_an_land", "sfc_fc_land"], "pl": ["pl_an", "pl_fc"], "ml": ["ml_an"] } -approx_interval_map = { +approx_interval_map: Dict[str, float] = { "1hr": round(1 / 24, 5), # 0.04167 "day": 1.00000, "mon": 30.00000, "fx": 0.00000 } -pressure_level_number = 37 +realm_prefix_map: Dict[str, str] = { + "aerosol": "AER", + "atmos": "A", + "atmosChem": "AER", + "ice": "I", + "land": "L", + "landIce": "LI", + "ocean": "O", + "seaIce": "SI" +} + +level_number: Dict[str, int] = { + "pl": 37, + "ml": 137 +} -def determine_level_category(level_type: str) -> str: +def _determine_level_category(level_type: str) -> str: for category, values in level_categories.items(): if level_type in values: return category return "sfc" -def load_variable_from_table(table_path: str, variable_name: str, table_prefix: str) -> Optional[Dict]: +def _load_variable_from_table(table_path: str, variable_name: str, table_prefix: str) -> Optional[Dict]: try: with open(table_path, 'r') as f: data = json.load(f) @@ -83,22 +99,16 @@ def load_variable_from_table(table_path: str, variable_name: str, table_prefix: return None -def find_best_matching_variable(variable: str, frequency: str, level_type: str, +def _find_best_matching_variable(variable: str, frequency: str, search_dirs: List[str]) -> Optional[Dict]: - level_category = determine_level_category(level_type) - priority_tables = frequency_priority.get(frequency, []) + priority_tables: List[str] = frequency_priority.get(frequency, []) for search_dir in search_dirs: for table_prefix in priority_tables: for root, _, files in os.walk(search_dir): - matching_files = [file for file in files if file.endswith(".json") and f"_{table_prefix}" in file or file == f"{table_prefix}.json"] + matching_files: List[str] = [file for file in files if file.endswith(".json") and f"_{table_prefix}" in file or file == f"{table_prefix}.json"] for file in matching_files: - # if level_category == "pl" and "Plev" not in file: - # continue - # if level_category == "sfc" and "Plev" in file: - # continue - - table_path = os.path.join(root, file) - variable_data = load_variable_from_table(table_path, variable, table_prefix) + table_path: str = os.path.join(root, file) + variable_data: Dict | None = _load_variable_from_table(table_path, variable, table_prefix) if variable_data: variable_data["source_table"] = os.path.basename(file) return variable_data @@ -106,11 +116,11 @@ def find_best_matching_variable(variable: str, frequency: str, level_type: str, def _read_csv(csv_filepath: str) -> pd.DataFrame: with open(csv_filepath, 'r', encoding='utf-8') as f: - lines = f.readlines() - header_line_idx = next(idx for idx, line in enumerate(lines) if line.startswith("#CCC|")) - headers = lines[header_line_idx].strip().lstrip('#').split('|') - data_lines = lines[header_line_idx + 1:] - df = pd.read_csv(StringIO(''.join(data_lines)), sep='|', names=headers, engine='python') + lines: List[str] = f.readlines() + header_line_idx: int = next(idx for idx, line in enumerate(lines) if line.startswith("#CCC|")) + headers: List[str] = lines[header_line_idx].strip().lstrip('#').split('|') + data_lines: List[str] = lines[header_line_idx + 1:] + df: pd.DataFrame = pd.read_csv(StringIO(''.join(data_lines)), sep='|', names=headers, engine='python') df.columns = df.columns.str.strip() return df.dropna(subset=["CMPAR"]).fillna("") @@ -126,27 +136,27 @@ def _get_mapping_source(cmip_val, source_table: str = "") -> str: elif cmip_val == 1: return "CF" elif cmip_val == 6: - lower_table = source_table.lower() + lower_table: str = source_table.lower() if "obs4mips" in lower_table: return "obs4MIPs" elif "cmip6" in lower_table: return "CMIP6" else: - return "CMIP6" # fallback to CMIP6 if unknown - return "unknown" + return "CMIP6" + return "ECMWF" def _filter_level_grid(row: dict, level_type: str) -> dict: """ Filter orig_grid and level_type for a specific level_type (e.g., 'sfc_fc_land', 'ml_an'). """ - all_level_types = [lt.strip() for lt in row.get("LTYPE", "").split(',') if lt.strip()] - all_grids = [g.strip() for g in row.get("ECGRID", "").split(',') if g.strip()] + all_level_types: list[str] = [lt.strip() for lt in row.get("LTYPE", "").split(',') if lt.strip()] + all_grids: list[str] = [g.strip() for g in row.get("ECGRID", "").split(',') if g.strip()] if level_type not in all_level_types: return {"level_type": "", "orig_grid": ""} if level_type.startswith("sfc"): - grid = "redGG-N1280" if "land" in level_type else "redGG-N320" + grid: Literal['redGG-N1280'] | Literal['redGG-N320'] = "redGG-N1280" if "land" in level_type else "redGG-N320" elif level_type.startswith("ml"): if "specG-T639" in all_grids: @@ -167,6 +177,95 @@ def _filter_level_grid(row: dict, level_type: str) -> dict: "orig_grid": grid } +def _guess_realm_freq(realm: str, frequency: str) -> str: + """ + Build a table name from realm and frequency, like Aday, Lmon, etc. + Handles composite realms like 'seaIce ocean' or 'landIce land'. + """ + for key, prefix in realm_prefix_map.items(): + if key in realm: + return f"{prefix}{frequency}" + return frequency + +def _extract_table_name(source_table: str) -> str: + """ + Extracts the table name (e.g. 'Eday') from a source table filename like + 'CMIP6_Eday.json'. + """ + return Path(source_table).stem.split('_')[-1] if source_table else "" + +def _get_dimensions( + matched: Optional[Dict], + frequency: str, + level_type: str +) -> str: + """ + Build dimensions string, resolving appropriate vertical level and ensuring no duplicates. + Replaces time-like variants (e.g. time1, time2) with standard 'time'. + """ + default_dims: Literal['longitude latitude'] | Literal['longitude latitude time'] = "longitude latitude" if frequency == "fx" else "longitude latitude time" + dims: str = matched.get("dimensions", default_dims) if matched else default_dims + dims_parts = dims.split() + dims_parts: list[str] = ["time" if re.match(r"time\d*$", dim) else dim for dim in dims_parts] + dims_parts = ["longitude" if dim == "site" else dim for dim in dims_parts] + if "longitude" in dims_parts and "latitude" not in dims_parts: + dims_parts.insert(dims_parts.index("longitude") + 1, "latitude") + dims_parts = [dim for dim in dims_parts if not ( + dim.startswith("plev") or dim.startswith("alevel"))] + + time_indices: List[int] = [i for i, dim in enumerate(dims_parts) if re.match(r"time\d*$", dim)] + time_idx: int = time_indices[0] if time_indices else len(dims_parts) + if level_type.startswith("pl"): + dims_parts.insert(time_idx, f"plev{level_number['pl']}") + elif level_type.startswith("ml"): + dims_parts.insert(time_idx, f"alevel{level_number['ml']}") + + return " ".join(dims_parts) + +def _get_cell_info( + var_name: str, + matched: Optional[Dict], + frequency: str, + modeling_realm: str +) -> dict: + """ + Build cell_methods and cell_measures, using matched values if present, + and filling gaps with defaults based on realm/frequency/variable. + """ + realm_lc: str = modeling_realm.lower() + + time_method = "time: mean" + area_method = "area: mean" + default_measure = "area: areacella" + + if frequency in {"1hr", "day", "mon"} and any(ext in var_name.lower() for ext in ["max", "min"]): + if "max" in var_name.lower(): + extreme="maximum" + elif "min" in var_name.lower(): + extreme="minimum" + if frequency == "mon": + time_method: str = f"time: {extreme} within days time: mean over days" + else: + time_method = f"time: {extreme}" + if "ocean" in realm_lc or "seaice" in realm_lc: + area_method = "area: mean where sea" + default_measure = "area: areacello" + elif "land" in realm_lc: + area_method = "area: mean where land" + elif "landice" in realm_lc: + area_method = "area: mean where snow" + default_method: str = f"{area_method} {time_method}" + + matched_method: str = matched.get("cell_methods", "") if matched else "" + matched_measure: str = matched.get("cell_measures", "") if matched else "" + + cell_methods = matched_method.strip() or default_method + cell_measures = matched_measure.strip() or default_measure + return { + "cell_methods": cell_methods, + "cell_measures": cell_measures + } + def _build_cmor_entry( var_name: str, @@ -183,20 +282,20 @@ def _build_cmor_entry( level_grid = _filter_level_grid(row, level_type) source_table = matched.get("source_table", "") if matched else "" - mapping = _get_mapping_source(row["CMIP"], source_table) - default_dimensions = "lat lon" if frequency == "fx" else "time lat lon" - default_cell_methods = "" if frequency == "fx" else "time: mean" + mapping: str = _get_mapping_source(row["CMIP"], source_table) + modeling_realm = matched.get("modeling_realm", "") if matched else realm + cell_info = _get_cell_info(var_name, matched, frequency, modeling_realm) matched_comment = matched.get("comment", "") if matched else "" final_comment = f"{matched_comment}. {comment}".strip(". ") if comment else matched_comment return { - "cell_measures": matched.get("cell_measures", "") if matched else "", - "cell_methods": matched.get("cell_methods", default_cell_methods) if matched else default_cell_methods, + "cell_measures": cell_info["cell_measures"], + "cell_methods": cell_info["cell_methods"], "comment": final_comment, - "dimensions": matched.get("dimensions", default_dimensions) if matched else default_dimensions, + "dimensions": _get_dimensions(matched, frequency, level_type), "frequency": frequency, "long_name": long_name, - "modeling_realm": matched.get("modeling_realm", "") if matched else realm, + "modeling_realm": modeling_realm, "ok_max_mean_abs": matched.get("ok_max_mean_abs", "") if matched else "", "ok_min_mean_abs": matched.get("ok_min_mean_abs", "") if matched else "", "out_name": var_name, @@ -217,54 +316,86 @@ def _build_cmor_entry( "level_type": level_grid["level_type"], "conversion": row["CMFACT"], "source_table": matched.get("source_table", "") if matched else "", - "table": source_table if source_table else frequency, + "table": _extract_table_name(source_table) if source_table else _guess_realm_freq(realm, frequency), "mapping": mapping } -def csv_to_cmor_json(csv_filepath=csv_file, json_output_path=json_output_path): +def _ensure_list(val): + if val is None: + return None + return val if isinstance(val, list) else [val] + +def csv_to_cmor_json( + csv_filepath=csv_file, + json_output_path=json_output_path, + var=None, + freq=None, + ltype=None, + clean_output: bool = False +) -> None: + + var = _ensure_list(var) + freq = _ensure_list(freq) + ltype = _ensure_list(ltype) + + if clean_output: + for file in glob.glob(os.path.join(json_output_path, "*.json")): + os.remove(file) + print(f"🧹 Removed existing JSON files from: {json_output_path}") + return - df = _read_csv(csv_filepath) - grouped_json = {} + df: pd.DataFrame = _read_csv(csv_filepath) + if var: + df = df[df["CMPAR"].isin(var)] + + grouped_json: dict = {} for _, row in df.iterrows(): - var_name = row["CMPAR"] - time_representation = [t.strip().upper() for t in row["TREPR"].split(',') if pd.notna(row["TREPR"])] - level_types = [lt.strip() for lt in row["LTYPE"].split(',') if pd.notna(row["LTYPE"])] + var_name: str = row["CMPAR"] + time_representation: list[str] = [ + t.strip().upper() for t in row["TREPR"].split(',') if pd.notna(row["TREPR"]) + ] + level_types: list[str] = [ + lt.strip() for lt in row["LTYPE"].split(',') if pd.notna(row["LTYPE"]) + ] for level_type in level_types: - level_group = determine_level_category(level_type) - - if "INV" in time_representation: - freq = "fx" - print(f"----------------------------{freq}-----------------------------") - key = (freq, level_group) - matched = find_best_matching_variable(var_name, freq, level_type, search_dirs) - cmor_entry = _build_cmor_entry( - var_name, matched, row, freq, level_type - ) - grouped_json.setdefault(key, {})[var_name] = cmor_entry - for freq in ["1hr", "day", "mon"]: - print(f"----------------------------{freq}-----------------------------") - key = (freq, level_group) - matched = find_best_matching_variable(var_name, freq, level_type, search_dirs) - cmor_entry = _build_cmor_entry( - var_name, matched, row, freq, level_type - ) + level_group = _determine_level_category(level_type) + if ltype and level_group not in ltype: + continue + + if set(time_representation) == {"INV"}: + applicable_frequencies: List[str] = ["fx"] + else: + applicable_frequencies = ["1hr", "day", "mon"] + if "INV" in time_representation: + applicable_frequencies.append("fx") + + for frequency in applicable_frequencies: + if freq and frequency not in freq: + continue + + print(f"----------------------------{frequency}-----------------------------") + key: tuple[str, str] = (frequency, level_group) + matched: dict = _find_best_matching_variable(var_name, frequency, search_dirs) + cmor_entry: dict = _build_cmor_entry( + var_name, matched, row, frequency, level_type + ) grouped_json.setdefault(key, {})[var_name] = cmor_entry - # Write each file - for (freq, level_group), variable_entry in grouped_json.items(): + for (frequency, level_group), variable_entry in grouped_json.items(): if "land" in level_group: level_group = level_group.replace("_land", "") - filename = f"ERA5Land_{freq}_{level_group}" - else: - filename = f"ERA5_{freq}_{level_group}" - cmor_json = { + filename = f"ERA5Land_{frequency}_{level_group}" + else: + filename = f"ERA5_{frequency}_{level_group}" + + cmor_json: dict[str, Any] = { "Header": { "Conventions": "CF-1.7 ODS-2.1", - "approx_interval": f"{approx_interval_map[freq]}", + "approx_interval": f"{approx_interval_map[frequency]}", "cmor_version": "3.5", "data_specs_version": "2.1.0", "generic_levels": "", @@ -272,7 +403,9 @@ def csv_to_cmor_json(csv_filepath=csv_file, json_output_path=json_output_path): "mip_era": "CMIP6", "missing_value": "1e20", "product": "model-output", - "realm": " ".join(sorted({v.get("modeling_realm", "") for v in variable_entry.values()})).strip(), + "realm": " ".join( + sorted({v.get("modeling_realm", "") for v in variable_entry.values()}) + ).strip(), "table_date": f"{today_date}", "table_id": f"Table {filename}" }, @@ -286,6 +419,6 @@ def csv_to_cmor_json(csv_filepath=csv_file, json_output_path=json_output_path): print(f"✅ Written: {output_path}") -# Run + if __name__ == "__main__": csv_to_cmor_json() \ No newline at end of file