From 6ea147801afed731190c1afa52d959646c73b2c3 Mon Sep 17 00:00:00 2001
From: k204229 <lucio-eceiza@dkrz.de>
Date: Thu, 3 Apr 2025 16:20:00 +0200
Subject: [PATCH] add methods to:     infere dimension, cell_info, realm, grid
 add granularity in csv_to_json() for var, freq, level

---
 src/converter.py | 281 ++++++++++++++++++++++++++++++++++-------------
 1 file changed, 207 insertions(+), 74 deletions(-)

diff --git a/src/converter.py b/src/converter.py
index 7c93059..4005df0 100644
--- a/src/converter.py
+++ b/src/converter.py
@@ -5,18 +5,20 @@ from io import StringIO
 from pathlib import Path
 from typing import Optional, Dict, List
 from datetime import datetime
+import re
+import glob
 
-today_date = datetime.today().strftime("%d %B %Y")
+today_date: str = datetime.today().strftime("%d %B %Y")
 
-BASE_DIR = Path(__file__).resolve().parent 
-csv_file = BASE_DIR.parent / "Tables/original_tables/ct_ecmwf.rc"
-json_output_path = BASE_DIR.parent / "Tables/era5-cmor-tables/Tables"
-search_dirs = [
+BASE_DIR: Path = Path(__file__).resolve().parent 
+csv_file: Path = BASE_DIR.parent / "Tables/original_tables/ct_ecmwf.rc"
+json_output_path: Path = BASE_DIR.parent / "Tables/era5-cmor-tables/Tables"
+search_dirs: List[Path] = [
     BASE_DIR.parent / "Tables/source_tables/obs4MIPs-cmor-tables/Tables",
     BASE_DIR.parent / "Tables/source_tables/cmip6-cmor-tables/Tables"
 ]
 
-frequency_priority = {
+frequency_priority: Dict[str, List[str]] = {
     "1hr": [
         "A1hr", "A3hr", "A6hr",         # Atmospheric hourly tables
         "E1hr", "E3hr", "E6hrZ",        # Earth system / energy hourly
@@ -46,29 +48,43 @@ frequency_priority = {
     ]
 }
 
-level_categories = {
+level_categories: Dict[str, List[str]] = {
     "sfc": ["sfc_an", "sfc_fc"],
     "sfc_land": ["sfc_an_land", "sfc_fc_land"],
     "pl": ["pl_an", "pl_fc"],
     "ml": ["ml_an"]
 }
 
-approx_interval_map = {
+approx_interval_map: Dict[str, float] = {
     "1hr": round(1 / 24, 5),    # 0.04167
     "day": 1.00000,
     "mon": 30.00000,
     "fx": 0.00000
 }
 
-pressure_level_number = 37
+realm_prefix_map: Dict[str, str] = {
+    "aerosol": "AER",
+    "atmos": "A",
+    "atmosChem": "AER",
+    "ice": "I",
+    "land": "L",
+    "landIce": "LI",
+    "ocean": "O",
+    "seaIce": "SI"
+}
+
+level_number: Dict[str, int] = {
+    "pl": 37,
+    "ml": 137
+}
 
-def determine_level_category(level_type: str) -> str:
+def _determine_level_category(level_type: str) -> str:
     for category, values in level_categories.items():
         if level_type in values:
             return category
     return "sfc"
 
-def load_variable_from_table(table_path: str, variable_name: str, table_prefix: str) -> Optional[Dict]: 
+def _load_variable_from_table(table_path: str, variable_name: str, table_prefix: str) -> Optional[Dict]: 
     try:
         with open(table_path, 'r') as f:
             data = json.load(f)
@@ -83,22 +99,16 @@ def load_variable_from_table(table_path: str, variable_name: str, table_prefix:
         return None
 
 
-def find_best_matching_variable(variable: str, frequency: str, level_type: str,
+def _find_best_matching_variable(variable: str, frequency: str,
                                  search_dirs: List[str]) -> Optional[Dict]:
-    level_category = determine_level_category(level_type)
-    priority_tables = frequency_priority.get(frequency, [])
+    priority_tables: List[str] = frequency_priority.get(frequency, [])
     for search_dir in search_dirs:
         for table_prefix in priority_tables:
             for root, _, files in os.walk(search_dir):
-                matching_files = [file for file in files if file.endswith(".json") and f"_{table_prefix}" in file or file == f"{table_prefix}.json"]
+                matching_files: List[str] = [file for file in files if file.endswith(".json") and f"_{table_prefix}" in file or file == f"{table_prefix}.json"]
                 for file in matching_files:
-                    # if level_category == "pl" and "Plev" not in file:
-                    #     continue
-                    # if level_category == "sfc" and "Plev" in file:
-                    #     continue
-
-                    table_path = os.path.join(root, file)
-                    variable_data = load_variable_from_table(table_path, variable, table_prefix)
+                    table_path: str = os.path.join(root, file)
+                    variable_data: Dict | None = _load_variable_from_table(table_path, variable, table_prefix)
                     if variable_data:
                         variable_data["source_table"] = os.path.basename(file)
                         return variable_data
@@ -106,11 +116,11 @@ def find_best_matching_variable(variable: str, frequency: str, level_type: str,
 
 def _read_csv(csv_filepath: str) -> pd.DataFrame:
     with open(csv_filepath, 'r', encoding='utf-8') as f:
-        lines = f.readlines()
-    header_line_idx = next(idx for idx, line in enumerate(lines) if line.startswith("#CCC|"))
-    headers = lines[header_line_idx].strip().lstrip('#').split('|')
-    data_lines = lines[header_line_idx + 1:]
-    df = pd.read_csv(StringIO(''.join(data_lines)), sep='|', names=headers, engine='python')
+        lines: List[str] = f.readlines()
+    header_line_idx: int = next(idx for idx, line in enumerate(lines) if line.startswith("#CCC|"))
+    headers: List[str] = lines[header_line_idx].strip().lstrip('#').split('|')
+    data_lines: List[str] = lines[header_line_idx + 1:]
+    df: pd.DataFrame = pd.read_csv(StringIO(''.join(data_lines)), sep='|', names=headers, engine='python')
     df.columns = df.columns.str.strip()
     return df.dropna(subset=["CMPAR"]).fillna("")
 
@@ -126,27 +136,27 @@ def _get_mapping_source(cmip_val, source_table: str = "") -> str:
     elif cmip_val == 1:
         return "CF"
     elif cmip_val == 6:
-        lower_table = source_table.lower()
+        lower_table: str = source_table.lower()
         if "obs4mips" in lower_table:
             return "obs4MIPs"
         elif "cmip6" in lower_table:
             return "CMIP6"
         else:
-            return "CMIP6"  # fallback to CMIP6 if unknown
-    return "unknown"
+            return "CMIP6"
+    return "ECMWF"
 
 def _filter_level_grid(row: dict, level_type: str) -> dict:
     """
     Filter orig_grid and level_type for a specific level_type (e.g., 'sfc_fc_land', 'ml_an').
     """
-    all_level_types = [lt.strip() for lt in row.get("LTYPE", "").split(',') if lt.strip()]
-    all_grids = [g.strip() for g in row.get("ECGRID", "").split(',') if g.strip()]
+    all_level_types: list[str] = [lt.strip() for lt in row.get("LTYPE", "").split(',') if lt.strip()]
+    all_grids: list[str] = [g.strip() for g in row.get("ECGRID", "").split(',') if g.strip()]
 
     if level_type not in all_level_types:
         return {"level_type": "", "orig_grid": ""}
 
     if level_type.startswith("sfc"):
-        grid = "redGG-N1280" if "land" in level_type else "redGG-N320"
+        grid: Literal['redGG-N1280'] | Literal['redGG-N320'] = "redGG-N1280" if "land" in level_type else "redGG-N320"
 
     elif level_type.startswith("ml"):
         if "specG-T639" in all_grids:
@@ -167,6 +177,95 @@ def _filter_level_grid(row: dict, level_type: str) -> dict:
         "orig_grid": grid
     }
 
+def _guess_realm_freq(realm: str, frequency: str) -> str:
+    """
+    Build a table name from realm and frequency, like Aday, Lmon, etc.
+    Handles composite realms like 'seaIce ocean' or 'landIce land'.
+    """
+    for key, prefix in realm_prefix_map.items():
+        if key in realm:
+            return f"{prefix}{frequency}"
+    return frequency
+
+def _extract_table_name(source_table: str) -> str:
+    """
+    Extracts the table name (e.g. 'Eday') from a source table filename like
+    'CMIP6_Eday.json'.
+    """
+    return Path(source_table).stem.split('_')[-1] if source_table else ""
+
+def _get_dimensions(
+    matched: Optional[Dict],
+    frequency: str,
+    level_type: str
+) -> str:
+    """
+    Build dimensions string, resolving appropriate vertical level and ensuring no duplicates.
+    Replaces time-like variants (e.g. time1, time2) with standard 'time'.
+    """
+    default_dims: Literal['longitude latitude'] | Literal['longitude latitude time'] = "longitude latitude" if frequency == "fx" else "longitude latitude time"
+    dims: str = matched.get("dimensions", default_dims) if matched else default_dims
+    dims_parts = dims.split()
+    dims_parts: list[str] = ["time" if re.match(r"time\d*$", dim) else dim for dim in dims_parts]
+    dims_parts = ["longitude" if dim == "site" else dim for dim in dims_parts]
+    if "longitude" in dims_parts and "latitude" not in dims_parts:
+        dims_parts.insert(dims_parts.index("longitude") + 1, "latitude")
+    dims_parts = [dim for dim in dims_parts if not (
+        dim.startswith("plev") or dim.startswith("alevel"))]
+    
+    time_indices: List[int] = [i for i, dim in enumerate(dims_parts) if re.match(r"time\d*$", dim)]
+    time_idx: int = time_indices[0] if time_indices else len(dims_parts)
+    if level_type.startswith("pl"):
+        dims_parts.insert(time_idx, f"plev{level_number['pl']}")
+    elif level_type.startswith("ml"):
+        dims_parts.insert(time_idx, f"alevel{level_number['ml']}")
+
+    return " ".join(dims_parts)
+
+def _get_cell_info(
+    var_name: str,
+    matched: Optional[Dict],
+    frequency: str,
+    modeling_realm: str
+) -> dict:
+    """
+    Build cell_methods and cell_measures, using matched values if present,
+    and filling gaps with defaults based on realm/frequency/variable.
+    """
+    realm_lc: str = modeling_realm.lower()
+
+    time_method = "time: mean"
+    area_method = "area: mean"
+    default_measure = "area: areacella"
+    
+    if frequency in {"1hr", "day", "mon"} and any(ext in var_name.lower() for ext in ["max", "min"]):
+        if "max" in var_name.lower():
+            extreme="maximum"
+        elif "min" in var_name.lower():
+            extreme="minimum"
+        if frequency == "mon":
+            time_method: str = f"time: {extreme} within days time: mean over days"
+        else:
+            time_method = f"time: {extreme}"
+    if "ocean" in realm_lc or "seaice" in realm_lc:
+        area_method = "area: mean where sea"
+        default_measure = "area: areacello"
+    elif "land" in realm_lc:
+        area_method = "area: mean where land"
+    elif "landice" in realm_lc:
+        area_method = "area: mean where snow"                
+    default_method: str = f"{area_method} {time_method}"
+
+    matched_method: str = matched.get("cell_methods", "") if matched else ""
+    matched_measure: str = matched.get("cell_measures", "") if matched else ""
+
+    cell_methods = matched_method.strip() or default_method
+    cell_measures = matched_measure.strip() or default_measure
+    return {
+        "cell_methods": cell_methods,
+        "cell_measures": cell_measures
+    }
+
 
 def _build_cmor_entry(
     var_name: str,
@@ -183,20 +282,20 @@ def _build_cmor_entry(
 
     level_grid = _filter_level_grid(row, level_type)
     source_table = matched.get("source_table", "") if matched else ""
-    mapping = _get_mapping_source(row["CMIP"], source_table)
-    default_dimensions = "lat lon" if frequency == "fx" else "time lat lon"
-    default_cell_methods = "" if frequency == "fx" else "time: mean"
+    mapping: str = _get_mapping_source(row["CMIP"], source_table)
+    modeling_realm = matched.get("modeling_realm", "") if matched else realm
+    cell_info = _get_cell_info(var_name, matched, frequency, modeling_realm)
     matched_comment = matched.get("comment", "") if matched else ""
     final_comment = f"{matched_comment}. {comment}".strip(". ") if comment else matched_comment
 
     return {
-        "cell_measures": matched.get("cell_measures", "") if matched else "",
-        "cell_methods": matched.get("cell_methods", default_cell_methods) if matched else default_cell_methods,
+        "cell_measures": cell_info["cell_measures"],
+        "cell_methods": cell_info["cell_methods"],
         "comment": final_comment,
-        "dimensions": matched.get("dimensions", default_dimensions) if matched else default_dimensions,
+        "dimensions": _get_dimensions(matched, frequency, level_type),
         "frequency": frequency,
         "long_name": long_name,
-        "modeling_realm": matched.get("modeling_realm", "") if matched else realm,
+        "modeling_realm": modeling_realm,
         "ok_max_mean_abs": matched.get("ok_max_mean_abs", "") if matched else "",
         "ok_min_mean_abs": matched.get("ok_min_mean_abs", "") if matched else "",
         "out_name": var_name,
@@ -217,54 +316,86 @@ def _build_cmor_entry(
         "level_type": level_grid["level_type"],
         "conversion": row["CMFACT"],
         "source_table": matched.get("source_table", "") if matched else "",
-        "table": source_table if source_table else frequency,
+        "table":  _extract_table_name(source_table) if source_table else _guess_realm_freq(realm, frequency),
         "mapping": mapping
     }
     
 
 
-def csv_to_cmor_json(csv_filepath=csv_file, json_output_path=json_output_path):
+def _ensure_list(val):
+    if val is None:
+        return None
+    return val if isinstance(val, list) else [val]
+
+def csv_to_cmor_json(
+    csv_filepath=csv_file,
+    json_output_path=json_output_path,
+    var=None,
+    freq=None,
+    ltype=None,
+    clean_output: bool = False
+) -> None:
+
+    var = _ensure_list(var)
+    freq = _ensure_list(freq)
+    ltype = _ensure_list(ltype)
+
+    if clean_output:
+        for file in glob.glob(os.path.join(json_output_path, "*.json")):
+            os.remove(file)
+        print(f"🧹 Removed existing JSON files from: {json_output_path}")
+        return
 
-    df = _read_csv(csv_filepath)
-    grouped_json = {}
+    df: pd.DataFrame = _read_csv(csv_filepath)
+    if var:
+        df = df[df["CMPAR"].isin(var)]
+
+    grouped_json: dict = {}
 
     for _, row in df.iterrows():
-        var_name = row["CMPAR"]
-        time_representation = [t.strip().upper() for t in row["TREPR"].split(',') if pd.notna(row["TREPR"])]
-        level_types = [lt.strip() for lt in row["LTYPE"].split(',') if pd.notna(row["LTYPE"])]
+        var_name: str = row["CMPAR"]
+        time_representation: list[str] = [
+            t.strip().upper() for t in row["TREPR"].split(',') if pd.notna(row["TREPR"])
+        ]
+        level_types: list[str] = [
+            lt.strip() for lt in row["LTYPE"].split(',') if pd.notna(row["LTYPE"])
+        ]
 
         for level_type in level_types:
-            level_group = determine_level_category(level_type)
-
-            if "INV" in time_representation:
-                freq = "fx"
-                print(f"----------------------------{freq}-----------------------------")                
-                key = (freq, level_group)
-                matched = find_best_matching_variable(var_name, freq, level_type, search_dirs)
-                cmor_entry = _build_cmor_entry(
-                    var_name, matched, row, freq, level_type
-                )                
-                grouped_json.setdefault(key, {})[var_name] = cmor_entry
-            for freq in ["1hr", "day", "mon"]:
-                print(f"----------------------------{freq}-----------------------------")
-                key = (freq, level_group)
-                matched = find_best_matching_variable(var_name, freq, level_type, search_dirs)
-                cmor_entry = _build_cmor_entry(
-                    var_name, matched, row, freq, level_type
-                )   
+            level_group = _determine_level_category(level_type)
+            if ltype and level_group not in ltype:
+                continue
+
+            if set(time_representation) == {"INV"}:
+                applicable_frequencies: List[str] = ["fx"]
+            else:
+                applicable_frequencies = ["1hr", "day", "mon"]
+                if "INV" in time_representation:
+                    applicable_frequencies.append("fx")
+
+            for frequency in applicable_frequencies:
+                if freq and frequency not in freq:
+                    continue
+
+                print(f"----------------------------{frequency}-----------------------------")
+                key: tuple[str, str] = (frequency, level_group)
+                matched: dict = _find_best_matching_variable(var_name, frequency, search_dirs)
+                cmor_entry: dict = _build_cmor_entry(
+                    var_name, matched, row, frequency, level_type
+                )
                 grouped_json.setdefault(key, {})[var_name] = cmor_entry
 
-    # Write each file
-    for (freq, level_group), variable_entry in grouped_json.items():
+    for (frequency, level_group), variable_entry in grouped_json.items():
         if "land" in level_group:
             level_group = level_group.replace("_land", "")
-            filename = f"ERA5Land_{freq}_{level_group}"
-        else:        
-            filename = f"ERA5_{freq}_{level_group}"
-        cmor_json = {
+            filename = f"ERA5Land_{frequency}_{level_group}"
+        else:
+            filename = f"ERA5_{frequency}_{level_group}"
+
+        cmor_json: dict[str, Any] = {
             "Header": {
                 "Conventions": "CF-1.7 ODS-2.1",
-                "approx_interval": f"{approx_interval_map[freq]}",
+                "approx_interval": f"{approx_interval_map[frequency]}",
                 "cmor_version": "3.5",
                 "data_specs_version": "2.1.0",
                 "generic_levels": "",
@@ -272,7 +403,9 @@ def csv_to_cmor_json(csv_filepath=csv_file, json_output_path=json_output_path):
                 "mip_era": "CMIP6",
                 "missing_value": "1e20",
                 "product": "model-output",
-                "realm": " ".join(sorted({v.get("modeling_realm", "") for v in variable_entry.values()})).strip(),
+                "realm": " ".join(
+                    sorted({v.get("modeling_realm", "") for v in variable_entry.values()})
+                ).strip(),
                 "table_date": f"{today_date}",
                 "table_id": f"Table {filename}"
             },
@@ -286,6 +419,6 @@ def csv_to_cmor_json(csv_filepath=csv_file, json_output_path=json_output_path):
 
         print(f"✅ Written: {output_path}")
 
-# Run
+
 if __name__ == "__main__":
     csv_to_cmor_json()
\ No newline at end of file
-- 
GitLab