Skip to content
Snippets Groups Projects
Commit 1af5cf80 authored by Aaron Spring's avatar Aaron Spring
Browse files

demo: fast netcdf reading and cdo_post MPI std output

parent 04871c1b
No related branches found
No related tags found
No related merge requests found
%% Cell type:code id: tags:
``` python
%load_ext lab_black
```
%% Cell type:markdown id: tags:
# Challenge
MPI-ESM standard output is usually organize in one file per year and output stream with many variables.
Open variable `v` from `mistral_ds_str`
%% Cell type:code id: tags:
``` python
#mistral_ds_str = "/work/mh0727/m300524/experiments/asp_esmControl_ens3014_m001/outdata/mpiom/asp_esmControl_ens3014_m001_mpiom_data_2d_mm_*.nc"
#mistral_ds_str = "/work/mh0727/m300524/experiments/vga0214/outdata/mpiom/vga0214_mpiom_data_2d_mm_*.nc"
mistral_ds_str = "/work/mh1007/MPI-ESM-GE/pictrl/pictrl0001/outdata/mpiom/pictrl0001_mpiom_data_2d_mm_1*.nc"
```
%% Cell type:code id: tags:
``` python
import glob
print(f"number of files to open: {len(glob.glob(mistral_ds_str))}")
```
%% Output
number of files to open: 150
%% Cell type:code id: tags:
``` python
v = "sst"
```
%% Cell type:markdown id: tags:
# cdo
%% Cell type:code id: tags:
``` python
import cdo
cdo = cdo.Cdo()
```
%% Cell type:code id: tags:
``` python
%time da_cdo = cdo.copy(input=mistral_ds_str, returnXArray=v)
```
%% Output
CPU times: user 34 ms, sys: 57 ms, total: 91 ms
Wall time: 1min 52s
%% Cell type:code id: tags:
``` python
da_cdo.dims
```
%% Output
('time', 'depth', 'y', 'x')
%% Cell type:code id: tags:
``` python
import dask
dask.utils.format_bytes(da_cdo.nbytes)
```
%% Output
'405.50 MB'
%% Cell type:markdown id: tags:
# xarray and dask
%% Cell type:code id: tags:
``` python
import xarray as xr
import dask
from dask.distributed import Client
import multiprocessing
```
%% Cell type:code id: tags:
``` python
ncpu = multiprocessing.cpu_count()
nworker = 8
nthreads = ncpu // nworker
print(
f"Number of CPUs: {ncpu}, number of threads: {nthreads}, number of workers: {nworker}"
)
client = Client(
processes=False, threads_per_worker=nthreads, n_workers=nworker, memory_limit="64GB"
)
client
```
%% Output
Number of CPUs: 72, number of threads: 9, number of workers: 8
<Client: 'inproc://10.50.47.157/25912/1' processes=8 threads=72, memory=512.00 GB>
%% Cell type:code id: tags:
``` python
kwargs = dict()
kwargs["chunks"] = {"time": 120}
kwargs["combine"] = "nested"
kwargs["concat_dim"] = "time"
```
%% Cell type:code id: tags:
``` python
%%time
ds = xr.open_mfdataset(mistral_ds_str, **kwargs)
```
%% Output
CPU times: user 38.7 s, sys: 10.8 s, total: 49.6 s
Wall time: 57.9 s
%% Cell type:code id: tags:
``` python
%time da = ds[v].compute()
```
%% Output
CPU times: user 8.54 s, sys: 3.04 s, total: 11.6 s
Wall time: 9.43 s
%% Cell type:markdown id: tags:
### speedup of the concatination in xr.mfdataset
%% Cell type:code id: tags:
``` python
%%time
ds = xr.open_mfdataset(mistral_ds_str, compat="override", data_vars="minimal",coords="minimal",**kwargs)
```
%% Output
CPU times: user 7.38 s, sys: 579 ms, total: 7.96 s
Wall time: 7.63 s
%% Cell type:code id: tags:
``` python
%time da = ds[v].compute()
```
%% Output
CPU times: user 4.82 s, sys: 2.14 s, total: 6.96 s
Wall time: 5.34 s
%% Cell type:markdown id: tags:
Thats a massive increase! 🚀
%% Cell type:code id: tags:
``` python
faster_kwargs = kwargs.copy()
faster_kwargs["coords"] = "minimal"
faster_kwargs["data_vars"] = "minimal"
faster_kwargs["compat"] = "override"
```
%% Cell type:markdown id: tags:
### speedup in decode_times
#### before compute
%% Cell type:code id: tags:
``` python
%%time
ds = xr.open_mfdataset(mistral_ds_str,decode_times=False, **kwargs)
ds = xr.decode_cf(ds)
```
%% Output
CPU times: user 39.8 s, sys: 11.2 s, total: 51 s
Wall time: 58.4 s
%% Cell type:code id: tags:
``` python
%time da = ds[v].compute()
```
%% Output
CPU times: user 8.75 s, sys: 2.96 s, total: 11.7 s
Wall time: 9.69 s
%% Cell type:markdown id: tags:
This does not seem to help! 🐢
%% Cell type:markdown id: tags:
#### after compute
%% Cell type:code id: tags:
``` python
def preprocess_var(ds, v=v):
return ds[v].to_dataset(name=v).squeeze()
```
%% Cell type:code id: tags:
``` python
%%time
ds = xr.open_mfdataset(mistral_ds_str, decode_times=False,**kwargs)
```
%% Output
CPU times: user 39.9 s, sys: 10.5 s, total: 50.4 s
Wall time: 57.8 s
%% Cell type:code id: tags:
``` python
%%time
ds = preprocess_var(ds).compute()
da = xr.decode_cf(ds)[v]
```
%% Output
CPU times: user 8.42 s, sys: 3.05 s, total: 11.5 s
Wall time: 9.47 s
%% Cell type:markdown id: tags:
This does not seem to help either! 🐢
%% Cell type:markdown id: tags:
### speedup in decode_cf
%% Cell type:code id: tags:
``` python
%%time
ds = xr.open_mfdataset(mistral_ds_str, decode_cf=False,**kwargs)
```
%% Output
CPU times: user 6.24 s, sys: 664 ms, total: 6.9 s
Wall time: 6.64 s
%% Cell type:code id: tags:
``` python
%%time
ds = preprocess_var(ds).compute()
da = xr.decode_cf(ds)[v]
```
%% Output
CPU times: user 4.01 s, sys: 1.93 s, total: 5.94 s
Wall time: 5.39 s
%% Cell type:markdown id: tags:
Thats a massive increase! 🚀
%% Cell type:code id: tags:
``` python
faster_kwargs["decode_cf"] = False
```
%% Cell type:markdown id: tags:
### speedup for engine and parallel
%% Cell type:code id: tags:
``` python
for parallel in [True, False]:
for engine in ['netcdf4', 'scipy','pynio']:
print('\n',parallel, engine)
%time ds = xr.open_mfdataset(mistral_ds_str, parallel=parallel, engine=engine, **kwargs)
%time da = ds[v].compute()
```
%% Output
True netcdf4
CPU times: user 47.4 s, sys: 20 s, total: 1min 7s
Wall time: 1min 5s
CPU times: user 8.74 s, sys: 3.14 s, total: 11.9 s
Wall time: 9.86 s
True scipy
CPU times: user 52.4 s, sys: 8.82 s, total: 1min 1s
Wall time: 1min 7s
CPU times: user 5.68 s, sys: 3.55 s, total: 9.23 s
Wall time: 4.72 s
True pynio
CPU times: user 53.5 s, sys: 17.1 s, total: 1min 10s
Wall time: 1min 23s
CPU times: user 7 s, sys: 4.22 s, total: 11.2 s
Wall time: 10 s
False netcdf4
CPU times: user 34.7 s, sys: 9.96 s, total: 44.7 s
Wall time: 53.7 s
CPU times: user 8.95 s, sys: 2.96 s, total: 11.9 s
Wall time: 10.1 s
False scipy
CPU times: user 44 s, sys: 7.58 s, total: 51.6 s
Wall time: 58.5 s
CPU times: user 5.65 s, sys: 2.55 s, total: 8.19 s
Wall time: 4.43 s
False pynio
CPU times: user 41.8 s, sys: 15.5 s, total: 57.3 s
Wall time: 1min 7s
CPU times: user 7.33 s, sys: 4.01 s, total: 11.3 s
Wall time: 10.3 s
%% Cell type:markdown id: tags:
`parallel=False` (default) and `engine=netcdf4` (default) seems the fastest!
%% Cell type:markdown id: tags:
### speedup preprocessing
%% Cell type:code id: tags:
``` python
%%time
ds = xr.open_mfdataset(mistral_ds_str, preprocess=preprocess_var, **kwargs)
```
%% Output
CPU times: user 15.5 s, sys: 3.29 s, total: 18.8 s
Wall time: 20.4 s
%% Cell type:code id: tags:
``` python
%%time
da = ds[v].compute()
```
%% Output
CPU times: user 9.69 s, sys: 3.19 s, total: 12.9 s
Wall time: 10.8 s
%% Cell type:markdown id: tags:
Thats a decent increase! 🏎️
%% Cell type:code id: tags:
``` python
faster_kwargs['preprocess']=preprocess_var
```
%% Cell type:markdown id: tags:
#### all combined
%% Cell type:code id: tags:
``` python
%%time
ds = xr.open_mfdataset(mistral_ds_str, **faster_kwargs)
```
%% Output
CPU times: user 5.32 s, sys: 769 ms, total: 6.09 s
Wall time: 5.85 s
%% Cell type:code id: tags:
``` python
%%time
ds = preprocess_var(ds).compute()
da = xr.decode_cf(ds)[v]
```
%% Output
CPU times: user 3.79 s, sys: 1.84 s, total: 5.64 s
Wall time: 5.07 s
%% Cell type:markdown id: tags:
🚀🚀🚀 From 67s down to 10s! 🚀🚀🚀
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
dask.utils.format_bytes(da.nbytes)
```
%% Output
'405.50 MB'
%% Cell type:code id: tags:
``` python
xr.testing.assert_equal(da.squeeze(),da_cdo.squeeze())
```
%% Cell type:code id: tags:
``` python
```
......@@ -28,7 +28,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment