#!/usr/bin/env python # coding: utf-8 import zarr from zarrswift import SwiftStore import xarray import os import shutil import math from tqdm import tqdm class Tzis(SwiftStore): def __init__(self, os_url, os_token, os_container, mf_dset=None, varname=None, verbose=False ) : auth = { "preauthurl": os_url, "preauthtoken": os_token, } SwiftStore.__init__(self, os_container, "", auth) self.verbose=verbose # self.mf_dset=self.open_mf_dataset(mf_dset) self.varname=self._get_varname(varname) def open_mf_dataset(self, mf): #if type(mf) != list and type(mf) != str : # raise ValueError("Dataset '{0}' must either be a string or a list of strings") if mf: return xarray.open_mfdataset(mf, decode_cf=True, use_cftime=True, concat_dim="time", data_vars='minimal', coords='minimal', compat='override') return None def _get_varname(self, varname=None): if not varname and self.mf_dset.variables : varname=self.mf_dset.variables[0] if varname and self.verbose : print("We use variable {0} in case we need to rechunk.".format(varname)) return varname def open_store(self, os_name): self._SwiftStore = SwiftStore(container=self.os_container, prefix=self.os_name, storage_options=self.auth) # `getSizeOfVarTimeseries` # returns the size of the variable `varname`of the entire dataset `ds` used for chunking. Dataset can be multiple files. # # - ds # - is the `xarray` object returned by `xarray.open_mfdataset` and internally passed. # - varname # - is a string and an unchanged input from user # # In[3]: def _get_size_of_var_timeseries(self, ds, varname): return ds[varname].nbytes # `calcChunkBytes` # # estimates the length of one chunk of `varname` of dataset `ds`in the chunk dimension `chunkdim` # to match the targeted chunk size `target_mb` in the cloud. # # - ds # - is the `xarray` object returned by `xarray.open_mfdataset` and internally passed. # - varname # - is a string and an unchanged input from user # - chunkdim # - is a string and the coordinate used for chunking. The default is `time` # - target_mb # - is an integer and the targeted size of one chunk in the cloud. The default is 1000. # In[4]: def _calc_chunk_bytes(self, ds, varname, chunkdim, target_mb): n_bytes = self._get_size_of_var_timeseries(ds, varname) return math.ceil(len(ds[chunkdim]) / math.ceil(n_bytes / (target_mb* (2 ** 20)))) # `rechunk` # # rechunks the original dataset in order to have optimized chunk sizes. # It sets the desired configuration with `xarray` function `chunk` and then loops over all variables in the dataset for unifying their chunks with `unify_chunks`. # - ds # - is the `xarray` object returned by `xarray.open_mfdataset` # - varname # - is a string and an input from user # - chunkdim # - is a string and the coordinate used for chunking. The default is 'time' # - target_mb # - is an integer and the targeted size of one chunk in the cloud # In[5]: def _rechunk(self, ds, varname, chunkdim, target_mb): chunk_length = self._calc_chunk_bytes(ds, varname, chunkdim, target_mb) chunk_rule = {chunkdim: chunk_length} if self.verbose: print("Chunking into chunks of {0} {1} steps".format(chunk_length, chunkdim)) chunked_ds = ds.chunk(chunk_rule) for var_id in chunked_ds.variables: chunked_ds[var_id].unify_chunks() return chunked_ds # `drop_vars_without_chunkdim` # # drops all variables which do not depend on the chunk dimension. This is required because those variables cannot be written chunkwise with `write_by_region`. The coordinate `time_bnds` also has to be dropped because `xarray` does write it at first place when the dataset is initialized in the swift storage. **Returns** the dataset without the dropped variables. # # - ds # - is the `xarray` object returned by `xarray.open_mfdataset` # - chunkdim # - is a string and the coordinate used for chunking. The default is 'time' # In[6]: def _drop_vars_without_chunkdim(self,ds, chunkdim): droplist=[var for var in ds.variables if chunkdim not in ds[var].coords] if "time_bnds" in ds.variables: droplist+=["time_bnds"] return ds.drop(droplist) # `sel_range_for_chunk_by_time` # # Selects a time interval spanned from `starttimeindex` to `endtimeindex` from dataset `ds`. # The indices are controlled in a loop and chosen such that exactly one chunk is matched. # # - ds # - is the `xarray` object returned by `xarray.open_mfdataset` # - starttimeindex # - is an integer and the dimension index of the interval start # - endtimeindex # - is an integer and the dimension index of the interval end # In[7]: def _sel_range_for_chunk_by_time(ds, starttimeindex, endtimeindex): return ds.isel(time=slice(starttimeindex,endtimeindex)) # `sel_range_for_chunk` # # Selects an interval spanned from `startindex` to `endindex` within the chunk dimension `chunkdim` from dataset `ds`. # The indices are controlled in a loop and chosen such that exactly one chunk is matched. # **Returns** the dataset slice or a ValueError. # # - ds # - is the `xarray` object returned by `xarray.open_mfdataset` # - startindex # - is an integer and the dimension index of the interval start # - endindex # - is an integer and the dimension index of the interval end # - chunkdim # - is a string and the coordinate used for chunking. The default is 'time' # In[8]: def _sel_range_for_chunk(self, ds, startindex, endindex, chunkdim): if chunkdim == "time" : return self._sel_range_for_chunk_by_time(ds, startindex, endindex) else: raise ValueError('Other chunk dimensions than "time" are not supported yet.') # `write_chunk_by_region` # # Selects an interval spanned from `startindex` to `endindex` within the chunk dimension `chunkdim` from dataset `ds`. # The indices are controlled in a loop and chosen such that exactly one chunk is matched. # # - towrite # - is an `xarray` dataset object and the slice to be written to cloud. # - chunkdim # - is a string and the coordinate used for chunking. The default is 'time' # - startindex # - is an integer and the dimension index of the interval start # - endindex # - is an integer and the dimension index of the interval end # In[9]: def _write_chunk_by_region(self, towrite, store, chunkdim, startindex, endindex): try: towrite.to_zarr(store=store,region={chunkdim: slice(startindex, endindex)}) towrite.close() return 0 except: return chunk_no def _open_or_initialize_swift_dset(self, store, ds, chunkdim): try: return xarray.open_zarr(store, consolidated=True, decode_cf=True, use_cftime=True) except: try: return ds.to_zarr(store, compute=False, consolidated=True, mode='w') except: print("Could not initialize dataset.") # `write_by_region` writes chunk-wise data into an initialized dataset in swift. # 'Initiliazed' means that a first `to_zarr` call has been executed which writes all coordinates and metadata for the dataset into the chunk. The subsequent `to_zarr` calls performed by `write_to_region` uses this information so that it knows how chunks have to be named. If a region has already been written, it will be overwritten by write_to_region. def write_by_region(self, chunked_ds, store, startchunk, validity_check, chunkdim, varname): already = self._open_or_initialize_swift_dset(store, chunked_ds, chunkdim) try: already=self._drop_vars_without_chunkdim(already, chunkdim) except: print("Could not drop vars without chunkdim.", " This is not an issue if you initialized the dataset in the cloud.") chunked_ds=self._drop_vars_without_chunkdim(chunked_ds, chunkdim) # all_chunks=chunked_ds.chunks[chunkdim] chunksum=0 for chunk_no in tqdm(range(0,len(all_chunks))): chunksum+=all_chunks[chunk_no] if chunk_no < startchunk : continue towrite = self._sel_range_for_chunk(chunked_ds, chunksum-all_chunks[chunk_no],chunksum, chunkdim) #.load() incloud = self._sel_range_for_chunk(already, chunksum-all_chunks[chunk_no],chunksum, chunkdim) #.load() #if towrite.broadcast_equals(incloud): #if towrite[varname].size == incloud[varname].size : if towrite[varname].identical(incloud[varname]): if self.verbose: print("datasets for chunk {0} are equal".format(chunk_no+1)) continue elif validity_check : print("Dsets at chunk {0} from {1} are different!".format(chunk_no, len(all_chunks))) return chunk_no incloud.close() write_status = self._write_chunk_by_region(towrite, store, chunkdim, chunksum-all_chunks[chunk_no], chunksum) if write_status > 0 : return write_status return 0 def write_directly(dset=None, store=None): dset.to_zarr(store=store, mode='w', consolidated=True) def write_with_validation_and_retries(self, ds, varname, store, chunkdim, target_mb, startchunk, validity_check, maxretries): chunked_ds = self.rechunk(ds, varname, chunkdim, target_mb) # retries = 0 success = -1 if startchunk != 0: success = startchunk while ( success != 0 and retries < maxretries ) : success = self.write_by_region(chunked_ds, store, success, validity_check, chunkdim, varname) retries += 1 if self.verbose and success != 0: print("Write by region failed. Now retry number {}.".format(retries)) if success != 0 : raise RuntimeError("Max retries {0} all failed at chunk no {1}".format(maxretries, success)) if self.verbose: print("Start validation of write process") if self.write_by_region(chunked_ds, store, startchunk, True, chunkdim, varname) != 0: raise RuntimeError("Validiation failed.") def write_to_swift(self, chunkdim="time", target_mb=1000, startchunk=0, validity_check=False, maxretries=3): timevars=[var for var in self.mf_dset.variables if "time" in self.mf_dset[var].coords] if len(timevars) > 0: self.write_with_validation_and_retries(self.mf_dset, self.varname, self._SwiftStore, chunkdim, target_mb, startchunk, validity_check, maxretries) else: self.write_directly(self.mf_dset, self._SwiftStore) self.mf_dset.close() # In[17]: #cordex_path = "/mnt/lustre01/work/kd0956/CORDEX/data//cordex/output/EUR-11/GERICS//NCC-NorESM1-M/rcp26/r1i1p1/GERICS-REMO2015/v1/day/pr/v20190925/" #mfs_towrite=[cordex_path +filename for filename in os.listdir(cordex_path)] # In[18]: #write_to_swift(mfs=mfs_towrite,outid="CORDEX.GERICS.NCC-NorESM1-M.rcp26.day.gn.pr", # varname="pr", # verbose=True)