From 3e4425126925e5b8fef2b4df8df01a2ffdfe8fa7 Mon Sep 17 00:00:00 2001 From: Fabian Wachsmann <k204210@l10395.lvt.dkrz.de> Date: Wed, 5 Jul 2023 16:21:01 +0200 Subject: [PATCH] Updated tzis --- tzis.egg-info/PKG-INFO | 19 +++++++++---------- tzis.egg-info/requires.txt | 2 +- tzis/daskhandling.py | 4 ++-- tzis/provenance.py | 7 ++++--- tzis/rechunker.py | 17 +++++++++++++---- tzis/swifthandling.py | 2 +- tzis/tzis.py | 24 ++++++++++++++++++------ 7 files changed, 48 insertions(+), 27 deletions(-) diff --git a/tzis.egg-info/PKG-INFO b/tzis.egg-info/PKG-INFO index d16fb7b..248240f 100644 --- a/tzis.egg-info/PKG-INFO +++ b/tzis.egg-info/PKG-INFO @@ -21,7 +21,7 @@ pip install git+https://gitlab.dkrz.de/data-infrastructure-services/tzis.git#egg ```python from tzis import swifthandling, openmf, tzis,catalog -import glob +import glob, os import xarray as xr #set credentials, tzis will ask for a password @@ -37,7 +37,7 @@ glob_path_var = "/work/ik1017/CMIP6/data/CMIP6/ScenarioMIP/DKRZ/MPI-ESM1-2-HR/ss varname = "tas" chunkdim = "time" -target_mb = 10 +target_mb = 16 # @@ -47,7 +47,7 @@ token=swifthandling.get_token( user ) -fsmap=swifthandling.get_swift_mapper( +target_fsmap=swifthandling.get_swift_mapper( token["OS_STORAGE_URL"], token["OS_AUTH_TOKEN"], container_name, @@ -57,7 +57,7 @@ fsmap=swifthandling.get_swift_mapper( omo = openmf.open_mfdataset_optimize( glob_path_var, varname, - fsmap, + target_fsmap, chunkdim=chunkdim, target_mb=target_mb ) @@ -73,14 +73,13 @@ outstore=tzis.write_zarr( xr.open_zarr( outstore, consolidated=True -) +) + catalog.write_catalog( - catalogname="catalog.json", - pattern=None, - delim=".", - columns=None, - mode="a" + omo.target_fsmap.fs, + os.path.dirname(omo.target_fsmap.root), + prefix_for_object_storage ) ``` diff --git a/tzis.egg-info/requires.txt b/tzis.egg-info/requires.txt index 19c6be2..8d34870 100644 --- a/tzis.egg-info/requires.txt +++ b/tzis.egg-info/requires.txt @@ -9,7 +9,7 @@ fsspec dask[distributed] intake aiohttp_retry -pandas<1.4.0 +pandas swiftspec@ git+https://github.com/fsspec/swiftspec python-swiftclient>=3.10.0 pytest diff --git a/tzis/daskhandling.py b/tzis/daskhandling.py index 701e892..e6aa56a 100644 --- a/tzis/daskhandling.py +++ b/tzis/daskhandling.py @@ -6,7 +6,7 @@ import math def reset_dask_chunk_sizes( client, target_mb, verbose, DASK_CHUNK_SIZE, other_chunk_dims ): - new_dsc = int(DASK_CHUNK_SIZE / other_chunk_dims) + new_dsc = math.ceil(DASK_CHUNK_SIZE / other_chunk_dims) worker_info = client.scheduler_info()["workers"] avail_mem = ( sum([worker["memory_limit"] for key, worker in worker_info.items()]) @@ -22,7 +22,7 @@ def reset_dask_chunk_sizes( ) required_mb = new_dsc * target_mb * 5 if avail_mem < required_mb: - new_dsc = int(avail_mem / (target_mb * 5)) + new_dsc = math.ceil(avail_mem / (target_mb * 5)) if verbose: print( f"""Reducing dask tasks size from {DASK_CHUNK_SIZE} to diff --git a/tzis/provenance.py b/tzis/provenance.py index d52a132..18685fc 100644 --- a/tzis/provenance.py +++ b/tzis/provenance.py @@ -41,15 +41,16 @@ class Provenance(object): self.container = None self.prefix = None if self.url.startswith("swift"): - self.url = os.path.dirname(os.path.dirname(url)) - self.container = os.path.basename(os.path.dirname(url)) - self.prefix = os.path.basename(url) if len(self.url.split('/')) < 6: if len(self.url.split('/')) < 5: raise ValueError(f"The provided {url} does not include a container") self.url = os.path.dirname(url) self.container = os.path.basename(url) self.prefix = self._identifier + else: + self.url = os.path.dirname(os.path.dirname(url)) + self.container = os.path.basename(os.path.dirname(url)) + self.prefix = os.path.basename(url) diff --git a/tzis/rechunker.py b/tzis/rechunker.py index 9e3c7f9..887e2e0 100644 --- a/tzis/rechunker.py +++ b/tzis/rechunker.py @@ -28,11 +28,20 @@ def calc_chunk_length(ds, varname, chunkdim, target_mb, other_chunks): ------ """ n_bytes = ds[varname].nbytes - target_mb_matching_len=math.ceil( + bytes_per_onestep=math.ceil(n_bytes / ( len(ds[chunkdim]) * other_chunks)) + if bytes_per_onestep > math.ceil(1.5*target_mb * (2**20)) : + raise ValueError( + f"You specified a chunkdim {chunkdim} which would contain {bytes_per_onestep/(2**20)}MB " + f"which is more than the specified {target_mb}mb for variable {varname} per one step." + ) + target_mb_matching_len=math.floor( len(ds[chunkdim]) * other_chunks / math.ceil(n_bytes / (target_mb * (2**20))) ) - return math.ceil(len(ds[chunkdim])/(math.ceil(len(ds[chunkdim]) / target_mb_matching_len))) - + #return_len=math.floor(len(ds[chunkdim])/(math.ceil(len(ds[chunkdim]) / target_mb_matching_len))) + if target_mb_matching_len== 0: + return 1 + return target_mb_matching_len + def calc_other_dim_chunks(orig_chunks_dict, chunkdim): other_dim_chunks = 1 @@ -96,4 +105,4 @@ def rechunk(ds, varname, chunkdim, target_mb, verbose): for var_id in chunked_ds.variables: chunked_ds[var_id].unify_chunks() - return chunked_ds, b_is_safe_chunk + return chunked_ds diff --git a/tzis/swifthandling.py b/tzis/swifthandling.py index f0eca7d..af184c4 100644 --- a/tzis/swifthandling.py +++ b/tzis/swifthandling.py @@ -77,7 +77,7 @@ def mapper_mkdirs(url, container, prefix=None): conn = Connection( preauthurl=getenv("OS_STORAGE_URL"), preauthtoken=getenv("OS_AUTH_TOKEN") ) - print(f"Check if {container} is in {url}. Otherwise make it.") + print(f"Check if container '{container}' is in url '{url}' or create it.") listings = fsmap.fs.listdir(url) if not container in [a["name"].split("/")[-1] for a in listings]: conn.put_container(container) diff --git a/tzis/tzis.py b/tzis/tzis.py index 09f6406..9e4afc9 100644 --- a/tzis/tzis.py +++ b/tzis/tzis.py @@ -177,6 +177,7 @@ class write_zarr(fsspec.FSMap): chunk_independent_list = [ var for var in ds.variables if chunkdim not in ds[var].coords ] + print(chunk_independent_list) if chunk_independent_list: ds[chunk_independent_list].to_zarr( storestring, @@ -320,13 +321,19 @@ class write_zarr(fsspec.FSMap): # if not validity_check: # print("Chunk {0} is skipped.".format(chunk_no + 1)) return -1 - elif validity_check: + else : print( f"Variable subset for index interval {start_chunkdim_index} - {end_chunkdim_index} " f"of chunkdim {chunkdim} are different." ) - return start_chunkdim_index - return 0 + if self.verbose: + print("to write:") + print(towrite[varname]) + print("in target:") + print(intarget[varname]) + if validity_check: + return start_chunkdim_index + return 0 def _get_start_chunk( self, @@ -339,7 +346,7 @@ class write_zarr(fsspec.FSMap): validity_check, varname, ): - if not start_chunk and self.verbose: + if start_chunk <= 0 and self.verbose: print( "Finding the start chunk can take long. You better provide the startchunk yourself" ) @@ -635,7 +642,12 @@ class write_zarr(fsspec.FSMap): if status_equality < 0: continue elif status_equality > 0: + if b_islocalclient: + client.close() return status_equality + if b_islocalclient: + client.close() + else: try: self.write_by_region_dask( @@ -678,7 +690,7 @@ class write_zarr(fsspec.FSMap): chunked_ds = ds if target_mb != 0: - chunked_ds, is_safe_chunk = rechunk( + chunked_ds = rechunk( ds, varname, chunkdim, target_mb, self.verbose ) elif target_mb == 0: @@ -691,7 +703,7 @@ class write_zarr(fsspec.FSMap): if self.verbose: print(f"original dask chunk size {target_mb}mb is used for chunking.") if target_mb > 2000: - printf(f"Warning: Your target_mb {target_mb} is too big for swift (max 2GB") + print(f"Warning: Your target_mb {target_mb} is too big for swift (max 2GB") # l_append = False -- GitLab