diff --git a/tzis/catalog.py b/tzis/catalog.py index cd16d9104543783a297312940c8daff37d55f998..41a04a175a8ee00995ef25e9e0c2ffb302d63ef3 100644 --- a/tzis/catalog.py +++ b/tzis/catalog.py @@ -4,15 +4,16 @@ from datetime import datetime import json import copy import fsspec +from .swifthandling import * -def write_index_file(store, url,pattern=None, contact=None): +def write_index_file(store, url, pattern=None, contact=None): index_name = "INDEX.csv" header = [ "contact", "prefix", "size", - #"first_modification", + # "first_modification", "last_modification", "url", ] @@ -22,14 +23,12 @@ def write_index_file(store, url,pattern=None, contact=None): writer.writerow(header) - all_zarr_datasets = [entry["name"].split('/')[-1] - for entry in store.listdir(url) - ] + all_zarr_datasets = [entry["name"].split("/")[-1] for entry in store.listdir(url)] catalog = next((zd for zd in all_zarr_datasets if "catalog" in zd), None) if catalog: all_zarr_datasets.remove(catalog) if index_name in all_zarr_datasets: - store.rm('/'.join([url,index_name])) + store.rm("/".join([url, index_name])) all_zarr_datasets.remove(index_name) if pattern: all_zarr_datasets = [zd for zd in all_zarr_datasets if pattern in zd] @@ -38,33 +37,36 @@ def write_index_file(store, url,pattern=None, contact=None): all_zarr_datasets = [zd for zd in all_zarr_datasets if outsort not in zd] for zd in all_zarr_datasets: - sumbytes = store.du('/'.join([url,zd])) - modified=0 + sumbytes = store.du("/".join([url, zd])) + modified = 0 try: - modified = [entry["last_modified"] - for entry in store.ls('/'.join([url,zd])) - if entry["name"].endswith(".zmetadata")] - if modified : - modified=modified[0] + modified = [ + entry["last_modified"] + for entry in store.ls("/".join([url, zd])) + if entry["name"].endswith(".zmetadata") + ] + if modified: + modified = modified[0] except Exception as e: print(e) print("Modification time is set to 0") - + writer.writerow( [ contact, zd, sumbytes, - #min(modiflist), + # min(modiflist), modified, "/".join([url, zd]), ] ) # all_zarr_datasets_bytes = u'\n'.join(all_zarr_datasets).encode('utf8').strip() - with store.open("/".join([url, index_name]),"wb") as f: + with store.open("/".join([url, index_name]), "wb") as f: f.write(bytes(output.getvalue(), "utf8")) + def write_catalog( store, fsmap, @@ -83,16 +85,19 @@ def write_catalog( "url": None, } + if not isinstance(fsmap, fsspec.FSMap): + fsmap = fsspec.get_mapper(fsmap) returnurl = "/".join([fsmap.root, catalogname]) + returnurl = get_os_url(returnurl) if not len(delim) == 1: raise ValueError( "Delim must have lenght 1 but {0} has length {1}".format(delim, len(delim)) ) - all_zarr_datasets = [entry["name"].split('/')[-1] - for entry in store.listdir(fsmap.root) - ] + all_zarr_datasets = [ + entry["name"].split("/")[-1] for entry in store.listdir(fsmap.root) + ] b_append = True if not catalogname in all_zarr_datasets: @@ -129,7 +134,7 @@ def write_catalog( b_usedelim = True if b_append: - catalog=json.loads(fsmap[catalogname]) + catalog = json.loads(fsmap[catalogname]) if any(prefix == entry["prefix"] for entry in catalog["catalog_dict"]): if verbose: print("Dataset {0} already in catalog".format(prefix)) @@ -168,8 +173,10 @@ def write_catalog( if max_col_length > len(columns) and verbose: print("Too less column values specified in columns") - template_url = "swift://swift.dkrz.de/dkrz_a44962e3ba914c309a7421573a6949a6/intake-esm/" - intakemap=fsspec.get_mapper(template_url) + template_url = ( + "swift://swift.dkrz.de/dkrz_a44962e3ba914c309a7421573a6949a6/intake-esm/" + ) + intakemap = fsspec.get_mapper(template_url) catalog = json.loads(intakemap["dkrz_tzis_template.json"]) catalog["catalog_dict"] = [] diff --git a/tzis/daskhandling.py b/tzis/daskhandling.py index e936777bb4da1c397b1741ae1afa737e036df71b..701e89214a1019fe9b7c7632c3d825631e210c05 100644 --- a/tzis/daskhandling.py +++ b/tzis/daskhandling.py @@ -2,8 +2,11 @@ from dask.distributed import get_client, Client, system from dask.system import CPU_COUNT import math -def reset_dask_chunk_sizes(client, target_mb, verbose, DASK_CHUNK_SIZE, other_chunk_dims): - new_dsc=int(DASK_CHUNK_SIZE/other_chunk_dims) + +def reset_dask_chunk_sizes( + client, target_mb, verbose, DASK_CHUNK_SIZE, other_chunk_dims +): + new_dsc = int(DASK_CHUNK_SIZE / other_chunk_dims) worker_info = client.scheduler_info()["workers"] avail_mem = ( sum([worker["memory_limit"] for key, worker in worker_info.items()]) @@ -29,18 +32,21 @@ def reset_dask_chunk_sizes(client, target_mb, verbose, DASK_CHUNK_SIZE, other_ch ) return new_dsc + def get_client_and_config(target_mb, DASK_CHUNK_SIZE, verbose, other_chunk_dims): b_islocalclient = True try: client = get_client() b_islocalclient = False except: - cpus=math.ceil(CPU_COUNT/2) + cpus = min(math.ceil(CPU_COUNT / 2), 4) client = Client( n_workers=cpus, - #threads_per_worker=cpus, - memory_limit=math.ceil(system.MEMORY_LIMIT/2/cpus) + # threads_per_worker=cpus, + memory_limit=min(math.ceil(system.MEMORY_LIMIT / 2 / cpus), 20 * 10**9), ) print(client) - new_dsc=reset_dask_chunk_sizes(client, target_mb, verbose, DASK_CHUNK_SIZE, other_chunk_dims) - return client, b_islocalclient, new_dsc \ No newline at end of file + new_dsc = reset_dask_chunk_sizes( + client, target_mb, verbose, DASK_CHUNK_SIZE, other_chunk_dims + ) + return client, b_islocalclient, new_dsc diff --git a/tzis/openmf.py b/tzis/openmf.py index 3ce68baada077b8a0b1e422c0cd0915d07438c33..a6f54a0a4f40c45b3f3aad4c8d9216b97201158a 100644 --- a/tzis/openmf.py +++ b/tzis/openmf.py @@ -2,6 +2,7 @@ import fsspec import xarray from .provenance import Provenance from .rechunker import calc_chunk_length +import glob OPEN_MFDATASET_KWARGS = dict( decode_cf=True, @@ -12,6 +13,7 @@ OPEN_MFDATASET_KWARGS = dict( combine_attrs="drop_conflicts", ) + class open_mfdataset_optimize: """ Opens the dataset with xarrays `open_mfdataset` @@ -32,8 +34,8 @@ class open_mfdataset_optimize: chunkdim=None : str chunkdim is the chunk dimension used for rechunking. Only set this in combination with target_mb. target_mb=None : int - target_mb is the desired size of one chunk in the target storage in megabytes. - Only set this in combination with chunkdim. + target_mb is the desired size of one chunk in the target storage in megabytes. + Only set this in combination with chunkdim. xarray_kwargs=None : dict xarray_kwargs are unpacked within `open_mfdataset`. @@ -43,79 +45,91 @@ class open_mfdataset_optimize: Dataset or None The xarray dataset with optimized chunk setting and attributes. """ - def __init__(self, - mf, - varname, - target_fsmap, - chunkdim=None, - target_mb=None, - xarray_kwargs=None, - verbose=False): - - # if type(mf) != list and type(mf) != str : - # raise ValueError("Dataset '{0}' must either be a string or a list of strings") - self.mf=mf - self.target_fsmap=target_fsmap - self.varname=varname - self.mf_dset=None - self.original_size=None - self.provenance=None + + def __init__( + self, + mf, + varname, + target_fsmap, + chunkdim=None, + target_mb=None, + xarray_kwargs=None, + verbose=False, + ): + + # if type(mf) != list and type(mf) != str : + # raise ValueError("Dataset '{0}' must either be a string or a list of strings") + self.mf = mf + self.target_fsmap = target_fsmap + if not isinstance(target_fsmap, fsspec.FSMap): + self.target_fsmap = fsspec.get_mapper(target_fsmap) + self.varname = varname + self.mf_dset = None + self.original_size = None + self.provenance = None if verbose: print("Resetting disk size and compression ratio") + imf = mf if type(mf) == str: - mf = [mf] + imf = glob.glob(mf) - if verbose: - print("Calculating chunk size") + if verbose: + print("Opening test dataset") if xarray_kwargs is not None: - testds=xarray.open_dataset(mf[0], **xarray_kwargs) - else: - testds=xarray.open_dataset(mf[0]) + OPEN_MFDATASET_KWARGS.update(xarray_kwargs) + + testds = xarray.open_mfdataset(imf, **OPEN_MFDATASET_KWARGS) if not varname: varname = get_varname(testds) else: if varname not in list(testds.data_vars): raise ValueError( - "Given variable name {0} not in dataset {1}".format( - varname, mf[0] + "Given variable name {0} not in dataset.".format( + varname, ) ) - if xarray_kwargs: - OPEN_MFDATASET_KWARGS.update(xarray_kwargs) + if verbose: + print(f"Calculating size of {mf}") - l_chunksset=False + tempfs = fsspec.get_mapper(imf[0]).fs + self.original_size = sum([tempfs.du(mf_input) for mf_input in imf]) + + if verbose: + print("Calculating chunk size") + + l_chunksset = False if chunkdim and target_mb: - size=fsspec.get_mapper(mf[0]).fs.du(mf[0])/1024/1024 + size = self.original_size / 1024 / 1024 if size < target_mb: - print(f"In open_mfdataset_optimize we cannot set target_mb {target_mb} because " - f"the size of the testfile {mf[0]} is {size}MB and therefore smaller.") - else : - chunk_length = calc_chunk_length(testds, - varname, - chunkdim, - target_mb, - 1) + print( + f"In open_mfdataset_optimize we cannot set target_mb {target_mb} because " + f"the size of the test dataset {mf} is {size}MB and therefore smaller." + ) + else: + chunk_length = calc_chunk_length( + testds, varname, chunkdim, target_mb, 1 + ) if verbose: print(f"Set chunks to {chunkdim}: {chunk_length}") OPEN_MFDATASET_KWARGS.update(dict(chunks={chunkdim: chunk_length})) - l_chunksset=True + l_chunksset = True testds.close() - mf_dset = xarray.open_mfdataset(mf, **OPEN_MFDATASET_KWARGS) # , + mf_dset = xarray.open_mfdataset(imf, **OPEN_MFDATASET_KWARGS) # , if chunkdim and target_mb and l_chunksset: mf_dset = mf_dset.chunk({chunkdim: chunk_length}) for var_id in mf_dset.variables: mf_dset[var_id].unify_chunks() - conflict_attrs = get_conflict_attrs(mf, mf_dset, xarray_kwargs) + conflict_attrs = get_conflict_attrs(imf, mf_dset, xarray_kwargs) self.provenance = Provenance(target_fsmap.root) @@ -124,12 +138,10 @@ class open_mfdataset_optimize: "input_tracking_id", conflict_attrs["tracking_id"] ) else: - self.provenance.gen_input("input_file_name", mf) + self.provenance.gen_input("input_file_name", imf) - self.original_size=sum([fsspec.get_mapper(mf_input).fs.du(mf_input) - for mf_input in mf] - ) from .tzis import __version__ as tzis_version + mf_dset.attrs["tracking_id"] = self.provenance.tracking_id mf_dset.attrs.setdefault("history", "") mf_dset.attrs[ @@ -145,15 +157,18 @@ class open_mfdataset_optimize: # for coord in mf_dset.coords: # mf_dset[coord].load() - self.mf_dset=mf_dset + self.mf_dset = mf_dset + def __str__(self): return self.mf_dset.__str__() + def __repr__(self): return self.mf_dset.__repr__() + def get_conflict_attrs(mf, mf_dset, xarray_kwargs): """ - Collects attributes which conflict within all single dsets in `mf`. + Collects attributes which conflict within all single dsets in `mf`. It opens all elements of `mf` with `xarray.open_dataset` and collects attributes in a dictionary and their values in lists. @@ -180,13 +195,11 @@ def get_conflict_attrs(mf, mf_dset, xarray_kwargs): digitstring = "{0:0" + str(maxdigits) + "}" for fileno, dset in enumerate(mf): if xarray_kwargs is not None: - ds=xarray.open_dataset(dset,**xarray_kwargs) + ds = xarray.open_dataset(dset, **xarray_kwargs) else: - ds=xarray.open_dataset(dset) + ds = xarray.open_dataset(dset) dset_attrs = ds.attrs - missing_attrs = { - k: v for k, v in dset_attrs.items() if k not in mf_dset.attrs - } + missing_attrs = {k: v for k, v in dset_attrs.items() if k not in mf_dset.attrs} for k, v in missing_attrs.items(): # attr_prefix=" File "+digitstring.format(fileno)+ ": " # if dset_attrs["tracking_id"]: @@ -200,6 +213,7 @@ def get_conflict_attrs(mf, mf_dset, xarray_kwargs): # print("Could not collect all attributes.") return conflict_attrs + def get_varname(mf_dset): return_varname = "" varlist = list(mf_dset.data_vars) @@ -218,7 +232,5 @@ def get_varname(mf_dset): "Could not find any variable to write to swift. Please specify varname." ) if verbose: - print( - "We use variable {0} in case we need to rechunk.".format(return_varname) - ) - return return_varname \ No newline at end of file + print("We use variable {0} in case we need to rechunk.".format(return_varname)) + return return_varname diff --git a/tzis/provenance.py b/tzis/provenance.py index 705c4cad6801c1936440e0400dc8c47de380e9ad..b7b60ab4b70ae8ed4978ffbc46837da8c0e0af61 100644 --- a/tzis/provenance.py +++ b/tzis/provenance.py @@ -29,6 +29,7 @@ DCTERMS_SOURCE = DCTERMS["source"] # tzis namespace TZIS = Namespace("tzis", uri="urn:tzis:") + class Provenance(object): def __init__(self, url): from .tzis import __version__ as tzis_version @@ -36,17 +37,13 @@ class Provenance(object): self._identifier = None self.tzis_version = tzis_version - self.url=url - self.container=None - self.prefix=None + self.url = url + self.container = None + self.prefix = None if self.url.startswith("swift"): - self.url=os.path.dirname( - os.path.dirname(url) - ) - self.container=os.path.basename( - os.path.dirname(url) - ) - self.prefix=os.path.basename(url) + self.url = os.path.dirname(os.path.dirname(url)) + self.container = os.path.basename(os.path.dirname(url)) + self.prefix = os.path.basename(url) # Create an empyty prov document self.doc = prov.ProvDocument() @@ -118,27 +115,24 @@ class Provenance(object): output = io.StringIO() self.doc.serialize(destination=output) # - store=get_swift_mapper(self.url, - os.getenv("OS_AUTH_TOKEN"), - self.container, - "provenance") + store = get_swift_mapper( + self.url, os.getenv("OS_AUTH_TOKEN"), self.container, "provenance" + ) store[f"provenance_{self.prefix}_{self.tracking_id}.json"] = bytes( output.getvalue(), "utf8" ) def write_png(self): figure = prov_to_dot(self.doc) - store=get_swift_mapper(self.url, - os.getenv("OS_AUTH_TOKEN"), - self.container, - "provenance") + store = get_swift_mapper( + self.url, os.getenv("OS_AUTH_TOKEN"), self.container, "provenance" + ) try: - store[f"provenance_{self.prefix}_{self.tracking_id}.png"] = figure.create_png() + store[ + f"provenance_{self.prefix}_{self.tracking_id}.png" + ] = figure.create_png() except: - print( - "Could not create provenance png." - ) - + print("Could not create provenance png.") def get_provn(self): return self.doc.get_provn() diff --git a/tzis/rechunker.py b/tzis/rechunker.py index ff7a5ae7c8e2122113fb1ff2d68f0ded83583beb..1cea83d8172b70b33838da97b713833362c2fa69 100644 --- a/tzis/rechunker.py +++ b/tzis/rechunker.py @@ -1,6 +1,7 @@ import xarray import math + def calc_chunk_length(ds, varname, chunkdim, target_mb, other_chunks): """Estimates the length of one chunk of `varname` of dataset `ds`in the chunk dimension `chunkdim` to match the targeted chunk size `target_mb` in the cloud. @@ -31,12 +32,13 @@ def calc_chunk_length(ds, varname, chunkdim, target_mb, other_chunks): len(ds[chunkdim]) * other_chunks / math.ceil(n_bytes / (target_mb * (2**20))) ) + def calc_other_dim_chunks(orig_chunks_dict, chunkdim): - other_dim_chunks=1 + other_dim_chunks = 1 for k, v in orig_chunks_dict.items(): - chunklen=len(v) + chunklen = len(v) if k != chunkdim and chunklen > 1: - other_dim_chunks*=chunklen + other_dim_chunks *= chunklen return other_dim_chunks @@ -61,11 +63,11 @@ def rechunk(ds, varname, chunkdim, target_mb, verbose): An `xarray` dataset with unified and rechunked chunks according. """ b_is_safe_chunk = True - orig_chunks_dict=dict(ds[varname].chunksizes.items()) + orig_chunks_dict = dict(ds[varname].chunksizes.items()) orig_chunks = [len(v) for k, v in orig_chunks_dict.items()] orig_no_of_chunks = 1 - for len_of_chunk in orig_chunks : - orig_no_of_chunks*=len_of_chunk + for len_of_chunk in orig_chunks: + orig_no_of_chunks *= len_of_chunk orig_chunk_size = ds[varname].nbytes / orig_no_of_chunks / (2**20) if math.ceil(orig_chunk_size) < target_mb: print( @@ -77,22 +79,20 @@ def rechunk(ds, varname, chunkdim, target_mb, verbose): ) b_is_safe_chunk = False # return ds - + other_dim_chunks = calc_other_dim_chunks(orig_chunks_dict, chunkdim) if other_dim_chunks > 1 and verbose: print("Please consider: more than one dimension chunked.") - + chunk_length = calc_chunk_length(ds, varname, chunkdim, target_mb, other_dim_chunks) chunk_rule = {chunkdim: chunk_length} if verbose: - print( - "Chunking into chunks of {0} {1} steps".format(chunk_length, chunkdim) - ) + print("Chunking into chunks of {0} {1} steps".format(chunk_length, chunkdim)) chunked_ds = ds.chunk(chunk_rule) for var_id in chunked_ds.variables: chunked_ds[var_id].unify_chunks() - return chunked_ds, b_is_safe_chunk \ No newline at end of file + return chunked_ds, b_is_safe_chunk diff --git a/tzis/swifthandling.py b/tzis/swifthandling.py index 44f1a73296816006f2f2d7d8033862acab2a6ee0..e6a45b1d0ee5d060f8c4699b6cc65936a5c39f5a 100644 --- a/tzis/swifthandling.py +++ b/tzis/swifthandling.py @@ -13,15 +13,20 @@ from copy import deepcopy import stat from swiftclient import Connection + async def get_client(**kwargs): import aiohttp import aiohttp_retry + retry_options = aiohttp_retry.ExponentialRetry( - attempts=3, - exceptions={OSError, aiohttp.ServerDisconnectedError}) - retry_client = aiohttp_retry.RetryClient(raise_for_status=False, retry_options=retry_options) + attempts=3, exceptions={OSError, aiohttp.ServerDisconnectedError} + ) + retry_client = aiohttp_retry.RetryClient( + raise_for_status=False, retry_options=retry_options + ) return retry_client + def get_storestring_and_options(store): """ToDo: Check if string is required. If swift is the protocol, use aiohttp_retry client to overcome writing errors. @@ -38,47 +43,50 @@ def get_storestring_and_options(store): Backend options for to_zarr depending on the protocol as dictionary. """ - storage_options=None - storestring=store + storage_options = None + storestring = store if store.fs.protocol == "swift": - storestring=store.root - storage_options={"get_client": get_client} + storestring = store.root + storage_options = {"get_client": get_client} return storestring, storage_options + def get_swift_mapper(url, os_token, container, os_name=None): - """ - """ + """ """ fs_url = get_fsspec_url(url) os_url = get_os_url(url) os.environ["OS_STORAGE_URL"] = os_url os.environ["OS_AUTH_TOKEN"] = os_token - mapper_url="/".join(list(filter(None,[fs_url,container,os_name]))) - fsspec_map=fsspec.get_mapper(mapper_url) - mapper_mkdirs(fs_url,container,prefix=os_name) + mapper_url = "/".join(list(filter(None, [fs_url, container, os_name]))) + fsspec_map = fsspec.get_mapper(mapper_url) + mapper_mkdirs(fs_url, container, prefix=os_name) test_fsspec(fsspec_map) return fsspec_map -def mapper_mkdirs(url,container,prefix=None): - mapper_url="/".join(list(filter(None,[url,container,prefix]))) - fsmap=fsspec.get_mapper(mapper_url) - test_fsspec(fsmap) - #create container if necessary + +def mapper_mkdirs(url, container, prefix=None): + mapper_url = "/".join(list(filter(None, [url, container, prefix]))) + fsmap = fsspec.get_mapper(mapper_url) + # create container if necessary if fsmap.fs.protocol == "swift": getenv = os.environ.get - conn = Connection(preauthurl=getenv('OS_STORAGE_URL'), - preauthtoken=getenv('OS_AUTH_TOKEN')) - listings=fsmap.fs.listdir(url) - if not container in [a["name"].split('/')[-1] - for a in listings]: + conn = Connection( + preauthurl=getenv("OS_STORAGE_URL"), preauthtoken=getenv("OS_AUTH_TOKEN") + ) + listings = fsmap.fs.listdir(url) + if not container in [a["name"].split("/")[-1] for a in listings]: conn.put_container(container) else: with fsmap.fs.transaction: - fsmap.fs.mkdirs(mapper_url,exist_ok=True) + fsmap.fs.mkdirs(mapper_url, exist_ok=True) + + test_fsspec(fsmap) + def test_fsspec(fsmap): try: @@ -91,7 +99,7 @@ def test_fsspec(fsmap): def get_fsspec_url(url): - fs_url=deepcopy(url) + fs_url = deepcopy(url) if fs_url.startswith("https"): fs_url = fs_url.replace("https", "swift") elif fs_url.startswith("http"): @@ -99,8 +107,9 @@ def get_fsspec_url(url): fs_url = fs_url.replace("v1/", "") return fs_url + def get_os_url(url): - os_url=deepcopy(url) + os_url = deepcopy(url) if os_url.startswith("swift"): os_url = os_url.replace("swift:/", "https:/") if not ".de/v1" in os_url: @@ -109,11 +118,11 @@ def get_os_url(url): def _check_connection(swift_base, account, user, password=None): - SWIFT_BASE="https://swift.dkrz.de/" + SWIFT_BASE = "https://swift.dkrz.de/" SWIFT_VERS = "v1.0" - + if swift_base == "jsc": - SWIFT_BASE="https://just-keystone.fz-juelich.de:8080/" + SWIFT_BASE = "https://just-keystone.fz-juelich.de:8080/" SWIFT_VERS = "v1.0" elif swift_base != "dkrz": raise Exception("You have to provide either dkrz' or 'jsc' for swift_base") @@ -237,14 +246,16 @@ def get_token(swift_base, account, username=None, **kwargs): ): return env create_token(swift_base, account, username or None) - env,expires = _get_envtoken(**kwargs) + env, expires = _get_envtoken(**kwargs) return env + def toggle_public(container): "toggle container public read_acl settings" getenv = os.environ.get - conn = Connection(preauthurl=getenv('OS_STORAGE_URL'), - preauthtoken=getenv('OS_AUTH_TOKEN')) + conn = Connection( + preauthurl=getenv("OS_STORAGE_URL"), preauthtoken=getenv("OS_AUTH_TOKEN") + ) acl = conn.head_container(container).get("x-container-read", "") if ".r:*" in acl: acl = acl.replace(".r:*", "") diff --git a/tzis/tzis.py b/tzis/tzis.py index 80481d33fd78bc56e64febe95daac2f09ae8c5c0..c00727dd63f15e99a621b4a768db0267e4020453 100644 --- a/tzis/tzis.py +++ b/tzis/tzis.py @@ -4,7 +4,8 @@ import fsspec import copy import zarr -#from zarrswift import SwiftStore + +# from zarrswift import SwiftStore from swiftclient import Connection from .swifthandling import * from .daskhandling import * @@ -25,12 +26,8 @@ from .provenance import Provenance from .catalog import * from .openmf import * -OPEN_ZARR_KWARGS = dict( - consolidated=True, - decode_cf=True, - use_cftime=True -) - +OPEN_ZARR_KWARGS = dict(consolidated=True, decode_cf=True, use_cftime=True) + # version: # from xarray: @@ -58,37 +55,39 @@ class write_zarr(fsspec.FSMap): startchunk=0, validity_check=False, maxretries=3, - trusted=True + trusted=True, ): self.DASK_CHUNK_SIZE = 70 - self._cratio=None - self.original_size=None + self._cratio = None + self.original_size = None self.verbose = verbose self.fsspec_map = fsmap + if not isinstance(fsmap, fsspec.FSMap): + self.fsspec_map = fsspec.get_mapper(fsmap) self.fs = self.fsspec_map.fs - + self.varname = varname - self.mf_dset=mf_dset + self.mf_dset = mf_dset self.provenance = None - if isinstance(self.mf_dset,open_mfdataset_optimize): + if isinstance(self.mf_dset, open_mfdataset_optimize): print(self.mf_dset) - self.provenance=self.mf_dset.provenance - self.original_size=self.mf_dset.provenance - self.recent_chunk=None - - target=self.write_zarr_func( + self.provenance = self.mf_dset.provenance + self.original_size = self.mf_dset.provenance + self.recent_chunk = None + + target = self.write_zarr_func( chunkdim=chunkdim, target_mb=target_mb, startchunk=startchunk, validity_check=validity_check, maxretries=maxretries, - trusted=trusted + trusted=trusted, ) - super().__init__(target.root,target.fs) + super().__init__(target.root, target.fs) def _drop_vars_without_chunkdim(self, ds, chunkdim): """Drops all variables which do not depend on the chunk dimension. @@ -140,18 +139,18 @@ class write_zarr(fsspec.FSMap): if chunkdim == "time": return ds.isel(time=slice(startindex, endindex)) else: - return ds.isel(**{chunkdim:slice(startindex, endindex)}) - #raise ValueError( + return ds.isel(**{chunkdim: slice(startindex, endindex)}) + # raise ValueError( # 'Other chunk dimensions than "time" are not supported yet.' - #) + # ) def _init_dataset(self, ds, store, chunkdim, overwrite): - storestring,storage_options=get_storestring_and_options(store) + storestring, storage_options = get_storestring_and_options(store) if overwrite: try: with self.fs.transaction: - self.fs.rm(self.fsspec_map.root,recursive=True) - #with self.fs.transaction: + self.fs.rm(self.fsspec_map.root, recursive=True) + # with self.fs.transaction: # self.fs.mkdirs('/'.join([self.os_url,self.container,self.prefix]),exist_ok=True) except Exception as e: print(e) @@ -160,12 +159,23 @@ class write_zarr(fsspec.FSMap): pass try: # try fsspec store: - ds.to_zarr(storestring, compute=False, consolidated=True, mode="w", storage_options=storage_options) + ds.to_zarr( + storestring, + compute=False, + consolidated=True, + mode="w", + storage_options=storage_options, + ) chunk_independent_list = [ var for var in ds.variables if chunkdim not in ds[var].coords ] - if chunk_independent_list : - ds[chunk_independent_list].to_zarr(storestring, consolidated=True, mode="r+", storage_options=storage_options) + if chunk_independent_list: + ds[chunk_independent_list].to_zarr( + storestring, + consolidated=True, + mode="r+", + storage_options=storage_options, + ) except: raise ValueError("Could not initialize dataset") @@ -176,7 +186,7 @@ class write_zarr(fsspec.FSMap): def _open_or_initialize_swift_dset( self, store, ds, chunkdim, validity_check, overwrite=False ): - storestring,storage_options=get_storestring_and_options(store) + storestring, storage_options = get_storestring_and_options(store) if not overwrite: try: return ( @@ -221,54 +231,54 @@ class write_zarr(fsspec.FSMap): print("Chunk setting has changed.") return True - already_coords=already.coords - faulty_coords=[] + already_coords = already.coords + faulty_coords = [] for coord in chunked_ds.coords: - if chunkdim in coord : + if chunkdim in coord: continue try: chunked_ds[coord].load() if not chunked_ds[coord].equals(already[coord]): faulty_coords.append(coord) - + except: - print( - f"Could not check if {coord} is equal to target store.\n" - ) - #return True + print(f"Could not check if {coord} is equal to target store.\n") + # return True if faulty_coords: - raise ValueError("Source and target coordinates: \n"+','.join(faulty_coords)+"\n differ.") + raise ValueError( + "Source and target coordinates: \n" + + ",".join(faulty_coords) + + "\n differ." + ) a = already.attrs.copy() c = chunked_ds.attrs.copy() l_differ = False - - for key in ["tracking_id","hasProvenance"] : - if key in c.keys() : + + for key in ["tracking_id", "hasProvenance"]: + if key in c.keys(): del c[key] - if key in a.keys() : + if key in a.keys(): del a[key] if not a == c or l_differ: if self.verbose: - akeys=a.keys() - ckeys=c.keys() - notinc=[key for key in akeys - if key not in ckeys] - notina=[key for key in ckeys - if key not in akeys] + akeys = a.keys() + ckeys = c.keys() + notinc = [key for key in akeys if key not in ckeys] + notina = [key for key in ckeys if key not in akeys] print("Attributes of target store and source dataset differ.") - if notina : + if notina: print(f"Attribute not in source: {','.join(notina)}") - if notinc : + if notinc: print(f"Attribute not in target: {','.join(notinc)}") if notina and notinc is not None: - for key in list(set(list(a.keys()))&set(list(c.keys()))): + for key in list(set(list(a.keys())) & set(list(c.keys()))): if a[key] != c[key]: print(f"Values of attribute {key} differ:") print(f"source: {c[key]} target: {a[key]} ") - #return True + # return True if self.verbose: print("target will not be overwritten. target will be appended if needed") @@ -295,14 +305,18 @@ class write_zarr(fsspec.FSMap): intarget.close() if b_isidentical: if self.verbose: - print(f"Variable subset for index interval {start_chunkdim_index} - {end_chunkdim_index} " - f"of chunkdim {chunkdim} are equal.") - #if not validity_check: + print( + f"Variable subset for index interval {start_chunkdim_index} - {end_chunkdim_index} " + f"of chunkdim {chunkdim} are equal." + ) + # if not validity_check: # print("Chunk {0} is skipped.".format(chunk_no + 1)) return -1 elif validity_check: - print(f"Variable subset for index interval {start_chunkdim_index} - {end_chunkdim_index} " - f"of chunkdim {chunkdim} are different.") + print( + f"Variable subset for index interval {start_chunkdim_index} - {end_chunkdim_index} " + f"of chunkdim {chunkdim} are different." + ) return start_chunkdim_index return 0 @@ -321,18 +335,20 @@ class write_zarr(fsspec.FSMap): print( "Finding the start chunk can take long. You better provide the startchunk yourself" ) - chunked_ds_check=chunked_ds.copy() - already_check=already.copy() + chunked_ds_check = chunked_ds.copy() + already_check = already.copy() if chunked_ds.coords.dtypes != already.coords.dtypes: - if self.verbose : - print("Dtypes of coords of target and write dset differ." - "Will try to remove coords before execute identical.") - + if self.verbose: + print( + "Dtypes of coords of target and write dset differ." + "Will try to remove coords before execute identical." + ) + for coord in already.coords: - try : - chunked_ds_check=chunked_ds_check.drop_vars(coord) - already_check=already_check.drop_vars(coord) + try: + chunked_ds_check = chunked_ds_check.drop_vars(coord) + already_check = already_check.drop_vars(coord) except: break @@ -346,7 +362,7 @@ class write_zarr(fsspec.FSMap): status_equality = self._check_for_equality( towrite, already_check, - chunksum-all_chunks[chunk_no], + chunksum - all_chunks[chunk_no], chunksum, chunkdim, validity_check, @@ -359,37 +375,58 @@ class write_zarr(fsspec.FSMap): return status_equality, chunk_no chunksum -= all_chunks[chunk_no] break - chunk_no+=1 + chunk_no += 1 return 0, chunk_no def _reset_already_if_overwrite( - self, validity_check, already, chunked_ds, store, isnew, chunkdim, varname, l_append + self, + validity_check, + already, + chunked_ds, + store, + isnew, + chunkdim, + varname, + l_append, ): - storestring,storage_options=get_storestring_and_options(store) + storestring, storage_options = get_storestring_and_options(store) b_isnew = isnew if not validity_check: overwrite = True if l_append: - + chunk_independent_list = [ - var for var in chunked_ds.variables - if ((chunkdim not in chunked_ds[var].coords) & - (var not in already.variables) - ) + var + for var in chunked_ds.variables + if ( + (chunkdim not in chunked_ds[var].coords) + & (var not in already.variables) + ) ] - if chunk_independent_list : - chunked_ds[chunk_independent_list].to_zarr(storestring, consolidated=True, mode="a", - storage_options=storage_options) - - chunked_ds[[varname]].to_zarr(storestring, compute=False, consolidated=True, mode="a", - storage_options=storage_options) + if chunk_independent_list: + chunked_ds[chunk_independent_list].to_zarr( + storestring, + consolidated=True, + mode="a", + storage_options=storage_options, + ) + + chunked_ds[[varname]].to_zarr( + storestring, + compute=False, + consolidated=True, + mode="a", + storage_options=storage_options, + ) overwrite = False - - if overwrite : + + if overwrite: if b_isnew: overwrite = False else: - overwrite = self._check_for_overwrite(already, chunked_ds, varname, chunkdim) + overwrite = self._check_for_overwrite( + already, chunked_ds, varname, chunkdim + ) if overwrite: already, dummy = self._open_or_initialize_swift_dset( store, chunked_ds, chunkdim, validity_check, overwrite=True @@ -401,7 +438,7 @@ class write_zarr(fsspec.FSMap): # 'Initiliazed' means that a first `to_zarr` call has been executed which writes all coordinates and metadata for the dataset into the chunk. The subsequent `to_zarr` calls performed by `write_to_region` uses this information so that it knows how chunks have to be named. If a region has already been written, it will be overwritten by write_to_region. def write_by_region(self, towrite, store, chunkdim, chunkdimvalues): - storestring,storage_options=get_storestring_and_options(store) + storestring, storage_options = get_storestring_and_options(store) towrite_chunks = towrite[chunkdim].values startindex = [ idx @@ -416,27 +453,33 @@ class write_zarr(fsspec.FSMap): towrite.drop_vars(chunkdim).to_zarr( store=storestring, region={chunkdim: slice(startindex[0], endindex[0] + 1)}, - storage_options=storage_options + storage_options=storage_options, ) # , return towrite def write_by_region_dask( - self, istartchunk, all_chunks, chunked_ds, chunkdim, store, target_mb, varname, other_chunk_dims + self, + istartchunk, + all_chunks, + chunked_ds, + chunkdim, + store, + target_mb, + varname, + other_chunk_dims, ): - + client, b_islocalclient, work_dcs = get_client_and_config( target_mb, self.DASK_CHUNK_SIZE, self.verbose, other_chunk_dims ) - size=0 - for start_dask_chunk in tqdm( - range(istartchunk, len(all_chunks), work_dcs) - ): + size = 0 + for start_dask_chunk in tqdm(range(istartchunk, len(all_chunks), work_dcs)): end_dask_chunk = start_dask_chunk + work_dcs if end_dask_chunk > len(all_chunks): end_chunk = sum(all_chunks) end_dask_chunk = len(all_chunks) else: - end_chunk = sum(all_chunks[0 : end_dask_chunk]) + end_chunk = sum(all_chunks[0:end_dask_chunk]) start_chunk = sum(all_chunks[0:start_dask_chunk]) to_write = self._sel_range_for_chunk( chunked_ds, start_chunk, end_chunk, chunkdim @@ -451,25 +494,32 @@ class write_zarr(fsspec.FSMap): self.write_by_region(to_write, store, chunkdim, chunked_ds[chunkdim].values) endtime = time.time() if self.verbose: - assumed_mb=target_mb + assumed_mb = target_mb if self._cratio: - assumed_mb*=self._cratio - if start_dask_chunk < 3*work_dcs: - newsize=self.fs.du('/'.join([self.fsspec_map.root,self.varname]))/1024/1024 - assumed_mb=newsize-size - size=newsize + assumed_mb *= self._cratio + if start_dask_chunk < 3 * work_dcs: + newsize = ( + self.fs.du("/".join([self.fsspec_map.root, self.varname])) + / 1024 + / 1024 + ) + assumed_mb = newsize - size + size = newsize print( "Stored data with {0}mb/s".format( - assumed_mb / (endtime - starttime) + assumed_mb / (endtime - starttime) ) ) else: print( "Estimated writing speed {0}mb/s".format( - assumed_mb * other_chunk_dims * (end_dask_chunk - start_dask_chunk) / (endtime - starttime) + assumed_mb + * other_chunk_dims + * (end_dask_chunk - start_dask_chunk) + / (endtime - starttime) ) - ) - self.recent_chunk=start_dask_chunk + ) + self.recent_chunk = start_dask_chunk if b_islocalclient: client.close() @@ -493,7 +543,14 @@ class write_zarr(fsspec.FSMap): print("Reset target if overwrite and drop vars without chunkdim.") already, isnew = self._reset_already_if_overwrite( - validity_check, already, chunked_ds, store, isnew, chunkdim, varname, l_append + validity_check, + already, + chunked_ds, + store, + isnew, + chunkdim, + varname, + l_append, ) try: @@ -508,10 +565,10 @@ class write_zarr(fsspec.FSMap): all_chunks = chunked_ds.chunks[chunkdim] chunksum = 0 istartchunk = 0 - + if self.recent_chunk: - istartchunk=self.recent_chunk - else : + istartchunk = self.recent_chunk + else: if not isnew and not validity_check and not l_append: if self.verbose: print("Get start chunk") @@ -529,12 +586,13 @@ class write_zarr(fsspec.FSMap): if status_equality > 0: return status_equality - if istartchunk < len(all_chunks) : + if istartchunk < len(all_chunks): if self.verbose: print(f"Start Looping over chunks beginning with {istartchunk}") - other_chunk_dims=calc_other_dim_chunks(dict(chunked_ds[varname].chunksizes.items()), - chunkdim) + other_chunk_dims = calc_other_dim_chunks( + dict(chunked_ds[varname].chunksizes.items()), chunkdim + ) if validity_check: client, b_islocalclient, work_dcs = get_client_and_config( @@ -549,7 +607,7 @@ class write_zarr(fsspec.FSMap): end_dask_chunk = len(all_chunks) end_chunkdim_index = sum(all_chunks) else: - end_chunkdim_index = sum(all_chunks[0 : end_dask_chunk]) + end_chunkdim_index = sum(all_chunks[0:end_dask_chunk]) start_chunkdim_index = sum(all_chunks[0:start_dask_chunk]) to_check = self._sel_range_for_chunk( @@ -563,7 +621,7 @@ class write_zarr(fsspec.FSMap): end_chunkdim_index, chunkdim, validity_check, - 0, #dummy + 0, # dummy varname, ) if status_equality < 0: @@ -573,7 +631,14 @@ class write_zarr(fsspec.FSMap): else: try: self.write_by_region_dask( - istartchunk, all_chunks, chunked_ds, chunkdim, store, target_mb, varname, other_chunk_dims + istartchunk, + all_chunks, + chunked_ds, + chunkdim, + store, + target_mb, + varname, + other_chunk_dims, ) except Exception as e: print(e) @@ -581,9 +646,13 @@ class write_zarr(fsspec.FSMap): return 0 def write_directly(self, dset=None, store=None): - storestring,storage_options=get_storestring_and_options(store) - dset.to_zarr(store=storestring, mode="w", consolidated=True, - storage_options=storage_options) + storestring, storage_options = get_storestring_and_options(store) + dset.to_zarr( + store=storestring, + mode="w", + consolidated=True, + storage_options=storage_options, + ) def write_with_validation_and_retries( self, @@ -599,23 +668,25 @@ class write_zarr(fsspec.FSMap): ): is_safe_chunk = True chunked_ds = ds - - if target_mb != 0 : - chunked_ds, is_safe_chunk = rechunk(ds, varname, chunkdim, target_mb, self.verbose) + + if target_mb != 0: + chunked_ds, is_safe_chunk = rechunk( + ds, varname, chunkdim, target_mb, self.verbose + ) elif target_mb == 0: - orig_chunks_dict=dict(ds[varname].chunksizes.items()) + orig_chunks_dict = dict(ds[varname].chunksizes.items()) orig_chunks = [len(v) for k, v in orig_chunks_dict.items()] orig_no_of_chunks = 1 - for len_of_chunk in orig_chunks : - orig_no_of_chunks*=len_of_chunk + for len_of_chunk in orig_chunks: + orig_no_of_chunks *= len_of_chunk target_mb = ds[self.varname].nbytes / orig_no_of_chunks / (2**20) if self.verbose: print(f"original dask chunk size {target_mb}mb is used for chunking.") - if target_mb > 2000 : + if target_mb > 2000: printf(f"Warning: Your target_mb {target_mb} is too big for swift (max 2GB") # - l_append=False + l_append = False retries = 0 success = -1 if startchunk != 0: @@ -625,16 +696,20 @@ class write_zarr(fsspec.FSMap): store, chunked_ds, chunkdim, validity_check ) if not b_isnew: - if varname not in already.variables : + if varname not in already.variables: if self.verbose: - print(f"Varname {varname} not in target variables." - f"Will append {varname} to target store.") - l_append=True + print( + f"Varname {varname} not in target variables." + f"Will append {varname} to target store." + ) + l_append = True if l_append: - coords_merge=list(set(list(already.coords)+list(chunked_ds.coords))) - if not coords_merge and not already[coords_merge].identical(chunked_ds[coords_merge]): + coords_merge = list(set(list(already.coords) + list(chunked_ds.coords))) + if not coords_merge and not already[coords_merge].identical( + chunked_ds[coords_merge] + ): raise ValueError("Cannot append if coords of variables differ") - + if l_append and validity_check: raise ValueError("Cannot validity check against new variable") # if validity_check and b_isnew GIVES a runtimerror from _open_or_initialize_swift_dset @@ -706,9 +781,9 @@ class write_zarr(fsspec.FSMap): raise RuntimeError("Validiation failed.") def write_provenance(self, pargs): - #try: + # try: if not self.provenance: - self.provenance=Provenance(self.fsspec_map.root) + self.provenance = Provenance(self.fsspec_map.root) if self.mf_dset.attrs: if "tracking_id" in self.mf_dset.attrs: self.provenance.gen_input( @@ -733,15 +808,19 @@ class write_zarr(fsspec.FSMap): os.path.dirname(self.fsspec_map.root), "provenance", "_".join( - ["provenance", self.provenance.prefix, self.provenance.tracking_id + ".json"] + [ + "provenance", + self.provenance.prefix, + self.provenance.tracking_id + ".json", + ] ), ] ) - #except Exception as e: + # except Exception as e: # print(e) # print("Could not create full provenance") # pass - + def write_zarr_func( self, chunkdim="time", @@ -749,29 +828,33 @@ class write_zarr(fsspec.FSMap): startchunk=0, validity_check=False, maxretries=3, - trusted=True + trusted=True, ): self.recent_chunk = None - if not self.mf_dset : - raise ValueError("Do not know what to write. Please define object mf_dset first.") - + if not self.mf_dset: + raise ValueError( + "Do not know what to write. Please define object mf_dset first." + ) + if not self.varname: - self.varname = self._get_varname(self.mf_dset) + self.varname = self._get_varname(self.mf_dset) if self.verbose: - print("You did not specify a varname. Take the first data var out of the dset"+ - f" which is {self.varname}") + print( + "You did not specify a varname. Take the first data var out of the dset" + + f" which is {self.varname}" + ) dset_to_write = self.mf_dset[[self.varname]] - - totalsize=dset_to_write.nbytes - + + totalsize = dset_to_write.nbytes + if self.original_size and not self._cratio: - self._cratio=self.original_size/totalsize + self._cratio = self.original_size / totalsize if self.verbose: - print(f"Compression ratio was estimated by disk size to {self._cratio}") + print(f"Compression ratio was estimated by disk size to {self._cratio}") if "chunks" in dset_to_write[self.varname].encoding.keys(): del dset_to_write[self.varname].encoding["chunks"] pargs = locals() - + if not validity_check: self.write_provenance(pargs) @@ -794,16 +877,18 @@ class write_zarr(fsspec.FSMap): ) else: self.write_directly(dset_to_write, self.fsspec_map) -# self.mf_dset.close() - -# try: - write_index_file(self.fsspec_map.fs, - os.path.dirname(self.fsspec_map.root), - pattern=None, - contact=None) -# except Exception as e: -# print(e) -# print("Could not write index file") -# pass - - return copy.deepcopy(self.fsspec_map) \ No newline at end of file + # self.mf_dset.close() + + # try: + write_index_file( + self.fsspec_map.fs, + os.path.dirname(self.fsspec_map.root), + pattern=None, + contact=None, + ) + # except Exception as e: + # print(e) + # print("Could not write index file") + # pass + + return copy.deepcopy(self.fsspec_map)