Commit 4c5a90c6 authored by Fabian Wachsmann's avatar Fabian Wachsmann
Browse files

Updates

parent 92d3eb57
......@@ -9,33 +9,29 @@ import shutil
import math
from tqdm import tqdm
class Tzis:
class Tzis(SwiftStore):
def __init__(self,
os_url,
os_token,
os_container,
mf_dset=None,
varname=None,
verbose=False,
os_token=None,
os_url=None,
os_container=None,
os_name=None,
verbose=False
) :
auth = {
"preauthurl": os_url,
"preauthtoken": os_token,
}
SwiftStore.__init__(self, os_container, "", auth)
self.verbose=verbose
#
self.mf_dset=self.__open_mf_dataset__(mf_dset)
self.varname=self.__get_varname__(varname)
self.store =self.open_store(os_container, os_name, os_url, os_token)
def __get_varname__(self,
varname=None):
if not varname:
varname=self.mf_dset.variables[0]
if self.verbose :
print("We use variable {0} in case we need to rechunk.".format(varname))
return varname
self.mf_dset=self.open_mf_dataset(mf_dset)
self.varname=self._get_varname(varname)
def __open_mf_dataset__(self, mf):
def open_mf_dataset(self, mf):
#if type(mf) != list and type(mf) != str :
# raise ValueError("Dataset '{0}' must either be a string or a list of strings")
if mf:
return xarray.open_mfdataset(mf,
decode_cf=True,
use_cftime=True,
......@@ -43,7 +39,20 @@ class Tzis:
data_vars='minimal',
coords='minimal',
compat='override')
return None
def _get_varname(self,
varname=None):
if not varname and self.mf_dset.variables :
varname=self.mf_dset.variables[0]
if varname and self.verbose :
print("We use variable {0} in case we need to rechunk.".format(varname))
return varname
def open_store(self, os_name):
self._SwiftStore = SwiftStore(container=self.os_container,
prefix=self.os_name,
storage_options=self.auth)
# `getSizeOfVarTimeseries`
# returns the size of the variable `varname`of the entire dataset `ds` used for chunking. Dataset can be multiple files.
#
......@@ -56,7 +65,7 @@ class Tzis:
# In[3]:
def __get_size_of_var_timeseries__(self, ds, varname):
def _get_size_of_var_timeseries(self, ds, varname):
return ds[varname].nbytes
......@@ -77,8 +86,8 @@ class Tzis:
# In[4]:
def __calc_chunk_bytes__(self, ds, varname, chunkdim, target_mb):
n_bytes = self.__get_size_of_var_timeseries__(ds, varname)
def _calc_chunk_bytes(self, ds, varname, chunkdim, target_mb):
n_bytes = self._get_size_of_var_timeseries(ds, varname)
return math.ceil(len(ds[chunkdim]) / math.ceil(n_bytes / (target_mb* (2 ** 20))))
......@@ -98,8 +107,8 @@ class Tzis:
# In[5]:
def __rechunk__(self, ds, varname, chunkdim, target_mb):
chunk_length = self.__calc_chunk_bytes__(ds, varname, chunkdim, target_mb)
def _rechunk(self, ds, varname, chunkdim, target_mb):
chunk_length = self._calc_chunk_bytes(ds, varname, chunkdim, target_mb)
chunk_rule = {chunkdim: chunk_length}
if self.verbose:
......@@ -124,7 +133,7 @@ class Tzis:
# In[6]:
def __drop_vars_without_chunkdim__(self,ds, chunkdim):
def _drop_vars_without_chunkdim(self,ds, chunkdim):
droplist=[var for var in ds.variables if chunkdim not in ds[var].coords]
if "time_bnds" in ds.variables:
droplist+=["time_bnds"]
......@@ -146,7 +155,7 @@ class Tzis:
# In[7]:
def __sel_range_for_chunk_by_time__(ds, starttimeindex, endtimeindex):
def _sel_range_for_chunk_by_time(ds, starttimeindex, endtimeindex):
return ds.isel(time=slice(starttimeindex,endtimeindex))
......@@ -168,9 +177,9 @@ class Tzis:
# In[8]:
def __sel_range_for_chunk__(self, ds, startindex, endindex, chunkdim):
def _sel_range_for_chunk(self, ds, startindex, endindex, chunkdim):
if chunkdim == "time" :
return self.__sel_range_for_chunk_by_time__(ds, startindex, endindex)
return self._sel_range_for_chunk_by_time(ds, startindex, endindex)
else:
raise ValueError('Other chunk dimensions than "time" are not supported yet.')
......@@ -192,7 +201,7 @@ class Tzis:
# In[9]:
def __write_chunk_by_region__(self, towrite, store, chunkdim, startindex, endindex):
def _write_chunk_by_region(self, towrite, store, chunkdim, startindex, endindex):
try:
towrite.to_zarr(store=store,region={chunkdim: slice(startindex, endindex)})
towrite.close()
......@@ -200,7 +209,7 @@ class Tzis:
except:
return chunk_no
def __open_or_initialize_swift_dset__(self, store, ds, chunkdim):
def _open_or_initialize_swift_dset(self, store, ds, chunkdim):
try:
return xarray.open_zarr(store, consolidated=True, decode_cf=True, use_cftime=True)
except:
......@@ -209,33 +218,25 @@ class Tzis:
except:
print("Could not initialize dataset.")
def open_store(self, os_container, os_name, os_url, os_token):
auth = {
"preauthurl": os_url,
"preauthtoken": os_token,
}
store = SwiftStore(container=os_container, prefix=os_name, storage_options=auth)
return store
# `write_by_region` writes chunk-wise data into an initialized dataset in swift.
# 'Initiliazed' means that a first `to_zarr` call has been executed which writes all coordinates and metadata for the dataset into the chunk. The subsequent `to_zarr` calls performed by `write_to_region` uses this information so that it knows how chunks have to be named. If a region has already been written, it will be overwritten by write_to_region.
def write_by_region(self, chunked_ds, store, startchunk, validity_check, chunkdim, varname):
already = self.__open_or_initialize_swift_dset__(store, chunked_ds, chunkdim)
already = self._open_or_initialize_swift_dset(store, chunked_ds, chunkdim)
try:
already=self.__drop_vars_without_chunkdim__(already, chunkdim)
already=self._drop_vars_without_chunkdim(already, chunkdim)
except:
print("Could not drop vars without chunkdim.",
" This is not an issue if you initialized the dataset in the cloud.")
chunked_ds=self.__drop_vars_without_chunkdim__(chunked_ds, chunkdim) #
chunked_ds=self._drop_vars_without_chunkdim(chunked_ds, chunkdim) #
all_chunks=chunked_ds.chunks[chunkdim]
chunksum=0
for chunk_no in tqdm(range(0,len(all_chunks))):
chunksum+=all_chunks[chunk_no]
if chunk_no < startchunk :
continue
towrite = self.__sel_range_for_chunk__(chunked_ds, chunksum-all_chunks[chunk_no],chunksum, chunkdim) #.load()
incloud = self.__sel_range_for_chunk__(already, chunksum-all_chunks[chunk_no],chunksum, chunkdim) #.load()
towrite = self._sel_range_for_chunk(chunked_ds, chunksum-all_chunks[chunk_no],chunksum, chunkdim) #.load()
incloud = self._sel_range_for_chunk(already, chunksum-all_chunks[chunk_no],chunksum, chunkdim) #.load()
#if towrite.broadcast_equals(incloud):
#if towrite[varname].size == incloud[varname].size :
if towrite[varname].identical(incloud[varname]):
......@@ -246,7 +247,7 @@ class Tzis:
print("Dsets at chunk {0} from {1} are different!".format(chunk_no, len(all_chunks)))
return chunk_no
incloud.close()
write_status = self.__write_chunk_by_region__(towrite, store, chunkdim, chunksum-all_chunks[chunk_no], chunksum)
write_status = self._write_chunk_by_region(towrite, store, chunkdim, chunksum-all_chunks[chunk_no], chunksum)
if write_status > 0 :
return write_status
return 0
......@@ -283,14 +284,14 @@ class Tzis:
if len(timevars) > 0:
self.write_with_validation_and_retries(self.mf_dset,
self.varname,
self.store,
self._SwiftStore,
chunkdim,
target_mb,
startchunk,
validity_check,
maxretries)
else:
self.write_directly(self.mf_dset, self.store)
self.write_directly(self.mf_dset, self._SwiftStore)
self.mf_dset.close()
# In[17]:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment