Skip to content
Snippets Groups Projects
Commit e11d288c authored by Fabian Wachsmann's avatar Fabian Wachsmann
Browse files

Allow more than one chunk di

parent 0261be30
No related branches found
No related tags found
No related merge requests found
import fsspec
import xarray
from .provenance import Provenance
from .rechunker import calc_chunk_length
from .rechunker import calc_chunk_length, calc_other_dim_chunks
import glob
from copy import deepcopy
OPEN_MFDATASET_KWARGS = dict(
decode_cf=True,
......@@ -53,6 +54,7 @@ class open_mfdataset_optimize:
target_fsmap,
chunkdim=None,
target_mb=None,
keep_auto_chunks=True,
xarray_kwargs=None,
verbose=False,
):
......@@ -67,6 +69,7 @@ class open_mfdataset_optimize:
self.mf_dset = None
self.original_size = None
self.provenance = None
default_kwargs=deepcopy(OPEN_MFDATASET_KWARGS)
if verbose:
print("Resetting disk size and compression ratio")
......@@ -79,10 +82,13 @@ class open_mfdataset_optimize:
print("Opening test dataset")
if xarray_kwargs is not None:
OPEN_MFDATASET_KWARGS.update(xarray_kwargs)
testds = xarray.open_mfdataset(imf, **OPEN_MFDATASET_KWARGS)
default_kwargs.update(xarray_kwargs)
testds = xarray.open_mfdataset(imf, **default_kwargs,chunks="auto")
targetchunks={}
for sourcechunkdim,chunklen in testds.chunks.items():
targetchunks[sourcechunkdim]=chunklen[0]
if not varname:
varname = get_varname(testds)
else:
......@@ -111,18 +117,29 @@ class open_mfdataset_optimize:
f"the size of the test dataset {mf} is {size}MB and therefore smaller."
)
else:
chunk_length = calc_chunk_length(
testds, varname, chunkdim, target_mb, 1
)
if verbose:
print(f"Set chunks to {chunkdim}: {chunk_length}")
OPEN_MFDATASET_KWARGS.update(dict(chunks={chunkdim: chunk_length}))
if keep_auto_chunks:
orig_chunks_dict = dict(testds[varname].chunksizes.items())
other_dim_chunks=calc_other_dim_chunks(orig_chunks_dict, chunkdim)
chunk_length = calc_chunk_length(
testds, varname, chunkdim, target_mb, other_dim_chunks
)
targetchunks.update({chunkdim:chunk_length})
if verbose:
print(f"Set chunks to {targetchunks}")
default_kwargs.update(dict(chunks=targetchunks))
else:
chunk_length = calc_chunk_length(
testds, varname, chunkdim, target_mb, 1
)
if verbose:
print(f"Set chunks to {chunkdim}: {chunk_length}")
default_kwargs.update(dict(chunks={chunkdim:chunk_length}))
l_chunksset = True
testds.close()
mf_dset = xarray.open_mfdataset(imf, **OPEN_MFDATASET_KWARGS) # ,
mf_dset = xarray.open_mfdataset(imf, **default_kwargs) # ,
if chunkdim and target_mb and l_chunksset:
mf_dset = mf_dset.chunk({chunkdim: chunk_length})
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment