Commit 723d15eb authored by Marco Kulüke's avatar Marco Kulüke
Browse files

add package structure

parent 6048a94d
.ipynb_checkpoints/
__pycache__/
swiftenv*
Copyright 2021 Fabian Wachsmann, Marco Kulük
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from setuptools import setup, find_packages
setup(
name='tzis',
version='0.1',
packages=find_packages(exclude=['tests*']),
license='MIT',
description='Convert to Zarr in Swift Object Storage',
long_description=open('README.md').read(),
install_requires=['zarr', 'xarray'],
url='???',
author='Fabian Wachsmann, Marco Kulüke',
author_email='wachsmann@dkrz.de'
)
from . import tzis
#!/usr/bin/env python
# coding: utf-8
# In[1]:
import zarr
from zarrswift import SwiftStore
import xarray
import os
import shutil
from swiftenv import *
import math
from tqdm import tqdm
globalverbose=False
# `getSizeOfVarTimeseries`
# returns the size of the variable `varname`of the entire dataset `ds` used for chunking. Dataset can be multiple files.
#
# - ds
# - is the `xarray` object returned by `xarray.open_mfdataset` and internally passed.
# - varname
# - is a string and an unchanged input from user
#
# In[3]:
def get_size_of_var_timeseries(ds, varname):
return ds[varname].nbytes
# `calcChunkBytes`
#
# estimates the length of one chunk of `varname` of dataset `ds`in the chunk dimension `chunkdim`
# to match the targeted chunk size `target_mb` in the cloud.
#
# - ds
# - is the `xarray` object returned by `xarray.open_mfdataset` and internally passed.
# - varname
# - is a string and an unchanged input from user
# - chunkdim
# - is a string and the coordinate used for chunking. The default is `time`
# - target_mb
# - is an integer and the targeted size of one chunk in the cloud. The default is 1000.
# In[4]:
def calc_chunk_bytes(ds, varname, chunkdim, target_mb):
n_bytes = get_size_of_var_timeseries(ds, varname)
return math.ceil(len(ds[chunkdim]) / math.ceil(n_bytes / (target_mb* (2 ** 20))))
# `rechunk`
#
# rechunks the original dataset in order to have optimized chunk sizes.
# It sets the desired configuration with `xarray` function `chunk` and then loops over all variables in the dataset for unifying their chunks with `unify_chunks`.
# - ds
# - is the `xarray` object returned by `xarray.open_mfdataset`
# - varname
# - is a string and an input from user
# - chunkdim
# - is a string and the coordinate used for chunking. The default is 'time'
# - target_mb
# - is an integer and the targeted size of one chunk in the cloud
# In[5]:
def rechunk(ds, varname, chunkdim, target_mb):
chunk_length = calc_chunk_bytes(ds, varname, chunkdim, target_mb)
chunk_rule = {chunkdim: chunk_length}
if globalverbose:
print("Chunking into chunks of {0} {1} steps".format(chunk_length, chunkdim))
chunked_ds = ds.chunk(chunk_rule)
for var_id in chunked_ds.variables:
chunked_ds[var_id].unify_chunks()
return chunked_ds
# `drop_vars_without_chunkdim`
#
# drops all variables which do not depend on the chunk dimension. This is required because those variables cannot be written chunkwise with `write_by_region`. The coordinate `time_bnds` also has to be dropped because `xarray` does write it at first place when the dataset is initialized in the swift storage. **Returns** the dataset without the dropped variables.
#
# - ds
# - is the `xarray` object returned by `xarray.open_mfdataset`
# - chunkdim
# - is a string and the coordinate used for chunking. The default is 'time'
# In[6]:
def drop_vars_without_chunkdim(ds, chunkdim):
droplist=[var for var in ds.variables if chunkdim not in ds[var].coords]
if "time_bnds" in ds.variables:
droplist+=["time_bnds"]
return ds.drop(droplist)
# `sel_range_for_chunk_by_time`
#
# Selects a time interval spanned from `starttimeindex` to `endtimeindex` from dataset `ds`.
# The indices are controlled in a loop and chosen such that exactly one chunk is matched.
#
# - ds
# - is the `xarray` object returned by `xarray.open_mfdataset`
# - starttimeindex
# - is an integer and the dimension index of the interval start
# - endtimeindex
# - is an integer and the dimension index of the interval end
# In[7]:
def sel_range_for_chunk_by_time(ds, starttimeindex, endtimeindex):
return ds.isel(time=slice(starttimeindex,endtimeindex))
# `sel_range_for_chunk`
#
# Selects an interval spanned from `startindex` to `endindex` within the chunk dimension `chunkdim` from dataset `ds`.
# The indices are controlled in a loop and chosen such that exactly one chunk is matched.
# **Returns** the dataset slice or a ValueError.
#
# - ds
# - is the `xarray` object returned by `xarray.open_mfdataset`
# - startindex
# - is an integer and the dimension index of the interval start
# - endindex
# - is an integer and the dimension index of the interval end
# - chunkdim
# - is a string and the coordinate used for chunking. The default is 'time'
# In[8]:
def sel_range_for_chunk(ds, startindex, endindex, chunkdim):
if chunkdim == "time" :
return sel_range_for_chunk_by_time(ds, startindex, endindex)
else:
raise ValueError('Other chunk dimensions than "time" are not supported yet.')
# `write_chunk_by_region`
#
# Selects an interval spanned from `startindex` to `endindex` within the chunk dimension `chunkdim` from dataset `ds`.
# The indices are controlled in a loop and chosen such that exactly one chunk is matched.
#
# - towrite
# - is an `xarray` dataset object and the slice to be written to cloud.
# - chunkdim
# - is a string and the coordinate used for chunking. The default is 'time'
# - startindex
# - is an integer and the dimension index of the interval start
# - endindex
# - is an integer and the dimension index of the interval end
# In[9]:
def write_chunk_by_region(towrite, store, chunkdim, startindex, endindex):
try:
towrite.to_zarr(store=store,region={chunkdim: slice(startindex, endindex)})
towrite.close()
return 0
except:
return chunk_no
# `write_by_region` writes chunk-wise data into an initialized dataset in swift.
# 'Initiliazed' means that a first `to_zarr` call has been executed which writes all coordinates and metadata for the dataset into the chunk. The subsequent `to_zarr` calls performed by `write_to_region` uses this information so that it knows how chunks have to be named. If a region has already been written, it will be overwritten by write_to_region.
# In[10]:
def open_or_initialize_swift_dset(store, ds, chunkdim):
try:
return xarray.open_zarr(store, consolidated=True, decode_cf=True, use_cftime=True)
except:
try:
return ds.to_zarr(store, compute=False, consolidated=True, mode='w')
except:
print("Could not initialize dataset.")
# In[11]:
def write_by_region(chunked_ds, store, startchunk, validity_check, chunkdim, varname):
already = open_or_initialize_swift_dset(store, chunked_ds, chunkdim)
try:
already=drop_vars_without_chunkdim(already, chunkdim)
except:
print("Could not drop vars without chunkdim.",
" This is not an issue if you initialized the dataset in the cloud.")
chunked_ds=drop_vars_without_chunkdim(chunked_ds, chunkdim) #
all_chunks=chunked_ds.chunks[chunkdim]
chunksum=0
for chunk_no in tqdm(range(0,len(all_chunks))):
chunksum+=all_chunks[chunk_no]
if chunk_no < startchunk :
continue
towrite = sel_range_for_chunk(chunked_ds, chunksum-all_chunks[chunk_no],chunksum, chunkdim) #.load()
incloud = sel_range_for_chunk(already, chunksum-all_chunks[chunk_no],chunksum, chunkdim) #.load()
#if towrite.broadcast_equals(incloud):
#if towrite[varname].size == incloud[varname].size :
if towrite[varname].identical(incloud[varname]):
if globalverbose:
print("datasets for chunk {0} are equal".format(chunk_no+1))
continue
elif validity_check :
print("Dsets at chunk {0} from {1} are different!".format(chunk_no, len(all_chunks)))
return chunk_no
incloud.close()
write_status = write_chunk_by_region(towrite, store, chunkdim, chunksum-all_chunks[chunk_no], chunksum)
if write_status > 0 :
return write_status
return 0
# In[12]:
def write_directly(ds,store):
ds.to_zarr(store=store, mode='w', consolidated=True)
# In[13]:
def write_with_validation_and_retries(ds, varname, chunkdim, target_mb, store, startchunk, validity_check, maxretries):
chunked_ds = rechunk(ds, varname, chunkdim, target_mb)
#
retries = 0
success = -1
if startchunk != 0:
success = startchunk
while ( success != 0 and retries < maxretries ) :
success = write_by_region(chunked_ds, store, success, validity_check, chunkdim, varname)
retries += 1
if globalverbose and success != 0:
print("Write by region failed. Now retry number {}.".format(retries))
if success != 0 :
raise RuntimeError("Max retries {0} all failed at chunk no {1}".format(maxretries, success))
if globalverbose:
print("Start validation of write process")
if write_by_region(chunked_ds, store, startchunk, True, chunkdim, varname) != 0:
raise RuntimeError("Validiation failed.")
# In[14]:
def open_store(outid=None):
auth = {
"preauthurl": OS_STORAGE_URL,
"preauthtoken": OS_AUTH_TOKEN,
}
store = SwiftStore(container='cordex-zarr', prefix=outid, storage_options=auth)
return store
# In[15]:
def open_mf_dataset(mf):
#if type(mf) != list and type(mf) != str :
# raise ValueError("Dataset '{0}' must either be a string or a list of strings")
return xarray.open_mfdataset(mf,
decode_cf=True,
use_cftime=True,
concat_dim="time",
data_vars='minimal',
coords='minimal',
compat='override')
# In[16]:
def write_to_swift(mfs=None,
outid=None,
varname=None,
target_mb=1000,
startchunk=0,
validity_check=False,
chunkdim="time",
maxretries=3,
verbose=False):
global globalverbose
ds = open_mf_dataset(mfs)
if verbose:
globalverbose=True
if not varname:
varname=ds.variables[0]
if globalverbose :
print("We use variable {0} in case we need to rechunk.".format(varname))
store = open_store(outid)
#
timevars=[var for var in ds.variables if "time" in ds[var].coords]
if len(timevars) > 0:
write_with_validation_and_retries(ds, varname, chunkdim, target_mb, store, startchunk, validity_check, maxretries)
else:
write_directly(ds, store)
ds.close()
# In[17]:
cordex_path = "/mnt/lustre01/work/kd0956/CORDEX/data//cordex/output/EUR-11/GERICS//NCC-NorESM1-M/rcp26/r1i1p1/GERICS-REMO2015/v1/day/pr/v20190925/"
mfs_towrite=[cordex_path +filename for filename in os.listdir(cordex_path)]
# In[18]:
write_to_swift(mfs=mfs_towrite,outid="CORDEX.GERICS.NCC-NorESM1-M.rcp26.day.gn.pr",
varname="pr",
verbose=True)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment