Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • m300524/intake_taking_the_pain_out_of_data_access
1 result
Show changes
Commits on Source (2)
%% Cell type:code id: tags:
 
``` python
import seaborn as sns
 
sns.set(style="ticks", context="talk")
```
 
%% Cell type:code id: tags:
 
``` python
# get formating done automatically according to style `black`
%load_ext lab_black
```
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:markdown id: tags:
 
# assessing `CMIP6` data with `intake`
 
%% Cell type:code id: tags:
 
``` python
# intake facilitates file browsing and access, checkout intake for reading in .csv files
# and plugins for S3, SQL, etc ...
# https://intake.readthedocs.io/en/latest/plugin-directory.html
import intake
import xarray as xr
```
 
%% Cell type:markdown id: tags:
 
`intake-esm` loads a `json` file, in which all concatination rules and the location of the catalog `csv`-file is provided.
 
%% Cell type:code id: tags:
 
``` python
# json file contains the rules for concat and merge as well as the location for the catalog .csv file
# Fabian Wachsmann creates a new catalog of CMIP6 data downloaded to /work/ik1017/ daily
col_url = "/work/ik1017/Catalogs/mistral-cmip6.json"
# col_url = "/work/ik1017/Catalogs/mistral-cmip6.json" # mistral
col_url = "/pool/data/Catalogs/dkrz_cmip6_disk.json" # levante
```
 
%% Cell type:code id: tags:
 
``` python
# where to find the catalog_file
!cat /work/ik1017/Catalogs/mistral-cmip6.json | head -12
```
 
%% Output
 
{
"esmcat_version": "0.1.0",
"id": "mistral-cmip6",
"description": "This is an ESM collection for CMIP6 data accessible on the DKRZ's MISTRAL disk storage system in /work/ik1017/CMIP6/data/CMIP6",
"catalog_file": "/mnt/lustre02/work/ik1017/Catalogs/mistral-cmip6.csv.gz",
"attributes": [
{
"column_name": "activity_id",
"vocabulary": "https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_activity_id.json"
},
{
"column_name": "source_id",
 
%% Cell type:code id: tags:
 
``` python
# which dimensions should be aggregated
!cat /work/ik1017/Catalogs/mistral-cmip6.json | tail -23
```
 
%% Output
 
"aggregations": [
{
"type": "union",
"attribute_name": "variable_id"
},
{
"type": "join_existing",
"attribute_name": "time_range",
"options": { "dim": "time", "coords": "minimal", "compat": "override" }
},
{
"type": "join_new",
"attribute_name": "member_id",
"options": { "coords": "minimal", "compat": "override" }
},
{
"type": "join_new",
"attribute_name": "dcpp_init_year",
"options": { "coords": "minimal", "compat": "override" }
}
]
}
}
 
%% Cell type:code id: tags:
 
``` python
# takes a few secs
# loads all file paths of the CMIP6 archive
%time col = intake.open_esm_datastore(col_url)
col
```
 
%% Output
 
CPU times: user 19.7 s, sys: 1.64 s, total: 21.3 s
Wall time: 21.3 s
 
 
%% Cell type:code id: tags:
 
``` python
# col.df is a pandas.DataFrame
col.df.head()
```
 
%% Output
 
activity_id institution_id source_id experiment_id member_id table_id \
0 AerChemMIP BCC BCC-ESM1 hist-piNTCF r1i1p1f1 AERmon
1 AerChemMIP BCC BCC-ESM1 hist-piNTCF r1i1p1f1 AERmon
2 AerChemMIP BCC BCC-ESM1 hist-piNTCF r1i1p1f1 AERmon
3 AerChemMIP BCC BCC-ESM1 hist-piNTCF r1i1p1f1 AERmon
4 AerChemMIP BCC BCC-ESM1 hist-piNTCF r1i1p1f1 AERmon
variable_id grid_label dcpp_init_year version time_range \
0 c2h6 gn NaN v20190718 185001-201412
1 c3h6 gn NaN v20190718 185001-201412
2 c3h8 gn NaN v20190718 185001-201412
3 cdnc gn NaN v20190718 185001-201412
4 ch3coch3 gn NaN v20190718 185001-201412
path
0 /mnt/lustre02/work/ik1017/CMIP6/data/CMIP6/Aer...
1 /mnt/lustre02/work/ik1017/CMIP6/data/CMIP6/Aer...
2 /mnt/lustre02/work/ik1017/CMIP6/data/CMIP6/Aer...
3 /mnt/lustre02/work/ik1017/CMIP6/data/CMIP6/Aer...
4 /mnt/lustre02/work/ik1017/CMIP6/data/CMIP6/Aer...
 
%% Cell type:markdown id: tags:
 
## multi-model analysis
 
must-use for `intake-esm`
 
%% Cell type:code id: tags:
 
``` python
# col.search selects the experiments you specify in `query`
variable = "tas"
query = dict(
experiment_id="historical",
table_id="Amon",
member_id="r1i1p1f1",
variable_id=variable,
# source_id="MPI-ESM1-2-LR", # uncomment to take all models
)
cat = col.search(**query)
cat
```
 
%% Cell type:code id: tags:
 
``` python
# some data cleaning needed: wrong times/metadata found
source_ids = list(cat.df.source_id.unique())
for models in ["E3SM-1-1-ECA", "NorCPM1", "FGOALS-g3", "E3SM-1-0"]:
source_ids.remove(models)
cat = col.search(**query, source_id=source_ids)
cat
```
 
%% Output
 
 
%% Cell type:code id: tags:
 
``` python
# cdf_kwargs are given to xarray.open_dataset
# cftime is like datetime but extends to all four digit years and many calendar types
%time dset_dict = cat.to_dataset_dict(cdf_kwargs={"chunks": {"time": -1}, "use_cftime": True})
```
 
%% Output
 
--> The keys in the returned dictionary of datasets are constructed as follows:
'activity_id.institution_id.source_id.experiment_id.table_id.grid_label'
 
/work/mh0727/m300524/miniconda3/envs/pymistral/lib/python3.7/site-packages/xarray/conventions.py:498: SerializationWarning: variable 'tas' has multiple fill values {1e+20, 1e+20}, decoding all values to NaN.
decode_timedelta=decode_timedelta,
 
 
CPU times: user 42.4 s, sys: 17.9 s, total: 1min
Wall time: 54.9 s
 
%% Cell type:code id: tags:
 
``` python
# return dict because models different spatial dimensions
list(dset_dict)
```
 
%% Output
 
['CMIP.CMCC.CMCC-CM2-SR5.historical.Amon.gn',
'CMIP.THU.CIESM.historical.Amon.gr',
'CMIP.MIROC.MIROC6.historical.Amon.gn',
'CMIP.NASA-GISS.GISS-E2-1-H.historical.Amon.gn',
'CMIP.CSIRO-ARCCSS.ACCESS-CM2.historical.Amon.gn',
'CMIP.BCC.BCC-CSM2-MR.historical.Amon.gn',
'CMIP.CCCma.CanESM5.historical.Amon.gn',
'CMIP.INM.INM-CM4-8.historical.Amon.gr1',
'CMIP.CAS.CAS-ESM2-0.historical.Amon.gn',
'CMIP.MPI-M.MPI-ESM1-2-HR.historical.Amon.gn',
'CMIP.CMCC.CMCC-CM2-HR4.historical.Amon.gn',
'CMIP.CSIRO.ACCESS-ESM1-5.historical.Amon.gn',
'CMIP.CAS.FGOALS-f3-L.historical.Amon.gr',
'CMIP.NASA-GISS.GISS-E2-1-G.historical.Amon.gn',
'CMIP.NIMS-KMA.KACE-1-0-G.historical.Amon.gr',
'CMIP.E3SM-Project.E3SM-1-1.historical.Amon.gr',
'CMIP.KIOST.KIOST-ESM.historical.Amon.gr1',
'CMIP.BCC.BCC-ESM1.historical.Amon.gn',
'CMIP.SNU.SAM0-UNICON.historical.Amon.gn',
'CMIP.NCAR.CESM2-WACCM-FV2.historical.Amon.gn',
'CMIP.HAMMOZ-Consortium.MPI-ESM-1-2-HAM.historical.Amon.gn',
'CMIP.MPI-M.MPI-ESM1-2-LR.historical.Amon.gn',
'CMIP.NUIST.NESM3.historical.Amon.gn',
'CMIP.NCC.NorESM2-LM.historical.Amon.gn',
'CMIP.INM.INM-CM5-0.historical.Amon.gr1',
'CMIP.MRI.MRI-ESM2-0.historical.Amon.gn',
'CMIP.NCAR.CESM2-FV2.historical.Amon.gn',
'CMIP.NCAR.CESM2-WACCM.historical.Amon.gn',
'CMIP.AS-RCEC.TaiESM1.historical.Amon.gn',
'CMIP.IPSL.IPSL-CM6A-LR.historical.Amon.gr',
'CMIP.FIO-QLNM.FIO-ESM-2-0.historical.Amon.gn',
'CMIP.CAMS.CAMS-CSM1-0.historical.Amon.gn',
'CMIP.NCC.NorESM2-MM.historical.Amon.gn',
'CMIP.NCAR.CESM2.historical.Amon.gn',
'CMIP.NOAA-GFDL.GFDL-ESM4.historical.Amon.gr1',
'CMIP.UA.MCM-UA-1-0.historical.Amon.gn',
'CMIP.AWI.AWI-ESM-1-1-LR.historical.Amon.gn',
'CMIP.EC-Earth-Consortium.EC-Earth3-Veg.historical.Amon.gr']
 
%% Cell type:code id: tags:
 
``` python
# dset_dict['CMIP.CAS.CAS-ESM2-0.historical.Amon.gn']#['tas'].mean(['lon','lat']).squeeze().plot()
```
 
%% Cell type:code id: tags:
 
``` python
# ultimate goal to get all data into one object: will ease following computation
da = []
modellist = []
orig_size = 0
 
# resetting time as often differently set
expected_time = xr.cftime_range(start="1850", freq="MS", periods=165 * 12)
 
# extract all items from dset_dict
for key, value in dset_dict.items():
model = key.split(".")[2] # extract model name from key
da_model = value["tas"].squeeze() # extract variable from dataset
# track dataset size processed
orig_size += da_model.nbytes
# spatial mean: gmst
spatial_dims = [d for d in da_model.dims if d not in ["time"]]
da_model_spatial_mean = da_model.mean(spatial_dims).sortby("time")
# often time is differently formatted, therefore overwrite
if da_model_spatial_mean.time.size != expected_time.size:
print(f"omit {model}")
continue
da_model_spatial_mean["time"] = expected_time
# rechunk time, see dashboard task graph
da_model_spatial_mean = da_model_spatial_mean.chunk({"time": -1})
da.append(da_model_spatial_mean)
modellist.append(model)
```
 
%% Cell type:code id: tags:
 
``` python
da = xr.concat(da, dim="model", coords="minimal")
da["model"] = modellist
 
da
```
 
%% Output
 
<xarray.DataArray 'tas' (model: 38, time: 1980)>
dask.array<concatenate, shape=(38, 1980), dtype=float32, chunksize=(1, 1980), chunktype=numpy.ndarray>
Coordinates:
height float64 2.0
member_id <U8 'r1i1p1f1'
* time (time) object 1850-01-01 00:00:00 ... 2014-12-01 00:00:00
* model (model) <U15 'CMCC-CM2-SR5' 'CIESM' ... 'EC-Earth3-Veg'
 
%% Cell type:code id: tags:
 
``` python
from dask.utils import format_bytes
 
print(f"Collapsing {format_bytes(orig_size)} into {format_bytes(da.nbytes)}")
```
 
%% Output
 
Collapsing 11.25 GB into 300.96 kB
 
%% Cell type:markdown id: tags:
 
### computation on notebook resources
 
%% Cell type:code id: tags:
 
``` python
# one one compute node, shared wont work
# to see dashboard and use distributed scheduler
from dask.distributed import Client
import multiprocessing
 
ncpu = multiprocessing.cpu_count()
threads = 6
nworker = ncpu // threads
print(
f"Number of CPUs: {ncpu}, number of threads: {threads}, number of workers: {nworker}"
)
 
# client also starts dask dashboard
client = Client(
processes=False, threads_per_worker=threads, n_workers=nworker, memory_limit="64GB"
)
client
```
 
%% Output
 
Number of CPUs: 48, number of threads: 6, number of workers: 8
 
<Client: 'inproc://10.50.40.73/18319/1' processes=8 threads=48, memory=512.00 GB>
 
%% Cell type:code id: tags:
 
``` python
# trigger execution, remaining coding is data light and better suited for eager data
%time dac = da.compute()
```
 
%% Output
 
CPU times: user 3min 1s, sys: 27.7 s, total: 3min 28s
Wall time: 2min 14s
 
%% Cell type:code id: tags:
 
``` python
client.close()
```
 
%% Cell type:markdown id: tags:
 
### computation on many compute nodes
starts SLURM jobs via `dask_jobqueue`
 
%% Cell type:code id: tags:
 
``` python
# request 12 workers
```
 
%% Cell type:code id: tags:
 
``` python
from dask.distributed import Client
 
client = Client("tcp://10.50.34.182:39368")
```
 
%% Cell type:code id: tags:
 
``` python
!squeue -o "%.9i %.50j %.8u %.8T %.11M %.11l %.7D %.S" -u m300524
```
 
%% Output
 
JOBID NAME USER STATE TIME TIME_LIMIT NODES START_TIME
23856334 Jupyter m300524 RUNNING 3:19:02 4:00:00 1 2020-09-23T13:45:46
 
%% Cell type:code id: tags:
 
``` python
client
```
 
%% Output
 
<Client: 'tcp://10.50.34.182:39368' processes=0 threads=0, memory=0 B>
 
%% Cell type:code id: tags:
 
``` python
%time dac = da.compute()
```
 
%% Output
 
CPU times: user 710 ms, sys: 16 ms, total: 726 ms
Wall time: 32.5 s
 
%% Cell type:code id: tags:
 
``` python
# shutdown cluster also
client.close()
```
 
%% Cell type:code id: tags:
 
``` python
# usually I save important intermediate data to to disk (large data preferably in zarr format)
# the dashboard shows that I/O is the bottleneck
```
 
%% Cell type:markdown id: tags:
 
### Multi-model plot
 
%% Cell type:code id: tags:
 
``` python
def yearmean(ds, dim="time"):
return ds.groupby(f"{dim}.year").mean().rename({"year": dim})
```
 
%% Cell type:code id: tags:
 
``` python
# magically get observations, more details later
# remote_cat = intake.open_catalog('https://raw.githubusercontent.com/aaronspring/remote_climate_data/master/master.yaml')
remote_cat = intake.open_catalog("/home/mpim/m300524/remote_climate_data/master.yaml")
hadcrut = remote_cat.atmosphere.HadCRUT4.to_dask()
print(hadcrut.coords)
obs = hadcrut.mean(["longitude", "latitude"]).temperature_anomaly
```
 
%% Output
 
Coordinates:
* latitude (latitude) float32 -87.5 -82.5 -77.5 -72.5 ... 77.5 82.5 87.5
* longitude (longitude) float32 -177.5 -172.5 -167.5 ... 167.5 172.5 177.5
* time (time) object 1850-01-16 12:00:00 ... 2020-07-16 12:00:00
 
%% Cell type:code id: tags:
 
``` python
# best practice: all data into one xr.object, the do the math afterwards with automatic broadcasting
# add observations to da
obs = obs.expand_dims("model")
obs["model"] = ["HadCRUT4 obs"]
```
 
%% Cell type:code id: tags:
 
``` python
obs["time"] = xr.cftime_range(start="1850", freq="MS", periods=obs.time.size)
obs = obs.sel(time=slice("1850", "2019"))
 
dac = xr.concat([obs, dac], "model")
```
 
%% Cell type:code id: tags:
 
``` python
# anomaly plot
import matplotlib.pyplot as plt
 
# this is were the science starts
anom = yearmean(dac - dac.sel(time=slice("1960", "1990")).mean("time"))
# and now already ends
 
# plot all models
anom.drop_sel(model="HadCRUT4 obs").plot(hue="model", figsize=(12, 3), alpha=0.3)
# plot model mean in black
anom.drop_sel(model="HadCRUT4 obs").mean("model").plot(c="k")
# plot observations in red
anom.sel(model="HadCRUT4 obs").plot(c="red", lw=2)
plt.title("GMST anomaly since 1960-1990")
plt.xlim([1850, 2110])
plt.show()
```
 
%% Output
 
/work/mh0727/m300524/miniconda3/envs/pymistral/lib/python3.7/site-packages/dask/array/numpy_compat.py:40: RuntimeWarning: invalid value encountered in true_divide
x = np.divide(x1, x2, out)
/work/mh0727/m300524/miniconda3/envs/pymistral/lib/python3.7/site-packages/dask/array/numpy_compat.py:40: RuntimeWarning: invalid value encountered in true_divide
x = np.divide(x1, x2, out)
 
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:markdown id: tags:
 
# observations with `intake-xarray`
 
%% Cell type:markdown id: tags:
 
- avoid copy pasting for accessing data
- distinction between data curator and analyst roles
- access data without learning every backend API
- version control data sources
- cache remote data sources
- see https://intake.readthedocs.io/en/latest/use_cases.html
 
%% Cell type:markdown id: tags:
 
## ICDC catalog
 
%% Cell type:code id: tags:
 
``` python
import intake
 
icdc_cat = intake.open_catalog("/pool/data/ICDC/ocean/ocean_mistral.yml")
```
 
%% Cell type:code id: tags:
 
``` python
list(icdc_cat)
```
 
%% Output
 
['levitus_tanom',
'levitus_sanom',
'levitus_ohc',
'modis_chlorophyll_sst',
'aviso_ssh',
'hadisst',
'amsre_sst',
'reynolds_sst',
'oscar_surface_current_velocity',
'smos_sss',
'en4',
'bnsc',
'waghc',
'nsbc',
'pathfinder_sst_v5.2',
'jtp_ocean_currents',
'woce_climatology']
 
%% Cell type:code id: tags:
 
``` python
!cat /pool/data/ICDC/ocean/ocean_mistral.yml | head -21
```
 
%% Output
 
plugins:
source:
- module: intake_xarray
sources:
levitus_tanom:
description: Levitus T anom
driver: netcdf
metadata:
fields:
t_an:
label: temperature anomaly
unit: °C
args:
urlpath: /pool/data/ICDC/ocean/levitus/DATA/tanom/yearly/tanom_*.nc
xarray_kwargs:
decode_times: False
concat_dim: time
combine: nested
drop_variables: [lat_bnds,lon_bnds,depth_bnds,climatology_bounds,crs]
levitus_sanom:
 
%% Cell type:code id: tags:
 
``` python
ds = icdc_cat.levitus_tanom.to_dask()
print(ds.coords)
```
 
%% Output
 
Coordinates:
* lon (lon) float32 -179.5 -178.5 -177.5 -176.5 ... 177.5 178.5 179.5
* lat (lat) float32 -89.5 -88.5 -87.5 -86.5 -85.5 ... 86.5 87.5 88.5 89.5
* depth (depth) float64 0.0 10.0 20.0 30.0 ... 1.5e+03 1.75e+03 2e+03
* time (time) float64 6.0 18.0 30.0 42.0 54.0 ... 738.0 750.0 762.0 774.0
 
%% Cell type:code id: tags:
 
``` python
ds.t_an.sel(depth=[0, 100, 500, 2000]).mean(["lon", "lat"]).plot(hue="depth")
```
 
%% Output
 
/work/mh0727/m300524/miniconda3/envs/pymistral/lib/python3.7/site-packages/dask/array/numpy_compat.py:40: RuntimeWarning: invalid value encountered in true_divide
x = np.divide(x1, x2, out)
 
[<matplotlib.lines.Line2D at 0x2b48518aba10>,
<matplotlib.lines.Line2D at 0x2b4815f44710>,
<matplotlib.lines.Line2D at 0x2b4815f44850>,
<matplotlib.lines.Line2D at 0x2b4815f44810>]
 
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:markdown id: tags:
 
## remote
 
%% Cell type:code id: tags:
 
``` python
import intake
 
# remote_cat = intake.open_catalog('https://raw.githubusercontent.com/aaronspring/remote_climate_data/master/master.yaml')
# no internet connection expect for GPU nodes
remote_cat = intake.open_catalog("/home/mpim/m300524/remote_climate_data/master.yaml")
```
 
%% Cell type:code id: tags:
 
``` python
remote_cat.atmosphere.HadCRUT4
```
 
%% Output
 
 
%% Cell type:code id: tags:
 
``` python
hadcrut = remote_cat.atmosphere.HadCRUT4.to_dask()
print(hadcrut.coords)
gmst = hadcrut.mean(["longitude", "latitude"]).temperature_anomaly
```
 
%% Output
 
Coordinates:
* latitude (latitude) float32 -87.5 -82.5 -77.5 -72.5 ... 77.5 82.5 87.5
* longitude (longitude) float32 -177.5 -172.5 -167.5 ... 167.5 172.5 177.5
* time (time) object 1850-01-16 12:00:00 ... 2020-07-16 12:00:00
 
%% Cell type:code id: tags:
 
``` python
yearmean(gmst).plot()
```
 
%% Output
 
[<matplotlib.lines.Line2D at 0x2b485b013b50>]
 
 
%% Cell type:markdown id: tags:
 
### pre-defined plots
 
%% Cell type:code id: tags:
 
``` python
import hvplot.xarray
 
remote_cat.atmosphere.HadCRUT4.plots
```
 
%% Output
 
 
 
 
['temperature_anomaly_over_time', 'GMSTa']
 
%% Cell type:code id: tags:
 
``` python
remote_cat.atmosphere.HadCRUT4.plot.GMSTa()
```
 
%% Output
 
 
:DynamicMap [longitude,latitude]
:Curve [time] (temperature_anomaly)
 
%% Cell type:code id: tags:
 
``` python
remote_cat.atmosphere.HadCRUT4.plot.temperature_anomaly_over_time().opts(clim=(-5, 5))
```
 
%% Output
 
 
:DynamicMap [time]
:Polygons [longitude,latitude] (temperature_anomaly)
 
%% Cell type:markdown id: tags:
 
### caching possible
 
%% Cell type:code id: tags:
 
``` python
# remote datasets get downloaded/cached if `simplecache::` is added to URL and then opened locally
print(remote_cat.atmosphere.HadCRUT4.urlpath)
remote_cat.atmosphere.HadCRUT4.storage_options
```
 
%% Output
 
simplecache::https://crudata.uea.ac.uk/cru/data/temperature/HadCRUT.4.6.0.0.median.nc
 
{'simplecache': {'cache_storage': 'HadCRUT4', 'same_names': True}}
 
%% Cell type:code id: tags:
 
``` python
!ls HadCRUT4
```
 
%% Output
 
HadCRUT.4.6.0.0.median.nc
 
%% Cell type:markdown id: tags:
 
=> you can share the remote description (catalog) and simple plots via intake-xarray, without sharing the data
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:markdown id: tags:
 
# backup
 
%% Cell type:markdown id: tags:
 
## intake for regionmask
 
%% Cell type:code id: tags:
 
``` python
remote_cat.regionmask.Countries
```
 
%% Output
 
 
%% Cell type:code id: tags:
 
``` python
#country_regions = remote_cat.regionmask.Countries.read()
```
 
%% Cell type:code id: tags:
 
``` python
# workaround / what intake_regionmask in intake_geopandas does
import geopandas as gpd
import regionmask
country_shp = gpd.read_file('zip://Countries/ne_10m_admin_0_countries.zip')
country_regions = regionmask.from_geopandas(country_shp, names='NAME_EN',abbrevs='_from_name')
```
 
%% Cell type:code id: tags:
 
``` python
import matplotlib.pyplot as plt
fig,ax = plt.subplots(figsize=(25,10))
country_regions.plot(label='abbrev',add_land=True, add_ocean=True)
```
 
%% Output
 
<cartopy.mpl.geoaxes.GeoAxesSubplot at 0x2abf6434ffd0>
 
 
%% Cell type:code id: tags:
 
``` python
# regionmask works with the coordinates, and therefore also works with curvilinear grids
mask = country_regions.mask(hadcrut, lon_name='longitude',lat_name='latitude')
mask.plot(cmap='jet')
```
 
%% Output
 
<matplotlib.collections.QuadMesh at 0x2aed71ee1710>
 
 
%% Cell type:code id: tags:
 
``` python
# aggregate mean temperature per country
hadcrut_country = hadcrut.groupby(mask).mean('stacked_latitude_longitude')
```
 
%% Cell type:code id: tags:
 
``` python
def set_regionmask_labels(ds, region):
"""Set names as region label for region dimension from regionmask regions."""
abbrevs = region[ds.region.values].abbrevs
names = region[ds.region.values].names
ds.coords["abbrevs"] = ("region", abbrevs)
ds.coords["number"] = ("region", ds.region.values)
ds["region"] = names
return ds
 
hadcrut_country = set_regionmask_labels(hadcrut_country, country_regions)
hadcrut_country.coords
```
 
%% Output
 
Coordinates:
* time (time) object 1850-01-16 12:00:00 ... 2020-07-16 12:00:00
* region (region) <U32 'Indonesia' 'Malaysia' ... 'Madagascar' 'Japan'
abbrevs (region) <U14 'Ind0' 'Mal0' 'Chi' 'Bol' ... 'NewZea' 'Mad' 'Jap'
number (region) float64 0.0 1.0 2.0 3.0 4.0 ... 174.0 176.0 178.0 186.0
 
%% Cell type:code id: tags:
 
``` python
# plot yearmean until 2019 temperature record per country
regions = ['Germany','United States of America','Niger','India',"People's Republic of China",'Russia','Australia','Egypt']
hadcrut_country.sel(region=regions).sel(time=slice(None,'2019')).groupby('time.year').mean().temperature_anomaly.plot(hue='region',figsize=(15,3))
plt.title('HadCRUT4 temperature anomalies by country')
plt.axhline(y=0,c='gray',ls=':')
plt.axvline(x=1960,c='gray',ls='--')
plt.axvline(x=1989,c='gray',ls='--')
plt.show()
```
 
%% Output
 
/work/mh0727/m300524/miniconda3/envs/pymistral/lib/python3.7/site-packages/dask/array/numpy_compat.py:40: RuntimeWarning: invalid value encountered in true_divide
x = np.divide(x1, x2, out)
 
 
%% Cell type:markdown id: tags:
 
## warming over the last 30 years
 
%% Cell type:code id: tags:
 
``` python
# rename inconsistencies from CMIP6
from cmip6_preprocessing.preprocessing import (
combined_preprocessing,
correct_lon,
)
```
 
%% Cell type:code id: tags:
 
``` python
import xesmf as xe
import xarray as xr
 
deg = 5
 
 
def regrid(ds, deg=deg, **kwargs):
ds_out = xe.util.grid_global(deg, deg)
regridder = xe.Regridder(ds, ds_out, "bilinear", reuse_weights=True, **kwargs)
ds_out = regridder(ds)
return ds_out
 
 
expected_time = xr.cftime_range(start="1850", freq="MS", periods=165 * 12)
da_regrid = []
modellist = []
for i, (key, ds) in enumerate(dset_dict.items()):
if i == 15:
break
da_model = correct_lon(ds)["tas"].squeeze()
model = key.split(".")[2]
print(f"Processing {model} ...")
# data cleaning
if set(spatial_dims) != set(["lon", "lat"]):
print("reset lon lat")
da_model = da_model.rename({"longitude": "lon", "latitude": "lat"})
# often time is differently formatted, therefore overwrite
da_model = da_model.sortby("time")
da_model["time"] = expected_time
# take 30 years
da_model = da_model.sel(time=slice("1985", "2014"))
da_regrid.append(regrid(da_model))
modellist.append(model)
```
 
%% Cell type:code id: tags:
 
``` python
# embarrasingly parallel over dimensions: model, x(lon), y(lat)
# one time dimension chunk because yearmean and trend
da_regrid = xr.concat(da_regrid, dim="model", coords="minimal")
da_regrid["model"] = modellist
da_regrid.data
```
 
%% Cell type:code id: tags:
 
``` python
da_regrid_ym = yearmean(da_regrid)
```
 
%% Cell type:code id: tags:
 
``` python
da_trend = (
da_regrid_ym.polyfit("time", 1)
.sel(degree=1)
.rename({"polyfit_coefficients": "trend"})["trend"]
* da_regrid_ym.time.size
)
da_trend.data
```
 
%% Cell type:code id: tags:
 
``` python
%time da_trend = da_trend.compute()
```
 
%% Cell type:code id: tags:
 
``` python
# fix dropped coords
for c in ["lon", "lat"]:
da_trend[c] = xe.util.grid_global(deg, deg)[c]
```
 
%% Cell type:code id: tags:
 
``` python
# xarray facet grid plot with cartopy
import cartopy.crs as ccrs
fg = da_trend.plot(
col="model",
col_wrap=5,
x="lon",
y="lat",
transform=ccrs.PlateCarree(),
subplot_kws={"projection": ccrs.PlateCarree()},
robust=True,
aspect=2.5,
cbar_kwargs={"label": "Surface Temperature Trend [$^\circ C / 30 yrs$]",},
)
 
# lets add a coastline to each axis
# great reason to use FacetGrid.map
fg.map(lambda: plt.gca().coastlines())
```
 
%% Output
 
<xarray.plot.facetgrid.FacetGrid at 0x2b866bae2410>
 
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:markdown id: tags:
 
## read netcdf without xarray
the old netcdf way 🙄
 
%% Cell type:code id: tags:
 
``` python
from netCDF4 import Dataset
import matplotlib.pyplot as plt
 
%matplotlib inline
```
 
%% Cell type:code id: tags:
 
``` python
ds = Dataset(urlpath)
```
 
%% Cell type:markdown id: tags:
 
[netcdf](https://www.unidata.ucar.edu/software/netcdf/) is a self-describing data format.
 
%% Cell type:code id: tags:
 
``` python
ds
```
 
%% Output
 
<class 'netCDF4._netCDF4.Dataset'>
root group (NETCDF4 data model, file format HDF5):
title: HadCRUT4 near-surface temperature ensemble data - ensemble median
institution: Met Office Hadley Centre / Climatic Research Unit, University of East Anglia
history: Updated at 09/07/2020 14:17:54
source: CRUTEM.4.6.0.0, HadSST.3.1.1.0
comment:
reference: Morice, C. P., J. J. Kennedy, N. A. Rayner, and P. D. Jones (2012), Quantifying uncertainties in global and regional temperature change using an ensemble of observational estimates: The HadCRUT4 dataset, J. Geophys. Res., doi:10.1029/2011JD017187
version: HadCRUT.4.6.0.0
Conventions: CF-1.0
ensemble_members: 100
ensemble_member_index: 0
dimensions(sizes): latitude(36), longitude(72), field_status_string_length(1), time(2045)
variables(dimensions): float32 latitude(latitude), float32 longitude(longitude), float32 time(time), float32 temperature_anomaly(time,latitude,longitude), |S1 field_status(time,field_status_string_length)
groups:
 
%% Cell type:code id: tags:
 
``` python
ds.variables.keys()
```
 
%% Output
 
odict_keys(['latitude', 'longitude', 'time', 'temperature_anomaly', 'field_status'])
 
%% Cell type:code id: tags:
 
``` python
temp = ds.variables["temperature_anomaly"]
```
 
%% Cell type:code id: tags:
 
``` python
temp.shape
```
 
%% Output
 
(2045, 36, 72)
 
%% Cell type:code id: tags:
 
``` python
ds.variables["time"]
```
 
%% Output
 
<class 'netCDF4._netCDF4.Variable'>
float32 time(time)
standard_name: time
long_name: time
units: days since 1850-1-1 00:00:00
calendar: gregorian
start_year: 1850
end_year: 2020
start_month: 1
end_month: 5
axis: T
unlimited dimensions: time
current shape = (2045,)
filling on, default _FillValue of 9.969209968386869e+36 used
 
%% Cell type:code id: tags:
 
``` python
# remember time first axis
last_timestep = temp[-1, :, :]
last_timestep
```
 
%% Output
 
masked_array(
data=[[--, --, --, ..., --, --, --],
[--, --, --, ..., --, --, --],
[--, --, --, ..., 0.9777460694313049, --, --],
...,
[--, --, --, ..., --, --, --],
[--, --, --, ..., --, --, --],
[--, --, --, ..., --, --, --]],
mask=[[ True, True, True, ..., True, True, True],
[ True, True, True, ..., True, True, True],
[ True, True, True, ..., False, True, True],
...,
[ True, True, True, ..., True, True, True],
[ True, True, True, ..., True, True, True],
[ True, True, True, ..., True, True, True]],
fill_value=-1e+30,
dtype=float32)
 
%% Cell type:code id: tags:
 
``` python
type(last_timestep)
```
 
%% Output
 
numpy.ma.core.MaskedArray
 
%% Cell type:code id: tags:
 
``` python
plt.imshow(last_timestep)
```
 
%% Output
 
<matplotlib.image.AxesImage at 0x2b2f50a8a510>
 
 
%% Cell type:markdown id: tags:
 
- only values shown
- no reference about
- variable name
- scale
- time
- unit
- How to plot November 1989? 🤔
- How to plot timeseries for Hamburg? 🤔
 
%% Cell type:markdown id: tags:
 
## read netcdf with xarray
 
%% Cell type:code id: tags:
 
``` python
import xarray as xr
ds = xr.open_dataset(urlpath)
ds
```
 
%% Output
 
<xarray.Dataset>
Dimensions: (latitude: 36, longitude: 72, time: 2045)
Coordinates:
* latitude (latitude) float32 -87.5 -82.5 -77.5 ... 77.5 82.5 87.5
* longitude (longitude) float32 -177.5 -172.5 ... 172.5 177.5
* time (time) datetime64[ns] 1850-01-16T12:00:00 ... 2020-0...
Data variables:
temperature_anomaly (time, latitude, longitude) float32 ...
field_status (time) |S1 ...
Attributes:
title: HadCRUT4 near-surface temperature ensemble data -...
institution: Met Office Hadley Centre / Climatic Research Unit...
history: Updated at 09/07/2020 14:17:54
source: CRUTEM.4.6.0.0, HadSST.3.1.1.0
comment:
reference: Morice, C. P., J. J. Kennedy, N. A. Rayner, and P...
version: HadCRUT.4.6.0.0
Conventions: CF-1.0
ensemble_members: 100
ensemble_member_index: 0
 
%% Cell type:code id: tags:
 
``` python
# xr.Dataset useful when working with more than one variable
# one variable usually xr.DataArray like a dict
da = ds["temperature_anomaly"]
# underlying numpy.ndarray
type(da.values)
```
 
%% Output
 
numpy.ndarray
 
%% Cell type:code id: tags:
 
``` python
# xr.DataArray and xr.Dataset dimensions gives axes names
da.dims
```
 
%% Output
 
('time', 'latitude', 'longitude')
 
%% Cell type:code id: tags:
 
``` python
# xr.DataArray and xr.Dataset coordinates describe dimensions
ds.coords
```
 
%% Output
 
Coordinates:
* latitude (latitude) float32 -87.5 -82.5 -77.5 -72.5 ... 77.5 82.5 87.5
* longitude (longitude) float32 -177.5 -172.5 -167.5 ... 167.5 172.5 177.5
* time (time) datetime64[ns] 1850-01-16T12:00:00 ... 2020-05-16T12:00:00
 
%% Cell type:code id: tags:
 
``` python
# remaining xr.DataArray metadata in attrs
da.attrs
```
 
%% Output
 
{'long_name': 'near_surface_temperature_anomaly',
'units': 'K',
'reference_period': array([1961, 1990], dtype=int16)}
 
%% Cell type:code id: tags:
 
``` python
da.attrs["units"]
```
 
%% Output
 
'K'
 
%% Cell type:code id: tags:
 
``` python
```
%% Cell type:code id: tags:
 
``` python
import seaborn as sns
 
sns.set(style="ticks", context="talk")
```
 
%% Cell type:code id: tags:
 
``` python
# get formating done automatically according to style `black`
%load_ext lab_black
```
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:markdown id: tags:
 
# assessing `CMIP6` data with `intake`
 
%% Cell type:code id: tags:
 
``` python
# intake facilitates file browsing and access, checkout intake for reading in .csv files
# and plugins for S3, SQL, etc ...
# https://intake.readthedocs.io/en/latest/plugin-directory.html
import intake
import xarray as xr
```
 
%% Cell type:markdown id: tags:
 
`intake-esm` loads a `json` file, in which all concatination rules and the location of the catalog `csv`-file is provided.
 
%% Cell type:code id: tags:
 
``` python
# json file contains the rules for concat and merge as well as the location for the catalog .csv file
# Fabian Wachsmann creates a new catalog of CMIP6 data downloaded to /work/ik1017/ daily
col_url = "/work/ik1017/Catalogs/mistral-cmip6.json"
col_url = "/work/ik1017/Catalogs/mistral-cmip6.json" # mistral
col_url = "/pool/data/Catalogs/dkrz_cmip6_disk.json" # levante
```
 
%% Cell type:code id: tags:
 
``` python
# where to find the catalog_file
!cat /work/ik1017/Catalogs/mistral-cmip6.json | head -12
```
 
%% Output
 
{
"esmcat_version": "0.1.0",
"id": "mistral-cmip6",
"description": "This is an ESM collection for CMIP6 data accessible on the DKRZ's MISTRAL disk storage system in /work/ik1017/CMIP6/data/CMIP6",
"catalog_file": "/mnt/lustre02/work/ik1017/Catalogs/mistral-cmip6.csv.gz",
"attributes": [
{
"column_name": "activity_id",
"vocabulary": "https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_activity_id.json"
},
{
"column_name": "source_id",
 
%% Cell type:code id: tags:
 
``` python
# which dimensions should be aggregated
!cat /work/ik1017/Catalogs/mistral-cmip6.json | tail -23
```
 
%% Output
 
"aggregations": [
{
"type": "union",
"attribute_name": "variable_id"
},
{
"type": "join_existing",
"attribute_name": "time_range",
"options": { "dim": "time", "coords": "minimal", "compat": "override" }
},
{
"type": "join_new",
"attribute_name": "member_id",
"options": { "coords": "minimal", "compat": "override" }
},
{
"type": "join_new",
"attribute_name": "dcpp_init_year",
"options": { "coords": "minimal", "compat": "override" }
}
]
}
}
 
%% Cell type:code id: tags:
 
``` python
# takes a few secs
# loads all file paths of the CMIP6 archive
%time col = intake.open_esm_datastore(col_url)
col
```
 
%% Output
 
CPU times: user 19.7 s, sys: 1.64 s, total: 21.3 s
Wall time: 21.3 s
 
 
%% Cell type:code id: tags:
 
``` python
# col.df is a pandas.DataFrame
col.df.head()
```
 
%% Output
 
activity_id institution_id source_id experiment_id member_id table_id \
0 AerChemMIP BCC BCC-ESM1 hist-piNTCF r1i1p1f1 AERmon
1 AerChemMIP BCC BCC-ESM1 hist-piNTCF r1i1p1f1 AERmon
2 AerChemMIP BCC BCC-ESM1 hist-piNTCF r1i1p1f1 AERmon
3 AerChemMIP BCC BCC-ESM1 hist-piNTCF r1i1p1f1 AERmon
4 AerChemMIP BCC BCC-ESM1 hist-piNTCF r1i1p1f1 AERmon
variable_id grid_label dcpp_init_year version time_range \
0 c2h6 gn NaN v20190718 185001-201412
1 c3h6 gn NaN v20190718 185001-201412
2 c3h8 gn NaN v20190718 185001-201412
3 cdnc gn NaN v20190718 185001-201412
4 ch3coch3 gn NaN v20190718 185001-201412
path
0 /mnt/lustre02/work/ik1017/CMIP6/data/CMIP6/Aer...
1 /mnt/lustre02/work/ik1017/CMIP6/data/CMIP6/Aer...
2 /mnt/lustre02/work/ik1017/CMIP6/data/CMIP6/Aer...
3 /mnt/lustre02/work/ik1017/CMIP6/data/CMIP6/Aer...
4 /mnt/lustre02/work/ik1017/CMIP6/data/CMIP6/Aer...
 
%% Cell type:markdown id: tags:
 
## multi-model analysis
 
must-use for `intake-esm`
 
%% Cell type:code id: tags:
 
``` python
# col.search selects the experiments you specify in `query`
variable = "tas"
query = dict(
experiment_id="historical",
table_id="Amon",
member_id="r1i1p1f1",
variable_id=variable,
# source_id="MPI-ESM1-2-LR", # uncomment to take all models
)
cat = col.search(**query)
 
# some data cleaning needed: wrong times/metadata found
source_ids = list(cat.df.source_id.unique())
for models in ["E3SM-1-1-ECA", "NorCPM1", "FGOALS-g3",'E3SM-1-0']:
source_ids.remove(models)
cat = col.search(**query, source_id=source_ids)
cat
```
 
%% Output
 
 
%% Cell type:code id: tags:
 
``` python
# cdf_kwargs are given to xarray.open_dataset
# cftime is like datetime but extends to all four digit years and many calendar types
%time dset_dict = cat.to_dataset_dict(cdf_kwargs={"chunks": {"time": -1}, "use_cftime": True})
```
 
%% Output
 
--> The keys in the returned dictionary of datasets are constructed as follows:
'activity_id.institution_id.source_id.experiment_id.table_id.grid_label'
 
/work/mh0727/m300524/miniconda3/envs/pymistral/lib/python3.7/site-packages/xarray/conventions.py:498: SerializationWarning: variable 'tas' has multiple fill values {1e+20, 1e+20}, decoding all values to NaN.
decode_timedelta=decode_timedelta,
 
 
CPU times: user 46 s, sys: 24 s, total: 1min 9s
Wall time: 1min 45s
 
%% Cell type:code id: tags:
 
``` python
# return dict because models different spatial dimensions
list(dset_dict)
```
 
%% Output
 
['CMIP.BCC.BCC-ESM1.historical.Amon.gn',
'CMIP.CSIRO.ACCESS-ESM1-5.historical.Amon.gn',
'CMIP.NIMS-KMA.KACE-1-0-G.historical.Amon.gr',
'CMIP.NCC.NorESM2-LM.historical.Amon.gn',
'CMIP.NUIST.NESM3.historical.Amon.gn',
'CMIP.NCAR.CESM2-WACCM-FV2.historical.Amon.gn',
'CMIP.MPI-M.MPI-ESM1-2-HR.historical.Amon.gn',
'CMIP.CAMS.CAMS-CSM1-0.historical.Amon.gn',
'CMIP.INM.INM-CM4-8.historical.Amon.gr1',
'CMIP.E3SM-Project.E3SM-1-1.historical.Amon.gr',
'CMIP.MPI-M.MPI-ESM1-2-LR.historical.Amon.gn',
'CMIP.AS-RCEC.TaiESM1.historical.Amon.gn',
'CMIP.BCC.BCC-CSM2-MR.historical.Amon.gn',
'CMIP.CAS.FGOALS-f3-L.historical.Amon.gr',
'CMIP.MIROC.MIROC6.historical.Amon.gn',
'CMIP.MRI.MRI-ESM2-0.historical.Amon.gn',
'CMIP.IPSL.IPSL-CM6A-LR.historical.Amon.gr',
'CMIP.THU.CIESM.historical.Amon.gr',
'CMIP.NCAR.CESM2-WACCM.historical.Amon.gn',
'CMIP.NASA-GISS.GISS-E2-1-G.historical.Amon.gn',
'CMIP.NCC.NorESM2-MM.historical.Amon.gn',
'CMIP.SNU.SAM0-UNICON.historical.Amon.gn',
'CMIP.KIOST.KIOST-ESM.historical.Amon.gr1',
'CMIP.NCAR.CESM2.historical.Amon.gn',
'CMIP.FIO-QLNM.FIO-ESM-2-0.historical.Amon.gn',
'CMIP.NCAR.CESM2-FV2.historical.Amon.gn',
'CMIP.HAMMOZ-Consortium.MPI-ESM-1-2-HAM.historical.Amon.gn',
'CMIP.CCCma.CanESM5.historical.Amon.gn',
'CMIP.CSIRO-ARCCSS.ACCESS-CM2.historical.Amon.gn',
'CMIP.CMCC.CMCC-CM2-HR4.historical.Amon.gn',
'CMIP.CMCC.CMCC-CM2-SR5.historical.Amon.gn',
'CMIP.INM.INM-CM5-0.historical.Amon.gr1',
'CMIP.CAS.CAS-ESM2-0.historical.Amon.gn',
'CMIP.NASA-GISS.GISS-E2-1-H.historical.Amon.gn',
'CMIP.NOAA-GFDL.GFDL-ESM4.historical.Amon.gr1',
'CMIP.UA.MCM-UA-1-0.historical.Amon.gn',
'CMIP.AWI.AWI-ESM-1-1-LR.historical.Amon.gn',
'CMIP.EC-Earth-Consortium.EC-Earth3-Veg.historical.Amon.gr']
 
%% Cell type:code id: tags:
 
``` python
#dset_dict['CMIP.CAS.CAS-ESM2-0.historical.Amon.gn']#['tas'].mean(['lon','lat']).squeeze().plot()
```
 
%% Cell type:code id: tags:
 
``` python
# ultimate goal to get all data into one object: will ease following computation
da = []
modellist = []
orig_size = 0
 
# resetting time as often differently set
expected_time = xr.cftime_range(start="1850", freq="MS", periods=165 * 12)
 
# extract all items from dset_dict
for key, value in dset_dict.items():
model = key.split(".")[2] # extract model name from key
da_model = value["tas"].squeeze() # extract variable from dataset
# track dataset size processed
orig_size += da_model.nbytes
# spatial mean: gmst
spatial_dims = [d for d in da_model.dims if d not in ["time"]]
da_model_spatial_mean = da_model.mean(spatial_dims).sortby("time")
# often time is differently formatted, therefore overwrite
if da_model_spatial_mean.time.size != expected_time.size:
print(f'omit {model}')
continue
da_model_spatial_mean["time"] = expected_time
# rechunk time, see dashboard task graph
da_model_spatial_mean = da_model_spatial_mean.chunk({"time": -1})
da.append(da_model_spatial_mean)
modellist.append(model)
```
 
%% Cell type:code id: tags:
 
``` python
da = xr.concat(da, dim="model", coords="minimal")
da["model"] = modellist
 
da
```
 
%% Output
 
<xarray.DataArray 'tas' (model: 38, time: 1980)>
dask.array<concatenate, shape=(38, 1980), dtype=float32, chunksize=(1, 1980), chunktype=numpy.ndarray>
Coordinates:
height float64 2.0
member_id <U8 'r1i1p1f1'
* time (time) object 1850-01-01 00:00:00 ... 2014-12-01 00:00:00
* model (model) <U15 'BCC-ESM1' 'ACCESS-ESM1-5' ... 'EC-Earth3-Veg'
 
%% Cell type:code id: tags:
 
``` python
from dask.utils import format_bytes
 
print(f"Collapsing {format_bytes(orig_size)} into {format_bytes(da.nbytes)}")
```
 
%% Output
 
Collapsing 11.25 GB into 300.96 kB
 
%% Cell type:markdown id: tags:
 
### computation on notebook resources
 
%% Cell type:code id: tags:
 
``` python
# one one compute node, shared wont work
# to see dashboard and use distributed scheduler
from dask.distributed import Client
import multiprocessing
 
ncpu = multiprocessing.cpu_count()
threads = 6
nworker = ncpu // threads
print(
f"Number of CPUs: {ncpu}, number of threads: {threads}, number of workers: {nworker}"
)
 
# client also starts dask dashboard
client = Client(
processes=False, threads_per_worker=threads, n_workers=nworker, memory_limit="64GB"
)
client
```
 
%% Output
 
Number of CPUs: 48, number of threads: 6, number of workers: 8
 
/work/mh0727/m300524/miniconda3/envs/pymistral/lib/python3.7/site-packages/distributed/node.py:155: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 43143 instead
http_address["port"], self.http_server.port
 
<Client: 'inproc://10.50.38.203/43833/1' processes=8 threads=48, memory=512.00 GB>
 
%% Cell type:code id: tags:
 
``` python
# trigger execution, remaining coding is data light and better suited for eager data
%time dac = da.compute()
```
 
%% Output
 
CPU times: user 3min 1s, sys: 27.4 s, total: 3min 28s
Wall time: 2min 13s
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:code id: tags:
 
``` python
client.close()
```
 
%% Cell type:markdown id: tags:
 
### computation on many compute nodes
starts SLURM jobs via `dask_jobqueue`
 
%% Cell type:code id: tags:
 
``` python
# request 12 workers
```
 
%% Cell type:code id: tags:
 
``` python
from dask.distributed import Client
 
client = Client("tcp://10.50.34.182:39368")
```
 
%% Cell type:code id: tags:
 
``` python
!squeue -o "%.9i %.50j %.8u %.8T %.11M %.11l %.7D %.S" -u m300524
```
 
%% Output
 
JOBID NAME USER STATE TIME TIME_LIMIT NODES START_TIME
23856334 Jupyter m300524 RUNNING 3:19:02 4:00:00 1 2020-09-23T13:45:46
 
%% Cell type:code id: tags:
 
``` python
client
```
 
%% Output
 
<Client: 'tcp://10.50.34.182:39368' processes=0 threads=0, memory=0 B>
 
%% Cell type:code id: tags:
 
``` python
%time dac = da.compute()
```
 
%% Output
 
CPU times: user 710 ms, sys: 16 ms, total: 726 ms
Wall time: 32.5 s
 
%% Cell type:code id: tags:
 
``` python
# shutdown cluster also
client.close()
```
 
%% Cell type:code id: tags:
 
``` python
# usually I save important intermediate data to to disk (large data preferably in zarr format)
# the dashboard shows that I/O is the bottleneck
```
 
%% Cell type:markdown id: tags:
 
### Multi-model plot
 
%% Cell type:code id: tags:
 
``` python
def yearmean(ds, dim="time"):
return ds.groupby(f"{dim}.year").mean().rename({"year": dim})
```
 
%% Cell type:code id: tags:
 
``` python
# magically get observations, more details later
#remote_cat = intake.open_catalog('https://raw.githubusercontent.com/aaronspring/remote_climate_data/master/master.yaml')
remote_cat = intake.open_catalog('/home/mpim/m300524/remote_climate_data/master.yaml')
hadcrut = remote_cat.atmosphere.HadCRUT4.to_dask()
print(hadcrut.coords)
obs = hadcrut.mean(['longitude','latitude']).temperature_anomaly
```
 
%% Output
 
Coordinates:
* latitude (latitude) float32 -87.5 -82.5 -77.5 -72.5 ... 77.5 82.5 87.5
* longitude (longitude) float32 -177.5 -172.5 -167.5 ... 167.5 172.5 177.5
* time (time) object 1850-01-16 12:00:00 ... 2020-07-16 12:00:00
 
%% Cell type:code id: tags:
 
``` python
# best practice: all data into one xr.object, the do the math afterwards with automatic broadcasting
# add observations to da
obs = obs.expand_dims("model")
obs["model"] = ["HadCRUT4 obs"]
```
 
%% Cell type:code id: tags:
 
``` python
obs["time"] = xr.cftime_range(start="1850", freq="MS", periods=obs.time.size)
obs = obs.sel(time=slice("1850", "2019"))
 
dac = xr.concat([obs, dac], "model")
```
 
%% Cell type:code id: tags:
 
``` python
# anomaly plot
import matplotlib.pyplot as plt
# this is were the science starts
anom = yearmean(dac - dac.sel(time=slice("1960", "1990")).mean("time"))
# and now already ends
 
# plot all models
anom.drop_sel(model="HadCRUT4 obs").plot(hue="model", figsize=(12, 3), alpha=0.3)
# plot model mean in black
anom.drop_sel(model="HadCRUT4 obs").mean("model").plot(c="k")
# plot observations in red
anom.sel(model="HadCRUT4 obs").plot(c="red", lw=2)
plt.title("GMST anomaly since 1960-1990")
plt.xlim([1850, 2110])
plt.show()
```
 
%% Output
 
/work/mh0727/m300524/miniconda3/envs/pymistral/lib/python3.7/site-packages/dask/array/numpy_compat.py:40: RuntimeWarning: invalid value encountered in true_divide
x = np.divide(x1, x2, out)
/work/mh0727/m300524/miniconda3/envs/pymistral/lib/python3.7/site-packages/dask/array/numpy_compat.py:40: RuntimeWarning: invalid value encountered in true_divide
x = np.divide(x1, x2, out)
 
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:markdown id: tags:
 
# observations with `intake-xarray`
 
%% Cell type:markdown id: tags:
 
- avoid copy pasting for accessing data
- distinction between data curator and analyst roles
- access data without learning every backend API
- version control data sources
- cache remote data sources
- see https://intake.readthedocs.io/en/latest/use_cases.html
 
%% Cell type:markdown id: tags:
 
## ICDC catalog
 
%% Cell type:code id: tags:
 
``` python
import intake
icdc_cat = intake.open_catalog('/pool/data/ICDC/ocean/ocean_mistral.yml')
```
 
%% Cell type:code id: tags:
 
``` python
list(icdc_cat)
```
 
%% Output
 
['levitus_tanom',
'levitus_sanom',
'levitus_ohc',
'modis_chlorophyll_sst',
'aviso_ssh',
'hadisst',
'amsre_sst',
'reynolds_sst',
'oscar_surface_current_velocity',
'smos_sss',
'en4',
'bnsc',
'waghc',
'nsbc',
'pathfinder_sst_v5.2',
'jtp_ocean_currents',
'woce_climatology']
 
%% Cell type:code id: tags:
 
``` python
!cat /pool/data/ICDC/ocean/ocean_mistral.yml | head -21
```
 
%% Output
 
plugins:
source:
- module: intake_xarray
sources:
levitus_tanom:
description: Levitus T anom
driver: netcdf
metadata:
fields:
t_an:
label: temperature anomaly
unit: °C
args:
urlpath: /pool/data/ICDC/ocean/levitus/DATA/tanom/yearly/tanom_*.nc
xarray_kwargs:
decode_times: False
concat_dim: time
combine: nested
drop_variables: [lat_bnds,lon_bnds,depth_bnds,climatology_bounds,crs]
levitus_sanom:
 
%% Cell type:code id: tags:
 
``` python
ds = icdc_cat.levitus_tanom.to_dask()
print(ds.coords)
```
 
%% Output
 
Coordinates:
* depth (depth) float64 0.0 10.0 20.0 30.0 ... 1.5e+03 1.75e+03 2e+03
* lat (lat) float32 -89.5 -88.5 -87.5 -86.5 -85.5 ... 86.5 87.5 88.5 89.5
* lon (lon) float32 -179.5 -178.5 -177.5 -176.5 ... 177.5 178.5 179.5
* time (time) float64 6.0 18.0 30.0 42.0 54.0 ... 738.0 750.0 762.0 774.0
 
%% Cell type:code id: tags:
 
``` python
ds.t_an.sel(depth=[0,100,500,2000]).mean(['lon','lat']).plot(hue='depth')
```
 
%% Output
 
/work/mh0727/m300524/miniconda3/envs/pymistral/lib/python3.7/site-packages/dask/array/numpy_compat.py:40: RuntimeWarning: invalid value encountered in true_divide
x = np.divide(x1, x2, out)
 
[<matplotlib.lines.Line2D at 0x2af161727210>,
<matplotlib.lines.Line2D at 0x2af161727650>,
<matplotlib.lines.Line2D at 0x2af16155ab90>,
<matplotlib.lines.Line2D at 0x2af161556ed0>]
 
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:markdown id: tags:
 
## remote
 
%% Cell type:code id: tags:
 
``` python
import intake
# remote_cat = intake.open_catalog('https://raw.githubusercontent.com/aaronspring/remote_climate_data/master/master.yaml')
# no internet connection expect for GPU nodes
remote_cat = intake.open_catalog('/home/mpim/m300524/remote_climate_data/master.yaml')
```
 
%% Cell type:code id: tags:
 
``` python
remote_cat.atmosphere.HadCRUT4
```
 
%% Output
 
 
%% Cell type:code id: tags:
 
``` python
hadcrut = remote_cat.atmosphere.HadCRUT4.to_dask()
print(hadcrut.coords)
gmst = hadcrut.mean(['longitude','latitude']).temperature_anomaly
```
 
%% Output
 
Coordinates:
* latitude (latitude) float32 -87.5 -82.5 -77.5 -72.5 ... 77.5 82.5 87.5
* longitude (longitude) float32 -177.5 -172.5 -167.5 ... 167.5 172.5 177.5
* time (time) object 1850-01-16 12:00:00 ... 2020-07-16 12:00:00
 
%% Cell type:code id: tags:
 
``` python
yearmean(gmst).plot()
```
 
%% Cell type:markdown id: tags:
 
### pre-defined plots
 
%% Cell type:code id: tags:
 
``` python
import hvplot.xarray
remote_cat.atmosphere.HadCRUT4.plots
```
 
%% Output
 
 
 
 
['temperature_anomaly_over_time', 'GMSTa']
 
%% Cell type:code id: tags:
 
``` python
remote_cat.atmosphere.HadCRUT4.plot.GMSTa()
```
 
%% Output
 
 
:DynamicMap [longitude,latitude]
:Curve [time] (temperature_anomaly)
 
%% Cell type:code id: tags:
 
``` python
remote_cat.atmosphere.HadCRUT4.plot.temperature_anomaly_over_time().opts(clim=(-5,5))
```
 
%% Output
 
 
:DynamicMap [time]
:Polygons [longitude,latitude] (temperature_anomaly)
 
%% Cell type:markdown id: tags:
 
### caching possible
 
%% Cell type:code id: tags:
 
``` python
# remote datasets get downloaded/cached if `simplecache::` is added to URL and then opened locally
print(remote_cat.atmosphere.HadCRUT4.urlpath)
remote_cat.atmosphere.HadCRUT4.storage_options
```
 
%% Output
 
simplecache::https://crudata.uea.ac.uk/cru/data/temperature/HadCRUT.4.6.0.0.median.nc
 
{'simplecache': {'cache_storage': 'HadCRUT4', 'same_names': True}}
 
%% Cell type:code id: tags:
 
``` python
!ls HadCRUT4
```
 
%% Output
 
HadCRUT.4.6.0.0.median.nc
 
%% Cell type:markdown id: tags:
 
=> you can share the remote description (catalog) and simple plots via intake-xarray, without sharing the data
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:markdown id: tags:
 
# backup
 
%% Cell type:markdown id: tags:
 
## intake for regionmask
 
%% Cell type:code id: tags:
 
``` python
remote_cat.regionmask.Countries
```
 
%% Output
 
 
%% Cell type:code id: tags:
 
``` python
#country_regions = remote_cat.regionmask.Countries.read()
```
 
%% Cell type:code id: tags:
 
``` python
# workaround / what intake_regionmask in intake_geopandas does
import geopandas as gpd
import regionmask
country_shp = gpd.read_file('zip://Countries/ne_10m_admin_0_countries.zip')
country_regions = regionmask.from_geopandas(country_shp, names='NAME_EN',abbrevs='_from_name')
```
 
%% Cell type:code id: tags:
 
``` python
import matplotlib.pyplot as plt
fig,ax = plt.subplots(figsize=(25,10))
country_regions.plot(label='abbrev',add_land=True, add_ocean=True)
```
 
%% Output
 
<cartopy.mpl.geoaxes.GeoAxesSubplot at 0x2abf6434ffd0>
 
 
%% Cell type:code id: tags:
 
``` python
# regionmask works with the coordinates, and therefore also works with curvilinear grids
mask = country_regions.mask(hadcrut, lon_name='longitude',lat_name='latitude')
mask.plot(cmap='jet')
```
 
%% Output
 
<matplotlib.collections.QuadMesh at 0x2aed71ee1710>
 
 
%% Cell type:code id: tags:
 
``` python
# aggregate mean temperature per country
hadcrut_country = hadcrut.groupby(mask).mean('stacked_latitude_longitude')
```
 
%% Cell type:code id: tags:
 
``` python
def set_regionmask_labels(ds, region):
"""Set names as region label for region dimension from regionmask regions."""
abbrevs = region[ds.region.values].abbrevs
names = region[ds.region.values].names
ds.coords["abbrevs"] = ("region", abbrevs)
ds.coords["number"] = ("region", ds.region.values)
ds["region"] = names
return ds
 
hadcrut_country = set_regionmask_labels(hadcrut_country, country_regions)
hadcrut_country.coords
```
 
%% Output
 
Coordinates:
* time (time) object 1850-01-16 12:00:00 ... 2020-07-16 12:00:00
* region (region) <U32 'Indonesia' 'Malaysia' ... 'Madagascar' 'Japan'
abbrevs (region) <U14 'Ind0' 'Mal0' 'Chi' 'Bol' ... 'NewZea' 'Mad' 'Jap'
number (region) float64 0.0 1.0 2.0 3.0 4.0 ... 174.0 176.0 178.0 186.0
 
%% Cell type:code id: tags:
 
``` python
# plot yearmean until 2019 temperature record per country
regions = ['Germany','United States of America','Niger','India',"People's Republic of China",'Russia','Australia','Egypt']
hadcrut_country.sel(region=regions).sel(time=slice(None,'2019')).groupby('time.year').mean().temperature_anomaly.plot(hue='region',figsize=(15,3))
plt.title('HadCRUT4 temperature anomalies by country')
plt.axhline(y=0,c='gray',ls=':')
plt.axvline(x=1960,c='gray',ls='--')
plt.axvline(x=1989,c='gray',ls='--')
plt.show()
```
 
%% Output
 
/work/mh0727/m300524/miniconda3/envs/pymistral/lib/python3.7/site-packages/dask/array/numpy_compat.py:40: RuntimeWarning: invalid value encountered in true_divide
x = np.divide(x1, x2, out)
 
 
%% Cell type:markdown id: tags:
 
## warming over the last 30 years
 
%% Cell type:code id: tags:
 
``` python
# rename inconsistencies from CMIP6
from cmip6_preprocessing.preprocessing import (
combined_preprocessing,
correct_lon,
)
```
 
%% Cell type:code id: tags:
 
``` python
import xesmf as xe
import xarray as xr
 
deg = 5
 
 
def regrid(ds, deg=deg, **kwargs):
ds_out = xe.util.grid_global(deg, deg)
regridder = xe.Regridder(ds, ds_out, "bilinear", reuse_weights=True, **kwargs)
ds_out = regridder(ds)
return ds_out
 
 
expected_time = xr.cftime_range(start="1850", freq="MS", periods=165 * 12)
da_regrid = []
modellist = []
for i, (key, ds) in enumerate(dset_dict.items()):
if i == 15:
break
da_model = correct_lon(ds)["tas"].squeeze()
model = key.split(".")[2]
print(f"Processing {model} ...")
# data cleaning
if set(spatial_dims) != set(["lon", "lat"]):
print("reset lon lat")
da_model = da_model.rename({"longitude": "lon", "latitude": "lat"})
# often time is differently formatted, therefore overwrite
da_model = da_model.sortby("time")
da_model["time"] = expected_time
# take 30 years
da_model = da_model.sel(time=slice("1985", "2014"))
da_regrid.append(regrid(da_model))
modellist.append(model)
```
 
%% Cell type:code id: tags:
 
``` python
# embarrasingly parallel over dimensions: model, x(lon), y(lat)
# one time dimension chunk because yearmean and trend
da_regrid = xr.concat(da_regrid, dim="model", coords="minimal")
da_regrid["model"] = modellist
da_regrid.data
```
 
%% Cell type:code id: tags:
 
``` python
da_regrid_ym = yearmean(da_regrid)
```
 
%% Cell type:code id: tags:
 
``` python
da_trend = (
da_regrid_ym.polyfit("time", 1)
.sel(degree=1)
.rename({"polyfit_coefficients": "trend"})["trend"]
* da_regrid_ym.time.size
)
da_trend.data
```
 
%% Cell type:code id: tags:
 
``` python
%time da_trend = da_trend.compute()
```
 
%% Cell type:code id: tags:
 
``` python
# fix dropped coords
for c in ["lon", "lat"]:
da_trend[c] = xe.util.grid_global(deg, deg)[c]
```
 
%% Cell type:code id: tags:
 
``` python
# xarray facet grid plot with cartopy
import cartopy.crs as ccrs
fg = da_trend.plot(
col="model",
col_wrap=5,
x="lon",
y="lat",
transform=ccrs.PlateCarree(),
subplot_kws={"projection": ccrs.PlateCarree()},
robust=True,
aspect=2.5,
cbar_kwargs={"label": "Surface Temperature Trend [$^\circ C / 30 yrs$]",},
)
 
# lets add a coastline to each axis
# great reason to use FacetGrid.map
fg.map(lambda: plt.gca().coastlines())
```
 
%% Output
 
<xarray.plot.facetgrid.FacetGrid at 0x2b866bae2410>
 
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:code id: tags:
 
``` python
```
 
%% Cell type:markdown id: tags:
 
## read netcdf without xarray
the old netcdf way 🙄
 
%% Cell type:code id: tags:
 
``` python
from netCDF4 import Dataset
import matplotlib.pyplot as plt
 
%matplotlib inline
```
 
%% Cell type:code id: tags:
 
``` python
ds = Dataset(urlpath)
```
 
%% Cell type:markdown id: tags:
 
[netcdf](https://www.unidata.ucar.edu/software/netcdf/) is a self-describing data format.
 
%% Cell type:code id: tags:
 
``` python
ds
```
 
%% Output
 
<class 'netCDF4._netCDF4.Dataset'>
root group (NETCDF4 data model, file format HDF5):
title: HadCRUT4 near-surface temperature ensemble data - ensemble median
institution: Met Office Hadley Centre / Climatic Research Unit, University of East Anglia
history: Updated at 09/07/2020 14:17:54
source: CRUTEM.4.6.0.0, HadSST.3.1.1.0
comment:
reference: Morice, C. P., J. J. Kennedy, N. A. Rayner, and P. D. Jones (2012), Quantifying uncertainties in global and regional temperature change using an ensemble of observational estimates: The HadCRUT4 dataset, J. Geophys. Res., doi:10.1029/2011JD017187
version: HadCRUT.4.6.0.0
Conventions: CF-1.0
ensemble_members: 100
ensemble_member_index: 0
dimensions(sizes): latitude(36), longitude(72), field_status_string_length(1), time(2045)
variables(dimensions): float32 latitude(latitude), float32 longitude(longitude), float32 time(time), float32 temperature_anomaly(time,latitude,longitude), |S1 field_status(time,field_status_string_length)
groups:
 
%% Cell type:code id: tags:
 
``` python
ds.variables.keys()
```
 
%% Output
 
odict_keys(['latitude', 'longitude', 'time', 'temperature_anomaly', 'field_status'])
 
%% Cell type:code id: tags:
 
``` python
temp = ds.variables["temperature_anomaly"]
```
 
%% Cell type:code id: tags:
 
``` python
temp.shape
```
 
%% Output
 
(2045, 36, 72)
 
%% Cell type:code id: tags:
 
``` python
ds.variables["time"]
```
 
%% Output
 
<class 'netCDF4._netCDF4.Variable'>
float32 time(time)
standard_name: time
long_name: time
units: days since 1850-1-1 00:00:00
calendar: gregorian
start_year: 1850
end_year: 2020
start_month: 1
end_month: 5
axis: T
unlimited dimensions: time
current shape = (2045,)
filling on, default _FillValue of 9.969209968386869e+36 used
 
%% Cell type:code id: tags:
 
``` python
# remember time first axis
last_timestep = temp[-1, :, :]
last_timestep
```
 
%% Output
 
masked_array(
data=[[--, --, --, ..., --, --, --],
[--, --, --, ..., --, --, --],
[--, --, --, ..., 0.9777460694313049, --, --],
...,
[--, --, --, ..., --, --, --],
[--, --, --, ..., --, --, --],
[--, --, --, ..., --, --, --]],
mask=[[ True, True, True, ..., True, True, True],
[ True, True, True, ..., True, True, True],
[ True, True, True, ..., False, True, True],
...,
[ True, True, True, ..., True, True, True],
[ True, True, True, ..., True, True, True],
[ True, True, True, ..., True, True, True]],
fill_value=-1e+30,
dtype=float32)
 
%% Cell type:code id: tags:
 
``` python
type(last_timestep)
```
 
%% Output
 
numpy.ma.core.MaskedArray
 
%% Cell type:code id: tags:
 
``` python
plt.imshow(last_timestep)
```
 
%% Output
 
<matplotlib.image.AxesImage at 0x2b2f50a8a510>
 
 
%% Cell type:markdown id: tags:
 
- only values shown
- no reference about
- variable name
- scale
- time
- unit
- How to plot November 1989? 🤔
- How to plot timeseries for Hamburg? 🤔
 
%% Cell type:markdown id: tags:
 
## read netcdf with xarray
 
%% Cell type:code id: tags:
 
``` python
import xarray as xr
ds = xr.open_dataset(urlpath)
ds
```
 
%% Output
 
<xarray.Dataset>
Dimensions: (latitude: 36, longitude: 72, time: 2045)
Coordinates:
* latitude (latitude) float32 -87.5 -82.5 -77.5 ... 77.5 82.5 87.5
* longitude (longitude) float32 -177.5 -172.5 ... 172.5 177.5
* time (time) datetime64[ns] 1850-01-16T12:00:00 ... 2020-0...
Data variables:
temperature_anomaly (time, latitude, longitude) float32 ...
field_status (time) |S1 ...
Attributes:
title: HadCRUT4 near-surface temperature ensemble data -...
institution: Met Office Hadley Centre / Climatic Research Unit...
history: Updated at 09/07/2020 14:17:54
source: CRUTEM.4.6.0.0, HadSST.3.1.1.0
comment:
reference: Morice, C. P., J. J. Kennedy, N. A. Rayner, and P...
version: HadCRUT.4.6.0.0
Conventions: CF-1.0
ensemble_members: 100
ensemble_member_index: 0
 
%% Cell type:code id: tags:
 
``` python
# xr.Dataset useful when working with more than one variable
# one variable usually xr.DataArray like a dict
da = ds["temperature_anomaly"]
# underlying numpy.ndarray
type(da.values)
```
 
%% Output
 
numpy.ndarray
 
%% Cell type:code id: tags:
 
``` python
# xr.DataArray and xr.Dataset dimensions gives axes names
da.dims
```
 
%% Output
 
('time', 'latitude', 'longitude')
 
%% Cell type:code id: tags:
 
``` python
# xr.DataArray and xr.Dataset coordinates describe dimensions
ds.coords
```
 
%% Output
 
Coordinates:
* latitude (latitude) float32 -87.5 -82.5 -77.5 -72.5 ... 77.5 82.5 87.5
* longitude (longitude) float32 -177.5 -172.5 -167.5 ... 167.5 172.5 177.5
* time (time) datetime64[ns] 1850-01-16T12:00:00 ... 2020-05-16T12:00:00
 
%% Cell type:code id: tags:
 
``` python
# remaining xr.DataArray metadata in attrs
da.attrs
```
 
%% Output
 
{'long_name': 'near_surface_temperature_anomaly',
'units': 'K',
'reference_period': array([1961, 1990], dtype=int16)}
 
%% Cell type:code id: tags:
 
``` python
da.attrs["units"]
```
 
%% Output
 
'K'
 
%% Cell type:code id: tags:
 
``` python
```