Commit ddb2b754 authored by Fabian Wachsmann's avatar Fabian Wachsmann
Browse files

Moved all functions into a class

parent f0f333bc
#!/usr/bin/env python #!/usr/bin/env python
# coding: utf-8 # coding: utf-8
# In[1]:
import zarr import zarr
from zarrswift import SwiftStore from zarrswift import SwiftStore
import xarray import xarray
import os import os
import shutil import shutil
from swiftenv import *
import math import math
from tqdm import tqdm from tqdm import tqdm
globalverbose=False
class Tzis:
def __init__(self,
mf_dset=None,
varname=None,
verbose=False,
os_token=None,
os_url=None,
os_container=None,
os_name=None,
) :
self.verbose=verbose
#
self.mf_dset=self.__open_mf_dataset__(mf_dset)
self.varname=self.__get_varname__(varname)
self.store =self.open_store(os_container, os_name, os_url, os_token)
def __get_varname__(self,
varname=None):
if not varname:
varname=self.mf_dset.variables[0]
if self.verbose :
print("We use variable {0} in case we need to rechunk.".format(varname))
return varname
def __open_mf_dataset__(self, mf):
#if type(mf) != list and type(mf) != str :
# raise ValueError("Dataset '{0}' must either be a string or a list of strings")
return xarray.open_mfdataset(mf,
decode_cf=True,
use_cftime=True,
concat_dim="time",
data_vars='minimal',
coords='minimal',
compat='override')
# `getSizeOfVarTimeseries` # `getSizeOfVarTimeseries`
# returns the size of the variable `varname`of the entire dataset `ds` used for chunking. Dataset can be multiple files. # returns the size of the variable `varname`of the entire dataset `ds` used for chunking. Dataset can be multiple files.
...@@ -27,8 +56,8 @@ globalverbose=False ...@@ -27,8 +56,8 @@ globalverbose=False
# In[3]: # In[3]:
def get_size_of_var_timeseries(ds, varname): def __get_size_of_var_timeseries__(self, ds, varname):
return ds[varname].nbytes return ds[varname].nbytes
# `calcChunkBytes` # `calcChunkBytes`
...@@ -48,9 +77,9 @@ def get_size_of_var_timeseries(ds, varname): ...@@ -48,9 +77,9 @@ def get_size_of_var_timeseries(ds, varname):
# In[4]: # In[4]:
def calc_chunk_bytes(ds, varname, chunkdim, target_mb): def __calc_chunk_bytes__(self, ds, varname, chunkdim, target_mb):
n_bytes = get_size_of_var_timeseries(ds, varname) n_bytes = self.__get_size_of_var_timeseries__(ds, varname)
return math.ceil(len(ds[chunkdim]) / math.ceil(n_bytes / (target_mb* (2 ** 20)))) return math.ceil(len(ds[chunkdim]) / math.ceil(n_bytes / (target_mb* (2 ** 20))))
# `rechunk` # `rechunk`
...@@ -69,18 +98,18 @@ def calc_chunk_bytes(ds, varname, chunkdim, target_mb): ...@@ -69,18 +98,18 @@ def calc_chunk_bytes(ds, varname, chunkdim, target_mb):
# In[5]: # In[5]:
def rechunk(ds, varname, chunkdim, target_mb): def __rechunk__(self, ds, varname, chunkdim, target_mb):
chunk_length = calc_chunk_bytes(ds, varname, chunkdim, target_mb) chunk_length = self.__calc_chunk_bytes__(ds, varname, chunkdim, target_mb)
chunk_rule = {chunkdim: chunk_length} chunk_rule = {chunkdim: chunk_length}
if globalverbose: if self.verbose:
print("Chunking into chunks of {0} {1} steps".format(chunk_length, chunkdim)) print("Chunking into chunks of {0} {1} steps".format(chunk_length, chunkdim))
chunked_ds = ds.chunk(chunk_rule) chunked_ds = ds.chunk(chunk_rule)
for var_id in chunked_ds.variables:
chunked_ds[var_id].unify_chunks()
return chunked_ds for var_id in chunked_ds.variables:
chunked_ds[var_id].unify_chunks()
return chunked_ds
# `drop_vars_without_chunkdim` # `drop_vars_without_chunkdim`
...@@ -95,11 +124,11 @@ def rechunk(ds, varname, chunkdim, target_mb): ...@@ -95,11 +124,11 @@ def rechunk(ds, varname, chunkdim, target_mb):
# In[6]: # In[6]:
def drop_vars_without_chunkdim(ds, chunkdim): def __drop_vars_without_chunkdim__(self,ds, chunkdim):
droplist=[var for var in ds.variables if chunkdim not in ds[var].coords] droplist=[var for var in ds.variables if chunkdim not in ds[var].coords]
if "time_bnds" in ds.variables: if "time_bnds" in ds.variables:
droplist+=["time_bnds"] droplist+=["time_bnds"]
return ds.drop(droplist) return ds.drop(droplist)
# `sel_range_for_chunk_by_time` # `sel_range_for_chunk_by_time`
...@@ -117,8 +146,8 @@ def drop_vars_without_chunkdim(ds, chunkdim): ...@@ -117,8 +146,8 @@ def drop_vars_without_chunkdim(ds, chunkdim):
# In[7]: # In[7]:
def sel_range_for_chunk_by_time(ds, starttimeindex, endtimeindex): def __sel_range_for_chunk_by_time__(ds, starttimeindex, endtimeindex):
return ds.isel(time=slice(starttimeindex,endtimeindex)) return ds.isel(time=slice(starttimeindex,endtimeindex))
# `sel_range_for_chunk` # `sel_range_for_chunk`
...@@ -139,11 +168,11 @@ def sel_range_for_chunk_by_time(ds, starttimeindex, endtimeindex): ...@@ -139,11 +168,11 @@ def sel_range_for_chunk_by_time(ds, starttimeindex, endtimeindex):
# In[8]: # In[8]:
def sel_range_for_chunk(ds, startindex, endindex, chunkdim): def __sel_range_for_chunk__(self, ds, startindex, endindex, chunkdim):
if chunkdim == "time" : if chunkdim == "time" :
return sel_range_for_chunk_by_time(ds, startindex, endindex) return self.__sel_range_for_chunk_by_time__(ds, startindex, endindex)
else: else:
raise ValueError('Other chunk dimensions than "time" are not supported yet.') raise ValueError('Other chunk dimensions than "time" are not supported yet.')
# `write_chunk_by_region` # `write_chunk_by_region`
...@@ -163,153 +192,106 @@ def sel_range_for_chunk(ds, startindex, endindex, chunkdim): ...@@ -163,153 +192,106 @@ def sel_range_for_chunk(ds, startindex, endindex, chunkdim):
# In[9]: # In[9]:
def write_chunk_by_region(towrite, store, chunkdim, startindex, endindex): def __write_chunk_by_region__(self, towrite, store, chunkdim, startindex, endindex):
try: try:
towrite.to_zarr(store=store,region={chunkdim: slice(startindex, endindex)}) towrite.to_zarr(store=store,region={chunkdim: slice(startindex, endindex)})
towrite.close() towrite.close()
return 0 return 0
except: except:
return chunk_no return chunk_no
def __open_or_initialize_swift_dset__(self, store, ds, chunkdim):
try:
return xarray.open_zarr(store, consolidated=True, decode_cf=True, use_cftime=True)
except:
try:
return ds.to_zarr(store, compute=False, consolidated=True, mode='w')
except:
print("Could not initialize dataset.")
def open_store(self, os_container, os_name, os_url, os_token):
auth = {
"preauthurl": os_url,
"preauthtoken": os_token,
}
store = SwiftStore(container=os_container, prefix=os_name, storage_options=auth)
return store
# `write_by_region` writes chunk-wise data into an initialized dataset in swift. # `write_by_region` writes chunk-wise data into an initialized dataset in swift.
# 'Initiliazed' means that a first `to_zarr` call has been executed which writes all coordinates and metadata for the dataset into the chunk. The subsequent `to_zarr` calls performed by `write_to_region` uses this information so that it knows how chunks have to be named. If a region has already been written, it will be overwritten by write_to_region. # 'Initiliazed' means that a first `to_zarr` call has been executed which writes all coordinates and metadata for the dataset into the chunk. The subsequent `to_zarr` calls performed by `write_to_region` uses this information so that it knows how chunks have to be named. If a region has already been written, it will be overwritten by write_to_region.
# In[10]: def write_by_region(self, chunked_ds, store, startchunk, validity_check, chunkdim, varname):
already = self.__open_or_initialize_swift_dset__(store, chunked_ds, chunkdim)
def open_or_initialize_swift_dset(store, ds, chunkdim):
try:
return xarray.open_zarr(store, consolidated=True, decode_cf=True, use_cftime=True)
except:
try: try:
return ds.to_zarr(store, compute=False, consolidated=True, mode='w') already=self.__drop_vars_without_chunkdim__(already, chunkdim)
except: except:
print("Could not initialize dataset.") print("Could not drop vars without chunkdim.",
" This is not an issue if you initialized the dataset in the cloud.")
chunked_ds=self.__drop_vars_without_chunkdim__(chunked_ds, chunkdim) #
# In[11]: all_chunks=chunked_ds.chunks[chunkdim]
chunksum=0
for chunk_no in tqdm(range(0,len(all_chunks))):
def write_by_region(chunked_ds, store, startchunk, validity_check, chunkdim, varname): chunksum+=all_chunks[chunk_no]
already = open_or_initialize_swift_dset(store, chunked_ds, chunkdim) if chunk_no < startchunk :
try: continue
already=drop_vars_without_chunkdim(already, chunkdim) towrite = self.__sel_range_for_chunk__(chunked_ds, chunksum-all_chunks[chunk_no],chunksum, chunkdim) #.load()
except: incloud = self.__sel_range_for_chunk__(already, chunksum-all_chunks[chunk_no],chunksum, chunkdim) #.load()
print("Could not drop vars without chunkdim.", #if towrite.broadcast_equals(incloud):
" This is not an issue if you initialized the dataset in the cloud.") #if towrite[varname].size == incloud[varname].size :
chunked_ds=drop_vars_without_chunkdim(chunked_ds, chunkdim) # if towrite[varname].identical(incloud[varname]):
all_chunks=chunked_ds.chunks[chunkdim] if self.verbose:
chunksum=0 print("datasets for chunk {0} are equal".format(chunk_no+1))
for chunk_no in tqdm(range(0,len(all_chunks))): continue
chunksum+=all_chunks[chunk_no] elif validity_check :
if chunk_no < startchunk : print("Dsets at chunk {0} from {1} are different!".format(chunk_no, len(all_chunks)))
continue return chunk_no
towrite = sel_range_for_chunk(chunked_ds, chunksum-all_chunks[chunk_no],chunksum, chunkdim) #.load() incloud.close()
incloud = sel_range_for_chunk(already, chunksum-all_chunks[chunk_no],chunksum, chunkdim) #.load() write_status = self.__write_chunk_by_region__(towrite, store, chunkdim, chunksum-all_chunks[chunk_no], chunksum)
#if towrite.broadcast_equals(incloud): if write_status > 0 :
#if towrite[varname].size == incloud[varname].size : return write_status
if towrite[varname].identical(incloud[varname]): return 0
if globalverbose:
print("datasets for chunk {0} are equal".format(chunk_no+1)) def write_directly(dset=None, store=None):
continue dset.to_zarr(store=store, mode='w', consolidated=True)
elif validity_check :
print("Dsets at chunk {0} from {1} are different!".format(chunk_no, len(all_chunks))) def write_with_validation_and_retries(self, ds, varname, store, chunkdim, target_mb, startchunk, validity_check, maxretries):
return chunk_no chunked_ds = self.rechunk(ds, varname, chunkdim, target_mb)
incloud.close()
write_status = write_chunk_by_region(towrite, store, chunkdim, chunksum-all_chunks[chunk_no], chunksum)
if write_status > 0 :
return write_status
return 0
# In[12]:
def write_directly(ds,store):
ds.to_zarr(store=store, mode='w', consolidated=True)
# In[13]:
def write_with_validation_and_retries(ds, varname, chunkdim, target_mb, store, startchunk, validity_check, maxretries):
chunked_ds = rechunk(ds, varname, chunkdim, target_mb)
#
retries = 0
success = -1
if startchunk != 0:
success = startchunk
while ( success != 0 and retries < maxretries ) :
success = write_by_region(chunked_ds, store, success, validity_check, chunkdim, varname)
retries += 1
if globalverbose and success != 0:
print("Write by region failed. Now retry number {}.".format(retries))
if success != 0 :
raise RuntimeError("Max retries {0} all failed at chunk no {1}".format(maxretries, success))
if globalverbose:
print("Start validation of write process")
if write_by_region(chunked_ds, store, startchunk, True, chunkdim, varname) != 0:
raise RuntimeError("Validiation failed.")
# In[14]:
def open_store(outid=None):
auth = {
"preauthurl": OS_STORAGE_URL,
"preauthtoken": OS_AUTH_TOKEN,
}
store = SwiftStore(container='cordex-zarr', prefix=outid, storage_options=auth)
return store
# In[15]:
def open_mf_dataset(mf):
#if type(mf) != list and type(mf) != str :
# raise ValueError("Dataset '{0}' must either be a string or a list of strings")
return xarray.open_mfdataset(mf,
decode_cf=True,
use_cftime=True,
concat_dim="time",
data_vars='minimal',
coords='minimal',
compat='override')
# In[16]:
def write_to_swift(mfs=None,
outid=None,
varname=None,
target_mb=1000,
startchunk=0,
validity_check=False,
chunkdim="time",
maxretries=3,
verbose=False):
global globalverbose
ds = open_mf_dataset(mfs)
if verbose:
globalverbose=True
if not varname:
varname=ds.variables[0]
if globalverbose :
print("We use variable {0} in case we need to rechunk.".format(varname))
store = open_store(outid)
# #
timevars=[var for var in ds.variables if "time" in ds[var].coords] retries = 0
if len(timevars) > 0: success = -1
write_with_validation_and_retries(ds, varname, chunkdim, target_mb, store, startchunk, validity_check, maxretries) if startchunk != 0:
else: success = startchunk
write_directly(ds, store) while ( success != 0 and retries < maxretries ) :
ds.close() success = self.write_by_region(chunked_ds, store, success, validity_check, chunkdim, varname)
retries += 1
if self.verbose and success != 0:
print("Write by region failed. Now retry number {}.".format(retries))
if success != 0 :
raise RuntimeError("Max retries {0} all failed at chunk no {1}".format(maxretries, success))
if self.verbose:
print("Start validation of write process")
if self.write_by_region(chunked_ds, store, startchunk, True, chunkdim, varname) != 0:
raise RuntimeError("Validiation failed.")
def write_to_swift(self,
chunkdim="time",
target_mb=1000,
startchunk=0,
validity_check=False,
maxretries=3):
timevars=[var for var in self.mf_dset.variables if "time" in self.mf_dset[var].coords]
if len(timevars) > 0:
self.write_with_validation_and_retries(self.mf_dset,
self.varname,
self.store,
chunkdim,
target_mb,
startchunk,
validity_check,
maxretries)
else:
self.write_directly(self.mf_dset, self.store)
self.mf_dset.close()
# In[17]: # In[17]:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment