diff --git a/notebooks/tzis-tutorial.ipynb b/notebooks/tzis-tutorial.ipynb index e72f0ec36fb561399eba9e07530567089a8b3895..7a2a06fe465d392d417f406432461e02a63ac582 100644 --- a/notebooks/tzis-tutorial.ipynb +++ b/notebooks/tzis-tutorial.ipynb @@ -23,14 +23,16 @@ "`tzis` features\n", "\n", "- **Parallel writing** of a zarr dataset which contains many files per `write_zarr` call with `dask` .\n", + "- **Avoid intermediate writing** to temporary storage for e.g. reformatting purposes because tzis directly writes the dataset into the target storage.\n", "- **Appending** variables to existing zarr datasets which allows to add grids afterwards\n", "- Optimized **Rechunking** along one dimension such that chunk sizes match quota of the target storage and dask graphs are kept small\n", "- **Consolidated metadata** by saving the intersection of many files' metadata into one .zmetadata json file.\n", - "- **Swiftspec** implementation for `fsspec` enabling linux-like file system operations on the object store (like `listdir`)\n", "- **All kind of other target storage**s (filesystem, s3, gcs, zip,...) by setting the `.fsspec_map` attribute manually.\n", "- Writing of **different input file formats**. \\*\n", - "- **Provenance**: Provencance files based on the `prov` lib are saved inside the container in the virtual directory `provenance` for all `write_zarr` calls.\n", - "- **Catalog creation**: Make your zarr datasets easy browsable and accessible via `intake` catalogs\n", + "- **Modular design**: Common feel from fsspec and xarray. Import only what you need:\n", + " - **Swiftspec** implementation for `fsspec` enabling linux-like file system operations on the object store (like `listdir`)\n", + " - **Provenance**: Provencance files based on the `prov` lib are saved inside the container in the virtual directory `provenance` for all `write_zarr` calls.\n", + " - **Catalog creation**: Make your zarr datasets easy browsable and accessible via `intake` catalogs\n", "\n", "\\* All files that can be passed to \n", "```python\n", @@ -83,7 +85,7 @@ "In recent years, cloud object storage systems became an alternative to traditional file systems because of\n", "\n", "- **Independency** from computational ressources. Users can access and download data from anywhere without the need of HPC access or resources\n", - "- **Scalability** because no filesystem or system manager has to care about the connected disks.\n", + "- **Scalability** because no filesystem has to care about the connected disks.\n", "- **A lack of storage** space in general because of increasing model output volume.\n", "- **No namespace conflicts** because data is accessed via global unique identifier\n", "\n", @@ -148,13 +150,13 @@ "source": [ "<a class=\"anchor\" id=\"token\"></a>\n", "\n", - "## Get token and url\n", + "## `swifthandling`: Get token and url\n", "\n", "`Tzis` includes a function to get the token or, if not available, create the token:\n", "\n", "```python\n", - "from tzis import tzis\n", - "token=tzis.get_token(host, account, username=USERNAME)\n", + "from tzis import swifthandling\n", + "token=swifthandling.get_token(host, account, username=USERNAME)\n", "```\n", "\n", "where `host` is either \"dkrz\" or \"jsc\".\n", @@ -168,19 +170,33 @@ "1. it returns a dictionary with all configuration variables" ] }, + { + "cell_type": "code", + "execution_count": 29, + "id": "dc780e96-8284-4be0-a9d6-59619aa67290", + "metadata": {}, + "outputs": [], + "source": [ + "from tzis import swifthandling\n", + "token=swifthandling.get_token(\"dkrz\",\"ik1017\",username=\"k204210\")" + ] + }, { "cell_type": "markdown", "id": "13396f94-6da3-4eb9-9602-1045fc9540c5", "metadata": {}, "source": [ - "# Initializing an output container\n", + "# Initialize swift mapper \n", "\n", "After successfully creating the authentication for swift, we *initialize* a swift container in which we will save the data. We do that with\n", "\n", "```python\n", - "container = tzis.Tzis(os_url, os_token, os_container, \n", - " os_name=None, mf_dset=None, varname=None, verbose=False, \n", - " xarray_kwargs=None)\n", + "fsmap=swifthandling.get_swift_mapper(\n", + " os_url,\n", + " os_token,\n", + " os_container,\n", + " os_name=os_name\n", + ")\n", "```\n", "\n", "The mandatory arguments are:\n", @@ -197,21 +213,23 @@ "- decide whether you want to run the write process in *Verbose* mode by specifying `verbose=True`\n", "\n", "E.g.:\n", + "\n", "```python\n", - "container = tzis.Tzis(token[\"OS_STORAGE_URL\"], token[\"OS_AUTH_TOKEN\"], \"tzistest\",\n", - " verbose=True)\n", - "```\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "68d2c5f4-f248-401b-b300-85a37f748f49", - "metadata": {}, - "outputs": [], - "source": [ - "from tzis import tzis\n", - "help(tzis.Tzis)" + "fsmap=swifthandling.get_swift_mapper(\n", + " token[\"OS_STORAGE_URL\"],\n", + " token[\"OS_AUTH_TOKEN\"],\n", + " \"tzistest\",\n", + " os_name=\"zarrfile\"\n", + ")\n", + "```\n", + "\n", + "You can check if your mapper works by applying\n", + "\n", + "```python\n", + "list(fsmap.keys())\n", + "```\n", + "\n", + "which will print *all* existing elements in the container." ] }, { @@ -219,13 +237,21 @@ "id": "c2380bfc-22ab-41c0-96a6-aa9ce2203b13", "metadata": {}, "source": [ - "\n", "# Setting a zarr dataset name (an object prefix)\n", "\n", - "You can switch to different zarr dataset output names within one container by overwriting the container's `store` attribute:\n", + "You can switch to different zarr dataset output names within one container by using fsspec:\n", + "\n", "```python\n", - "container.open_store(os_name):\n", - "```" + "zarr_dset_name=\"test\"\n", + "fsmap=fsspec.get_mapper(\n", + " '/'.join(\n", + " [fsmap.root, #Or use `os.path.dirname(fsmap.root)` if you set another object prefix already\n", + " zarr_dset_name\n", + " ]\n", + " )\n", + "```\n", + "\n", + "Use `os.path.dirname(fsmap.root)` instead of `fsmap.root` if you set another object prefix already." ] }, { @@ -235,64 +261,67 @@ "source": [ "<a class=\"anchor\" id=\"source\"></a>\n", "\n", - "# Open and configure the source dataset\n", - "\n", - "We need to connect the container with a source dataset which should be written into cloud. This is done by setting the container's attribute `container.mf_dset`. \n", + "# `openmf`: Open and configure the source dataset\n", "\n", - "We can use `tzis` for opening the dataset. `tzis` uses `xarray`'s `open_mfdataset` for reading the source file(s). It uses a configuration which tries to **merge** many source files into one dataset **without producing errors**. Therefore, *conflicts* are ignored and only structures and attributes are combined which are overcutting in all source files. The internal function looks like: \n", + "We can use `open_mfdataset_optimized` from module `tizs.openmf` for opening the dataset. `tzis` uses `xarray`'s `open_mfdataset` for reading the source file(s). The corresponding call is:\n", "\n", "```python\n", - "OPEN_MFDATASET_KWARGS = dict(\n", - " decode_cf=True,\n", - " use_cftime=True,\n", - " data_vars=\"minimal\",\n", - " coords=\"minimal\",\n", - " compat=\"override\",\n", - " combine_attrs=\"drop_conflicts\",\n", + "def open_mfdataset_optimize(\n", + " mf,\n", + " varname,\n", + " target_fsmap,\n", + " chunkdim=None,\n", + " target_mb=None,\n", + " xarray_kwargs=None,\n", + " verbose=False\n", ")\n", + "```\n", "\n", - "mf_dset = xarray.open_mfdataset(mf,\n", - " **OPEN_MFDATASET_KWARGS)\n", + "E.g.:\n", + "\n", + "```python\n", + "from tzis import openmf\n", + "varname=\"tas\"\n", + "path_to_dataset = f\"/mnt/lustre02/work/ik1017/CMIP6/data/CMIP6/ScenarioMIP/DKRZ/MPI-ESM1-2-HR/ssp370/r1i1p1f1/Amon/{varname}/gn/v20190710/\"\n", + "mfs_towrite=[path_var +filename for filename in os.listdir(path_to_dataset)]\n", + "omo=openmf.open_mfdataset_optimized(\n", + " mfs_towrite,\n", + " varname,\n", + " fsmap,\n", + " chunkdim=\"time\",\n", + " target_mb=100,\n", + " verbose=True\n", + ")\n", + "omo\n", "```\n", "\n", - "- Additionally, tzis will try to grep the **tracking_id** attributes of all input files and save them in the **provenance**. If none are available, the **name** of the files given in `mf` are used for provenance instead.\n", - "- The **history** attribute of the target zarr data set will be set to `f'Converted and written to swift cloud with tzis version {tzis.__version__}'`" + "Note that the actual dataset can be retrieved via `omo.mf_dset`." ] }, { "cell_type": "markdown", - "id": "f4e2f1b9-f4a4-4aa6-a384-e93762a41567", + "id": "41f810c1-17ec-4d94-97ae-2cbf90a90013", "metadata": {}, "source": [ - "`tzis` function for opening datasets can used at two points:\n", - "\n", - "1. Either with `tzis.open_mfdataset`:\n", + "It uses a configuration which tries to **merge** many source files into one dataset **without producing errors**. Therefore, *conflicts* are ignored and only structures and attributes are combined which are overcutting in all source files. The internal function looks like: \n", "\n", "```python\n", - "def open_mf_dataset(self, mf, varname, xarray_kwargs=None):\n", - "```\n", - "\n", - "The mandatory arguments are\n", - "\n", - "- `mf`: The dataset file(s). A `str` or a `list` of source files which can be opened with\n", - "- `varname`: The variable from the dataset which will be selected and then written into the object store\n", - "\n", - "E.g.:\n", + "OPEN_MFDATASET_KWARGS = dict(\n", + " decode_cf=True,\n", + " use_cftime=True,\n", + " data_vars=\"minimal\",\n", + " coords=\"minimal\",\n", + " compat=\"override\",\n", + " combine_attrs=\"drop_conflicts\",\n", + ")\n", "\n", - "```python\n", - "varname=\"tas\"\n", - "path_to_dataset = f\"/mnt/lustre02/work/ik1017/CMIP6/data/CMIP6/ScenarioMIP/DKRZ/MPI-ESM1-2-HR/ssp370/r1i1p1f1/Amon/{varname}/gn/v20190710/\"\n", - "mfs_towrite=[path_var +filename for filename in os.listdir(path_to_dataset)]\n", - "container.mf_dataset=container.open_mf_dataset(mfs_towrite, varname)\n", - "container.mf_dataset\n", + "mf_dset = xarray.open_mfdataset(mf,\n", + " **OPEN_MFDATASET_KWARGS)\n", "```\n", "\n", - "2. When opening the container, e.g. using variables from example 1:\n", - "\n", - "```python\n", - "container = tzis.Tzis(os_url, os_token, os_container, \n", - " os_name=os_name, mf_dset=mfs_towrite, varname=varname)\n", - "```" + "- Additionally, tzis will try to grep the **tracking_id** attributes of all input files and save them in the **provenance**. If none are available, the **name** of the files given in `mf` are used for provenance instead.\n", + "- The **history** attribute of the target zarr data set will be set to `f'Converted and written to swift cloud with tzis version {tzis.__version__}'`\n", + "- An **optimized** chunk size is used already for opening the dataset in order to match the specified target chunk size" ] }, { @@ -304,15 +333,15 @@ "source": [ "## Attributes\n", "\n", - "*Attributes* of the dataset are handled in a `dict`ionary in the `container.mf_dset` variable via `xarray`. You can **add** or **delete** attributes just like items from a dictionary:\n", + "*Attributes* of the dataset are handled in a `dict`ionary in the `omo.mf_dset` variable via `xarray`. You can **add** or **delete** attributes just like items from a dictionary:\n", "\n", "```python\n", "#add an attribute\n", - "mf_dset.attrs[\"new_attribute\"]=\"New value of attribute\"\n", - "print(mf_dset.attrs[\"new_attribute\"])\n", + "omo.mf_dset.attrs[\"new_attribute\"]=\"New value of attribute\"\n", + "print(omo.mf_dset.attrs[\"new_attribute\"])\n", "\n", "#delete the attribute\n", - "del mf_dset.attrs[\"new_attribute\"]\n", + "del omo.mmf_dsetf.attrs[\"new_attribute\"]\n", "```" ] }, @@ -325,147 +354,189 @@ "\n", "If you want to use `grb` input files, you can specify `cfgrib` as an **engine** for `xarray`.\n", "```python\n", - "container.open_mf_dataset(list_of_grib_files, \"pr\", xarray_kwargs=**dict(engine=\"cfgrib\"))\n", + "openmf.open_mfdataset_optimize(list_of_grib_files, \"pr\", xarray_kwargs=dict(engine=\"cfgrib\"))\n", "```" ] }, { "cell_type": "markdown", - "id": "76171636-1f6c-453b-942e-c62d2b49467d", + "id": "e638da79-4cae-4db3-bfaf-398174dd13dd", "metadata": { "tags": [] }, "source": [ - "<a class=\"anchor\" id=\"write\"></a>\n", + "## Chunking and rechunking - best practices\n", "\n", - "# Writing to swift\n", - "\n", - "After we have initialized the container and opened the dataset, we can **write** it into cloud. The conversion to `zarr` is made on the way. We can specify all necessary configuration options within the `write` function:\n", - "\n", - "```python\n", - "def write_zarr(self,\n", - " chunkdim='time',\n", - " target_mb=1000, \n", - " startchunk=0,\n", - " validity_check=False,\n", - " maxretries=3,\n", - " trusted=True)\n", - "```\n", - "\n", - "The function allows you\n", + "A correct chunk configuration is **essential** to enable users to access, analysis and download zarr data with the **highest performance** possible. With `tzis`, you can set\n", "\n", - "- to set `chunkdim` which is the *dimension* used for chunking.\n", - "- to set the target size `target_mb` of a data chunk. A *chunk* corresponds to an object in the swift object storage.\n", - "- to set the `startchunk`. If the write process was interrupted - e.g. because your dataset is very large, you can specify at which chunk the write process should restart.\n", - "- to set the number of *retries* if the transfer is interrupted.\n", - "- to set `validity_check=True` which will validate the transfer **instead of writing** the data. This checks if the data in the chunks are equal to the input data.\n", - "- to set `trusted=False` which validates the transfer **after writing** the data into cloud. The validation is equal to `validity_check=True`\n", + "- `chunkdim`: the dimension, which is used for rechunking\n", + "- `target_mb`: the target size of one chunk\n", "\n", - "E.g.\n", - "```python\n", - "outstore=container.write_zarr()\n", - "```" + "To match the `target_mb`, tzis estimates the resulting chunk sizes by using the size of the array of the variable. <mark> However, the compression ratio can only rarely be taken into account. </mark> If the size of the source dataset can be retrieved via `fsspec`, an estimated compression ratio from the source is applied on the target chunk size." ] }, { "cell_type": "markdown", - "id": "9f045270-f61d-450d-8bc5-dd9a725c7dfb", + "id": "2db64ae8-6e2d-4b07-a171-4ca36cfd3c65", "metadata": { - "jp-MarkdownHeadingCollapsed": true, "tags": [] }, "source": [ - "The output `outstore` of `write_zarr` is a new variable for the output **zarr storage**. Packages like `xarray` which are using `zarr` can identify and open the *consolidated* dataset from the cloud with that store. The `os_name` of `container` can now be changed while the `outstore` still points to the written `os_name`." + "### Chunk sizes\n", + "\n", + "<div class=\"alert alert-block alert-info\">\n", + " <b> Note: </b> If you set target_mb=0, rechunking is set off. That also allows to use multiple chunk dimensions with a corresponding dask setting (see arguments for open_mfdataset) and tzis is able to write them to the final zarr dataset.\n", + "</div>" ] }, { "cell_type": "markdown", - "id": "5230a651-4f6d-4c12-a0d1-bb9bb790877d", + "id": "e9c5d5e5-564a-4de1-97b1-e2a09801e035", "metadata": { "tags": [] }, "source": [ - "## Parallelization\n", + "The size of a chunk has technical limitations on both interval limits:\n", "\n", - "Dask is used to parallelize the `write_zarr` function. Tzis tries to get a running client with dask's `get_client` function. If that fails, it opens a local client with half of the maximum available CPU and memory resources. \n", + "- Sizes **larger than 2GB** are not supported by swift\n", + "- Chunks **smaller than 10 MB** may lead to problems when writing the data due to many small `PUT` requests.\n", "\n", - "Dask will process a default batch of 70 chunks at once. Most of the time, tzis resizes the batch so that it is most likely that the memory is not overloaded." + "In between this interval, the size optimization corresponds to network fine adjustment between **latency** and traffic **volume**.\n", + "\n", + "- If users are interested in accessing and downloading the **entire** dataset, you should set the `target_mb` to the maximum in order to **reduce latency**. E.g., writing zarr data for DKRZ's HSM archive should be done with maximal `target_mb`.\n", + "- If users are interested in **small subsets** of the data, **lower the `target_mb`** so that **volume** is reduced.\n", + "\n", + "If both use cases are possible, choose a chunk size in between. The highest **throughput** rates ( **O(1GB/s)** ) can be acchieved when the chunk size is over **100 MB**, however, this requires high network volume.\n", + "\n", + "<div class=\"alert alert-block alert-info\">\n", + " <b> Note: </b> If the rechunk-dimension is not `time` and multiple files were opened for the input dataset,\n", + " the zarr output will have the addition chunking-dimensions `time`.\n", + "</div>" ] }, { "cell_type": "markdown", - "id": "4ec0cf68-ebb9-455a-b215-f58b4ff24a06", + "id": "f55d0fea-e94a-48c3-8499-3e56f71d83ea", "metadata": { "tags": [] }, "source": [ - "## Chunking - best practices\n", + "### Chunk dimension\n", "\n", - "A correct chunk configuration is **essential** to enable users to access, analysis and download zarr data with the **highest performance** possible. With `tzis`, you can set\n", + "Chunk dimension configuration highly depend on the use cases of the final zarr data:\n", "\n", - "- `chunkdim`: the dimension, which is used for rechunking\n", - "- `target_mb`: the target size of one chunk\n", + "- if users mainly subselect on spatial dimensions rather than temporal, chunk over a spatial dimension (**lat** or **lon**). A use case can be **calculating climate indices** for a special location.\n", + "- if users mainly subselect on temporal dimension rather than spatial, chunk over time. This is recommended if you have long simulations covering many years on high frequencies.\n", "\n", - "To match the `target_mb`, tzis estimates the resulting chunk sizes by using the size of the array of the variable. <mark> However, the compression ratio can only rarely be taken into account. </mark> If the size of the source dataset can be retrieved via `fsspec`, an estimated compression ratio from the source is applied on the target chunk size." + "It also depends on the shape of the input arrays:\n", + "\n", + "- if the data is available on an **high resolution unstructured grid** i.e. on a **long and single spatial dimension**, it might be worth to consider rechunking over this axis. This can be tested e.g. for high resoltion ICON simulations.\n", + "\n" ] }, { "cell_type": "markdown", - "id": "dc9c5429-29bc-4f83-b255-4bb2b1882939", + "id": "49c056f9-f038-496f-b3cc-3bb16ba41ae4", "metadata": { "tags": [] }, "source": [ - "### Chunk sizes\n", + "# Rechunking\n", "\n", - "<div class=\"alert alert-block alert-info\">\n", - " <b> Note: </b> If you set target_mb=0, rechunking is set off. That also allows to use multiple chunk dimensions with a corresponding dask setting (see arguments for open_mfdataset) and tzis is able to write them to the final zarr dataset.\n", - "</div>" + "For *rechunking* an xarray dataset along chunk dimension `chunkdim` by using variable `varname` and target chunk size `target_mb`, you can use `rechunk` from tzis.rechunker:\n", + "\n", + "```python\n", + "from tzis import rechunker\n", + "ds_rechunked=rechunker.rechunk(\n", + " ds,\n", + " varname,\n", + " chunkdim,\n", + " target_mb,\n", + " verbose\n", + ")\n", + "```" ] }, { "cell_type": "markdown", - "id": "923551a9-7546-42e7-bcf7-970e9d957a50", + "id": "76171636-1f6c-453b-942e-c62d2b49467d", "metadata": { "tags": [] }, "source": [ - "The size of a chunk has technical limitations on both interval limits:\n", + "<a class=\"anchor\" id=\"write\"></a>\n", "\n", - "- Sizes **larger than 2GB** are not supported by swift\n", - "- Chunks **smaller than 10 MB** may lead to problems when writing the data due to many small `PUT` requests.\n", + "# Writing to swift\n", "\n", - "In between this interval, the size optimization corresponds to network fine adjustment between **latency** and traffic **volume**.\n", + "After we have\n", "\n", - "- If users are interested in accessing and downloading the **entire** dataset, you should set the `target_mb` to the maximum in order to **reduce latency**. E.g., writing zarr data for DKRZ's HSM archive should be done with maximal `target_mb`.\n", - "- If users are interested in **small subsets** of the data, **lower the `target_mb`** so that **volume** is reduced.\n", + "1. initialized the container \n", + "1. opened or rechunked the dataset\n", "\n", - "If both use cases are possible, choose a chunk size in between. The highest **throughput** rates ( **O(1GB/s)** ) can be acchieved when the chunk size is over **100 MB**, however, this requires high network volume.\n", + "we can **write** it into cloud. The conversion to `zarr` is made on the way. We can specify all necessary configuration options within the `write` function:\n", "\n", - "<div class=\"alert alert-block alert-info\">\n", - " <b> Note: </b> If the rechunk-dimension is not `time` and multiple files were opened for the input dataset,\n", - " the zarr output will have the addition chunking-dimensions `time`.\n", - "</div>" + "```python\n", + "def write_zarr(\n", + " self,\n", + " fsmap,\n", + " mf_dset,\n", + " varname,\n", + " verbose=False,\n", + " chunkdim=\"time\",\n", + " target_mb=0,\n", + " startchunk=0,\n", + " validity_check=False,\n", + " maxretries=3,\n", + " trusted=True\n", + ")\n", + "```\n", + "\n", + "The function allows you\n", + "\n", + "- to set `chunkdim` which is the *dimension* used for chunking.\n", + "- to set the target size `target_mb` of a data chunk. A *chunk* corresponds to an object in the swift object storage.\n", + "- to set the `startchunk`. If the write process was interrupted - e.g. because your dataset is very large, you can specify at which chunk the write process should restart.\n", + "- to set the number of *retries* if the transfer is interrupted.\n", + "- to set `validity_check=True` which will validate the transfer **instead of writing** the data. This checks if the data in the chunks are equal to the input data.\n", + "- to set `trusted=False` which validates the transfer **after writing** the data into cloud. The validation is equal to `validity_check=True`\n", + "\n", + "E.g.\n", + "\n", + "```python\n", + "from tzis import tzis\n", + "outstore=tzis.write_zarr(\n", + " omo.target_fsmap,\n", + " omo.mf_dset,\n", + " omo.varname,\n", + " verbose=True,\n", + " target_mb=0\n", + ")\n", + "```" ] }, { "cell_type": "markdown", - "id": "6c805d48-f90e-4d4d-9468-6fc84d09d664", + "id": "9f045270-f61d-450d-8bc5-dd9a725c7dfb", "metadata": { + "jp-MarkdownHeadingCollapsed": true, "tags": [] }, "source": [ - "### Chunk dimension\n", - "\n", - "Chunk dimension configuration highly depend on the use cases of the final zarr data:\n", - "\n", - "- if users mainly subselect on spatial dimensions rather than temporal, chunk over a spatial dimension (**lat** or **lon**). A use case can be **calculating climate indices** for a special location.\n", - "- if users mainly subselect on temporal dimension rather than spatial, chunk over time. This is recommended if you have long simulations covering many years on high frequencies.\n", + "The output `outstore` of `write_zarr` is a new variable for the output **zarr storage**. Packages like `xarray` which are using `zarr` can identify and open the *consolidated* dataset from the cloud with that store. The `os_name` of `container` can now be changed while the `outstore` still points to the written `os_name`." + ] + }, + { + "cell_type": "markdown", + "id": "5230a651-4f6d-4c12-a0d1-bb9bb790877d", + "metadata": { + "tags": [] + }, + "source": [ + "## Parallelization\n", "\n", - "It also depends on the shape of the input arrays:\n", + "Dask is used to parallelize the `write_zarr` function. Tzis tries to get a running client with dask's `get_client` function. If that fails, it opens a local client with half of the maximum available CPU and memory resources. \n", "\n", - "- if the data is available on an **high resolution unstructured grid** i.e. on a **long and single spatial dimension**, it might be worth to consider rechunking over this axis. This can be tested e.g. for high resoltion ICON simulations.\n", - "\n" + "Dask will process a default batch of 70 chunks at once. Most of the time, tzis resizes the batch so that it is most likely that the memory is not overloaded." ] }, { @@ -528,7 +599,7 @@ " decode_cf=True,\n", " use_cftime=True\n", ")\n", - "xarray.open_zarr(container.fsspec_map,\n", + "xarray.open_zarr(omo.mf,\n", " **OPEN_ZARR_KWARGS)\n", "```" ] @@ -559,15 +630,31 @@ "\n", "1. If the second variable has either coordinates with the same name but different values or different meta data, open a new prefix `new_os_name` first:\n", "```python\n", - "container.fsspec_map= container.open_store(os_url,os_container,new_os_name):\n", + "fsmap= fsspec.get_mapper(\n", + " '/'.join(\n", + " [\n", + " fsmap.root, #Or use `os.path.dirname(fsmap.root)` if you set another object prefix already\n", + " new_zarr_dset_name\n", + " ]\n", + " )\n", + ")\n", "```\n", - "2. Set another variable name `varname`:\n", + " \n", + "2. If necessary, rechunk the source dataset for the other variable:\n", + "\n", "```python\n", - "container.varname=varname\n", + "omo.mf_dset=rechunker.rechunk(\n", + " omo.mf_dset,\n", + " new_varname,\n", + " new_chunkdim,\n", + " new_target_mb,\n", + " verbose\n", + ")\n", "```\n", + "\n", "3. Write to the target storage:\n", "```python\n", - "container.write_zarr()\n", + "outstore=tzis.write_zarr()\n", "```\n", "\n", "The variable will be appended if possible." @@ -585,10 +672,17 @@ "After initializing your container, you can overwrite its variables. E.g. by\n", "\n", "```python\n", - "container.mf_dset=container.open_mf_dataset(...)\n", + "omo=openmf.open_mfdataset_optimized(\n", + " other_mfs_towrite,\n", + " varname,\n", + " fsmap,\n", + " chunkdim=\"time\",\n", + " target_mb=100,\n", + " verbose=True\n", + ")\n", "```\n", "\n", - "Afterwards, you can write into the same target storage." + "Afterwards, you can write into the same target storage by repeating your `tzis.write_zarr()` command." ] }, { @@ -639,7 +733,7 @@ "'/'.join([OS_STORAGE_URL,os_container,\"INDEX.csv\"])\n", "```\n", "\n", - "1. If your container is *private*, you have to use tokens and fsspec. If you create environment variables for OS_AUTH_TOKEN and OS_STORAGE_URL, you can get a fsspec mapper which than can be used for xarray:\n", + "2. If your container is *private*, you have to use tokens and fsspec. If you create environment variables for OS_AUTH_TOKEN and OS_STORAGE_URL, you can get a fsspec mapper which than can be used for xarray:\n", "\n", "```python\n", "#note: this is working in notebooks:\n", @@ -665,8 +759,16 @@ "`tzis` features\n", "\n", "```python\n", - " def write_catalog(\n", - " self, catalogname=\"catalog.json\", pattern=None, delim=\".\", columns=[], mode=\"a\"\n", + " catalog.write_catalog(\n", + " self.fsspec_map.fs,\n", + " self.fsspec_map,\n", + " os.basename(self.fsspec_map.root),\n", + " catalogname=\"catalog.json\",\n", + " pattern=None,\n", + " delim=\".\",\n", + " columns=[],\n", + " mode=\"a\",\n", + " verbose=self.verbose,\n", " )\n", "```\n", "\n", diff --git a/tests/test_functions.ipynb b/tests/test_functions.ipynb index d4e5dc3f32d281c0bb84bdb8aa45689db867c193..7519567e846f1214dda969ef2aed553d90d98d34 100644 --- a/tests/test_functions.ipynb +++ b/tests/test_functions.ipynb @@ -6,7 +6,7 @@ "metadata": {}, "outputs": [], "source": [ - "from tzis import tzis\n", + "from tzis import *\n", "import os\n", "import pytest\n", "import xarray as xr" @@ -49,11 +49,12 @@ "source": [ "config=[]\n", "testfiledir=\"files\"\n", - "container=tzis.Tzis(OS_STORAGE_URL,\n", - " SWIFT_TOKEN,\n", - " container_name,\n", - " verbose=True)\n", - "container.store.rm(container.fsspec_map.root+\"/*\",recursive=True)\n", + "fsmap=swifthandling.get_swift_mapper(\n", + " OS_STORAGE_URL,\n", + " SWIFT_TOKEN,\n", + " container_name\n", + ")\n", + "fsmap.fs.rm(fsmap.root+\"/*\",recursive=True)\n", "for file in os.listdir(testfiledir):\n", " files=testfiledir+\"/\"+file\n", " varname=None\n", @@ -65,34 +66,38 @@ " elif file_ending != \"nc\":\n", " os.remove(files)\n", " continue\n", - " config.append((container,files,varname,xarray_kwargs,zarrobject))" + " config.append((fsmap,files,varname,xarray_kwargs,zarrobject))" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": {}, "outputs": [], "source": [ - "@pytest.mark.parametrize(\"container,files,varname,xarray_kwargs,zarrobject\", config)\n", + "@pytest.mark.parametrize(\"fsmap,files,varname,xarray_kwargs,zarrobject\", config)\n", "class TestTzisFunctions:\n", " def test_open_mf_dataset(self,\n", - " container,\n", - " files,\n", - " varname,\n", - " xarray_kwargs,\n", - " zarrobject):\n", - " assert(container.open_mf_dataset(files,\n", - " varname,\n", - " xarray_kwargs=xarray_kwargs))\n", - " def test_open_store(self,\n", - " container,\n", + " fsmap,\n", " files,\n", " varname,\n", " xarray_kwargs,\n", " zarrobject):\n", - " container.open_store(zarrobject)\n", - " assert(True)" + " assert(\n", + " openmf.open_mfdataset_optimized(\n", + " files,\n", + " varname,\n", + " fsspec.get_mapper(\n", + " '/'.join(\n", + " [\n", + " fsmap.root,\n", + " zarrobject\n", + " ]\n", + " )\n", + " ),\n", + " xarray_kwargs=xarray_kwargs\n", + " )\n", + " )" ] }, { @@ -101,28 +106,45 @@ "metadata": {}, "outputs": [], "source": [ - "@pytest.mark.parametrize(\"container,files,varname,xarray_kwargs,zarrobject\", config)\n", + "@pytest.mark.parametrize(\"fsmap,files,varname,xarray_kwargs,zarrobject\", config)\n", "@pytest.mark.parametrize(\"target_mb,validity_check\",\n", " [(10,True),(2000,True),\n", " (10,False),(2000,False)])\n", "class TestTzisWrite:\n", " def test_write_zarr(self,\n", - " container,\n", + " fsmap,\n", " files,\n", " varname,\n", " xarray_kwargs,\n", " zarrobject,\n", " target_mb,\n", " validity_check):\n", - " container.mf_dset = container.open_mf_dataset(files,\n", - " varname,\n", - " xarray_kwargs=xarray_kwargs)\n", - " container.open_store(zarrobject+str(target_mb))\n", - " container.write_zarr(chunkdim=\"time\",\n", - " target_mb=target_mb,\n", - " startchunk=0,\n", - " validity_check=validity_check,\n", - " maxretries=3)\n", + " targetstore=fsspec.get_mapper(\n", + " '/'.join(\n", + " [\n", + " fsmap.root,\n", + " zarrobject,\n", + " str(target_mb)\n", + " ]\n", + " )\n", + " )\n", + " omo = openmf.open_mfdataset_optimized(\n", + " files,\n", + " varname,\n", + " targetstore,\n", + " xarray_kwargs=xarray_kwargs\n", + " )\n", + " outstore = tzis.write_zarr(\n", + " targetstore,\n", + " omo,\n", + " varname,\n", + " verbose=True,\n", + " chunkdim=\"time\",\n", + " target_mb=target_mb,\n", + " startchunk=0,\n", + " validity_check=validity_check,\n", + " maxretries=3\n", + " )\n", " assert(True)" ] }, @@ -159,7 +181,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.6" + "version": "3.10.0" } }, "nbformat": 4, diff --git a/tests/test_validations.ipynb b/tests/test_validations.ipynb index 7a9b9f39dff345bd7c9f666307b8c21f0566ec4a..0a5e7d40a168ab4b72965aa3649090d08c4bc5d8 100644 --- a/tests/test_validations.ipynb +++ b/tests/test_validations.ipynb @@ -2,15 +2,16 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ - "from tzis import tzis\n", + "from tzis import *\n", "import os\n", "import pytest\n", "import xarray as xr\n", - "import subprocess" + "import subprocess\n", + "import fsspec" ] }, { @@ -21,7 +22,7 @@ "source": [ "OS_STORAGE_URL=\"https://swift.dkrz.de/v1/dkrz_0b2a0dcc-1430-4a8a-9f25-a6cb8924d92b\"\n", "SWIFT_TOKEN=os.environ.get('SWIFT_TOKEN')\n", - "container_name=\"tzis-tests_output-created\"" + "fsmap_name=\"tzis-tests_output-created\"" ] }, { @@ -32,15 +33,20 @@ "source": [ "config=[]\n", "testfiledir=\"files\"\n", - "container=tzis.Tzis(OS_STORAGE_URL,\n", - " SWIFT_TOKEN,\n", - " container_name,\n", - " verbose=True)\n", - "alltests=container.store.listdir(container.fsspec_map.root)\n", + "fsmap=swifthandling.get_swift_mapper(\n", + " OS_STORAGE_URL,\n", + " SWIFT_TOKEN,\n", + " fsmap_name\n", + ")\n", + "alltests=fsmap.fs.listdir(fsmap.fsspec_map.root)\n", "for zarr_test_output in alltests:\n", " if zarr_test_output[\"type\"] == \"directory\":\n", " print(zarr_test_output)\n", - " config.append((container,zarr_test_output[\"name\"].split('/')[-1]))" + " config.append(\n", + " (fsmap,\n", + " zarr_test_output[\"name\"].split('/')[-1]\n", + " )\n", + " )" ] }, { @@ -79,26 +85,40 @@ "metadata": {}, "outputs": [], "source": [ - "@pytest.mark.parametrize(\"container,zarr_test_output\", config)\n", + "@pytest.mark.parametrize(\"fsmap,zarr_test_output\", config)\n", "class TestTzisValidate:\n", " def test_validate_tzis_open(self,\n", - " container,\n", + " fsmap,\n", " zarr_test_output\n", " ):\n", - " container.open_store(container.os_url, container.container, zarr_test_output)\n", - " zarr_dset=xr.open_zarr(container.fsspec_map,\n", + " zarr_fsmap=fsspec.get_mapper(\n", + " '/'.join(\n", + " [\n", + " fsmap.root,\n", + " zarr_test_output\n", + " ]\n", + " )\n", + " )\n", + " zarr_dset=xr.open_zarr(zarr_fsmap,\n", " consolidated=True,\n", " #decode_times=True,\n", " use_cftime=True)\n", " assert True\n", " \n", " def test_validate_tzis_tonetcdf(self,\n", - " container,\n", + " fsmap,\n", " zarr_test_output\n", " ):\n", - " container.open_store(container.os_url, container.container, zarr_test_output)\n", - " \n", - " zarr_dset=xr.open_zarr(container.fsspec_map,\n", + " zarr_fsmap=fsspec.get_mapper(\n", + " '/'.join(\n", + " [\n", + " fsmap.root,\n", + " zarr_test_output\n", + " ]\n", + " )\n", + " )\n", + " \n", + " zarr_dset=xr.open_zarr(zarr_fsmap,\n", " consolidated=True,\n", " #decode_times=True,\n", " use_cftime=True)\n", @@ -111,7 +131,7 @@ " encoding=encoding)\n", " assert(True)\n", " def test_validate_tzis_compare(self,\n", - " container,\n", + " fsmap,\n", " zarr_test_output \n", " ):\n", " netcdf_rewritten=zarr_test_output+\".nc\"\n", diff --git a/tzis.egg-info/SOURCES.txt b/tzis.egg-info/SOURCES.txt index 02bc679cc20072bb693bd33236ef819cd7637674..99d92410d620220e3e0f653357b85dee25d254c8 100644 --- a/tzis.egg-info/SOURCES.txt +++ b/tzis.egg-info/SOURCES.txt @@ -4,6 +4,7 @@ setup.py tzis/__init__.py tzis/catalog.py tzis/daskhandling.py +tzis/openmf.py tzis/provenance.py tzis/rechunker.py tzis/swifthandling.py diff --git a/tzis.egg-info/requires.txt b/tzis.egg-info/requires.txt index 4065e8ff5a0a97cc01f2c2472c01b59eb7df157e..19c6be224b91e186a816915fd4a8a1e5b6e70f9e 100644 --- a/tzis.egg-info/requires.txt +++ b/tzis.egg-info/requires.txt @@ -12,4 +12,5 @@ aiohttp_retry pandas<1.4.0 swiftspec@ git+https://github.com/fsspec/swiftspec python-swiftclient>=3.10.0 +pytest lxml diff --git a/tzis/catalog.py b/tzis/catalog.py index c64a849d931fc8c65228ec247bee2dff79906791..cd16d9104543783a297312940c8daff37d55f998 100644 --- a/tzis/catalog.py +++ b/tzis/catalog.py @@ -6,7 +6,7 @@ import copy import fsspec -def write_index_file(store, fsmap,pattern=None, contact=None): +def write_index_file(store, url,pattern=None, contact=None): index_name = "INDEX.csv" header = [ "contact", @@ -23,13 +23,13 @@ def write_index_file(store, fsmap,pattern=None, contact=None): writer.writerow(header) all_zarr_datasets = [entry["name"].split('/')[-1] - for entry in store.listdir(fsmap.root) + for entry in store.listdir(url) ] catalog = next((zd for zd in all_zarr_datasets if "catalog" in zd), None) if catalog: all_zarr_datasets.remove(catalog) if index_name in all_zarr_datasets: - del fsmap[index_name] + store.rm('/'.join([url,index_name])) all_zarr_datasets.remove(index_name) if pattern: all_zarr_datasets = [zd for zd in all_zarr_datasets if pattern in zd] @@ -38,11 +38,11 @@ def write_index_file(store, fsmap,pattern=None, contact=None): all_zarr_datasets = [zd for zd in all_zarr_datasets if outsort not in zd] for zd in all_zarr_datasets: - sumbytes = store.du('/'.join([fsmap.root,zd])) + sumbytes = store.du('/'.join([url,zd])) modified=0 try: modified = [entry["last_modified"] - for entry in store.ls('/'.join([fsmap.root,zd])) + for entry in store.ls('/'.join([url,zd])) if entry["name"].endswith(".zmetadata")] if modified : modified=modified[0] @@ -57,12 +57,13 @@ def write_index_file(store, fsmap,pattern=None, contact=None): sumbytes, #min(modiflist), modified, - "/".join([fsmap.root, zd]), + "/".join([url, zd]), ] ) # all_zarr_datasets_bytes = u'\n'.join(all_zarr_datasets).encode('utf8').strip() - fsmap[index_name] = bytes(output.getvalue(), "utf8") + with store.open("/".join([url, index_name]),"wb") as f: + f.write(bytes(output.getvalue(), "utf8")) def write_catalog( store, diff --git a/tzis/openmf.py b/tzis/openmf.py new file mode 100644 index 0000000000000000000000000000000000000000..3ce68baada077b8a0b1e422c0cd0915d07438c33 --- /dev/null +++ b/tzis/openmf.py @@ -0,0 +1,224 @@ +import fsspec +import xarray +from .provenance import Provenance +from .rechunker import calc_chunk_length + +OPEN_MFDATASET_KWARGS = dict( + decode_cf=True, + use_cftime=True, + data_vars="minimal", + coords="minimal", + compat="override", + combine_attrs="drop_conflicts", +) + +class open_mfdataset_optimize: + """ + Opens the dataset with xarrays `open_mfdataset` + - with optimized chunks by estimating sizes with a test file `mf[0]` if `chunkdim` and `target_mb` are set. + - with OPEN_MFDATASET_KWARGS and `xarray_kwargs` if provided + + It saves the original size of the source if available for estimating compression ratio. + It initializes the provenance. + It collects conflicting attributes and saves it as a new attribute. + It sets a new `tracking_id` and appends to the history attribute. + + Parameters + ---------- + mf : list or str + mf is converted to a list and used as the first argument of `xarray.open_mfdataset`. + varname: str + varname is the variable which is used for rechunking and which should be written to the target storage. + chunkdim=None : str + chunkdim is the chunk dimension used for rechunking. Only set this in combination with target_mb. + target_mb=None : int + target_mb is the desired size of one chunk in the target storage in megabytes. + Only set this in combination with chunkdim. + xarray_kwargs=None : dict + xarray_kwargs are unpacked within `open_mfdataset`. + + + Returns + ------- + Dataset or None + The xarray dataset with optimized chunk setting and attributes. + """ + def __init__(self, + mf, + varname, + target_fsmap, + chunkdim=None, + target_mb=None, + xarray_kwargs=None, + verbose=False): + + # if type(mf) != list and type(mf) != str : + # raise ValueError("Dataset '{0}' must either be a string or a list of strings") + self.mf=mf + self.target_fsmap=target_fsmap + self.varname=varname + self.mf_dset=None + self.original_size=None + self.provenance=None + + if verbose: + print("Resetting disk size and compression ratio") + + if type(mf) == str: + mf = [mf] + + if verbose: + print("Calculating chunk size") + + if xarray_kwargs is not None: + testds=xarray.open_dataset(mf[0], **xarray_kwargs) + else: + testds=xarray.open_dataset(mf[0]) + + if not varname: + varname = get_varname(testds) + else: + if varname not in list(testds.data_vars): + raise ValueError( + "Given variable name {0} not in dataset {1}".format( + varname, mf[0] + ) + ) + + if xarray_kwargs: + OPEN_MFDATASET_KWARGS.update(xarray_kwargs) + + l_chunksset=False + if chunkdim and target_mb: + size=fsspec.get_mapper(mf[0]).fs.du(mf[0])/1024/1024 + if size < target_mb: + print(f"In open_mfdataset_optimize we cannot set target_mb {target_mb} because " + f"the size of the testfile {mf[0]} is {size}MB and therefore smaller.") + else : + chunk_length = calc_chunk_length(testds, + varname, + chunkdim, + target_mb, + 1) + if verbose: + print(f"Set chunks to {chunkdim}: {chunk_length}") + + OPEN_MFDATASET_KWARGS.update(dict(chunks={chunkdim: chunk_length})) + l_chunksset=True + + testds.close() + + mf_dset = xarray.open_mfdataset(mf, **OPEN_MFDATASET_KWARGS) # , + + if chunkdim and target_mb and l_chunksset: + mf_dset = mf_dset.chunk({chunkdim: chunk_length}) + for var_id in mf_dset.variables: + mf_dset[var_id].unify_chunks() + + conflict_attrs = get_conflict_attrs(mf, mf_dset, xarray_kwargs) + + self.provenance = Provenance(target_fsmap.root) + + if "tracking_id" in conflict_attrs: + self.provenance.gen_input( + "input_tracking_id", conflict_attrs["tracking_id"] + ) + else: + self.provenance.gen_input("input_file_name", mf) + + self.original_size=sum([fsspec.get_mapper(mf_input).fs.du(mf_input) + for mf_input in mf] + ) + from .tzis import __version__ as tzis_version + mf_dset.attrs["tracking_id"] = self.provenance.tracking_id + mf_dset.attrs.setdefault("history", "") + mf_dset.attrs[ + "history" + ] += "Converted and written to swift target with tzis version {0}".format( + tzis_version + ) + + # if keep_attrs and conflict_attrs: + # for k, v in conflict_attrs.items(): + # mf_dset.attrs[k] = v + + # for coord in mf_dset.coords: + # mf_dset[coord].load() + + self.mf_dset=mf_dset + def __str__(self): + return self.mf_dset.__str__() + def __repr__(self): + return self.mf_dset.__repr__() + +def get_conflict_attrs(mf, mf_dset, xarray_kwargs): + """ + Collects attributes which conflict within all single dsets in `mf`. + It opens all elements of `mf` with `xarray.open_dataset` and collects + attributes in a dictionary and their values in lists. + + Parameters + ---------- + mf : list or str + mf is converted to a list and used as the first argument of `xarray.open_mfdataset`. + mf_dset: Dataset + `mf_dset` is the `xarray` object returned by `xarray.open_mfdataset` which does not include + the conflicting attributes. + xarray_kwargs=None : dict + xarray_kwargs are unpacked within `open_mfdataset`. + + + Returns + ------- + Dict + All conflicting attributes and its values. + """ + + conflict_attrs = {} + # try: + maxdigits = len(str(len(mf))) + digitstring = "{0:0" + str(maxdigits) + "}" + for fileno, dset in enumerate(mf): + if xarray_kwargs is not None: + ds=xarray.open_dataset(dset,**xarray_kwargs) + else: + ds=xarray.open_dataset(dset) + dset_attrs = ds.attrs + missing_attrs = { + k: v for k, v in dset_attrs.items() if k not in mf_dset.attrs + } + for k, v in missing_attrs.items(): + # attr_prefix=" File "+digitstring.format(fileno)+ ": " + # if dset_attrs["tracking_id"]: + # attr_prefix=" "+dset_attrs["tracking_id"] + ": " + # conflict_attrs[k]=conflict_attrs[k] + attr_prefix + v + "," + + conflict_attrs.setdefault(k, []) + conflict_attrs[k].append(v) + # except: + # if verbose: + # print("Could not collect all attributes.") + return conflict_attrs + +def get_varname(mf_dset): + return_varname = "" + varlist = list(mf_dset.data_vars) + for var in varlist: + cinv = False + for coord in ["ap", "b", "ps", "bnds"]: + if coord in var: + cinv = True + break + if cinv: + continue + return_varname = var + break + if not return_varname: + raise ValueError( + "Could not find any variable to write to swift. Please specify varname." + ) + if verbose: + print( + "We use variable {0} in case we need to rechunk.".format(return_varname) + ) + return return_varname \ No newline at end of file diff --git a/tzis/provenance.py b/tzis/provenance.py index d69dff8ac0174c3ac9bb84122b90d8ba42ef24d4..705c4cad6801c1936440e0400dc8c47de380e9ad 100644 --- a/tzis/provenance.py +++ b/tzis/provenance.py @@ -6,7 +6,7 @@ import json import io import copy from datetime import datetime -from .swifthandling import mapper_mkdirs +from .swifthandling import get_swift_mapper from prov.identifier import Namespace import prov.model as prov @@ -29,17 +29,24 @@ DCTERMS_SOURCE = DCTERMS["source"] # tzis namespace TZIS = Namespace("tzis", uri="urn:tzis:") - class Provenance(object): - def __init__(self, url,container,prefix): + def __init__(self, url): from .tzis import __version__ as tzis_version self._identifier = None self.tzis_version = tzis_version self.url=url - self.container=container - self.prefix=prefix + self.container=None + self.prefix=None + if self.url.startswith("swift"): + self.url=os.path.dirname( + os.path.dirname(url) + ) + self.container=os.path.basename( + os.path.dirname(url) + ) + self.prefix=os.path.basename(url) # Create an empyty prov document self.doc = prov.ProvDocument() @@ -111,14 +118,20 @@ class Provenance(object): output = io.StringIO() self.doc.serialize(destination=output) # - store=mapper_mkdirs(self.url,self.container,"provenance") + store=get_swift_mapper(self.url, + os.getenv("OS_AUTH_TOKEN"), + self.container, + "provenance") store[f"provenance_{self.prefix}_{self.tracking_id}.json"] = bytes( output.getvalue(), "utf8" ) def write_png(self): figure = prov_to_dot(self.doc) - store=mapper_mkdirs(self.url,self.container,"provenance") + store=get_swift_mapper(self.url, + os.getenv("OS_AUTH_TOKEN"), + self.container, + "provenance") try: store[f"provenance_{self.prefix}_{self.tracking_id}.png"] = figure.create_png() except: diff --git a/tzis/swifthandling.py b/tzis/swifthandling.py index 4765bf3354432aa94c39ae6dcfeed1728549aacb..44f1a73296816006f2f2d7d8033862acab2a6ee0 100644 --- a/tzis/swifthandling.py +++ b/tzis/swifthandling.py @@ -9,16 +9,76 @@ from datetime import datetime from pathlib import Path import fsspec import time +from copy import deepcopy import stat from swiftclient import Connection -def mapper_mkdirs(url,container,prefix): +async def get_client(**kwargs): + import aiohttp + import aiohttp_retry + retry_options = aiohttp_retry.ExponentialRetry( + attempts=3, + exceptions={OSError, aiohttp.ServerDisconnectedError}) + retry_client = aiohttp_retry.RetryClient(raise_for_status=False, retry_options=retry_options) + return retry_client + +def get_storestring_and_options(store): + """ToDo: Check if string is required. If swift is the protocol, use aiohttp_retry client to + overcome writing errors. + + Parameters + ---------- + store : `fsspec` mapping. + + Returns + ------- + storestring + The root of the mapping as str. + storage_options + Backend options for to_zarr depending on the protocol as dictionary. + """ + + storage_options=None + storestring=store + if store.fs.protocol == "swift": + storestring=store.root + storage_options={"get_client": get_client} + + return storestring, storage_options + +def get_swift_mapper(url, os_token, container, os_name=None): + """ + """ + fs_url = get_fsspec_url(url) + os_url = get_os_url(url) + + os.environ["OS_STORAGE_URL"] = os_url + os.environ["OS_AUTH_TOKEN"] = os_token + + mapper_url="/".join(list(filter(None,[fs_url,container,os_name]))) + fsspec_map=fsspec.get_mapper(mapper_url) + mapper_mkdirs(fs_url,container,prefix=os_name) + + test_fsspec(fsspec_map) + return fsspec_map + +def mapper_mkdirs(url,container,prefix=None): mapper_url="/".join(list(filter(None,[url,container,prefix]))) fsmap=fsspec.get_mapper(mapper_url) test_fsspec(fsmap) - if not fsmap.fs.protocol == "swift": - fsmap.fs.mkdirs(mapper_url,exist_ok=True) - return fsmap + #create container if necessary + if fsmap.fs.protocol == "swift": + getenv = os.environ.get + conn = Connection(preauthurl=getenv('OS_STORAGE_URL'), + preauthtoken=getenv('OS_AUTH_TOKEN')) + listings=fsmap.fs.listdir(url) + if not container in [a["name"].split('/')[-1] + for a in listings]: + conn.put_container(container) + + else: + with fsmap.fs.transaction: + fsmap.fs.mkdirs(mapper_url,exist_ok=True) def test_fsspec(fsmap): try: @@ -30,12 +90,21 @@ def test_fsspec(fsmap): raise ValueError("Could not use fsspec implementation") -def get_fsspec_url(os_url): - if os_url.startswith("https"): - os_url = os_url.replace("https", "swift") - elif os_url.startswith("http"): - os_url = os_url.replace("http", "swift") - os_url = os_url.replace("v1/", "") +def get_fsspec_url(url): + fs_url=deepcopy(url) + if fs_url.startswith("https"): + fs_url = fs_url.replace("https", "swift") + elif fs_url.startswith("http"): + fs_url = fs_url.replace("http", "swift") + fs_url = fs_url.replace("v1/", "") + return fs_url + +def get_os_url(url): + os_url=deepcopy(url) + if os_url.startswith("swift"): + os_url = os_url.replace("swift:/", "https:/") + if not ".de/v1" in os_url: + os_url = os_url.replace(".de/", ".de/v1/") return os_url diff --git a/tzis/tzis.py b/tzis/tzis.py index fc84843a2d9b0d5e5f3e618a259b593ebd7f54bf..80481d33fd78bc56e64febe95daac2f09ae8c5c0 100644 --- a/tzis/tzis.py +++ b/tzis/tzis.py @@ -21,26 +21,9 @@ import stat import sys import time import json -from . import provenance -from . import catalog - -async def get_client(**kwargs): - import aiohttp - import aiohttp_retry - retry_options = aiohttp_retry.ExponentialRetry( - attempts=3, - exceptions={OSError, aiohttp.ServerDisconnectedError}) - retry_client = aiohttp_retry.RetryClient(raise_for_status=False, retry_options=retry_options) - return retry_client - -OPEN_MFDATASET_KWARGS = dict( - decode_cf=True, - use_cftime=True, - data_vars="minimal", - coords="minimal", - compat="override", - combine_attrs="drop_conflicts", -) +from .provenance import Provenance +from .catalog import * +from .openmf import * OPEN_ZARR_KWARGS = dict( consolidated=True, @@ -63,356 +46,49 @@ except Exception: __version__ = "999" -class Tzis: +class write_zarr(fsspec.FSMap): def __init__( self, - os_url, - os_token, - os_container, - os_name=None, - mf_dset=None, - varname=None, + fsmap, + mf_dset, + varname, verbose=False, - open_kwargs=None, - xarray_kwargs=None + chunkdim="time", + target_mb=0, + startchunk=0, + validity_check=False, + maxretries=3, + trusted=True ): self.DASK_CHUNK_SIZE = 70 self._cratio=None - self._original_size=None + self.original_size=None self.verbose = verbose - self.provenance = None - self.fsspec_map = None - self.varname = varname - - - self.auth = { - "preauthurl": os_url, - "preauthtoken": os_token, - } - self.url=os_url - self.os_url = get_fsspec_url(os_url) - - os.environ["OS_STORAGE_URL"] = os_url - os.environ["OS_AUTH_TOKEN"] = os_token + self.fsspec_map = fsmap + self.fs = self.fsspec_map.fs - self.container = os_container - - #self.prefix=os_name happens in open_store - #self.store= fsspec.filesystem("swift") happens in open_store - self.fsspec_map = self.open_store(self.os_url,self.container,os_name) + self.varname = varname - # - if open_kwargs: - self.mf_dset = self.open_mf_dataset( - mf_dset, self.varname, xarray_kwargs=xarray_kwargs,**open_kwargs - ) - else: - self.mf_dset = self.open_mf_dataset( - mf_dset, self.varname, xarray_kwargs=xarray_kwargs - ) + self.mf_dset=mf_dset + self.provenance = None + if isinstance(self.mf_dset,open_mfdataset_optimize): + print(self.mf_dset) + self.provenance=self.mf_dset.provenance + self.original_size=self.mf_dset.provenance self.recent_chunk=None - - def get_storestring_and_options(self,store): - """ToDo: Check if string is required. If swift is the protocol, use aiohttp_retry client to - overcome writing errors. - - Parameters - ---------- - store : `fsspec` mapping. - - Returns - ------- - storestring - The root of the mapping as str. - storage_options - Backend options for to_zarr depending on the protocol as dictionary. - """ - - storage_options=None - storestring=store - if self.store.protocol == "swift": - storestring=store.root - storage_options={"get_client": get_client} - - return storestring, storage_options - def init_verify_provenance(self, reset=False): - """ - Initializes a provenance document if reset is True or self.provenance is None. - - Parameters - ---------- - reset : If True, provenance is recreated. Default is False. - - Returns - ------- - status - True if provenance was newly created, False if not. - """ - - if self.provenance and reset: - if self.verbose: - print( - "Opening a fresh provenance." - ) - self.provenance = None - if not self.provenance: - self.provenance = provenance.Provenance(self.os_url,self.container,self.prefix) - return True - return False - - def open_mf_dataset(self, mf, varname, chunkdim=None, target_mb=None, xarray_kwargs=None): - """ - Opens the dataset with xarrays `open_mfdataset` - - with optimized chunks by estimating sizes with a test file `mf[0]` if `chunkdim` and `target_mb` are set. - - with OPEN_MFDATASET_KWARGS and `xarray_kwargs` if provided - - It saves the original size of the source if available for estimating compression ratio. - It initializes the provenance. - It collects conflicting attributes and saves it as a new attribute. - It sets a new `tracking_id` and appends to the history attribute. - - Parameters - ---------- - mf : list or str - mf is converted to a list and used as the first argument of `xarray.open_mfdataset`. - varname: str - varname is the variable which is used for rechunking and which should be written to the target storage. - chunkdim=None : str - chunkdim is the chunk dimension used for rechunking. Only set this in combination with target_mb. - target_mb=None : int - target_mb is the desired size of one chunk in the target storage in megabytes. - Only set this in combination with chunkdim. - xarray_kwargs=None : dict - xarray_kwargs are unpacked within `open_mfdataset`. - - - Returns - ------- - Dataset or None - The xarray dataset with optimized chunk setting and attributes. - """ - - # if type(mf) != list and type(mf) != str : - # raise ValueError("Dataset '{0}' must either be a string or a list of strings") - - if mf: - if self.verbose: - print("Resetting disk size and compression ratio") - self._original_size=None - self._cratio=None - - if type(mf) == str: - mf = [mf] - - if self.verbose: - print("Calculating chunk size") - - if xarray_kwargs is not None: - testds=xarray.open_dataset(mf[0], **xarray_kwargs) - else: - testds=xarray.open_dataset(mf[0]) - - if not varname: - self.varname = self._get_varname(testds) - else: - if varname not in list(testds.data_vars): - raise ValueError( - "Given variable name {0} not in dataset {1}".format( - varname, mf[0] - ) - ) - self.varname = varname - - if xarray_kwargs: - OPEN_MFDATASET_KWARGS.update(xarray_kwargs) - - l_chunksset=False - if chunkdim and target_mb: - size=fsspec.get_mapper(mf[0]).fs.du(mf[0])/1024/1024 - if size < target_mb: - print(f"In open_mf_dataset we cannot set target_mb {target_mb} because " - f"the size of the testfile {mf[0]} is {size}MB and therefore smaller.") - else : - chunk_length = calc_chunk_length(testds, - self.varname, - chunkdim, - target_mb, - 1) - if self.verbose: - print(f"Set chunks to {chunkdim}: {chunk_length}") - - OPEN_MFDATASET_KWARGS.update(dict(chunks={chunkdim: chunk_length})) - l_chunksset=True - - testds.close() - - mf_dset = xarray.open_mfdataset(mf, **OPEN_MFDATASET_KWARGS) # , - - if chunkdim and target_mb and l_chunksset: - mf_dset = mf_dset.chunk({chunkdim: chunk_length}) - for var_id in mf_dset.variables: - mf_dset[var_id].unify_chunks() - - conflict_attrs = self.get_conflict_attrs(mf, mf_dset, xarray_kwargs) - - self.init_verify_provenance(reset=True) - - if "tracking_id" in conflict_attrs: - self.provenance.gen_input( - "input_tracking_id", conflict_attrs["tracking_id"] - ) - else: - self.provenance.gen_input("input_file_name", mf) - - self._original_size=sum([fsspec.get_mapper(mf_input).fs.du(mf_input) - for mf_input in mf] - ) - - mf_dset.attrs["tracking_id"] = self.provenance.tracking_id - mf_dset.attrs.setdefault("history", "") - mf_dset.attrs[ - "history" - ] += "Converted and written to swift target with tzis version {0}".format( - __version__ - ) - - # if keep_attrs and conflict_attrs: - # for k, v in conflict_attrs.items(): - # mf_dset.attrs[k] = v - - # for coord in mf_dset.coords: - # mf_dset[coord].load() - - return mf_dset - elif self.verbose: - print("You did not provide anything to open.") - return None - - def get_conflict_attrs(self, mf, mf_dset, xarray_kwargs): - """ - Collects attributes which conflict within all single dsets in `mf`. - It opens all elements of `mf` with `xarray.open_dataset` and collects - attributes in a dictionary and their values in lists. - - Parameters - ---------- - mf : list or str - mf is converted to a list and used as the first argument of `xarray.open_mfdataset`. - mf_dset: Dataset - `mf_dset` is the `xarray` object returned by `xarray.open_mfdataset` which does not include - the conflicting attributes. - xarray_kwargs=None : dict - xarray_kwargs are unpacked within `open_mfdataset`. - - - Returns - ------- - Dict - All conflicting attributes and its values. - """ - - conflict_attrs = {} - # try: - maxdigits = len(str(len(mf))) - digitstring = "{0:0" + str(maxdigits) + "}" - for fileno, dset in enumerate(mf): - if xarray_kwargs is not None: - ds=xarray.open_dataset(dset,**xarray_kwargs) - else: - ds=xarray.open_dataset(dset) - dset_attrs = ds.attrs - missing_attrs = { - k: v for k, v in dset_attrs.items() if k not in mf_dset.attrs - } - for k, v in missing_attrs.items(): - # attr_prefix=" File "+digitstring.format(fileno)+ ": " - # if dset_attrs["tracking_id"]: - # attr_prefix=" "+dset_attrs["tracking_id"] + ": " - # conflict_attrs[k]=conflict_attrs[k] + attr_prefix + v + "," - - conflict_attrs.setdefault(k, []) - conflict_attrs[k].append(v) - # except: - # if self.verbose: - # print("Could not collect all attributes.") - return conflict_attrs - - def _get_varname(self, mf_dset): - return_varname = "" - varlist = list(mf_dset.data_vars) - for var in varlist: - cinv = False - for coord in ["ap", "b", "ps", "bnds"]: - if coord in var: - cinv = True - break - if cinv: - continue - return_varname = var - break - if not return_varname: - raise ValueError( - "Could not find any variable to write to swift. Please specify varname." - ) - if self.verbose: - print( - "We use variable {0} in case we need to rechunk.".format(return_varname) - ) - return return_varname - - def open_store(self, os_url, container, os_name): - """ - Collects attributes which conflict within all single dsets in `mf`. - It opens all elements of `mf` with `xarray.open_dataset` and collects - attributes in a dictionary and their values in lists. - - Parameters - ---------- - mf : list or str - mf is converted to a list and used as the first argument of `xarray.open_mfdataset`. - mf_dset: Dataset - `mf_dset` is the `xarray` object returned by `xarray.open_mfdataset` which does not include - the conflicting attributes. - xarray_kwargs=None : dict - xarray_kwargs are unpacked within `open_mfdataset`. - - - Returns - ------- - Dict - All conflicting attributes and its values. - """ - if not all([os_url,container]): - raise ValueError(f"Specify at least os_url and container for open_store") - if os_name: - if type(os_name) != str: - raise ValueError(f"{f} must be a sting") - self.prefix = os_name - - mapper_url="/".join(list(filter(None,[os_url,container,self.prefix]))) - self.fsspec_map=fsspec.get_mapper(mapper_url) - self.store=self.fsspec_map.fs - - #create container if necessary - if self.store.protocol == "swift": - getenv = os.environ.get - conn = Connection(preauthurl=getenv('OS_STORAGE_URL'), - preauthtoken=getenv('OS_AUTH_TOKEN')) - listings=self.store.listdir(self.os_url) - if not self.container in [a["name"].split('/')[-1] - for a in listings]: - conn.put_container(self.container) - - else: - with self.store.transaction: - self.store.mkdirs(mapper_url,exist_ok=True) - - test_fsspec(self.fsspec_map) - return self.fsspec_map + target=self.write_zarr_func( + chunkdim=chunkdim, + target_mb=target_mb, + startchunk=startchunk, + validity_check=validity_check, + maxretries=maxretries, + trusted=trusted + ) + super().__init__(target.root,target.fs) def _drop_vars_without_chunkdim(self, ds, chunkdim): """Drops all variables which do not depend on the chunk dimension. @@ -470,15 +146,13 @@ class Tzis: #) def _init_dataset(self, ds, store, chunkdim, overwrite): - storestring,storage_options=self.get_storestring_and_options(store) + storestring,storage_options=get_storestring_and_options(store) if overwrite: try: - #prefix = self.prefix - with self.store.transaction: - self.store.rm('/'.join([self.os_url,self.container,self.prefix]),recursive=True) - #with self.store.transaction: - # self.store.mkdirs('/'.join([self.os_url,self.container,self.prefix]),exist_ok=True) - #self.prefix = prefix + with self.fs.transaction: + self.fs.rm(self.fsspec_map.root,recursive=True) + #with self.fs.transaction: + # self.fs.mkdirs('/'.join([self.os_url,self.container,self.prefix]),exist_ok=True) except Exception as e: print(e) if self.verbose: @@ -502,7 +176,7 @@ class Tzis: def _open_or_initialize_swift_dset( self, store, ds, chunkdim, validity_check, overwrite=False ): - storestring,storage_options=self.get_storestring_and_options(store) + storestring,storage_options=get_storestring_and_options(store) if not overwrite: try: return ( @@ -514,7 +188,7 @@ class Tzis: except: if validity_check: raise ValueError( - f"validity_check=True but target dataset {self.prefix} cannot be opened." + f"validity_check=True but target dataset {self.fsspec_map.root} cannot be opened." ) elif self.verbose: print("A new zarr dataset is created at store.") @@ -691,7 +365,7 @@ class Tzis: def _reset_already_if_overwrite( self, validity_check, already, chunked_ds, store, isnew, chunkdim, varname, l_append ): - storestring,storage_options=self.get_storestring_and_options(store) + storestring,storage_options=get_storestring_and_options(store) b_isnew = isnew if not validity_check: overwrite = True @@ -727,7 +401,7 @@ class Tzis: # 'Initiliazed' means that a first `to_zarr` call has been executed which writes all coordinates and metadata for the dataset into the chunk. The subsequent `to_zarr` calls performed by `write_to_region` uses this information so that it knows how chunks have to be named. If a region has already been written, it will be overwritten by write_to_region. def write_by_region(self, towrite, store, chunkdim, chunkdimvalues): - storestring,storage_options=self.get_storestring_and_options(store) + storestring,storage_options=get_storestring_and_options(store) towrite_chunks = towrite[chunkdim].values startindex = [ idx @@ -781,7 +455,7 @@ class Tzis: if self._cratio: assumed_mb*=self._cratio if start_dask_chunk < 3*work_dcs: - newsize=self.store.du('/'.join([self.fsspec_map.root,self.varname]))/1024/1024 + newsize=self.fs.du('/'.join([self.fsspec_map.root,self.varname]))/1024/1024 assumed_mb=newsize-size size=newsize print( @@ -907,7 +581,7 @@ class Tzis: return 0 def write_directly(self, dset=None, store=None): - storestring,storage_options=self.get_storestring_and_options(store) + storestring,storage_options=get_storestring_and_options(store) dset.to_zarr(store=storestring, mode="w", consolidated=True, storage_options=storage_options) @@ -1033,8 +707,8 @@ class Tzis: def write_provenance(self, pargs): #try: - is_init = self.init_verify_provenance() - if is_init: + if not self.provenance: + self.provenance=Provenance(self.fsspec_map.root) if self.mf_dset.attrs: if "tracking_id" in self.mf_dset.attrs: self.provenance.gen_input( @@ -1056,10 +730,10 @@ class Tzis: self.mf_dset.attrs.setdefault("hasProvenance", "") self.mf_dset.attrs["hasProvenance"] += " " + "/".join( [ - self.url, + os.path.dirname(self.fsspec_map.root), "provenance", "_".join( - ["provenance", self.prefix, self.provenance.tracking_id + ".json"] + ["provenance", self.provenance.prefix, self.provenance.tracking_id + ".json"] ), ] ) @@ -1068,14 +742,14 @@ class Tzis: # print("Could not create full provenance") # pass - def write_zarr( + def write_zarr_func( self, chunkdim="time", target_mb=0, startchunk=0, validity_check=False, maxretries=3, - trusted=True, + trusted=True ): self.recent_chunk = None if not self.mf_dset : @@ -1090,17 +764,14 @@ class Tzis: totalsize=dset_to_write.nbytes - if self._original_size and not self._cratio: - self._cratio=self._original_size/totalsize + if self.original_size and not self._cratio: + self._cratio=self.original_size/totalsize if self.verbose: print(f"Compression ratio was estimated by disk size to {self._cratio}") if "chunks" in dset_to_write[self.varname].encoding.keys(): del dset_to_write[self.varname].encoding["chunks"] pargs = locals() - if not self.prefix: - raise ValueError("First open a target store with open_store()") - if not validity_check: self.write_provenance(pargs) @@ -1125,31 +796,14 @@ class Tzis: self.write_directly(dset_to_write, self.fsspec_map) # self.mf_dset.close() - try: - self.write_index_file() - except Exception as e: - print(e) - print("Could not write index file") - pass - - return copy.deepcopy(self.fsspec_map) - - def write_index_file(self, pattern=None, contact=None): - fsmap=mapper_mkdirs(self.os_url,self.container, None) - catalog.write_index_file(fsmap.fs, fsmap, pattern, contact) - - def write_catalog( - self, catalogname="catalog.json", pattern=None, delim=".", columns=[], mode="a" - ): - fsmap=mapper_mkdirs(self.os_url,self.container,None) - catalog.write_catalog( - fsmap.fs, - fsmap, - self.prefix, - catalogname="catalog.json", - pattern=None, - delim=".", - columns=[], - mode="a", - verbose=self.verbose, - ) \ No newline at end of file +# try: + write_index_file(self.fsspec_map.fs, + os.path.dirname(self.fsspec_map.root), + pattern=None, + contact=None) +# except Exception as e: +# print(e) +# print("Could not write index file") +# pass + + return copy.deepcopy(self.fsspec_map) \ No newline at end of file