Skip to content
Snippets Groups Projects
Commit d68eed01 authored by wachsylon's avatar wachsylon
Browse files

Refactored to modularize

parent fc7d7020
No related branches found
No related tags found
No related merge requests found
%% Cell type:markdown id:d8957c6c-7375-4bff-8985-475be95a245f tags:
# Tzis - *T*o *Z*arr *i*n *S*torages
`tzis` is a small python package originally designed for
1. converting data into the [zarr](https://zarr.readthedocs.io/en/stable/) format and
1. writing it to the DKRZ's cloud storage space [swift](https://swiftbrowser.dkrz.de/)
in one step. It is based on a script which uses [xarray](http://xarray.pydata.org/en/stable/index.html), the `fsspec` [implementation for swift](https://github.com/d70-t/swiftspec) from Tobias Kölling and the [dask](https://docs.dask.org/en/stable/) distributed scheduler. `tzis` is optimized for DKRZ's High Performance Computer but can also be used from local computers.
%% Cell type:markdown id:5ae0977b-54a0-48d0-bb20-f8f370aa2f4a tags:
`tzis` features
- **Parallel writing** of a zarr dataset which contains many files per `write_zarr` call with `dask` .
- **Avoid intermediate writing** to temporary storage for e.g. reformatting purposes because tzis directly writes the dataset into the target storage.
- **Appending** variables to existing zarr datasets which allows to add grids afterwards
- Optimized **Rechunking** along one dimension such that chunk sizes match quota of the target storage and dask graphs are kept small
- **Consolidated metadata** by saving the intersection of many files' metadata into one .zmetadata json file.
- **Swiftspec** implementation for `fsspec` enabling linux-like file system operations on the object store (like `listdir`)
- **All kind of other target storage**s (filesystem, s3, gcs, zip,...) by setting the `.fsspec_map` attribute manually.
- Writing of **different input file formats**. \*
- **Provenance**: Provencance files based on the `prov` lib are saved inside the container in the virtual directory `provenance` for all `write_zarr` calls.
- **Catalog creation**: Make your zarr datasets easy browsable and accessible via `intake` catalogs
- **Modular design**: Common feel from fsspec and xarray. Import only what you need:
- **Swiftspec** implementation for `fsspec` enabling linux-like file system operations on the object store (like `listdir`)
- **Provenance**: Provencance files based on the `prov` lib are saved inside the container in the virtual directory `provenance` for all `write_zarr` calls.
- **Catalog creation**: Make your zarr datasets easy browsable and accessible via `intake` catalogs
\* All files that can be passed to
```python
xarray.open_mfdataset()
```
can be used.
%% Cell type:markdown id:6f216a5e-dd27-42ad-ad9b-baa4b0434f86 tags:
In this tutorial, you will learn
- the [meaning](#define) of `zarr` and the `swift object storage`
- why you can [benefit](#moti) from `zarr` in cloud storage
- [when](#when) it is a good idea to write into cloud
- how to [initializie the swift store](#token) for `tzis` including creating a token
- how to [open and configure](#source) the source dataset
- how to [write](#write) data to swift
- how to [set options](#output) for the zarr output
- how to [access](#access) and use data from swift
- how to work with the [SwiftStore](#swiftstore) similar to file systems
%% Cell type:markdown id:8d6a1c4f-66b5-466a-84b6-8a011ac5bd82 tags:
<a class="anchor" id="define"></a>
# Definition
**Zarr** is a *cloud-optimised* format for climate data. By using *chunk*-based data access, `zarr` enables arrays the can be larger than memory. Both input and output operations can be parallelised. It features *customization* of compression methods and stores.
The **Swift** cloud object storage is a 🔑 *Keyvalue* store where the key is a global unique identifier and the value a representation of binary data. In contrast to a file system 📁 , there are no files or directories but *objects and containers/buckets*. Data access is possible via internet i.e. `http`.
%% Cell type:markdown id:82d19de7-889b-423a-beb2-0ed10972c90e tags:
<a class="anchor" id="moti"></a>
# Motivation
In recent years, cloud object storage systems became an alternative to traditional file systems because of
- **Independency** from computational ressources. Users can access and download data from anywhere without the need of HPC access or resources
- **Scalability** because no filesystem or system manager has to care about the connected disks.
- **Scalability** because no filesystem has to care about the connected disks.
- **A lack of storage** space in general because of increasing model output volume.
- **No namespace conflicts** because data is accessed via global unique identifier
Large Earth System Science data bases like the CMIP Data Pool at DKRZ contain [netCDF](https://github.com/Unidata/netcdf-c) formatted data. Access and transfers of such data from an object storage can only be conducted on file level which results in heavy download volumes and less reproducible workflows.
%% Cell type:markdown id:58a88ea9-8d3c-4e6c-8369-ce890a66c495 tags:
The cloud-optimised climate data format [Zarr](https://zarr.readthedocs.io/en/stable/) solves these problems by
- enabling **flexible and modular** compression, storage and storage protocol combinations
- allowing **asyncronous work** so that users can work on not-finished output
- allowing programs to identify _chunks_ corresponding to the desired subset of the data before the download so that the **volume of data transfer is reduced**.
- allowing users to access the data via `http` so that both **no authentication** or software on the cloud repository site is required
- saving **meta data** next to the binary data. That allows programs to quickly create a virtual representation of large and complex datasets.
Zarr formatted data in the cloud makes the data as *analysis ready* as possible.
With `tzis`, we developed a package that enables to use DKRZ's insitutional cloud storage as a back end storage for Earth System Science data. It combines `swiftclient` based scripts, a *Zarr storage* implementation and a high-level `xarray` application including `rechunking`. Download velocity can reach the order of **1000 MB/s** if the network allows it. Additional validation of the data transfer ensures its completeness.
%% Cell type:markdown id:87d9ffaf-55c4-4013-b961-3bfb94d9c99a tags:
<a class="anchor" id="when"></a>
# Which type of data is suitable?
Datasets in the cloud are useful if
- the cloud place can be their final location. Moving data in the cloud is very inefficient.
- they will not be *prepended*. Data in the cloud can be easily *appended* but *prepending* most likely requires moving which is not efficient.
- they are *open*. One advantage comes from the easy access via http. This is even easier when useres do not have to log in.
%% Cell type:markdown id:77feab5f-a512-4449-95c7-4daa0762f25f tags:
<a class="anchor" id="token"></a>
# Swift authentication and initialization
Central `tzis` functions require that you specify an `OS_AUTH_TOKEN` which allows the program to connect to the swift storage with your credentials. This token is valid for a month per default. Otherwise, you would have to login for each new session. When you work with `swift`, this token is saved in the hidden file `~/.swiftenv` which contains the following paramter
- `OS_STORAGE_URL` which is the URL associated with the storage space of the project-user-account. Note that this URL cannot be opened like a *swiftbrowser* link but instead it can be used within programs like `tzis`. Only one URL is saved so that you have to reconfigure if you would like to use another account.
- `OS_AUTH_TOKEN`.
**Be careful** with the token. It should stay only readable for you. Especially, do not push it into git repos.
%% Cell type:markdown id:aa1c7b7f-6c8e-44db-9c29-7bb9eee88a84 tags:
<a class="anchor" id="token"></a>
## Get token and url
## `swifthandling`: Get token and url
`Tzis` includes a function to get the token or, if not available, create the token:
```python
from tzis import tzis
token=tzis.get_token(host, account, username=USERNAME)
from tzis import swifthandling
token=swifthandling.get_token(host, account, username=USERNAME)
```
where `host` is either "dkrz" or "jsc".
When calling `get_token`,
1. it tries to read in the configuration file `~/.swiftenv`
1. if there is a file, it checks, if the found configuration matches the specified *account*
1. if no file was found or the configuration is invalid, it will create a token
1. it asks you for a password
1. it writes two files: the `~/.swiftenv` with the configuration and `~/.swiftenv_useracc` which contains the account and user specification for that token.
1. it returns a dictionary with all configuration variables
%% Cell type:code id:dc780e96-8284-4be0-a9d6-59619aa67290 tags:
``` python
from tzis import swifthandling
token=swifthandling.get_token("dkrz","ik1017",username="k204210")
```
%% Cell type:markdown id:13396f94-6da3-4eb9-9602-1045fc9540c5 tags:
# Initializing an output container
# Initialize swift mapper
After successfully creating the authentication for swift, we *initialize* a swift container in which we will save the data. We do that with
```python
container = tzis.Tzis(os_url, os_token, os_container,
os_name=None, mf_dset=None, varname=None, verbose=False,
xarray_kwargs=None)
fsmap=swifthandling.get_swift_mapper(
os_url,
os_token,
os_container,
os_name=os_name
)
```
The mandatory arguments are:
- `os_url` is the `OS_STORAGE_URL`
- `os_token` is the `OS_AUTH_TOKEN`
- `os_container` is the *container name* / the *bucket*. A container is the highest of two store levels in the swift object store.
these will connect you to the swift store and initialize/create a container.
You can
- already specify a `os_name` which is the *zarr dataset name* or the *object* name where the data will be in the end.
- decide whether you want to run the write process in *Verbose* mode by specifying `verbose=True`
E.g.:
```python
container = tzis.Tzis(token["OS_STORAGE_URL"], token["OS_AUTH_TOKEN"], "tzistest",
verbose=True)
fsmap=swifthandling.get_swift_mapper(
token["OS_STORAGE_URL"],
token["OS_AUTH_TOKEN"],
"tzistest",
os_name="zarrfile"
)
```
%% Cell type:code id:68d2c5f4-f248-401b-b300-85a37f748f49 tags:
You can check if your mapper works by applying
``` python
from tzis import tzis
help(tzis.Tzis)
```python
list(fsmap.keys())
```
%% Cell type:markdown id:c2380bfc-22ab-41c0-96a6-aa9ce2203b13 tags:
which will print *all* existing elements in the container.
%% Cell type:markdown id:c2380bfc-22ab-41c0-96a6-aa9ce2203b13 tags:
# Setting a zarr dataset name (an object prefix)
You can switch to different zarr dataset output names within one container by overwriting the container's `store` attribute:
You can switch to different zarr dataset output names within one container by using fsspec:
```python
container.open_store(os_name):
zarr_dset_name="test"
fsmap=fsspec.get_mapper(
'/'.join(
[fsmap.root, #Or use `os.path.dirname(fsmap.root)` if you set another object prefix already
zarr_dset_name
]
)
```
Use `os.path.dirname(fsmap.root)` instead of `fsmap.root` if you set another object prefix already.
%% Cell type:markdown id:9ec4e347-4d16-4916-8cd0-355ddd512fe2 tags:
<a class="anchor" id="source"></a>
# Open and configure the source dataset
# `openmf`: Open and configure the source dataset
We can use `open_mfdataset_optimized` from module `tizs.openmf` for opening the dataset. `tzis` uses `xarray`'s `open_mfdataset` for reading the source file(s). The corresponding call is:
```python
def open_mfdataset_optimize(
mf,
varname,
target_fsmap,
chunkdim=None,
target_mb=None,
xarray_kwargs=None,
verbose=False
)
```
E.g.:
```python
from tzis import openmf
varname="tas"
path_to_dataset = f"/mnt/lustre02/work/ik1017/CMIP6/data/CMIP6/ScenarioMIP/DKRZ/MPI-ESM1-2-HR/ssp370/r1i1p1f1/Amon/{varname}/gn/v20190710/"
mfs_towrite=[path_var +filename for filename in os.listdir(path_to_dataset)]
omo=openmf.open_mfdataset_optimized(
mfs_towrite,
varname,
fsmap,
chunkdim="time",
target_mb=100,
verbose=True
)
omo
```
We need to connect the container with a source dataset which should be written into cloud. This is done by setting the container's attribute `container.mf_dset`.
Note that the actual dataset can be retrieved via `omo.mf_dset`.
We can use `tzis` for opening the dataset. `tzis` uses `xarray`'s `open_mfdataset` for reading the source file(s). It uses a configuration which tries to **merge** many source files into one dataset **without producing errors**. Therefore, *conflicts* are ignored and only structures and attributes are combined which are overcutting in all source files. The internal function looks like:
%% Cell type:markdown id:41f810c1-17ec-4d94-97ae-2cbf90a90013 tags:
It uses a configuration which tries to **merge** many source files into one dataset **without producing errors**. Therefore, *conflicts* are ignored and only structures and attributes are combined which are overcutting in all source files. The internal function looks like:
```python
OPEN_MFDATASET_KWARGS = dict(
decode_cf=True,
use_cftime=True,
data_vars="minimal",
coords="minimal",
compat="override",
combine_attrs="drop_conflicts",
)
mf_dset = xarray.open_mfdataset(mf,
**OPEN_MFDATASET_KWARGS)
```
- Additionally, tzis will try to grep the **tracking_id** attributes of all input files and save them in the **provenance**. If none are available, the **name** of the files given in `mf` are used for provenance instead.
- The **history** attribute of the target zarr data set will be set to `f'Converted and written to swift cloud with tzis version {tzis.__version__}'`
%% Cell type:markdown id:f4e2f1b9-f4a4-4aa6-a384-e93762a41567 tags:
`tzis` function for opening datasets can used at two points:
1. Either with `tzis.open_mfdataset`:
```python
def open_mf_dataset(self, mf, varname, xarray_kwargs=None):
```
The mandatory arguments are
- `mf`: The dataset file(s). A `str` or a `list` of source files which can be opened with
- `varname`: The variable from the dataset which will be selected and then written into the object store
E.g.:
```python
varname="tas"
path_to_dataset = f"/mnt/lustre02/work/ik1017/CMIP6/data/CMIP6/ScenarioMIP/DKRZ/MPI-ESM1-2-HR/ssp370/r1i1p1f1/Amon/{varname}/gn/v20190710/"
mfs_towrite=[path_var +filename for filename in os.listdir(path_to_dataset)]
container.mf_dataset=container.open_mf_dataset(mfs_towrite, varname)
container.mf_dataset
```
2. When opening the container, e.g. using variables from example 1:
```python
container = tzis.Tzis(os_url, os_token, os_container,
os_name=os_name, mf_dset=mfs_towrite, varname=varname)
```
- An **optimized** chunk size is used already for opening the dataset in order to match the specified target chunk size
%% Cell type:markdown id:565f0ac4-eed3-4212-b761-09c36b9703e3 tags:
## Attributes
*Attributes* of the dataset are handled in a `dict`ionary in the `container.mf_dset` variable via `xarray`. You can **add** or **delete** attributes just like items from a dictionary:
*Attributes* of the dataset are handled in a `dict`ionary in the `omo.mf_dset` variable via `xarray`. You can **add** or **delete** attributes just like items from a dictionary:
```python
#add an attribute
mf_dset.attrs["new_attribute"]="New value of attribute"
print(mf_dset.attrs["new_attribute"])
omo.mf_dset.attrs["new_attribute"]="New value of attribute"
print(omo.mf_dset.attrs["new_attribute"])
#delete the attribute
del mf_dset.attrs["new_attribute"]
del omo.mmf_dsetf.attrs["new_attribute"]
```
%% Cell type:markdown id:6736e17c-c4e5-4393-8ef9-13590c2397fe tags:
## Grib input
If you want to use `grb` input files, you can specify `cfgrib` as an **engine** for `xarray`.
```python
container.open_mf_dataset(list_of_grib_files, "pr", xarray_kwargs=**dict(engine="cfgrib"))
openmf.open_mfdataset_optimize(list_of_grib_files, "pr", xarray_kwargs=dict(engine="cfgrib"))
```
%% Cell type:markdown id:76171636-1f6c-453b-942e-c62d2b49467d tags:
%% Cell type:markdown id:e638da79-4cae-4db3-bfaf-398174dd13dd tags:
<a class="anchor" id="write"></a>
# Writing to swift
After we have initialized the container and opened the dataset, we can **write** it into cloud. The conversion to `zarr` is made on the way. We can specify all necessary configuration options within the `write` function:
```python
def write_zarr(self,
chunkdim='time',
target_mb=1000,
startchunk=0,
validity_check=False,
maxretries=3,
trusted=True)
```
The function allows you
- to set `chunkdim` which is the *dimension* used for chunking.
- to set the target size `target_mb` of a data chunk. A *chunk* corresponds to an object in the swift object storage.
- to set the `startchunk`. If the write process was interrupted - e.g. because your dataset is very large, you can specify at which chunk the write process should restart.
- to set the number of *retries* if the transfer is interrupted.
- to set `validity_check=True` which will validate the transfer **instead of writing** the data. This checks if the data in the chunks are equal to the input data.
- to set `trusted=False` which validates the transfer **after writing** the data into cloud. The validation is equal to `validity_check=True`
E.g.
```python
outstore=container.write_zarr()
```
%% Cell type:markdown id:9f045270-f61d-450d-8bc5-dd9a725c7dfb tags:
The output `outstore` of `write_zarr` is a new variable for the output **zarr storage**. Packages like `xarray` which are using `zarr` can identify and open the *consolidated* dataset from the cloud with that store. The `os_name` of `container` can now be changed while the `outstore` still points to the written `os_name`.
%% Cell type:markdown id:5230a651-4f6d-4c12-a0d1-bb9bb790877d tags:
## Parallelization
Dask is used to parallelize the `write_zarr` function. Tzis tries to get a running client with dask's `get_client` function. If that fails, it opens a local client with half of the maximum available CPU and memory resources.
Dask will process a default batch of 70 chunks at once. Most of the time, tzis resizes the batch so that it is most likely that the memory is not overloaded.
%% Cell type:markdown id:4ec0cf68-ebb9-455a-b215-f58b4ff24a06 tags:
## Chunking - best practices
## Chunking and rechunking - best practices
A correct chunk configuration is **essential** to enable users to access, analysis and download zarr data with the **highest performance** possible. With `tzis`, you can set
- `chunkdim`: the dimension, which is used for rechunking
- `target_mb`: the target size of one chunk
To match the `target_mb`, tzis estimates the resulting chunk sizes by using the size of the array of the variable. <mark> However, the compression ratio can only rarely be taken into account. </mark> If the size of the source dataset can be retrieved via `fsspec`, an estimated compression ratio from the source is applied on the target chunk size.
%% Cell type:markdown id:dc9c5429-29bc-4f83-b255-4bb2b1882939 tags:
%% Cell type:markdown id:2db64ae8-6e2d-4b07-a171-4ca36cfd3c65 tags:
### Chunk sizes
<div class="alert alert-block alert-info">
<b> Note: </b> If you set target_mb=0, rechunking is set off. That also allows to use multiple chunk dimensions with a corresponding dask setting (see arguments for open_mfdataset) and tzis is able to write them to the final zarr dataset.
</div>
%% Cell type:markdown id:923551a9-7546-42e7-bcf7-970e9d957a50 tags:
%% Cell type:markdown id:e9c5d5e5-564a-4de1-97b1-e2a09801e035 tags:
The size of a chunk has technical limitations on both interval limits:
- Sizes **larger than 2GB** are not supported by swift
- Chunks **smaller than 10 MB** may lead to problems when writing the data due to many small `PUT` requests.
In between this interval, the size optimization corresponds to network fine adjustment between **latency** and traffic **volume**.
- If users are interested in accessing and downloading the **entire** dataset, you should set the `target_mb` to the maximum in order to **reduce latency**. E.g., writing zarr data for DKRZ's HSM archive should be done with maximal `target_mb`.
- If users are interested in **small subsets** of the data, **lower the `target_mb`** so that **volume** is reduced.
If both use cases are possible, choose a chunk size in between. The highest **throughput** rates ( **O(1GB/s)** ) can be acchieved when the chunk size is over **100 MB**, however, this requires high network volume.
<div class="alert alert-block alert-info">
<b> Note: </b> If the rechunk-dimension is not `time` and multiple files were opened for the input dataset,
the zarr output will have the addition chunking-dimensions `time`.
</div>
%% Cell type:markdown id:6c805d48-f90e-4d4d-9468-6fc84d09d664 tags:
%% Cell type:markdown id:f55d0fea-e94a-48c3-8499-3e56f71d83ea tags:
### Chunk dimension
Chunk dimension configuration highly depend on the use cases of the final zarr data:
- if users mainly subselect on spatial dimensions rather than temporal, chunk over a spatial dimension (**lat** or **lon**). A use case can be **calculating climate indices** for a special location.
- if users mainly subselect on temporal dimension rather than spatial, chunk over time. This is recommended if you have long simulations covering many years on high frequencies.
It also depends on the shape of the input arrays:
- if the data is available on an **high resolution unstructured grid** i.e. on a **long and single spatial dimension**, it might be worth to consider rechunking over this axis. This can be tested e.g. for high resoltion ICON simulations.
%% Cell type:markdown id:49c056f9-f038-496f-b3cc-3bb16ba41ae4 tags:
# Rechunking
For *rechunking* an xarray dataset along chunk dimension `chunkdim` by using variable `varname` and target chunk size `target_mb`, you can use `rechunk` from tzis.rechunker:
```python
from tzis import rechunker
ds_rechunked=rechunker.rechunk(
ds,
varname,
chunkdim,
target_mb,
verbose
)
```
%% Cell type:markdown id:76171636-1f6c-453b-942e-c62d2b49467d tags:
<a class="anchor" id="write"></a>
# Writing to swift
After we have
1. initialized the container
1. opened or rechunked the dataset
we can **write** it into cloud. The conversion to `zarr` is made on the way. We can specify all necessary configuration options within the `write` function:
```python
def write_zarr(
self,
fsmap,
mf_dset,
varname,
verbose=False,
chunkdim="time",
target_mb=0,
startchunk=0,
validity_check=False,
maxretries=3,
trusted=True
)
```
The function allows you
- to set `chunkdim` which is the *dimension* used for chunking.
- to set the target size `target_mb` of a data chunk. A *chunk* corresponds to an object in the swift object storage.
- to set the `startchunk`. If the write process was interrupted - e.g. because your dataset is very large, you can specify at which chunk the write process should restart.
- to set the number of *retries* if the transfer is interrupted.
- to set `validity_check=True` which will validate the transfer **instead of writing** the data. This checks if the data in the chunks are equal to the input data.
- to set `trusted=False` which validates the transfer **after writing** the data into cloud. The validation is equal to `validity_check=True`
E.g.
```python
from tzis import tzis
outstore=tzis.write_zarr(
omo.target_fsmap,
omo.mf_dset,
omo.varname,
verbose=True,
target_mb=0
)
```
%% Cell type:markdown id:9f045270-f61d-450d-8bc5-dd9a725c7dfb tags:
The output `outstore` of `write_zarr` is a new variable for the output **zarr storage**. Packages like `xarray` which are using `zarr` can identify and open the *consolidated* dataset from the cloud with that store. The `os_name` of `container` can now be changed while the `outstore` still points to the written `os_name`.
%% Cell type:markdown id:5230a651-4f6d-4c12-a0d1-bb9bb790877d tags:
## Parallelization
Dask is used to parallelize the `write_zarr` function. Tzis tries to get a running client with dask's `get_client` function. If that fails, it opens a local client with half of the maximum available CPU and memory resources.
Dask will process a default batch of 70 chunks at once. Most of the time, tzis resizes the batch so that it is most likely that the memory is not overloaded.
%% Cell type:markdown id:e494d109-82fa-448b-ac0d-ce4f77565949 tags:
## Compression
Compression configuration can be set from outside of `tzis` via the `zarr` and `numcodecs` libs.
[From Zarr docs:](https://zarr.readthedocs.io/en/v2.10.2/tutorial.html#compressors)
> If you don’t specify a compressor, by default Zarr uses the [Blosc](https://github.com/Blosc) compressor. Blosc is generally very fast and can be configured in a variety of ways to improve the compression ratio for different types of data. Blosc is in fact a *“meta-compressor”*, which means that it can use a number of different compression algorithms internally to compress the data. A list of the internal compression libraries available within Blosc can be obtained via:
```python
from numcodecs import blosc
blosc.list_compressors()
['blosclz', 'lz4', 'lz4hc', 'snappy', 'zlib', 'zstd']
```
> The default compressor can be changed by setting the value of the zarr.storage.default_compressor variable, e.g.:
```python
import zarr.storage
from numcodecs import Zstd, Blosc
# switch to using Zstandard
zarr.storage.default_compressor = Zstd(level=1)
```
> A number of different compressors can be used with Zarr. A separate package called [NumCodecs](http://numcodecs.readthedocs.io/) is available which provides a common interface to various compressor libraries including Blosc, Zstandard, LZ4, Zlib, BZ2 and LZMA. Different compressors can be provided via the compressor keyword argument accepted by all array creation functions.
Alistair Miles has made a [benchmark test](http://alimanfoo.github.io/2016/09/21/genotype-compression-benchmark.html) for compression.
%% Cell type:markdown id:c020d5f4-5c91-477f-bac9-22a8debb6854 tags:
## Overwriting or appending?
`write_zarr()` per default **appends** data if possible. It calls `xarray`'s `to_zarr()` function *for each chunk* by setting the `region` argument. Before a chunk is written, it is checked if there is already a chunk for exactly the **slice** of the dataset that should be written. If so, the chunk is skipped. Therefore, recalling `write_zarr` only overwrites chunks if they cover a different slice of the source dataset.
<div class="alert alert-block alert-info">
<b>Note: </b> Checking for the <b>startchunk</b> automatically ca take very long.
We recommend to manually set `startchunk` whenever possible in order to skip the comparison of a range of chunks.
Then, the function will jump to `startchunk` and start appending there.
</div>
When data is appended, the zarr dataset is opened with
```python
OPEN_ZARR_KWARGS = dict(
consolidated=True,
decode_cf=True,
use_cftime=True
)
xarray.open_zarr(container.fsspec_map,
xarray.open_zarr(omo.mf,
**OPEN_ZARR_KWARGS)
```
%% Cell type:markdown id:9a3955e2-4748-46d1-9b8d-1a755ea45024 tags:
1. **Overwriting conditions**
- if the chunk setting for the variable which is to be written has changed
2. **Error raising conditions**
- if the coordinates of the source dataset differ from the target dataset
3. Still **Appending cases**:
- if the global attributes of the datasets differ. Such attributes are printed.
%% Cell type:markdown id:fb5c5296-12a4-4096-8c64-5ed02e40b4cc tags:
## Writing or appending a second variable
1. If the second variable has either coordinates with the same name but different values or different meta data, open a new prefix `new_os_name` first:
```python
container.fsspec_map= container.open_store(os_url,os_container,new_os_name):
fsmap= fsspec.get_mapper(
'/'.join(
[
fsmap.root, #Or use `os.path.dirname(fsmap.root)` if you set another object prefix already
new_zarr_dset_name
]
)
)
```
2. Set another variable name `varname`:
2. If necessary, rechunk the source dataset for the other variable:
```python
container.varname=varname
omo.mf_dset=rechunker.rechunk(
omo.mf_dset,
new_varname,
new_chunkdim,
new_target_mb,
verbose
)
```
3. Write to the target storage:
```python
container.write_zarr()
outstore=tzis.write_zarr()
```
The variable will be appended if possible.
%% Cell type:markdown id:83f05439-12fe-43ac-8efa-5da0755eee6f tags:
## Using another source dataset
After initializing your container, you can overwrite its variables. E.g. by
```python
container.mf_dset=container.open_mf_dataset(...)
omo=openmf.open_mfdataset_optimized(
other_mfs_towrite,
varname,
fsmap,
chunkdim="time",
target_mb=100,
verbose=True
)
```
Afterwards, you can write into the same target storage.
Afterwards, you can write into the same target storage by repeating your `tzis.write_zarr()` command.
%% Cell type:markdown id:f06222ba-fb91-40b8-8063-5dd88d18bc85 tags:
## Provenance
The provenance created by tzis describes what the operation `write_zarr` has done:
- it saves the version of the `tzis` and the application time
- it gathers the input files and uses either their names or their `tracking_id` as identifier
- it connects the input files to the new zarr dataset which is identified by a newly created `uuid`
- it saves the values of the keyword arguments of the operation ("chunkdim", "startchunk", "target_mb"), and, if used, the keyword arguments of `tzis`'s `open_mfdataset` function.
Two files are created in the virtual directory "provenance" in the container:
- `f"provenance_{dset}_{self.uuid}.json"`
- `f"provenance_{dset}_{self.uuid}.png"`
Where `dset` is the output prefix and `self.uuid` the newly generated uuid for the zarr dataset.
%% Cell type:markdown id:b4e19440-3fd9-406f-80e2-752070e2e060 tags:
<a class="anchor" id="access"></a>
# Access and use your Zarr dataset
1. You can open the *consolidated zarr datasets* with `xarray` using an URL-prefix-like string constructed as
```python
zarrinput='/'.join([OS_STORAGE_URL,os_container,os_name])
xarry.open_zarr(zarrinput, consolidated=True, decode_times=True)
```
This is possible if the container is *public* (see *how to make a container public*).
Tzis automatically generate an index file which contains URLs for all prefixes of the container. The location is
```python
'/'.join([OS_STORAGE_URL,os_container,"INDEX.csv"])
```
1. If your container is *private*, you have to use tokens and fsspec. If you create environment variables for OS_AUTH_TOKEN and OS_STORAGE_URL, you can get a fsspec mapper which than can be used for xarray:
2. If your container is *private*, you have to use tokens and fsspec. If you create environment variables for OS_AUTH_TOKEN and OS_STORAGE_URL, you can get a fsspec mapper which than can be used for xarray:
```python
#note: this is working in notebooks:
%env OS_AUTH_TOKEN={token["OS_AUTH_TOKEN"]}
%env OS_STORAGE_URL={token["OS_STORAGE_URL"]}
import fsspec
targetstore = fsspec.get_mapper(zarrinput)
zarr_dset = xarray.open_zarr(targetstore, consolidated=True, decode_times=True)
zarr_dset
```
1. You can download data from the [swiftbrowser](https://swiftbrowser.dkrz.de) manually
%% Cell type:markdown id:594b8d3e-8d95-4bcd-ad0c-55baef5b7915 tags:
## Catalogs
`tzis` features
```python
def write_catalog(
self, catalogname="catalog.json", pattern=None, delim=".", columns=[], mode="a"
catalog.write_catalog(
self.fsspec_map.fs,
self.fsspec_map,
os.basename(self.fsspec_map.root),
catalogname="catalog.json",
pattern=None,
delim=".",
columns=[],
mode="a",
verbose=self.verbose,
)
```
which creates an `intake` catalog named `catalogname`. This will try to collect all virtual directories on the container level and interprete them as zarr datasets. I.e., the written zarr datasets should not have a virtual drs.
- `pattern` allows to subset the directories of the container by a pattern. E.g., if `pattern="CMIP6"`, only those directories are selected which have `CMIP6` in their names.
- `delim` will be used to split the directory names and put each value into a single catalog column.
- `columns` are the headers of the columns.
- `mode` is either `a` for append or `w` for overwrite the catalog.
<div class="alert alert-block alert-info">
<b>Note: </b> `delim` will only work when all directories have the same amounts of `delim` in their names
and the length of the `columns` list is equal to the amount of `delim` in the directory names.
</div>
E.g.
```python
container.write_catalog()
```
%% Cell type:markdown id:c976d55c-502d-47ab-b3ef-67842f6aea11 tags:
## Coordinates
Sometimes, you have to *reset* the coordinates because it gets lost on the transfer to zarr:
```python
precoords = set(
["lat_bnds", "lev_bnds", "ap", "b", "ap_bnds", "b_bnds", "lon_bnds"]
)
coords = [x for x in zarr_dset.data_vars.variables if x in precoords]
zarr_dset = zarr_dset.set_coords(coords)
```
%% Cell type:markdown id:91a744a5-eb11-4d21-a2be-9f7ac7284c21 tags:
# Reconvert to NetCDF
The basic reconversion to netCDF can be done with `xarray`:
```python
written.to_netcdf(outputfilename)
```
## Compression and encoding:
Often, the original netCDF was compressed. You can set different compressions in an **encoding** dictionary. For using `zlib` and its compression level 1, you can set:
```python
encoding = {var: written[var].encoding for var in written.data_vars}
var_dict = dict(zlib=True, complevel=1)
for var in written.data_vars:
encoding[var].update(var_dict)
```
## FillValue
`to_netcdf` might write out *FillValue*s for coordinates which is not compliant to CF. In order to prevent that, set an encoding as follows:
```python
coord_dict = dict(_FillValue=False)
for var in written.coords:
encoding[var].update(coord_dict)
```
## Unlimited dimensions
Last but not least, one key element of netCDF is the **unlimited dimension**. You can set a *keyword argument* in the `to_netcdf` command. E.g., for rewriting a zarr-CMIP6 dataset into netCDF, consider compression and fillValue in the encoding and run
```python
written.to_netcdf("testcase.nc",
format="NETCDF4_CLASSIC",
unlimited_dims="time",
encoding=encoding)
```
%% Cell type:markdown id:1eff5a21-b2dd-43b6-9e00-88779638b6aa tags:
<a class="anchor" id="swiftstore"></a>
# Container handling with the swiftstore - `chmod`, `ls`, `rm`, `mv`
%% Cell type:markdown id:39f029d4-9efd-442d-8262-9ded55cf0a3d tags:
## Saving a list of all zarr datasets
You can get all `os_name`s of a container with `ls` of `fsspec`'s filesystem:
```python
all_zarr_datasets=container.fsspec_map.fs.ls(container.fsspec_map.root)
print(all_zarr_datasets)
```
In case the data is free, you should save the list as follows:
```python
with fopen("zarrsets.txt", "w") as f:
for os_name in all_zarr_datasets:
f.write(OS_STORAGE_URL+"/"+os_container+"/"+os_name)
```
This will enable to [simply open](#access) the zarr datasets with `xarray` afterwards.
%% Cell type:markdown id:03a2f2e1-a4d7-4016-abe9-224663271a40 tags:
## How to make a container public
- use tzis:
```python
%env OS_AUTH_TOKEN={token["OS_AUTH_TOKEN"]}
%env OS_STORAGE_URL={token["OS_STORAGE_URL"]}
tzis.toggle_public("testtzis")
```
This will either make the container of the outstore *public* if it was not public or it will make it *private* by removing all access control lists if it was public. Note that only container as a whole can be made public or private.
- With hand:
1. Log in at https://swiftbrowser.dkrz.de/login/ .
2. In the line of the target container, click on the arrow on the right side with the red background and click on `share`.
3. Again, click on the arrow on the right side and click on `make public`.
%% Cell type:markdown id:966d03c4-74a0-4f63-87ed-49ba6f4b29ae tags:
## Remove a zarr-`store` i.e. all objects with `os_name` prefix
- use the `store`:
```python
del container.fsspec_map[os_name]
```
- With hand:
1. Log in at https://swiftbrowser.dkrz.de/login/ .
2.
- On the line of the target container, click on the arrow on the right side and click on `Delete container`.
- Click on the target container and select the store to be deleted. Click on the arrow on the right side and click on `Delete`.
%% Cell type:code id:56bf4fef-58eb-413f-833a-20594b515fb4 tags:
``` python
```
......
%% Cell type:code id: tags:
``` python
from tzis import tzis
from tzis import *
import os
import pytest
import xarray as xr
```
%% Cell type:code id: tags:
``` python
OS_STORAGE_URL="https://swift.dkrz.de/v1/dkrz_0b2a0dcc-1430-4a8a-9f25-a6cb8924d92b"
SWIFT_TOKEN=os.environ.get('SWIFT_TOKEN')
container_name="tzis-tests_output-created"
init_config=[(OS_STORAGE_URL, SWIFT_TOKEN, container_name)]
```
%% Cell type:code id: tags:
``` python
@pytest.mark.parametrize("OS_STORAGE_URL, SWIFT_TOKEN, container_name", init_config)
class TestTzisInit:
def test_config(self, OS_STORAGE_URL, SWIFT_TOKEN, container_name):
assert(OS_STORAGE_URL and SWIFT_TOKEN and container_name)
def test_init(self, OS_STORAGE_URL, SWIFT_TOKEN, container_name):
assert(tzis.Tzis(OS_STORAGE_URL,
SWIFT_TOKEN,
container_name))
```
%% Cell type:code id: tags:
``` python
config=[]
testfiledir="files"
container=tzis.Tzis(OS_STORAGE_URL,
SWIFT_TOKEN,
container_name,
verbose=True)
container.store.rm(container.fsspec_map.root+"/*",recursive=True)
fsmap=swifthandling.get_swift_mapper(
OS_STORAGE_URL,
SWIFT_TOKEN,
container_name
)
fsmap.fs.rm(fsmap.root+"/*",recursive=True)
for file in os.listdir(testfiledir):
files=testfiledir+"/"+file
varname=None
zarrobject=file+".zarr"
xarray_kwargs=None
file_ending=file.split('.')[-1]
if file_ending == "grb":
xarray_kwargs={"engine":"cfgrib"}
elif file_ending != "nc":
os.remove(files)
continue
config.append((container,files,varname,xarray_kwargs,zarrobject))
config.append((fsmap,files,varname,xarray_kwargs,zarrobject))
```
%% Cell type:code id: tags:
``` python
@pytest.mark.parametrize("container,files,varname,xarray_kwargs,zarrobject", config)
@pytest.mark.parametrize("fsmap,files,varname,xarray_kwargs,zarrobject", config)
class TestTzisFunctions:
def test_open_mf_dataset(self,
container,
fsmap,
files,
varname,
xarray_kwargs,
zarrobject):
assert(container.open_mf_dataset(files,
varname,
xarray_kwargs=xarray_kwargs))
def test_open_store(self,
container,
files,
varname,
xarray_kwargs,
zarrobject):
container.open_store(zarrobject)
assert(True)
assert(
openmf.open_mfdataset_optimized(
files,
varname,
fsspec.get_mapper(
'/'.join(
[
fsmap.root,
zarrobject
]
)
),
xarray_kwargs=xarray_kwargs
)
)
```
%% Cell type:code id: tags:
``` python
@pytest.mark.parametrize("container,files,varname,xarray_kwargs,zarrobject", config)
@pytest.mark.parametrize("fsmap,files,varname,xarray_kwargs,zarrobject", config)
@pytest.mark.parametrize("target_mb,validity_check",
[(10,True),(2000,True),
(10,False),(2000,False)])
class TestTzisWrite:
def test_write_zarr(self,
container,
fsmap,
files,
varname,
xarray_kwargs,
zarrobject,
target_mb,
validity_check):
container.mf_dset = container.open_mf_dataset(files,
varname,
xarray_kwargs=xarray_kwargs)
container.open_store(zarrobject+str(target_mb))
container.write_zarr(chunkdim="time",
target_mb=target_mb,
startchunk=0,
validity_check=validity_check,
maxretries=3)
targetstore=fsspec.get_mapper(
'/'.join(
[
fsmap.root,
zarrobject,
str(target_mb)
]
)
)
omo = openmf.open_mfdataset_optimized(
files,
varname,
targetstore,
xarray_kwargs=xarray_kwargs
)
outstore = tzis.write_zarr(
targetstore,
omo,
varname,
verbose=True,
chunkdim="time",
target_mb=target_mb,
startchunk=0,
validity_check=validity_check,
maxretries=3
)
assert(True)
```
%% Cell type:code id: tags:
``` python
pytest.main(["-x", "./test_functions.py"])
```
%% Cell type:code id: tags:
``` python
```
......
%% Cell type:code id: tags:
``` python
from tzis import tzis
from tzis import *
import os
import pytest
import xarray as xr
import subprocess
import fsspec
```
%% Cell type:code id: tags:
``` python
OS_STORAGE_URL="https://swift.dkrz.de/v1/dkrz_0b2a0dcc-1430-4a8a-9f25-a6cb8924d92b"
SWIFT_TOKEN=os.environ.get('SWIFT_TOKEN')
container_name="tzis-tests_output-created"
fsmap_name="tzis-tests_output-created"
```
%% Cell type:code id: tags:
``` python
config=[]
testfiledir="files"
container=tzis.Tzis(OS_STORAGE_URL,
SWIFT_TOKEN,
container_name,
verbose=True)
alltests=container.store.listdir(container.fsspec_map.root)
fsmap=swifthandling.get_swift_mapper(
OS_STORAGE_URL,
SWIFT_TOKEN,
fsmap_name
)
alltests=fsmap.fs.listdir(fsmap.fsspec_map.root)
for zarr_test_output in alltests:
if zarr_test_output["type"] == "directory":
print(zarr_test_output)
config.append((container,zarr_test_output["name"].split('/')[-1]))
config.append(
(fsmap,
zarr_test_output["name"].split('/')[-1]
)
)
```
%% Cell type:code id: tags:
``` python
def set_coords(zarr_dset):
precoords = set(
["lat_bnds", "lev_bnds", "ap", "b", "ap_bnds", "b_bnds", "lon_bnds"]
)
coords = [x for x in zarr_dset.data_vars.variables if x in precoords]
return zarr_dset.set_coords(coords)
def get_encoding(zarr_dset):
encoding = {
var:{
key : zarr_dset[var].encoding[key]
for key in zarr_dset[var].encoding.keys()
if key not in ['chunks', 'preferred_chunks', 'compressor', 'filters', 'coordinates']
}
for var in list(zarr_dset.variables.keys())
}
for var in zarr_dset.data_vars:
encoding[var].update(dict(zlib=True, complevel=1))
for var in zarr_dset.coords:
encoding[var].update(dict(_FillValue=False))
return encoding
```
%% Cell type:code id: tags:
``` python
@pytest.mark.parametrize("container,zarr_test_output", config)
@pytest.mark.parametrize("fsmap,zarr_test_output", config)
class TestTzisValidate:
def test_validate_tzis_open(self,
container,
fsmap,
zarr_test_output
):
container.open_store(container.os_url, container.container, zarr_test_output)
zarr_dset=xr.open_zarr(container.fsspec_map,
zarr_fsmap=fsspec.get_mapper(
'/'.join(
[
fsmap.root,
zarr_test_output
]
)
)
zarr_dset=xr.open_zarr(zarr_fsmap,
consolidated=True,
#decode_times=True,
use_cftime=True)
assert True
def test_validate_tzis_tonetcdf(self,
container,
fsmap,
zarr_test_output
):
container.open_store(container.os_url, container.container, zarr_test_output)
zarr_fsmap=fsspec.get_mapper(
'/'.join(
[
fsmap.root,
zarr_test_output
]
)
)
zarr_dset=xr.open_zarr(container.fsspec_map,
zarr_dset=xr.open_zarr(zarr_fsmap,
consolidated=True,
#decode_times=True,
use_cftime=True)
zarr_dset=set_coords(zarr_dset)
encoding=get_encoding(zarr_dset)
netcdf_rewritten=zarr_test_output+".nc"
zarr_dset.isel(time=slice(0,10)).to_netcdf(netcdf_rewritten,
format="NETCDF4_CLASSIC",
unlimited_dims="time",
encoding=encoding)
assert(True)
def test_validate_tzis_compare(self,
container,
fsmap,
zarr_test_output
):
netcdf_rewritten=zarr_test_output+".nc"
netcdf_orig=os.path.join(testfiledir, '.'.join(zarr_test_output.split('.')[:-1]))
with open("cdo_diff.log", "a") as f:
f.write(netcdf_rewritten)
subprocess.run(["cdo", "diff", netcdf_rewritten, "-seltimestep,1/10", netcdf_orig],
stdout=f,
stderr=subprocess.STDOUT)
subprocess.run(["rm", netcdf_rewritten])
assert(True)
```
%% Cell type:code id: tags:
``` python
pytest.main(["-x", "./test_validations.py"])
```
%% Cell type:code id: tags:
``` python
```
......
......@@ -4,6 +4,7 @@ setup.py
tzis/__init__.py
tzis/catalog.py
tzis/daskhandling.py
tzis/openmf.py
tzis/provenance.py
tzis/rechunker.py
tzis/swifthandling.py
......
......@@ -12,4 +12,5 @@ aiohttp_retry
pandas<1.4.0
swiftspec@ git+https://github.com/fsspec/swiftspec
python-swiftclient>=3.10.0
pytest
lxml
......@@ -6,7 +6,7 @@ import copy
import fsspec
def write_index_file(store, fsmap,pattern=None, contact=None):
def write_index_file(store, url,pattern=None, contact=None):
index_name = "INDEX.csv"
header = [
"contact",
......@@ -23,13 +23,13 @@ def write_index_file(store, fsmap,pattern=None, contact=None):
writer.writerow(header)
all_zarr_datasets = [entry["name"].split('/')[-1]
for entry in store.listdir(fsmap.root)
for entry in store.listdir(url)
]
catalog = next((zd for zd in all_zarr_datasets if "catalog" in zd), None)
if catalog:
all_zarr_datasets.remove(catalog)
if index_name in all_zarr_datasets:
del fsmap[index_name]
store.rm('/'.join([url,index_name]))
all_zarr_datasets.remove(index_name)
if pattern:
all_zarr_datasets = [zd for zd in all_zarr_datasets if pattern in zd]
......@@ -38,11 +38,11 @@ def write_index_file(store, fsmap,pattern=None, contact=None):
all_zarr_datasets = [zd for zd in all_zarr_datasets if outsort not in zd]
for zd in all_zarr_datasets:
sumbytes = store.du('/'.join([fsmap.root,zd]))
sumbytes = store.du('/'.join([url,zd]))
modified=0
try:
modified = [entry["last_modified"]
for entry in store.ls('/'.join([fsmap.root,zd]))
for entry in store.ls('/'.join([url,zd]))
if entry["name"].endswith(".zmetadata")]
if modified :
modified=modified[0]
......@@ -57,12 +57,13 @@ def write_index_file(store, fsmap,pattern=None, contact=None):
sumbytes,
#min(modiflist),
modified,
"/".join([fsmap.root, zd]),
"/".join([url, zd]),
]
)
# all_zarr_datasets_bytes = u'\n'.join(all_zarr_datasets).encode('utf8').strip()
fsmap[index_name] = bytes(output.getvalue(), "utf8")
with store.open("/".join([url, index_name]),"wb") as f:
f.write(bytes(output.getvalue(), "utf8"))
def write_catalog(
store,
......
import fsspec
import xarray
from .provenance import Provenance
from .rechunker import calc_chunk_length
OPEN_MFDATASET_KWARGS = dict(
decode_cf=True,
use_cftime=True,
data_vars="minimal",
coords="minimal",
compat="override",
combine_attrs="drop_conflicts",
)
class open_mfdataset_optimize:
"""
Opens the dataset with xarrays `open_mfdataset`
- with optimized chunks by estimating sizes with a test file `mf[0]` if `chunkdim` and `target_mb` are set.
- with OPEN_MFDATASET_KWARGS and `xarray_kwargs` if provided
It saves the original size of the source if available for estimating compression ratio.
It initializes the provenance.
It collects conflicting attributes and saves it as a new attribute.
It sets a new `tracking_id` and appends to the history attribute.
Parameters
----------
mf : list or str
mf is converted to a list and used as the first argument of `xarray.open_mfdataset`.
varname: str
varname is the variable which is used for rechunking and which should be written to the target storage.
chunkdim=None : str
chunkdim is the chunk dimension used for rechunking. Only set this in combination with target_mb.
target_mb=None : int
target_mb is the desired size of one chunk in the target storage in megabytes.
Only set this in combination with chunkdim.
xarray_kwargs=None : dict
xarray_kwargs are unpacked within `open_mfdataset`.
Returns
-------
Dataset or None
The xarray dataset with optimized chunk setting and attributes.
"""
def __init__(self,
mf,
varname,
target_fsmap,
chunkdim=None,
target_mb=None,
xarray_kwargs=None,
verbose=False):
# if type(mf) != list and type(mf) != str :
# raise ValueError("Dataset '{0}' must either be a string or a list of strings")
self.mf=mf
self.target_fsmap=target_fsmap
self.varname=varname
self.mf_dset=None
self.original_size=None
self.provenance=None
if verbose:
print("Resetting disk size and compression ratio")
if type(mf) == str:
mf = [mf]
if verbose:
print("Calculating chunk size")
if xarray_kwargs is not None:
testds=xarray.open_dataset(mf[0], **xarray_kwargs)
else:
testds=xarray.open_dataset(mf[0])
if not varname:
varname = get_varname(testds)
else:
if varname not in list(testds.data_vars):
raise ValueError(
"Given variable name {0} not in dataset {1}".format(
varname, mf[0]
)
)
if xarray_kwargs:
OPEN_MFDATASET_KWARGS.update(xarray_kwargs)
l_chunksset=False
if chunkdim and target_mb:
size=fsspec.get_mapper(mf[0]).fs.du(mf[0])/1024/1024
if size < target_mb:
print(f"In open_mfdataset_optimize we cannot set target_mb {target_mb} because "
f"the size of the testfile {mf[0]} is {size}MB and therefore smaller.")
else :
chunk_length = calc_chunk_length(testds,
varname,
chunkdim,
target_mb,
1)
if verbose:
print(f"Set chunks to {chunkdim}: {chunk_length}")
OPEN_MFDATASET_KWARGS.update(dict(chunks={chunkdim: chunk_length}))
l_chunksset=True
testds.close()
mf_dset = xarray.open_mfdataset(mf, **OPEN_MFDATASET_KWARGS) # ,
if chunkdim and target_mb and l_chunksset:
mf_dset = mf_dset.chunk({chunkdim: chunk_length})
for var_id in mf_dset.variables:
mf_dset[var_id].unify_chunks()
conflict_attrs = get_conflict_attrs(mf, mf_dset, xarray_kwargs)
self.provenance = Provenance(target_fsmap.root)
if "tracking_id" in conflict_attrs:
self.provenance.gen_input(
"input_tracking_id", conflict_attrs["tracking_id"]
)
else:
self.provenance.gen_input("input_file_name", mf)
self.original_size=sum([fsspec.get_mapper(mf_input).fs.du(mf_input)
for mf_input in mf]
)
from .tzis import __version__ as tzis_version
mf_dset.attrs["tracking_id"] = self.provenance.tracking_id
mf_dset.attrs.setdefault("history", "")
mf_dset.attrs[
"history"
] += "Converted and written to swift target with tzis version {0}".format(
tzis_version
)
# if keep_attrs and conflict_attrs:
# for k, v in conflict_attrs.items():
# mf_dset.attrs[k] = v
# for coord in mf_dset.coords:
# mf_dset[coord].load()
self.mf_dset=mf_dset
def __str__(self):
return self.mf_dset.__str__()
def __repr__(self):
return self.mf_dset.__repr__()
def get_conflict_attrs(mf, mf_dset, xarray_kwargs):
"""
Collects attributes which conflict within all single dsets in `mf`.
It opens all elements of `mf` with `xarray.open_dataset` and collects
attributes in a dictionary and their values in lists.
Parameters
----------
mf : list or str
mf is converted to a list and used as the first argument of `xarray.open_mfdataset`.
mf_dset: Dataset
`mf_dset` is the `xarray` object returned by `xarray.open_mfdataset` which does not include
the conflicting attributes.
xarray_kwargs=None : dict
xarray_kwargs are unpacked within `open_mfdataset`.
Returns
-------
Dict
All conflicting attributes and its values.
"""
conflict_attrs = {}
# try:
maxdigits = len(str(len(mf)))
digitstring = "{0:0" + str(maxdigits) + "}"
for fileno, dset in enumerate(mf):
if xarray_kwargs is not None:
ds=xarray.open_dataset(dset,**xarray_kwargs)
else:
ds=xarray.open_dataset(dset)
dset_attrs = ds.attrs
missing_attrs = {
k: v for k, v in dset_attrs.items() if k not in mf_dset.attrs
}
for k, v in missing_attrs.items():
# attr_prefix=" File "+digitstring.format(fileno)+ ": "
# if dset_attrs["tracking_id"]:
# attr_prefix=" "+dset_attrs["tracking_id"] + ": "
# conflict_attrs[k]=conflict_attrs[k] + attr_prefix + v + ","
conflict_attrs.setdefault(k, [])
conflict_attrs[k].append(v)
# except:
# if verbose:
# print("Could not collect all attributes.")
return conflict_attrs
def get_varname(mf_dset):
return_varname = ""
varlist = list(mf_dset.data_vars)
for var in varlist:
cinv = False
for coord in ["ap", "b", "ps", "bnds"]:
if coord in var:
cinv = True
break
if cinv:
continue
return_varname = var
break
if not return_varname:
raise ValueError(
"Could not find any variable to write to swift. Please specify varname."
)
if verbose:
print(
"We use variable {0} in case we need to rechunk.".format(return_varname)
)
return return_varname
\ No newline at end of file
......@@ -6,7 +6,7 @@ import json
import io
import copy
from datetime import datetime
from .swifthandling import mapper_mkdirs
from .swifthandling import get_swift_mapper
from prov.identifier import Namespace
import prov.model as prov
......@@ -29,17 +29,24 @@ DCTERMS_SOURCE = DCTERMS["source"]
# tzis namespace
TZIS = Namespace("tzis", uri="urn:tzis:")
class Provenance(object):
def __init__(self, url,container,prefix):
def __init__(self, url):
from .tzis import __version__ as tzis_version
self._identifier = None
self.tzis_version = tzis_version
self.url=url
self.container=container
self.prefix=prefix
self.container=None
self.prefix=None
if self.url.startswith("swift"):
self.url=os.path.dirname(
os.path.dirname(url)
)
self.container=os.path.basename(
os.path.dirname(url)
)
self.prefix=os.path.basename(url)
# Create an empyty prov document
self.doc = prov.ProvDocument()
......@@ -111,14 +118,20 @@ class Provenance(object):
output = io.StringIO()
self.doc.serialize(destination=output)
#
store=mapper_mkdirs(self.url,self.container,"provenance")
store=get_swift_mapper(self.url,
os.getenv("OS_AUTH_TOKEN"),
self.container,
"provenance")
store[f"provenance_{self.prefix}_{self.tracking_id}.json"] = bytes(
output.getvalue(), "utf8"
)
def write_png(self):
figure = prov_to_dot(self.doc)
store=mapper_mkdirs(self.url,self.container,"provenance")
store=get_swift_mapper(self.url,
os.getenv("OS_AUTH_TOKEN"),
self.container,
"provenance")
try:
store[f"provenance_{self.prefix}_{self.tracking_id}.png"] = figure.create_png()
except:
......
......@@ -9,16 +9,76 @@ from datetime import datetime
from pathlib import Path
import fsspec
import time
from copy import deepcopy
import stat
from swiftclient import Connection
def mapper_mkdirs(url,container,prefix):
async def get_client(**kwargs):
import aiohttp
import aiohttp_retry
retry_options = aiohttp_retry.ExponentialRetry(
attempts=3,
exceptions={OSError, aiohttp.ServerDisconnectedError})
retry_client = aiohttp_retry.RetryClient(raise_for_status=False, retry_options=retry_options)
return retry_client
def get_storestring_and_options(store):
"""ToDo: Check if string is required. If swift is the protocol, use aiohttp_retry client to
overcome writing errors.
Parameters
----------
store : `fsspec` mapping.
Returns
-------
storestring
The root of the mapping as str.
storage_options
Backend options for to_zarr depending on the protocol as dictionary.
"""
storage_options=None
storestring=store
if store.fs.protocol == "swift":
storestring=store.root
storage_options={"get_client": get_client}
return storestring, storage_options
def get_swift_mapper(url, os_token, container, os_name=None):
"""
"""
fs_url = get_fsspec_url(url)
os_url = get_os_url(url)
os.environ["OS_STORAGE_URL"] = os_url
os.environ["OS_AUTH_TOKEN"] = os_token
mapper_url="/".join(list(filter(None,[fs_url,container,os_name])))
fsspec_map=fsspec.get_mapper(mapper_url)
mapper_mkdirs(fs_url,container,prefix=os_name)
test_fsspec(fsspec_map)
return fsspec_map
def mapper_mkdirs(url,container,prefix=None):
mapper_url="/".join(list(filter(None,[url,container,prefix])))
fsmap=fsspec.get_mapper(mapper_url)
test_fsspec(fsmap)
if not fsmap.fs.protocol == "swift":
fsmap.fs.mkdirs(mapper_url,exist_ok=True)
return fsmap
#create container if necessary
if fsmap.fs.protocol == "swift":
getenv = os.environ.get
conn = Connection(preauthurl=getenv('OS_STORAGE_URL'),
preauthtoken=getenv('OS_AUTH_TOKEN'))
listings=fsmap.fs.listdir(url)
if not container in [a["name"].split('/')[-1]
for a in listings]:
conn.put_container(container)
else:
with fsmap.fs.transaction:
fsmap.fs.mkdirs(mapper_url,exist_ok=True)
def test_fsspec(fsmap):
try:
......@@ -30,12 +90,21 @@ def test_fsspec(fsmap):
raise ValueError("Could not use fsspec implementation")
def get_fsspec_url(os_url):
if os_url.startswith("https"):
os_url = os_url.replace("https", "swift")
elif os_url.startswith("http"):
os_url = os_url.replace("http", "swift")
os_url = os_url.replace("v1/", "")
def get_fsspec_url(url):
fs_url=deepcopy(url)
if fs_url.startswith("https"):
fs_url = fs_url.replace("https", "swift")
elif fs_url.startswith("http"):
fs_url = fs_url.replace("http", "swift")
fs_url = fs_url.replace("v1/", "")
return fs_url
def get_os_url(url):
os_url=deepcopy(url)
if os_url.startswith("swift"):
os_url = os_url.replace("swift:/", "https:/")
if not ".de/v1" in os_url:
os_url = os_url.replace(".de/", ".de/v1/")
return os_url
......
......@@ -21,26 +21,9 @@ import stat
import sys
import time
import json
from . import provenance
from . import catalog
async def get_client(**kwargs):
import aiohttp
import aiohttp_retry
retry_options = aiohttp_retry.ExponentialRetry(
attempts=3,
exceptions={OSError, aiohttp.ServerDisconnectedError})
retry_client = aiohttp_retry.RetryClient(raise_for_status=False, retry_options=retry_options)
return retry_client
OPEN_MFDATASET_KWARGS = dict(
decode_cf=True,
use_cftime=True,
data_vars="minimal",
coords="minimal",
compat="override",
combine_attrs="drop_conflicts",
)
from .provenance import Provenance
from .catalog import *
from .openmf import *
OPEN_ZARR_KWARGS = dict(
consolidated=True,
......@@ -63,356 +46,49 @@ except Exception:
__version__ = "999"
class Tzis:
class write_zarr(fsspec.FSMap):
def __init__(
self,
os_url,
os_token,
os_container,
os_name=None,
mf_dset=None,
varname=None,
fsmap,
mf_dset,
varname,
verbose=False,
open_kwargs=None,
xarray_kwargs=None
chunkdim="time",
target_mb=0,
startchunk=0,
validity_check=False,
maxretries=3,
trusted=True
):
self.DASK_CHUNK_SIZE = 70
self._cratio=None
self._original_size=None
self.original_size=None
self.verbose = verbose
self.provenance = None
self.fsspec_map = None
self.varname = varname
self.auth = {
"preauthurl": os_url,
"preauthtoken": os_token,
}
self.url=os_url
self.os_url = get_fsspec_url(os_url)
os.environ["OS_STORAGE_URL"] = os_url
os.environ["OS_AUTH_TOKEN"] = os_token
self.fsspec_map = fsmap
self.fs = self.fsspec_map.fs
self.container = os_container
#self.prefix=os_name happens in open_store
#self.store= fsspec.filesystem("swift") happens in open_store
self.fsspec_map = self.open_store(self.os_url,self.container,os_name)
self.varname = varname
#
if open_kwargs:
self.mf_dset = self.open_mf_dataset(
mf_dset, self.varname, xarray_kwargs=xarray_kwargs,**open_kwargs
)
else:
self.mf_dset = self.open_mf_dataset(
mf_dset, self.varname, xarray_kwargs=xarray_kwargs
)
self.mf_dset=mf_dset
self.provenance = None
if isinstance(self.mf_dset,open_mfdataset_optimize):
print(self.mf_dset)
self.provenance=self.mf_dset.provenance
self.original_size=self.mf_dset.provenance
self.recent_chunk=None
def get_storestring_and_options(self,store):
"""ToDo: Check if string is required. If swift is the protocol, use aiohttp_retry client to
overcome writing errors.
Parameters
----------
store : `fsspec` mapping.
Returns
-------
storestring
The root of the mapping as str.
storage_options
Backend options for to_zarr depending on the protocol as dictionary.
"""
storage_options=None
storestring=store
if self.store.protocol == "swift":
storestring=store.root
storage_options={"get_client": get_client}
return storestring, storage_options
def init_verify_provenance(self, reset=False):
"""
Initializes a provenance document if reset is True or self.provenance is None.
Parameters
----------
reset : If True, provenance is recreated. Default is False.
Returns
-------
status
True if provenance was newly created, False if not.
"""
if self.provenance and reset:
if self.verbose:
print(
"Opening a fresh provenance."
)
self.provenance = None
if not self.provenance:
self.provenance = provenance.Provenance(self.os_url,self.container,self.prefix)
return True
return False
def open_mf_dataset(self, mf, varname, chunkdim=None, target_mb=None, xarray_kwargs=None):
"""
Opens the dataset with xarrays `open_mfdataset`
- with optimized chunks by estimating sizes with a test file `mf[0]` if `chunkdim` and `target_mb` are set.
- with OPEN_MFDATASET_KWARGS and `xarray_kwargs` if provided
It saves the original size of the source if available for estimating compression ratio.
It initializes the provenance.
It collects conflicting attributes and saves it as a new attribute.
It sets a new `tracking_id` and appends to the history attribute.
Parameters
----------
mf : list or str
mf is converted to a list and used as the first argument of `xarray.open_mfdataset`.
varname: str
varname is the variable which is used for rechunking and which should be written to the target storage.
chunkdim=None : str
chunkdim is the chunk dimension used for rechunking. Only set this in combination with target_mb.
target_mb=None : int
target_mb is the desired size of one chunk in the target storage in megabytes.
Only set this in combination with chunkdim.
xarray_kwargs=None : dict
xarray_kwargs are unpacked within `open_mfdataset`.
Returns
-------
Dataset or None
The xarray dataset with optimized chunk setting and attributes.
"""
# if type(mf) != list and type(mf) != str :
# raise ValueError("Dataset '{0}' must either be a string or a list of strings")
if mf:
if self.verbose:
print("Resetting disk size and compression ratio")
self._original_size=None
self._cratio=None
if type(mf) == str:
mf = [mf]
if self.verbose:
print("Calculating chunk size")
if xarray_kwargs is not None:
testds=xarray.open_dataset(mf[0], **xarray_kwargs)
else:
testds=xarray.open_dataset(mf[0])
if not varname:
self.varname = self._get_varname(testds)
else:
if varname not in list(testds.data_vars):
raise ValueError(
"Given variable name {0} not in dataset {1}".format(
varname, mf[0]
)
)
self.varname = varname
if xarray_kwargs:
OPEN_MFDATASET_KWARGS.update(xarray_kwargs)
l_chunksset=False
if chunkdim and target_mb:
size=fsspec.get_mapper(mf[0]).fs.du(mf[0])/1024/1024
if size < target_mb:
print(f"In open_mf_dataset we cannot set target_mb {target_mb} because "
f"the size of the testfile {mf[0]} is {size}MB and therefore smaller.")
else :
chunk_length = calc_chunk_length(testds,
self.varname,
chunkdim,
target_mb,
1)
if self.verbose:
print(f"Set chunks to {chunkdim}: {chunk_length}")
OPEN_MFDATASET_KWARGS.update(dict(chunks={chunkdim: chunk_length}))
l_chunksset=True
testds.close()
mf_dset = xarray.open_mfdataset(mf, **OPEN_MFDATASET_KWARGS) # ,
if chunkdim and target_mb and l_chunksset:
mf_dset = mf_dset.chunk({chunkdim: chunk_length})
for var_id in mf_dset.variables:
mf_dset[var_id].unify_chunks()
conflict_attrs = self.get_conflict_attrs(mf, mf_dset, xarray_kwargs)
self.init_verify_provenance(reset=True)
if "tracking_id" in conflict_attrs:
self.provenance.gen_input(
"input_tracking_id", conflict_attrs["tracking_id"]
)
else:
self.provenance.gen_input("input_file_name", mf)
self._original_size=sum([fsspec.get_mapper(mf_input).fs.du(mf_input)
for mf_input in mf]
)
mf_dset.attrs["tracking_id"] = self.provenance.tracking_id
mf_dset.attrs.setdefault("history", "")
mf_dset.attrs[
"history"
] += "Converted and written to swift target with tzis version {0}".format(
__version__
)
# if keep_attrs and conflict_attrs:
# for k, v in conflict_attrs.items():
# mf_dset.attrs[k] = v
# for coord in mf_dset.coords:
# mf_dset[coord].load()
return mf_dset
elif self.verbose:
print("You did not provide anything to open.")
return None
def get_conflict_attrs(self, mf, mf_dset, xarray_kwargs):
"""
Collects attributes which conflict within all single dsets in `mf`.
It opens all elements of `mf` with `xarray.open_dataset` and collects
attributes in a dictionary and their values in lists.
Parameters
----------
mf : list or str
mf is converted to a list and used as the first argument of `xarray.open_mfdataset`.
mf_dset: Dataset
`mf_dset` is the `xarray` object returned by `xarray.open_mfdataset` which does not include
the conflicting attributes.
xarray_kwargs=None : dict
xarray_kwargs are unpacked within `open_mfdataset`.
Returns
-------
Dict
All conflicting attributes and its values.
"""
conflict_attrs = {}
# try:
maxdigits = len(str(len(mf)))
digitstring = "{0:0" + str(maxdigits) + "}"
for fileno, dset in enumerate(mf):
if xarray_kwargs is not None:
ds=xarray.open_dataset(dset,**xarray_kwargs)
else:
ds=xarray.open_dataset(dset)
dset_attrs = ds.attrs
missing_attrs = {
k: v for k, v in dset_attrs.items() if k not in mf_dset.attrs
}
for k, v in missing_attrs.items():
# attr_prefix=" File "+digitstring.format(fileno)+ ": "
# if dset_attrs["tracking_id"]:
# attr_prefix=" "+dset_attrs["tracking_id"] + ": "
# conflict_attrs[k]=conflict_attrs[k] + attr_prefix + v + ","
conflict_attrs.setdefault(k, [])
conflict_attrs[k].append(v)
# except:
# if self.verbose:
# print("Could not collect all attributes.")
return conflict_attrs
def _get_varname(self, mf_dset):
return_varname = ""
varlist = list(mf_dset.data_vars)
for var in varlist:
cinv = False
for coord in ["ap", "b", "ps", "bnds"]:
if coord in var:
cinv = True
break
if cinv:
continue
return_varname = var
break
if not return_varname:
raise ValueError(
"Could not find any variable to write to swift. Please specify varname."
)
if self.verbose:
print(
"We use variable {0} in case we need to rechunk.".format(return_varname)
)
return return_varname
def open_store(self, os_url, container, os_name):
"""
Collects attributes which conflict within all single dsets in `mf`.
It opens all elements of `mf` with `xarray.open_dataset` and collects
attributes in a dictionary and their values in lists.
Parameters
----------
mf : list or str
mf is converted to a list and used as the first argument of `xarray.open_mfdataset`.
mf_dset: Dataset
`mf_dset` is the `xarray` object returned by `xarray.open_mfdataset` which does not include
the conflicting attributes.
xarray_kwargs=None : dict
xarray_kwargs are unpacked within `open_mfdataset`.
Returns
-------
Dict
All conflicting attributes and its values.
"""
if not all([os_url,container]):
raise ValueError(f"Specify at least os_url and container for open_store")
if os_name:
if type(os_name) != str:
raise ValueError(f"{f} must be a sting")
self.prefix = os_name
mapper_url="/".join(list(filter(None,[os_url,container,self.prefix])))
self.fsspec_map=fsspec.get_mapper(mapper_url)
self.store=self.fsspec_map.fs
#create container if necessary
if self.store.protocol == "swift":
getenv = os.environ.get
conn = Connection(preauthurl=getenv('OS_STORAGE_URL'),
preauthtoken=getenv('OS_AUTH_TOKEN'))
listings=self.store.listdir(self.os_url)
if not self.container in [a["name"].split('/')[-1]
for a in listings]:
conn.put_container(self.container)
else:
with self.store.transaction:
self.store.mkdirs(mapper_url,exist_ok=True)
test_fsspec(self.fsspec_map)
return self.fsspec_map
target=self.write_zarr_func(
chunkdim=chunkdim,
target_mb=target_mb,
startchunk=startchunk,
validity_check=validity_check,
maxretries=maxretries,
trusted=trusted
)
super().__init__(target.root,target.fs)
def _drop_vars_without_chunkdim(self, ds, chunkdim):
"""Drops all variables which do not depend on the chunk dimension.
......@@ -470,15 +146,13 @@ class Tzis:
#)
def _init_dataset(self, ds, store, chunkdim, overwrite):
storestring,storage_options=self.get_storestring_and_options(store)
storestring,storage_options=get_storestring_and_options(store)
if overwrite:
try:
#prefix = self.prefix
with self.store.transaction:
self.store.rm('/'.join([self.os_url,self.container,self.prefix]),recursive=True)
#with self.store.transaction:
# self.store.mkdirs('/'.join([self.os_url,self.container,self.prefix]),exist_ok=True)
#self.prefix = prefix
with self.fs.transaction:
self.fs.rm(self.fsspec_map.root,recursive=True)
#with self.fs.transaction:
# self.fs.mkdirs('/'.join([self.os_url,self.container,self.prefix]),exist_ok=True)
except Exception as e:
print(e)
if self.verbose:
......@@ -502,7 +176,7 @@ class Tzis:
def _open_or_initialize_swift_dset(
self, store, ds, chunkdim, validity_check, overwrite=False
):
storestring,storage_options=self.get_storestring_and_options(store)
storestring,storage_options=get_storestring_and_options(store)
if not overwrite:
try:
return (
......@@ -514,7 +188,7 @@ class Tzis:
except:
if validity_check:
raise ValueError(
f"validity_check=True but target dataset {self.prefix} cannot be opened."
f"validity_check=True but target dataset {self.fsspec_map.root} cannot be opened."
)
elif self.verbose:
print("A new zarr dataset is created at store.")
......@@ -691,7 +365,7 @@ class Tzis:
def _reset_already_if_overwrite(
self, validity_check, already, chunked_ds, store, isnew, chunkdim, varname, l_append
):
storestring,storage_options=self.get_storestring_and_options(store)
storestring,storage_options=get_storestring_and_options(store)
b_isnew = isnew
if not validity_check:
overwrite = True
......@@ -727,7 +401,7 @@ class Tzis:
# 'Initiliazed' means that a first `to_zarr` call has been executed which writes all coordinates and metadata for the dataset into the chunk. The subsequent `to_zarr` calls performed by `write_to_region` uses this information so that it knows how chunks have to be named. If a region has already been written, it will be overwritten by write_to_region.
def write_by_region(self, towrite, store, chunkdim, chunkdimvalues):
storestring,storage_options=self.get_storestring_and_options(store)
storestring,storage_options=get_storestring_and_options(store)
towrite_chunks = towrite[chunkdim].values
startindex = [
idx
......@@ -781,7 +455,7 @@ class Tzis:
if self._cratio:
assumed_mb*=self._cratio
if start_dask_chunk < 3*work_dcs:
newsize=self.store.du('/'.join([self.fsspec_map.root,self.varname]))/1024/1024
newsize=self.fs.du('/'.join([self.fsspec_map.root,self.varname]))/1024/1024
assumed_mb=newsize-size
size=newsize
print(
......@@ -907,7 +581,7 @@ class Tzis:
return 0
def write_directly(self, dset=None, store=None):
storestring,storage_options=self.get_storestring_and_options(store)
storestring,storage_options=get_storestring_and_options(store)
dset.to_zarr(store=storestring, mode="w", consolidated=True,
storage_options=storage_options)
......@@ -1033,8 +707,8 @@ class Tzis:
def write_provenance(self, pargs):
#try:
is_init = self.init_verify_provenance()
if is_init:
if not self.provenance:
self.provenance=Provenance(self.fsspec_map.root)
if self.mf_dset.attrs:
if "tracking_id" in self.mf_dset.attrs:
self.provenance.gen_input(
......@@ -1056,10 +730,10 @@ class Tzis:
self.mf_dset.attrs.setdefault("hasProvenance", "")
self.mf_dset.attrs["hasProvenance"] += " " + "/".join(
[
self.url,
os.path.dirname(self.fsspec_map.root),
"provenance",
"_".join(
["provenance", self.prefix, self.provenance.tracking_id + ".json"]
["provenance", self.provenance.prefix, self.provenance.tracking_id + ".json"]
),
]
)
......@@ -1068,14 +742,14 @@ class Tzis:
# print("Could not create full provenance")
# pass
def write_zarr(
def write_zarr_func(
self,
chunkdim="time",
target_mb=0,
startchunk=0,
validity_check=False,
maxretries=3,
trusted=True,
trusted=True
):
self.recent_chunk = None
if not self.mf_dset :
......@@ -1090,17 +764,14 @@ class Tzis:
totalsize=dset_to_write.nbytes
if self._original_size and not self._cratio:
self._cratio=self._original_size/totalsize
if self.original_size and not self._cratio:
self._cratio=self.original_size/totalsize
if self.verbose:
print(f"Compression ratio was estimated by disk size to {self._cratio}")
if "chunks" in dset_to_write[self.varname].encoding.keys():
del dset_to_write[self.varname].encoding["chunks"]
pargs = locals()
if not self.prefix:
raise ValueError("First open a target store with open_store()")
if not validity_check:
self.write_provenance(pargs)
......@@ -1125,31 +796,14 @@ class Tzis:
self.write_directly(dset_to_write, self.fsspec_map)
# self.mf_dset.close()
try:
self.write_index_file()
except Exception as e:
print(e)
print("Could not write index file")
pass
return copy.deepcopy(self.fsspec_map)
def write_index_file(self, pattern=None, contact=None):
fsmap=mapper_mkdirs(self.os_url,self.container, None)
catalog.write_index_file(fsmap.fs, fsmap, pattern, contact)
def write_catalog(
self, catalogname="catalog.json", pattern=None, delim=".", columns=[], mode="a"
):
fsmap=mapper_mkdirs(self.os_url,self.container,None)
catalog.write_catalog(
fsmap.fs,
fsmap,
self.prefix,
catalogname="catalog.json",
pattern=None,
delim=".",
columns=[],
mode="a",
verbose=self.verbose,
)
\ No newline at end of file
# try:
write_index_file(self.fsspec_map.fs,
os.path.dirname(self.fsspec_map.root),
pattern=None,
contact=None)
# except Exception as e:
# print(e)
# print("Could not write index file")
# pass
return copy.deepcopy(self.fsspec_map)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment