Commit d4e67769 authored by Fabian Wachsmann's avatar Fabian Wachsmann
Browse files

Updated intake3

parent 35ed8b7c
......@@ -100,7 +100,9 @@
"source": [
"import intake\n",
"import pandas as pd\n",
"dkrz_catalog=intake.open_catalog([\"https://dkrz.de/s/intake\"])"
"#dkrz_catalog=intake.open_catalog([\"https://dkrz.de/s/intake\"])\n",
"#only for generating the web page we need to take the original link:\n",
"dkrz_catalog=intake.open_catalog([\"https://gitlab.dkrz.de/data-infrastructure-services/intake-esm/-/raw/master/esm-collections/cloud-access/dkrz_catalog.yaml\"])"
]
},
{
......@@ -155,9 +157,9 @@
"metadata": {},
"outputs": [],
"source": [
"cordex_columns=dkrz_catalog.metadata[\"parameters\"][\"additional_cordex_columns\"][\"default\"]\n",
"cordex_columns=dkrz_catalog._entries[\"dkrz_cordex_disk\"]._open_args[\"csv_kwargs\"][\"usecols\"]\n",
"print(cordex_columns)\n",
"cmip6_columns=['activity_id', 'source_id', 'member_id', 'table_id']#dkrz_catalog.metadata[\"parameters\"][\"additional_cmip6_columns\"][\"default\"]\n",
"cmip6_columns=dkrz_catalog._entries[\"dkrz_cmip6_disk\"]._open_args[\"csv_kwargs\"][\"usecols\"]\n",
"print(cmip6_columns)"
]
},
......@@ -174,7 +176,7 @@
"metadata": {},
"outputs": [],
"source": [
"cmip6_cat=dkrz_catalog.dkrz_cmip6_disk(csv_kwargs=dict(usecols=overall_columns+cmip6_columns))"
"cmip6_cat=dkrz_catalog.dkrz_cmip6_disk(csv_kwargs=dict(usecols=cmip6_columns+overall_columns))"
]
},
{
......@@ -183,7 +185,7 @@
"metadata": {},
"outputs": [],
"source": [
"cordex_cat=dkrz_catalog.dkrz_cordex_disk(csv_kwargs=dict(usecols=overall_columns+cordex_columns))"
"cordex_cat=dkrz_catalog.dkrz_cordex_disk(csv_kwargs=dict(usecols=cordex_columns+overall_columns))"
]
},
{
......@@ -220,9 +222,9 @@
"metadata": {},
"outputs": [],
"source": [
"for cordex_col in cordex_columns:\n",
"for cordex_col in list(set(cordex_columns)-set(overall_columns)):\n",
" cmip6_cat._df.loc[:,cordex_col]=\"None\"\n",
"for cmip6_col in cmip6_columns:\n",
"for cmip6_col in list(set(cmip6_columns)-set(overall_columns)):\n",
" cordex_cat._df.loc[:,cmip6_col]=\"None\""
]
},
......@@ -232,7 +234,18 @@
"metadata": {},
"outputs": [],
"source": [
"overall_df=pd.merge(cmip6_cat._df, cordex_cat._df, on=overall_columns+cmip6_columns+cordex_columns, how='outer')"
"for column in overall_columns+cmip6_columns+cordex_columns :\n",
" cmip6_cat._df[column]=cmip6_cat._df[column].astype(str)\n",
" cordex_cat._df[column]=cordex_cat._df[column].astype(str)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"overall_df=pd.merge(cmip6_cat._df, cordex_cat._df, on=overall_columns+cmip6_columns+cordex_columns, how=\"outer\")"
]
},
{
......@@ -297,7 +310,7 @@
"\n",
"1. Required to be manually changed:\n",
"- **groupby_attrs**: We will change it such that both CMIP6 and CORDEX datasets can be created.\n",
"- **attributes**: The CORDEX ones need to be added\n",
"- **attributes** and **default_columns**: The CORDEX ones need to be added\n",
"- **aggregation_control**: Must be revised but we can do that afterwards. For now, we will just delete all entries but the one for `variable_id`\n",
"\n",
"2. Other attributes\n",
......
%% Cell type:markdown id: tags:
# Intake III - work with two catalogs and merge them
%% Cell type:markdown id: tags:
```{admonition} Overview
:class: dropdown
![Level](https://img.shields.io/badge/Level-Intermediate-orange.svg)
🎯 **objectives**: Learn how to integrate `intake-esm` in your workflow
⌛ **time_estimation**: "60min"
☑️ **requirements**:
- 20GB memory
- intake I
- [pandas](https://pandas.pydata.org/)
- [xarray](http://xarray.pydata.org/en/stable/)
© **contributors**: k204210
⚖ **license**:
```
%% Cell type:markdown id: tags:
```{admonition} Agenda
:class: tip full-width
Based on DKRZ's CMIP6 and CORDEX catalogs and a Pangeo's CMIP6 catalog, you learn in this part,
> how to **merge catalogs** by combining their data bases with *differently formatted assets*.
This includes
1. [Loading catalogs with a user-defined set of attributes](#load)
1. [Comparing meta data for checking for compatibility](#compare)
1. Merging the data bases via [merge](#merge) or [concat](#concat)
1. [Configure a catalog description for accessing datasets across projects](#across)
1. [Make ALL data accessible and consolidate aggregation](#access)
1. [Save the new catalog](#save)
```
%% Cell type:markdown id: tags:
```{admonition} Questions
:class: questions
- how can you find out how **compatible** the catalogs are? Would you have to sanitize the column names?
- what is overlap? Which of the 1000 datasets of pange are included in DKRZ's?
```
%% Cell type:markdown id: tags:
This tutorial highlights two use cases:
1. Merging two projects (CMIP6 and CORDEX, both from DKRZ)
1. Merging two data bases for the same project (CMIP6 DKRZ and CMIP6 Pangeo)
For each, the ultimate **Goal** of this tutorial is to create a merged catalog which also enables data access to data sets of both catalogs.
%% Cell type:markdown id: tags:
## Case 1: Merge two projects CMIP6 and CORDEX in one catalog
%% Cell type:code id: tags:
``` python
import intake
import pandas as pd
dkrz_catalog=intake.open_catalog(["https://dkrz.de/s/intake"])
#dkrz_catalog=intake.open_catalog(["https://dkrz.de/s/intake"])
#only for generating the web page we need to take the original link:
dkrz_catalog=intake.open_catalog(["https://gitlab.dkrz.de/data-infrastructure-services/intake-esm/-/raw/master/esm-collections/cloud-access/dkrz_catalog.yaml"])
```
%% Cell type:code id: tags:
``` python
print([entry for entry in list(dkrz_catalog) if "disk" in entry and ("cordex" in entry or "cmip6" in entry)])
```
%% Cell type:markdown id: tags:
<a class="anchor" id="load"></a>
### Load catalogs with default + common columns
Most of all DKRZ catalogs include [cataloonies](https://tutorials.dkrz.de/tutorial_intake-1-2-dkrz-catalogs.html) attributes. This simplifies the merging as you could already merge the catalogs over these columns. Usable columns of the catalogs are stored in the main catalog's metadata and can be displayed and retrieved:
%% Cell type:code id: tags:
``` python
dkrz_catalog.metadata
```
%% Cell type:code id: tags:
``` python
overall_columns=dkrz_catalog.metadata["parameters"]["cataloonie_columns"]["default"]
print(overall_columns)
```
%% Cell type:markdown id: tags:
However, these attributes are not sufficient for finding an individual assets in CORDEX and CMIP6. We need *additional* columns:
%% Cell type:code id: tags:
``` python
cordex_columns=dkrz_catalog.metadata["parameters"]["additional_cordex_columns"]["default"]
cordex_columns=dkrz_catalog._entries["dkrz_cordex_disk"]._open_args["csv_kwargs"]["usecols"]
print(cordex_columns)
cmip6_columns=['activity_id', 'source_id', 'member_id', 'table_id']#dkrz_catalog.metadata["parameters"]["additional_cmip6_columns"]["default"]
cmip6_columns=dkrz_catalog._entries["dkrz_cmip6_disk"]._open_args["csv_kwargs"]["usecols"]
print(cmip6_columns)
```
%% Cell type:markdown id: tags:
We open both catalogs with the columns that we have found:
%% Cell type:code id: tags:
``` python
cmip6_cat=dkrz_catalog.dkrz_cmip6_disk(csv_kwargs=dict(usecols=overall_columns+cmip6_columns))
cmip6_cat=dkrz_catalog.dkrz_cmip6_disk(csv_kwargs=dict(usecols=cmip6_columns+overall_columns))
```
%% Cell type:code id: tags:
``` python
cordex_cat=dkrz_catalog.dkrz_cordex_disk(csv_kwargs=dict(usecols=overall_columns+cordex_columns))
cordex_cat=dkrz_catalog.dkrz_cordex_disk(csv_kwargs=dict(usecols=cordex_columns+overall_columns))
```
%% Cell type:markdown id: tags:
We assume that we are interested in the variable **tas** which is the *Near-Surface Temperature*:
%% Cell type:code id: tags:
``` python
cmip6_cat=cmip6_cat.search(variable_id="tas")
cordex_cat=cordex_cat.search(variable_id="tas")
```
%% Cell type:markdown id: tags:
<a class="anchor" id="merge"></a>
### Merge both catalogs
The underlying *DataFrames* have different columns. We add CMIP6 columns to the CORDEX catalog and vice versa so that we can merge:
%% Cell type:code id: tags:
``` python
for cordex_col in cordex_columns:
for cordex_col in list(set(cordex_columns)-set(overall_columns)):
cmip6_cat._df.loc[:,cordex_col]="None"
for cmip6_col in cmip6_columns:
for cmip6_col in list(set(cmip6_columns)-set(overall_columns)):
cordex_cat._df.loc[:,cmip6_col]="None"
```
%% Cell type:code id: tags:
``` python
overall_df=pd.merge(cmip6_cat._df, cordex_cat._df, on=overall_columns+cmip6_columns+cordex_columns, how='outer')
for column in overall_columns+cmip6_columns+cordex_columns :
cmip6_cat._df[column]=cmip6_cat._df[column].astype(str)
cordex_cat._df[column]=cordex_cat._df[column].astype(str)
```
%% Cell type:code id: tags:
``` python
overall_df=pd.merge(cmip6_cat._df, cordex_cat._df, on=overall_columns+cmip6_columns+cordex_columns, how="outer")
```
%% Cell type:code id: tags:
``` python
overall_df
```
%% Cell type:markdown id: tags:
We use the cmip6 catalog as the merged catalog and reset the dataframe underneath via:
%% Cell type:code id: tags:
``` python
cmip6_cat._df=overall_df
```
%% Cell type:markdown id: tags:
<a class="anchor" id="across"></a>
### Redefine catalog description
We copy the entire `.json` description file so that we can edit it.
%% Cell type:code id: tags:
``` python
mixed_esmcol_data=cmip6_cat.esmcol_data.copy()
```
%% Cell type:code id: tags:
``` python
mixed_esmcol_data
```
%% Cell type:markdown id: tags:
Let's have a look at these entries. We can subdivide these into two groups:
1. Required to be manually changed:
- **groupby_attrs**: We will change it such that both CMIP6 and CORDEX datasets can be created.
- **attributes**: The CORDEX ones need to be added
- **attributes** and **default_columns**: The CORDEX ones need to be added
- **aggregation_control**: Must be revised but we can do that afterwards. For now, we will just delete all entries but the one for `variable_id`
2. Other attributes
- **assets**: Will stay the same as there is no difference between the original catalogs
- **catalog_file**: Will be automatically overwritten by Intake when the final catalog is written.
- **Description**, **esmcat_version**, **id**: Is arbitrary
We will start with adding missing **attributes**:
%% Cell type:code id: tags:
``` python
columns_already=[k["column_name"] for k in mixed_esmcol_data["attributes"]]
```
%% Cell type:code id: tags:
``` python
columns_already
```
%% Cell type:code id: tags:
``` python
for k in cordex_cat.esmcol_data["attributes"] :
if k["column_name"] not in columns_already:
mixed_esmcol_data["attributes"].append(k)
```
%% Cell type:markdown id: tags:
**groupby_attrs**:
he attributes used to build an index for a dataset is defined by the order of attributes in the list **groupby_attrs**. The aggregation methods for CMIP6 datasets and CORDEX datasets *differ*.
We have to redefine this list. Think about the perfect order and arrangement of attributes.
%% Cell type:code id: tags:
``` python
mixed_esmcol_data["aggregation_control"]["groupby_attrs"]=[
"CORDEX_domain",
"driving_model_id",
"activity_id",
"institute_id",
"model_id",
"experiment_id",
"frequency",
"table_id",
"grid_label"
]
```
%% Cell type:markdown id: tags:
**aggregation_control**
For now, drop all the aggregation attributes besides `variable_id` for enabling a quick save of the catalog. Note that the grouping only works if there is at least one entry in the `mixed_esmcol_data["aggregation_control"]["aggregations"]` list.
%% Cell type:code id: tags:
``` python
for entry in mixed_esmcol_data["aggregation_control"]["aggregations"]:
if entry["attribute_name"] != "variable_id" :
mixed_esmcol_data["aggregation_control"]["aggregations"].remove(entry)
```
%% Cell type:code id: tags:
``` python
mixed_esmcol_data
```
%% Cell type:markdown id: tags:
Let's set the redefined *dictionary* as the the `esmcol_data`:
%% Cell type:code id: tags:
``` python
cmip6_cat.esmcol_data=mixed_esmcol_data
```
%% Cell type:markdown id: tags:
We write the new catalog to disk via:
%% Cell type:code id: tags:
``` python
cmip6_cat.serialize("test", catalog_type="file")
```
%% Cell type:markdown id: tags:
We can test if our configuration works by directly opening it:
%% Cell type:code id: tags:
``` python
intake.open_esm_datastore("test.json").search(experiment_id="historical",
source_id="MPI*",
simulation_id="r1i1*")#.to_dataset_dict(cdf_kwargs=dict(chunks=dict(time=1)))
```
%% Cell type:markdown id: tags:
## Case 2: Merge two data bases for CMIP6
Assume you are interested in variable `tas` from table `Amon` from both catalogs.
You would start look like this:
%% Cell type:code id: tags:
``` python
esm_dkrz=dkrz_catalog.dkrz_cmip6_disk
pangeo=intake.open_catalog("https://raw.githubusercontent.com/pangeo-data/pangeo-datastore/master/intake-catalogs/master.yaml")
#
esm_pangeo=pangeo.climate.cmip6_gcs
esm_dkrz_tas=esm_dkrz.search(
variable_id="tas",
table_id="Amon"
)
esm_pangeo_tas=esm_pangeo.search(
variable_id="tas",
table_id="Amon"
)
```
%% Cell type:code id: tags:
``` python
print(esm_dkrz_tas)
```
%% Cell type:code id: tags:
``` python
print(esm_pangeo_tas)
```
%% Cell type:markdown id: tags:
<a class="anchor" id="compare"></a>
Let's
### Compare the Metadata
1. Both catalogs follow the [esmcat-specs](https://github.com/NCAR/esm-collection-spec/blob/master/collection-spec/collection-spec.md) which can be seen from the following entry:
%% Cell type:code id: tags:
``` python
print(esm_dkrz_tas.esmcol_data["esmcat_version"])
print(esm_pangeo_tas.esmcol_data["esmcat_version"])
```
%% Cell type:markdown id: tags:
2. As both catalogs follow the esmcat standard, they have a list of `attributes` which we can compare: Indeed, they have exactly the same attributes/columns. In the following, we use pandas DataFrames for better displays:
%% Cell type:code id: tags:
``` python
import pandas as pd
esm_dkrz_atts_df=pd.DataFrame(esm_dkrz_tas.esmcol_data["attributes"])
esm_pangeo_atts_df=pd.DataFrame(esm_pangeo_tas.esmcol_data["attributes"])
esm_dkrz_atts_df
```
%% Cell type:code id: tags:
``` python
esm_pangeo_atts_df
```
%% Cell type:code id: tags:
``` python
esm_pangeo_atts_df.equals(esm_dkrz_atts_df)
```
%% Cell type:markdown id: tags:
When working with both catalogs, you would notice that the pangeo's do not use a prefix character 'v' for the values of *version* however dkrz does. We fix that with:
%% Cell type:code id: tags:
``` python
esm_pangeo_tas.df["version"]= "v" + esm_pangeo_tas.df["version"].astype(str)
```
%% Cell type:markdown id: tags:
3. The data format: The pangeo catalog contains zarr datasets stored in the google cloud storage while dkrz's catalog allows different formats by providing a column named *format*. When we combine these catalogs, we have to consider the different formats
%% Cell type:code id: tags:
``` python
print(esm_dkrz_tas.data_format,esm_dkrz_tas.format_column_name)
```
%% Cell type:code id: tags:
``` python
print(esm_pangeo_tas.data_format,esm_pangeo_tas.format_column_name)
```
%% Cell type:markdown id: tags:
<a class="anchor" id="concat"></a>
### Combine the databases with the underlying DataFrames
This is a workflow for creating a merged data base:
1. Find all common column names/keys that are in both data bases.
1. Create a filtered Catalog
1. Setting common columns as index in both catalogs
1. Throw out indices in one catalog that are in both.
1. Concat the filtered catalog with the reference catalog.
Let us start with 1.:
%% Cell type:code id: tags:
``` python
keys = [key
for key in esm_dkrz_tas.df.columns.values
if key in esm_pangeo_tas.df.columns.values
]
keys
```
%% Cell type:markdown id: tags:
We continue with 2.:
> 2. Create a filtered Catalog
We create a multi-index with all common keys with `set_index` and save these in new variables `i1` and `i2`. These can be used as a filter. The `~` sign reverses the condition in the filter:
%% Cell type:code id: tags:
``` python
i1 = esm_pangeo_tas.df.set_index(keys).index
i2 = esm_dkrz_tas.df.set_index(keys).index
esm_pangeo_tas_filtered=esm_pangeo_tas.df[~i1.isin(i2)]
```
%% Cell type:markdown id: tags:
And finally, 3.
> 3. Concat the filtered catalog with the reference catalog.
We use pandas `concat` function and *ignore the indices* of both catalogs.
%% Cell type:code id: tags:
``` python
esm_merged=pd.concat([esm_dkrz_tas.df, esm_pangeo_tas_filtered],
ignore_index=True)
esm_merged
```
%% Cell type:markdown id: tags:
<a class="anchor" id="access"></a>
### Make ALL data accessible and consolidate aggregation
Intake enables to load assets of different formats. For that,
1. the data base must have a column which describes the **format** of the asset.
1. only one column contains the information how to **access** the asset needs to be merged. In our example, the `zstore` column and the `uri` column needs to be merged into one common column. We name that `uri`.
1. the **assets** entry in the catalog description needs to be adapted to the new configuration.
We start with
1. creating a 'format' column which is *zarr* if there is no entry in *uri* and *netcdf* in all other cases.
%% Cell type:code id: tags:
``` python
esm_merged["format"]="netcdf"
esm_merged.loc[pd.isna(esm_merged["uri"]),"format"]="zarr"
esm_merged.loc[pd.isna(esm_merged["time_range"]),"time_range"]="*"
```
%% Cell type:markdown id: tags:
2. We merge the *zstore* and *uri* columns in a new column *uri*. As we need individual values of the asset, we have to loop over the rows.
```{note}
Whenever you can, you should omit using `iterrows` because it is rather slow.
```
%% Cell type:code id: tags:
``` python
esm_merged.loc[pd.isna(esm_merged["uri"]),"uri"]=esm_merged["zstore"]
```
%% Cell type:code id: tags:
``` python
del esm_merged["zstore"]
```
%% Cell type:markdown id: tags:
3. We now create a new description.
This will be based on the dkrz catalog. We use that because it has the aggregation over time which we want to maintain.
The *assets* entry now does not have a direct description of the **format** but instead a specification of a **format_column_name**. Also, the *column_name* is *uri* instead of *path*:
%% Cell type:code id: tags:
``` python
new_cat_json=esm_dkrz_tas.esmcol_data.copy()
```
%% Cell type:code id: tags:
``` python
new_cat_json["assets"]={
"column_name":"uri",
"format_column_name":"format"
}
```
%% Cell type:code id: tags:
``` python
new_cat_json["id"]="Merged dkrz-pangeo cmip6 subset catalog"
```
%% Cell type:markdown id: tags:
In order to make *zarr stores* compatible with the aggregation over time, we have to fill in a dummy value in *time_range*:
%% Cell type:code id: tags:
``` python
esm_merged.loc[pd.isna(esm_merged["time_range"]),"time_range"]="*"
```
%% Cell type:markdown id: tags:
<a class="anchor" id="save"></a>
### Save the new catalog
Let us test the new catalog first. We can open the new catalog by providing two arguments to `open_esm_datastore`:
- the data base **esm_merged**
- the catalog description **new_cat_json**
Afterwards, we search for a subset which is in both
%% Cell type:code id: tags:
``` python
esm_merged_cat=intake.open_esm_datastore(esm_merged, new_cat_json)
```
%% Cell type:code id: tags:
``` python
esm_merged_cat_test=esm_merged_cat.search(activity_id="ScenarioMIP",
member_id="r1i1p1f1",
grid_label="gn",
source_id=["MPI-ESM1-2-HR","CAS-ESM2-0"])
```
%% Cell type:markdown id: tags:
Since we have two different formats in the catalog, we have to provide keyword arguments for both formats within the `to_dataset_dict` function.
- `zarr_kwargs={"consolidated":True}` is needed because Pangeo's zarr assets have *consolidated* metadata
- `cdf_kwargs={"chunks":{"time":1}}` configures dask to not use very large arrays
%% Cell type:code id: tags:
``` python
test_dsets=esm_merged_cat_test.to_dataset_dict(
zarr_kwargs={"consolidated":True},
cdf_kwargs={"chunks":{"time":1}}
)
```
%% Cell type:markdown id: tags:
That worked fine. Now we save the catalog with `serialize`. We will separate the catalog into two files, the database `.csv.gz` file and the descriptor `.json` file. We can do that by passing the `catalog_type` keyword argument:
%% Cell type:code id: tags:
``` python
esm_merged_cat.serialize(name="our_catalog", catalog_type="file")
```
%% Cell type:markdown id: tags:
```{seealso}
This tutorial is part of a series on `intake`:
* [Part 1: Introduction](https://data-infrastructure-services.gitlab-pages.dkrz.de/tutorials-and-use-cases/tutorial_intake-1-introduction.html)
* [Part 2: Modifying and subsetting catalogs](https://data-infrastructure-services.gitlab-pages.dkrz.de/tutorials-and-use-cases/tutorial_intake-2-subset-catalogs.html)
* [Part 3: Merging catalogs](https://data-infrastructure-services.gitlab-pages.dkrz.de/tutorials-and-use-cases/tutorial_intake-3-merge-catalogs.html)
* [Part 4: Use preprocessing and create derived variables](https://data-infrastructure-services.gitlab-pages.dkrz.de/tutorials-and-use-cases/tutorial_intake-4-preprocessing-derived-variables.html)
* [Part 5: How to create an intake catalog](https://data-infrastructure-services.gitlab-pages.dkrz.de/tutorials-and-use-cases/tutorial_intake-5-create-esm-collection.html)
- You can also do another [CMIP6 tutorial](https://intake-esm.readthedocs.io/en/latest/user-guide/cmip6-tutorial.html) from the official intake page.
```
%% Cell type:code id: tags:
``` python
```
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment