Commit e950e6a6 authored by Georg Siemund's avatar Georg Siemund
Browse files

Upload New File

parent 12131a4d
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask.array as dsa\n",
"import xarray\n",
"import intake\n",
"import zarr\n",
"import fsspec\n",
"import numpy as np\n",
"import pandas as pd\n",
"from contextlib import contextmanager\n",
"import time\n",
"import dask\n",
"import fsspec"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#use intake to browse catalog\n",
"col_url = \"https://swift.dkrz.de/v1/dkrz_a44962e3ba914c309a7421573a6949a6/intake-esm/swift-cmip6.json\"\n",
"col = intake.open_esm_datastore(col_url)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#select data\n",
"cat = col.search(variable_id=\"tasmax\", table_id=\"day\")\n",
"cat.df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dset_dict = cat.to_dataset_dict(zarr_kwargs={'consolidated': True})\n",
"list(dset_dict.keys())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ds = dset_dict['ScenarioMIP.DKRZ.MPI-ESM1-2-HR.ssp370.day.gn']\n",
"ds"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"data = ds.tasmax.data\n",
"data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#ds.tasmax[0, 0].plot(figsize=(16, 8), center=False, robust=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class DevNullStore:\n",
"\n",
" def __init__(self):\n",
" pass\n",
"\n",
" def __setitem__(*args, **kwargs):\n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"null_store = DevNullStore()\n",
"# this line produces no error but actually does nothing\n",
"null_store['foo'] = 'bar'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%time dsa.store(data[0, :660], null_store, lock=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set up Cluster"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Client\n",
"#\n",
"memory_limit=\"48GB\"\n",
"threads = 1\n",
"nworker = 1\n",
"client = Client(processes=True, threads_per_worker=threads, n_workers=nworker, memory_limit=memory_limit)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class DiagnosticTimer:\n",
" def __init__(self):\n",
" self.diagnostics = []\n",
"\n",
" @contextmanager\n",
" def time(self, **kwargs):\n",
" tic = time.time()\n",
" yield\n",
" toc = time.time()\n",
" kwargs[\"runtime\"] = toc - tic\n",
" self.diagnostics.append(kwargs)\n",
"\n",
" def dataframe(self):\n",
" return pd.DataFrame(self.diagnostics)\n",
"\n",
"diag_timer = DiagnosticTimer()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"chunksize = np.prod(data.chunksize) * data.dtype.itemsize\n",
"chunksize"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def total_nthreads():\n",
" return sum([v for v in client.nthreads().values()])\n",
"\n",
"def total_ncores():\n",
" return sum([v for v in client.ncores().values()])\n",
"\n",
"def total_workers():\n",
" return len(client.ncores())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"diag_kwargs = dict(nbytes=data.nbytes, chunksize=chunksize,\n",
" cloud='swift', format='zarr')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"with diag_timer.time(nthreads=total_nthreads(),\n",
" ncores=total_ncores(),\n",
" nworkers=total_workers(),\n",
" **diag_kwargs):\n",
"\n",
" future = dsa.store(data, null_store, lock=False, compute=False)\n",
" dask.compute(future, retries=5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "zarr_test",
"language": "python",
"name": "zarr_test"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.9"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment