Skip to content
Snippets Groups Projects

Migration to plugin template and add climpact functionality

Merged Mostafa Hadizadeh requested to merge migration into main
4 unresolved threads
wrapper.py 0 → 100644
+ 369
0
# SPDX-FileCopyrightText: 2024 Climate Service Center Germany, Technische Universität Dresden
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Heat2UrbanImpact plugin API wrapper.
"""
######################################################
# import commonly used libraries. You can add/remove
# libraries as needed *test*
######################################################
import json
import logging
import os
from getpass import getuser
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import List, Optional
import freva
from assets.climpact_assets import MUNICIPALITIES
from evaluation_system.api import plugin
from evaluation_system.api.parameters import Bool, File, Integer
from evaluation_system.api.parameters import ParameterDictionary as ParamDict
from evaluation_system.api.parameters import SelectField, SolrField, String
from evaluation_system.misc import config, logger
from evaluation_system.model.solr import SolrFindFiles
class Heat2UrbanImpact_DO_NOT_USE(plugin.PluginAbstract):
######################################################
# Define the details for the plugin
######################################################
# Version of the Heat2UrbanImpact plugin (major, minor, patch) * Mandatory
__version__ = (2024, 11, 0)
# Short Description of the Heat2UrbanImpact plugin * Mandatory
__short_description__: str = "ATTENTION: THIS PLUGIN IS LOCATED HERE ONLY FOR DEVELOPMENT PURPOSES. PLEASE DON'T USE IT FOR THE RESEARCH AIMS. \n\n Preprocessed time series of heat periods from RCM data for urban impact models."
# Optional category this plugin belongs to
__category__: Optional[str] = "Time Series"
# Optional tags, that are the plugin can be described with and found by
__tags__: Optional[List[str]] = ["heat events", "urban impact model"]
# Optional author of the plugin
tool_developer = {
"name": "Bente Tiedje, Astrid Ziemann",
"email": "bente.tiedje@hereon.de, astrid.ziemann@tu-dresden.de",
}
# Optional long description of the author of the plugin
__long_description__: Optional[
str
] = "ATTENTION: THIS PLUGIN IS LOCATED HERE ONLY FOR DEVELOPMENT PURPOSES. PLEASE DON'T USE IT FOR THE RESEARCH AIMS. \n\n This plugin extracts the development of the atmospheric temperature stratification during a specified heat/warm period from regional climate model data and tailors the data for the usage in urban impact models. For a selectable region of interest and period of time this plugin returns hourly temperature data at different layers/hights in a directly usable format (format specification). This plugin is based on the NUKLEUS ensemble of 3km horizontal grid."
######################################################
# Define the parameters for the plugin * Mandatory
######################################################
__parameters__ = ParamDict(
File(
name="shape_file",
file_extension="geojson",
help=(
"Select a geo reference file defining your region of interest. If None (default), "
"the whole region defined in the climate data will be taken. Note: Either select a "
"path on the HPC system or a web URL."
),
default=None,
),
SelectField(
name="region",
options={mun: mun for mun in [""] + sorted(MUNICIPALITIES)},
help=(
"Select a pre-defined German municipality. Only applies if no shape file chosen."
),
),
String(
name="split_by",
help=(
"Key to split multiple geometries in geo reference file into sub-regions. Values "
"will be used to distinguish between different sub-regions."
),
),
SolrField(
name="experiment",
mandatory=True,
facet="experiment",
help="Select the model simulation of interest: historical or projection (global warming level +2K or +3K)",
max_items=1,
predefined_facets={
"project": ["nukleus"],
"product": ["ceu-3"],
"time_frequency": ["1hr"],
"variable": ["tas", "tasmin", "tasmax"],
},
),
SolrField(
name="models",
#mandatory=True,
facet="model",
multiple=True,
max_items=100,
predefined_facets={
"project": ["nukleus"],
"product": ["ceu-3"],
"time_frequency": ["1hr"],
"variable": ["tas", "tasmin", "tasmax"],
},
help=(
"Select climate model(s) from the NUKLEUS ensemble. Multiple models can be chosen for simultaneous output."
),
),
SelectField(
name="event",
default="95",
options={"75th percentil": "75", "85th percentile": "85", "95th percentile": "95"},
help="Select the exceptionality of the heat/warm period by selecting percentiles of temperature probability distribution. Default is the 95th percentile. IT'S NOT IMPLEMENTED YET.",
),
Integer(
name="length_of_event",
default=3,
help="Enter the number of days of the heat/warm period you would like to receive as output. An integer between 1 and 14 days would be plausible. Default is 3 days. IT'S NOT IMPLEMENTED YET.",
),
String(
name="months_of_event",
default="July",
#options={
# "JJA": "Summer (Jun-Aug)",
# "DJF": "Winter (Dec-Feb)",
# "MAM": "Spring (Mar-May)",
# "SON": "Autumn (Sep-Nov)",
#},
help="Enter the month(s) when your heat event is supposed to happen for your studies. E.g. for single month 'June' or for multiple months 'May:August'. Default is 'July'. IT'S NOT IMPLEMENTED YET.",
),
SelectField(
name="impact_model",
default=None,
options={
"ENVI-met": "ENVI-met",
"PALM": "PALM",
},
help="The format of the output will be tailored for the selected impact model. IT'S NOT IMPLEMENTED YET.",
),
String(
name="start",
help=(
"First timestep to process. If empty (default), uses first available timestep."
),
default=None,
),
String(
name="end",
help=(
"Last timestep to process. If empty (default), uses last available timestep."
),
default=None,
),
String(
name="output_units",
default="",
help=(
"Unit conversion specification. Format: variable:unit/multiplier. Examples: "
"'pr:mm/h' or 'pr:3600' for precipitation. Multiple: 'pr:3600,tas:degC'."
),
),
SelectField(
name="output_file_type",
default="nc",
options={"nc": "nc", "csv": "csv", "h5": "hdf5", "zarr": "zarr"},
help="Select output file format.",
),
SelectField(
name="mask_type",
default="none",
options={"none": "none", "land": "land", "sea": "sea"},
help=(
"Surface type to mask. none: no mask, land: mask land, sea: mask water areas."
),
),
SelectField(
name="mask_method",
default="centres",
options={"centres": "centres", "corners": "corners"},
help=(
"How to determine masked region. centres: only grid-cell centres within polygon. "
"corners: any grid-cell corner within polygon (less restrictive)."
),
),
Bool(
name="plot_map",
default=True,
help=(
"Generate maps for each variable/model to verify projection and masking."
),
),
File(
name="pre_process_module",
file_extension="py",
help=(
"Python file with custom processing routines. If not set, calculates time series "
"of selected variables."
),
default=None,
),
String(
name="pre_process_function",
help=(
"Name of pre-processing function in pre_process_module. Only used if module set."
),
default="pre_proc",
),
)
######################################################
# a couple of functions to collect the files
# !! THERE IS NO NEED TO CHANGE ANYTHING HERE !!
######################################################
@staticmethod
def _drs_search(**search_kw):
kw = search_kw.copy()
files = sorted(freva.databrowser(**kw))
if files:
return files
def _get_files(
self, variable, experiment, model, product, project, freq, start, end
):
files = {}
time = {}
if start and end:
time = {"time": f"{start}to{end}"}
elif start:
time = {"time": f"{start}to9999"}
elif end:
time = {"time": f"0to{end}"}
search_kw = dict(
variable=variable,
experiment=experiment,
model=model,
time_frequency=freq,
)
search_kw.update(time)
if product:
search_kw["product"] = product
if project:
search_kw["project"] = project
experiment_facets = freva.facet_search(facet=["ensemble"], **search_kw)
for ens in experiment_facets["ensemble"]:
search_kw["ensemble"] = ens
files_found = self._drs_search(**search_kw)
if not files_found:
logger.warning("No files found for %s", ens)
else:
files[ens] = files_found
return files
def get_mask(self, mask_type, **kwargs):
"""Get the land-sea mask for a setup."""
setup = kwargs.copy()
if mask_type is None or mask_type == "none":
return
setup["variable"] = "sftlf"
files = self._drs_search(**setup)
if files:
return str(files[0]), mask_type
logger.warning(
"No land-sea mask found for setup, searching for alternatives"
)
# Get rid of the model contraint
setup.pop("model")
files = self._drs_search(**setup)
if files:
return str(files[0]), mask_type
raise ValueError("No land-sea mask found for setup")
######################################################
def run_tool(self, config_dict):
# Define pre-defined facets for the plugin
config_dict["project"] = "nukleus"
config_dict["product"] = "ceu-3"
config_dict["time_frequency"] = "1hr"
config_dict["variable"] = ["tas", "ta300", "ta500", "ta700", "ta950"]
out_dir = self._special_variables.substitute({"d": "$USER_OUTPUT_DIR"})
out_dir = Path(out_dir["d"]) / str(self.rowid)
debug = logger.getEffectiveLevel() <= logging.DEBUG
config_dict["output_dir"] = str(out_dir)
out_dir.mkdir(exist_ok=True, parents=True)
models = config_dict.pop("models")
units = {}
for string in (
config_dict.pop("output_units").strip("{").strip("}").split(",")
):
var, _, unit = string.strip().partition(":")
if var:
try:
units[var.strip()] = float(unit.strip())
except ValueError:
units[var.strip()] = unit.strip()
config_dict["output_units"] = units
files = {}
search_kw = dict(
model=models,
experiment=config_dict["experiment"],
variable=config_dict["variable"],
)
for key in ("project", "product"):
if config_dict[key] is not None:
search_kw[key] = config_dict[key]
config_dict["mask"] = self.get_mask(
config_dict.pop("mask_type"), **search_kw
)
if config_dict["project"] is None:
config_dict["project"] = [
project
for project in SolrFindFiles.facets(**search_kw)["project"][
::2
]
if project
]
if isinstance(models, str):
models = [models]
for model in models:
files[model] = self._get_files(
config_dict["variable"],
config_dict["experiment"],
model,
config_dict["product"],
config_dict["project"],
config_dict["time_frequency"],
config_dict["start"],
config_dict["end"],
)
config_dict["swift_key"] = os.environ.get("LC_TELEPHONE", None)
options = config.get_section("scheduler_options")
config_dict["account"] = options.get("project") or getuser()
config_dict["files"] = files
config.reloadConfiguration()
this_dir = Path(__file__).parent.absolute()
config_dict["kernel_name"] = config.get("project_name", "freva")
python_env = this_dir / "plugin_env" / "bin" / "python"
tool_path = this_dir / "builder.py"
if not config_dict["shape_file"]:
config_dict["shape_file"] = str(this_dir / "assets" / "GER.shp")
for num, mun in enumerate(MUNICIPALITIES):
if mun.lower() == config_dict["region"].lower():
config_dict["region"] = str(num)
break
else:
config_dict["region"] = ""
config_dict["debug"] = False
for key, value in config_dict.items():
if value is None:
config_dict[key] = ""
if debug:
config_dict["abort_on_error"] = True
config_dict["debug"] = debug
with NamedTemporaryFile(suffix=".json") as tf:
with open(tf.name, "w") as f:
json.dump(config_dict, f, indent=3)
cmd = "{} -B {} {}".format(python_env, tool_path, tf.name)
_ = self.call(cmd)
for path in ("dask-worker-space", "__pycache__"):
rmpath = Path(config_dict["output_dir"]) / path
if rmpath.is_dir():
os.system("rm -r %s" % str(rmpath))
return self.prepare_output(config_dict["output_dir"])
Loading