tzis.py 12 KB
Newer Older
Marco Kulüke's avatar
Marco Kulüke committed
1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env python
# coding: utf-8

import zarr
from zarrswift import SwiftStore
import xarray
import os
import shutil
import math
from tqdm import tqdm

Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
12
class Tzis(SwiftStore):
13
    def __init__(self,                 
Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
14
15
16
                 os_url,
                 os_token,
                 os_container,
17
18
                 mf_dset=None,
                 varname=None,
Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
19
                 verbose=False                 
20
                ) :
Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
21
22
23
24
25
        auth = {
            "preauthurl": os_url,
            "preauthtoken": os_token,
        }
        SwiftStore.__init__(self, os_container, "", auth)        
26
27
        self.verbose=verbose
        #
Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
28
29
        self.mf_dset=self.open_mf_dataset(mf_dset)
        self.varname=self._get_varname(varname)
30
    
Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
31
    def open_mf_dataset(self, mf):
32
33
        #if type(mf) != list and type(mf) != str :
        #    raise ValueError("Dataset '{0}' must either be a string or a list of strings")
Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
34
35
        if mf:
            return xarray.open_mfdataset(mf,
36
37
38
39
40
41
                                 decode_cf=True,
                                 use_cftime=True,
                                 concat_dim="time",
                                 data_vars='minimal', 
                                 coords='minimal', 
                                 compat='override')
Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
42
43
44
45
46
47
48
49
50
51
52
53
54
55
        return None
    
    def _get_varname(self,
                        varname=None):
        if not varname and self.mf_dset.variables :
            varname=self.mf_dset.variables[0]
        if varname and self.verbose :
            print("We use variable {0} in case we need to rechunk.".format(varname))
        return varname
    
    def open_store(self, os_name):
        self._SwiftStore = SwiftStore(container=self.os_container,
                                      prefix=self.os_name,
                                      storage_options=self.auth)
Marco Kulüke's avatar
Marco Kulüke committed
56
57
58
59
60
61
62
63
64
65
66
67
# `getSizeOfVarTimeseries`
# returns the size of the variable `varname`of the entire dataset `ds` used for chunking. Dataset can be multiple files.
# 
# - ds
#   - is the `xarray` object returned by `xarray.open_mfdataset` and internally passed.
# - varname
#   - is a string and an unchanged input from user
# 

# In[3]:


Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
68
    def _get_size_of_var_timeseries(self, ds, varname):
69
        return ds[varname].nbytes
Marco Kulüke's avatar
Marco Kulüke committed
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88


# `calcChunkBytes`
# 
# estimates the length of one chunk of `varname` of dataset `ds`in the chunk dimension `chunkdim` 
# to match the targeted chunk size `target_mb` in the cloud.
# 
# - ds
#   - is the `xarray` object returned by `xarray.open_mfdataset` and internally passed.
# - varname
#   - is a string and an unchanged input from user
# - chunkdim
#   - is a string and the coordinate used for chunking. The default is `time`
# - target_mb
#   - is an integer and the targeted size of one chunk in the cloud. The default is 1000.

# In[4]:


Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
89
90
    def _calc_chunk_bytes(self, ds, varname, chunkdim, target_mb):
        n_bytes = self._get_size_of_var_timeseries(ds, varname)
91
        return math.ceil(len(ds[chunkdim]) / math.ceil(n_bytes / (target_mb* (2 ** 20))))
Marco Kulüke's avatar
Marco Kulüke committed
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109


# `rechunk`
# 
# rechunks the original dataset in order to have optimized chunk sizes.
# It sets the desired configuration with `xarray` function `chunk` and then loops over all variables in the dataset for unifying their chunks with `unify_chunks`.
# - ds
#   - is the `xarray` object returned by `xarray.open_mfdataset`
# - varname
#   - is a string and an input from user
# - chunkdim
#   - is a string and the coordinate used for chunking. The default is 'time'
# - target_mb
#   - is an integer and the targeted size of one chunk in the cloud

# In[5]:


Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
110
111
    def _rechunk(self, ds, varname, chunkdim, target_mb):
        chunk_length = self._calc_chunk_bytes(ds, varname, chunkdim, target_mb)
112
113
114
115
116
        chunk_rule = {chunkdim: chunk_length}
        
        if self.verbose:
            print("Chunking into chunks of {0} {1} steps".format(chunk_length, chunkdim))
        chunked_ds = ds.chunk(chunk_rule)
Marco Kulüke's avatar
Marco Kulüke committed
117
        
118
119
120
121
        for var_id in chunked_ds.variables:
            chunked_ds[var_id].unify_chunks()
            
        return chunked_ds
Marco Kulüke's avatar
Marco Kulüke committed
122
123
124
125
126
127
128
129
130
131
132
133
134
135


# `drop_vars_without_chunkdim`
# 
# drops all variables which do not depend on the chunk dimension. This is required because those variables cannot be written chunkwise with `write_by_region`. The coordinate `time_bnds` also has to be dropped because `xarray` does write it at first place when the dataset is initialized in the swift storage. **Returns** the dataset without the dropped variables.
# 
# - ds
#   - is the `xarray` object returned by `xarray.open_mfdataset`
# - chunkdim
#   - is a string and the coordinate used for chunking. The default is 'time'

# In[6]:


Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
136
    def _drop_vars_without_chunkdim(self,ds, chunkdim):
137
138
139
140
        droplist=[var for var in ds.variables if chunkdim not in ds[var].coords]
        if "time_bnds" in ds.variables:
            droplist+=["time_bnds"]
        return ds.drop(droplist)
Marco Kulüke's avatar
Marco Kulüke committed
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157


# `sel_range_for_chunk_by_time`
# 
# Selects a time interval spanned from `starttimeindex` to `endtimeindex` from dataset `ds`.
# The indices are controlled in a loop and chosen such that exactly one chunk is matched.
# 
# - ds
#   - is the `xarray` object returned by `xarray.open_mfdataset`
# - starttimeindex
#   - is an integer and the dimension index of the interval start
# - endtimeindex
#   - is an integer and the dimension index of the interval end

# In[7]:


Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
158
    def _sel_range_for_chunk_by_time(ds, starttimeindex, endtimeindex):
159
        return ds.isel(time=slice(starttimeindex,endtimeindex))
Marco Kulüke's avatar
Marco Kulüke committed
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179


# `sel_range_for_chunk`
# 
# Selects an interval spanned from `startindex` to `endindex` within the chunk dimension `chunkdim` from dataset `ds`.
# The indices are controlled in a loop and chosen such that exactly one chunk is matched.
# **Returns** the dataset slice or a ValueError.
# 
# - ds
#   - is the `xarray` object returned by `xarray.open_mfdataset`
# - startindex
#   - is an integer and the dimension index of the interval start
# - endindex
#   - is an integer and the dimension index of the interval end
# - chunkdim
#   - is a string and the coordinate used for chunking. The default is 'time'

# In[8]:


Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
180
    def _sel_range_for_chunk(self, ds, startindex, endindex, chunkdim):
181
        if chunkdim == "time" :
Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
182
            return self._sel_range_for_chunk_by_time(ds, startindex, endindex)
183
184
        else:
            raise ValueError('Other chunk dimensions than "time" are not supported yet.')
Marco Kulüke's avatar
Marco Kulüke committed
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203


# `write_chunk_by_region`
# 
# Selects an interval spanned from `startindex` to `endindex` within the chunk dimension `chunkdim` from dataset `ds`.
# The indices are controlled in a loop and chosen such that exactly one chunk is matched.
# 
# - towrite
#   - is an `xarray` dataset object and the slice to be written to cloud.
# - chunkdim
#   - is a string and the coordinate used for chunking. The default is 'time'
# - startindex
#   - is an integer and the dimension index of the interval start
# - endindex
#   - is an integer and the dimension index of the interval end

# In[9]:


Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
204
    def _write_chunk_by_region(self, towrite, store, chunkdim, startindex, endindex):
205
206
207
208
209
210
        try:
            towrite.to_zarr(store=store,region={chunkdim: slice(startindex, endindex)})
            towrite.close()
            return 0
        except:
            return chunk_no
Marco Kulüke's avatar
Marco Kulüke committed
211

Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
212
    def _open_or_initialize_swift_dset(self, store, ds, chunkdim):
213
214
215
216
217
218
219
220
        try:
            return xarray.open_zarr(store, consolidated=True, decode_cf=True, use_cftime=True)
        except:
            try:
                return ds.to_zarr(store, compute=False, consolidated=True, mode='w')
            except:
                print("Could not initialize dataset.")

Marco Kulüke's avatar
Marco Kulüke committed
221
222
223
# `write_by_region` writes chunk-wise data into an initialized dataset in swift.
# 'Initiliazed' means that a first `to_zarr` call has been executed which writes all coordinates and metadata for the dataset into the chunk. The subsequent `to_zarr` calls performed by `write_to_region` uses this information so that it knows how chunks have to be named. If a region has already been written, it will be overwritten by write_to_region.

224
    def write_by_region(self, chunked_ds, store, startchunk, validity_check, chunkdim, varname):
Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
225
        already = self._open_or_initialize_swift_dset(store, chunked_ds, chunkdim)
Marco Kulüke's avatar
Marco Kulüke committed
226
        try:
Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
227
            already=self._drop_vars_without_chunkdim(already, chunkdim)
Marco Kulüke's avatar
Marco Kulüke committed
228
        except:
229
230
            print("Could not drop vars without chunkdim.",
                  " This is not an issue if you initialized the dataset in the cloud.")
Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
231
        chunked_ds=self._drop_vars_without_chunkdim(chunked_ds, chunkdim)    #
232
233
234
235
236
237
        all_chunks=chunked_ds.chunks[chunkdim]
        chunksum=0
        for chunk_no in tqdm(range(0,len(all_chunks))):
            chunksum+=all_chunks[chunk_no]
            if chunk_no < startchunk :
                continue
Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
238
239
            towrite = self._sel_range_for_chunk(chunked_ds, chunksum-all_chunks[chunk_no],chunksum, chunkdim) #.load()
            incloud = self._sel_range_for_chunk(already, chunksum-all_chunks[chunk_no],chunksum, chunkdim) #.load()
240
241
242
243
244
245
246
247
248
249
            #if towrite.broadcast_equals(incloud):
            #if towrite[varname].size == incloud[varname].size :
            if towrite[varname].identical(incloud[varname]):
                if self.verbose:
                    print("datasets for chunk {0} are equal".format(chunk_no+1))
                continue
            elif validity_check :
                print("Dsets at chunk {0} from {1} are different!".format(chunk_no, len(all_chunks)))
                return chunk_no
            incloud.close()
Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
250
            write_status = self._write_chunk_by_region(towrite, store, chunkdim, chunksum-all_chunks[chunk_no], chunksum)
251
252
253
254
255
256
257
258
259
            if write_status > 0 :
                return write_status
            return 0

    def write_directly(dset=None, store=None):
        dset.to_zarr(store=store, mode='w', consolidated=True)

    def write_with_validation_and_retries(self, ds, varname, store, chunkdim, target_mb, startchunk, validity_check, maxretries):
        chunked_ds = self.rechunk(ds, varname, chunkdim, target_mb)    
Marco Kulüke's avatar
Marco Kulüke committed
260
    #
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
        retries = 0
        success = -1
        if startchunk != 0:
            success = startchunk
        while ( success != 0 and retries < maxretries ) :
            success = self.write_by_region(chunked_ds, store, success, validity_check, chunkdim, varname)
            retries += 1
            if self.verbose and success != 0:
                print("Write by region failed. Now retry number {}.".format(retries))
        if success != 0 :
            raise RuntimeError("Max retries {0} all failed at chunk no {1}".format(maxretries, success))
        if self.verbose:
            print("Start validation of write process")
        if self.write_by_region(chunked_ds, store, startchunk, True, chunkdim, varname) != 0:
            raise RuntimeError("Validiation failed.")

    def write_to_swift(self,
                       chunkdim="time",
                       target_mb=1000,
                       startchunk=0,
                       validity_check=False,
                       maxretries=3):
        timevars=[var for var in self.mf_dset.variables if "time" in self.mf_dset[var].coords]
        if len(timevars) > 0:
            self.write_with_validation_and_retries(self.mf_dset,
                                                   self.varname,
Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
287
                                                   self._SwiftStore,
288
289
290
291
292
293
                                                   chunkdim,
                                                   target_mb,
                                                   startchunk,
                                                   validity_check,
                                                   maxretries)
        else:
Fabian Wachsmann's avatar
Updates    
Fabian Wachsmann committed
294
            self.write_directly(self.mf_dset, self._SwiftStore)
295
        self.mf_dset.close()
Marco Kulüke's avatar
Marco Kulüke committed
296
297
298

# In[17]:

Marco Kulüke's avatar
Marco Kulüke committed
299
#cordex_path = "/mnt/lustre01/work/kd0956/CORDEX/data//cordex/output/EUR-11/GERICS//NCC-NorESM1-M/rcp26/r1i1p1/GERICS-REMO2015/v1/day/pr/v20190925/"
Marco Kulüke's avatar
Marco Kulüke committed
300

Marco Kulüke's avatar
Marco Kulüke committed
301
#mfs_towrite=[cordex_path +filename for filename in os.listdir(cordex_path)]
Marco Kulüke's avatar
Marco Kulüke committed
302
303
304
305
306


# In[18]:


Marco Kulüke's avatar
Marco Kulüke committed
307
308
309
#write_to_swift(mfs=mfs_towrite,outid="CORDEX.GERICS.NCC-NorESM1-M.rcp26.day.gn.pr",
#               varname="pr",
#               verbose=True)