From 14d1102376c68c0424c5dc90340723e625fd8978 Mon Sep 17 00:00:00 2001 From: k204229 <lucio-eceiza@dkrz.de> Date: Wed, 19 Mar 2025 18:26:06 +0100 Subject: [PATCH 01/15] feat: add option to force horizontal rechunking, add function to check chunksizes --- src/rechunk_data/__init__.py | 28 ++- src/rechunk_data/_rechunk.py | 347 +++++++++++++++++++++++++++++++++-- 2 files changed, 356 insertions(+), 19 deletions(-) diff --git a/src/rechunk_data/__init__.py b/src/rechunk_data/__init__.py index 8e68bac..66645e5 100644 --- a/src/rechunk_data/__init__.py +++ b/src/rechunk_data/__init__.py @@ -6,11 +6,11 @@ from pathlib import Path from typing import List, Optional from ._rechunk import ( rechunk_netcdf_file, - rechunk_dataset, + check_chunk_size, logger, ) -__version__ = "2310.0.1" +__version__ = "2503.0.1" PROGRAM_NAME = "rechunk-data" @@ -35,12 +35,13 @@ def parse_args(argv: Optional[List[str]]) -> argparse.Namespace: ), ) parser.add_argument( + "-o", "--output", type=Path, help=( "Output file/directory of the chunked netcdf " "file(s). Note: If ``input`` is a directory output should be a" - " directory. If None given (default) the ``input`` is overidden." + " directory. If None given (default) the ``input`` is overridden." ), default=None, ) @@ -57,6 +58,20 @@ def parse_args(argv: Optional[List[str]]) -> argparse.Namespace: action="store_true", default=False, ) + parser.add_argument( + "--force-horizontal", + "-fh", + help="Force horizontal chunking (~126 MB per chunk).", + action="store_true", + default=False, + ) + parser.add_argument( + "--check-chunk-size", + "-c", + help="Check the chunk size of the input dataset (in MB).", + action="store_true", + default=False, + ) parser.add_argument( "-v", action="count", @@ -77,9 +92,16 @@ def parse_args(argv: Optional[List[str]]) -> argparse.Namespace: def cli(argv: Optional[List[str]] = None) -> None: """Command line interface calling the rechunking method.""" args = parse_args(argv) + if args.check_chunk_size: + import xarray as xr + + with xr.open_dataset(args.input, chunks={}) as dset: + check_chunk_size(dset) + return rechunk_netcdf_file( args.input, args.output, engine=args.netcdf_engine, decode_cf=args.skip_cf_convention is False, + force_horizontal=args.force_horizontal, ) diff --git a/src/rechunk_data/_rechunk.py b/src/rechunk_data/_rechunk.py index 311ed25..8e48a7f 100644 --- a/src/rechunk_data/_rechunk.py +++ b/src/rechunk_data/_rechunk.py @@ -9,6 +9,7 @@ from typing_extensions import Literal from dask.utils import format_bytes from dask.array.core import Array import xarray as xr +import numpy as np logging.basicConfig( @@ -44,6 +45,23 @@ ENCODINGS = dict( def _search_for_nc_files(input_path: Path) -> Generator[Path, None, None]: + """Yield all netCDF files in the given input_path. + + If the input is a directory, search recursively for all files with the + suffixes .nc or .nc4. If the input is a file, yield only the file itself. + If the input is a path with a glob pattern, construct it and search for + netCDF files in the resulting directory. + + Parameters + ---------- + input_path: Path + The path to search for netCDF files + + Yields + ------ + Path + The path to a netCDF file + """ suffixes = [".nc", "nc4"] input_path = input_path.expanduser().absolute() if input_path.is_dir() and input_path.exists(): @@ -65,6 +83,28 @@ def _save_dataset( engine: Literal["netcdf4", "h5netcdf"], override: bool = False, ) -> None: + """ + Save the given xarray dataset to a netCDF file with specified encoding. + + Parameters + ---------- + dset : xr.Dataset + The dataset to be saved. + file_name : Path + The path where the netCDF file will be saved. + encoding : Dict[Hashable, Dict[str, Any]] + Encoding options for each variable in the dataset. + engine : Literal["netcdf4", "h5netcdf"] + The engine to use for writing the netCDF file. + override : bool, optional + If True, save the dataset even if no encoding is provided. Default is False. + + Returns + ------- + None + + Logs a debug message indicating successful save or an error message if saving fails. + """ if not encoding and not override: logger.debug("Chunk size already optimized for %s", file_name.name) return @@ -79,10 +119,218 @@ def _save_dataset( logger.error("Saving to file failed: %s", str(error)) +def _optimal_chunks( + time_size: int, + y_size: int, + x_size: int, + target_mb: float = 126.0, + dtype: np.dtype = np.dtype("float32"), + only_horizontal: bool = True, +) -> Tuple[int, int, int]: + """ + Compute optimal chunk sizes for time, y (lat), and x (lon) adjusted to a + certain size in MB. + Optionally, allow forcing chunking only in horizontal dimensions + while keeping the time dimension as a single chunk. + + Parameters + ---------- + time_size: int + Total size of the time dimension (kept as a single chunk). + y_size: int + Total size of the y (latitude) dimension. + x_size: int + Total size of the x (longitude) dimension. + target_mb: float, default: 126MB + Desired chunk size in MB. + dtype: np.dtype + Encoding dtype. + only_horizontal: bool + Whether to force chunking only in the horizontal dimensions. + + Returns + ------- + Tuple[int, int, int] + Optimal chunk sizes for time, y, and x. + """ + dtype_size = dtype.itemsize + target_elements = (target_mb * (1024**2)) / dtype_size + + if y_size == 1 and x_size == 1: + only_horizontal = False + if only_horizontal: + factor = np.sqrt(target_elements / (time_size * y_size * x_size)) + time_chunk = time_size + else: + factor = np.cbrt(target_elements / (time_size * y_size * x_size)) + time_chunk = max(1, int(time_size * factor)) + y_chunk = max(1, int(y_size * factor)) + x_chunk = max(1, int(x_size * factor)) + + y_chunk = min(y_chunk, y_size) + x_chunk = min(x_chunk, x_size) + time_chunk = min(time_chunk, time_size) + + return (time_chunk, y_chunk, x_chunk) + + +def _map_chunks( + chunks: dict, dim_mapping: dict, chunk_tuple: Tuple[int, int, int] +) -> dict: + """ + Update chunk sizes in `chunks` using the dimension mapping. + + Parameters + ---------- + chunks: dict + Dictionary with initial chunking setup (e.g., {0: 'auto', 1: None, 2: None}). + dim_mapping: dict + Mapping of dimension names to chunk indices (e.g., {'time': 0, 'y': 1, 'x': 2}). + chunk_tuple: Tuple[int, int, int] + Desired chunk size for (time, y, x) dimensions. + + Returns + ------- + dict + Updated chunk dictionary with appropriate chunk sizes. + """ + updated_chunks = chunks.copy() + + for dim, index in dim_mapping.items(): + if "time" in dim.lower(): + updated_chunks[index] = chunk_tuple[0] + elif "y" in dim.lower() or "lat" in dim.lower(): + updated_chunks[index] = chunk_tuple[1] + elif "x" in dim.lower() or "lon" in dim.lower(): + updated_chunks[index] = chunk_tuple[2] + + return updated_chunks + + +def _check_horizontal_unchanged(orig_size: dict, new_chunksizes: dict): + """ + Checks if time chunking is reduced while any horizontal dimension remains unchanged. + + Parameters + ---------- + orig_size: dict + Original sizes of dataset dimensions. + new_chunksizes: dict + New chunk sizes after rechunking. + + Returns + ------- + bool + True if time chunking is reduced and any horizontal chunk size is unchanged. + """ + time_dims = [dim for dim in orig_size if "time" in dim.lower()] + horizontal_dims = [ + dim + for dim in orig_size + if any(keyword in dim.lower() for keyword in ["lon", "lat", "x", "y"]) + ] + + time_reduced = any( + new_chunksizes.get(dim, float("inf")) + < orig_size.get(dim, float("inf")) + for dim in time_dims + ) + horizontal_unchanged = any( + new_chunksizes.get(dim) == orig_size.get(dim) + for dim in horizontal_dims + ) + return time_reduced and horizontal_unchanged + + +def _horizontal_chunks( + da: xr.DataArray, chunks: dict, chunksizes: Tuple[int, int, int] +) -> dict: + """ + Updates chunk sizes, forcing horizontal chunking whenever possible. + + Parameters + ---------- + da: xr.DataArray + Data variable from the dataset. + chunks: dict + Original chunking dictionary. + chunksizes: Tuple[int, int, int] + Chunk sizes obtained with the chunks dict. + + Returns + ------- + dict + Updated chunk dictionary with appropriate chunk sizes. + """ + orig_size = dict(da.sizes) + chunksizes_dict = dict(zip(da.dims, chunksizes)) + chunk_order = dict(zip(da.dims, chunks)) + + try: + if _check_horizontal_unchanged(orig_size, chunksizes_dict): + time_size = next( + ( + value + for key, value in orig_size.items() + if isinstance(key, str) and "time" in key.lower() + ), + 1, + ) + + y_size = next( + ( + value + for key, value in orig_size.items() + if isinstance(key, str) + and any(k in key.lower() for k in ["lat", "y"]) + ), + 1, + ) + + x_size = next( + ( + value + for key, value in orig_size.items() + if isinstance(key, str) + and any(k in key.lower() for k in ["lon", "x"]) + ), + 1, + ) + dtype = da.encoding.get("dtype", np.dtype("float32")) + chunk_tuple = _optimal_chunks( + time_size, y_size, x_size, dtype=dtype + ) + return _map_chunks(chunks, chunk_order, chunk_tuple) + else: + return chunks + + except Exception as e: + logger.error(f"Error in _horizontal_chunks: {e}", exc_info=True) + return chunks + + def _rechunk_dataset( dset: xr.Dataset, engine: Literal["h5netcdf", "netcdf4"], + force_horizontal: bool = False, ) -> Tuple[xr.Dataset, Dict[Hashable, Dict[str, Any]]]: + """ + Rechunks a dataset to optimize chunk sizes for storage and computation. + + Parameters + ---------- + dset: xr.Dataset + The dataset to be rechunked. + engine: str, default: netcdf4 + The NetCDF engine used for encoding the new dataset. + force_horizontal: bool, default: False + If True, forces horizontal chunking whenever possible. + + Returns + ------- + Tuple[xr.Dataset, Dict[Hashable, Dict[str, Any]]] + A tuple containing the rechunked dataset and the updated encoding dictionary. + """ encoding: Dict[Hashable, Dict[str, Any]] = {} try: _keywords = ENCODINGS[engine] @@ -102,12 +350,18 @@ def _rechunk_dataset( logger.debug("Rechunking variable %s", var) chunks: Dict[int, Optional[str]] = {} for i, dim in enumerate(map(str, dset[var].dims)): - if "lon" in dim.lower() or "lat" in dim.lower() or "bnds" in dim.lower(): + if any( + keyword in dim.lower() + for keyword in ["lon", "lat", "bnds", "x", "y"] + ): chunks[i] = None else: chunks[i] = "auto" old_chunks = dset[var].encoding.get("chunksizes") new_chunks = dset[var].data.rechunk(chunks).chunksize + if force_horizontal: + chunks = _horizontal_chunks(dset[var], chunks, new_chunks) + new_chunks = dset[var].data.rechunk(chunks).chunksize if new_chunks == old_chunks: logger.debug("%s: chunk sizes already optimized, skipping", var) continue @@ -120,15 +374,22 @@ def _rechunk_dataset( ) logger.debug("Settings encoding of variable %s", var) encoding[data_var] = { - str(k): v for k, v in dset[var].encoding.items() if str(k) in _keywords + str(k): v + for k, v in dset[var].encoding.items() + if str(k) in _keywords } - if engine != "netcdf4" or encoding[data_var].get("contiguous", False) is False: + if ( + engine != "netcdf4" + or encoding[data_var].get("contiguous", False) is False + ): encoding[data_var]["chunksizes"] = new_chunks return dset, encoding def rechunk_dataset( - dset: xr.Dataset, engine: Literal["h5netcdf", "netcdf4"] = "netcdf4" + dset: xr.Dataset, + engine: Literal["h5netcdf", "netcdf4"] = "netcdf4", + force_horizontal: bool = False, ) -> xr.Dataset: """Rechunk a xarray dataset. @@ -138,12 +399,14 @@ def rechunk_dataset( Input dataset that is going to be rechunked engine: str, default: netcdf4 The netcdf engine used to create the new netcdf file. + force_horizontal: bool, default: False + If True, forces horizontal chunking whenever possible. Returns ------- xarray.Dataset: rechunked dataset """ - data, _ = _rechunk_dataset(dset.chunk(), engine) + data, _ = _rechunk_dataset(dset.chunk(), engine, force_horizontal) return data @@ -152,23 +415,32 @@ def rechunk_netcdf_file( output_path: Optional[os.PathLike] = None, decode_cf: bool = True, engine: Literal["h5netcdf", "netcdf4"] = "netcdf4", + force_horizontal: bool = False, ) -> None: - """Rechunk netcdf files. + """Rechunk NetCDF files. Parameters ---------- input_path: os.PathLike - Input file/directory. If a directory is given all ``.nc`` in all sub - directories will be processed - output_path: os.PathLike - Output file/directory of the chunked netcdf file(s). Note: If ``input`` - is a directory output should be a directory. If None given (default) - the ``input`` is overidden. + Input file or directory. If a directory is given, all ``.nc`` files in all + subdirectories will be processed. + output_path: os.PathLike, optional + Output file or directory for the chunked NetCDF file(s). If ``input_path`` + is a directory, ``output_path`` should also be a directory. If None (default), + the ``input_path`` will be overwritten. decode_cf: bool, default: True - Whether to decode these variables, assuming they were saved according - to CF conventions. + Whether to decode variables, assuming they were saved according to CF + conventions. engine: str, default: netcdf4 - The netcdf engine used to create the new netcdf file. + The NetCDF engine used to create the new NetCDF file. + force_horizontal: bool, default: False + If True, forces horizontal chunking whenever possible. + + Returns + ------- + None + The function processes and saves the rechunked dataset(s) to the specified + ``output_path``. """ input_path = Path(input_path).expanduser().absolute() for input_file in _search_for_nc_files(input_path): @@ -187,7 +459,9 @@ def rechunk_netcdf_file( parallel=True, decode_cf=decode_cf, ) as nc_data: - new_data, encoding = _rechunk_dataset(nc_data, engine) + new_data, encoding = _rechunk_dataset( + nc_data, engine, force_horizontal + ) if encoding: logger.debug( "Loading data into memory (%s).", @@ -208,3 +482,44 @@ def rechunk_netcdf_file( engine, override=output_file != input_file, ) + + +def check_chunk_size(dset: xr.Dataset) -> None: + """ + Estimates the chunk size of a dataset in MB. + + Parameters + ---------- + dset: xr.Dataset + The dataset for which to estimate the chunk size. + + Returns + ------- + None + This function prints out the estimated chunk size for each variable in a + dataset. The chunk size is estimated by multiplying the size of a single + element of the data type with the product of all chunk sizes. + """ + for var in dset.data_vars: + print(f"\n{var}:\t{dict(dset[var].sizes)}") + dtype_size = np.dtype(dset[var].dtype).itemsize + chunksizes = dset[var].encoding.get("chunksizes") + + if chunksizes is None: + print( + f" âš ï¸ Warning: No chunk sizes found for {var}, skipping...\n" + ) + continue + + chunksizes = tuple( + filter(lambda x: isinstance(x, (int, np.integer)), chunksizes) + ) + chunk_size_bytes = np.prod(chunksizes) * dtype_size + chunk_size_mb = chunk_size_bytes / (1024**2) # Convert to MB + + chunks = dset[var].chunks or tuple(() for _ in dset[var].dims) + + print(f" * Chunk Sizes: {chunksizes}") + print(f" * Estimated Chunk Size: {chunk_size_mb:.2f} MB") + print(f" * Chunks: {dict(zip(dset[var].dims, map(tuple, chunks)))}") + print(f"\n----------\n{dset}\n") -- GitLab From ca14198cad2d4f556f69ca6ce011260a1d3940dc Mon Sep 17 00:00:00 2001 From: k204229 <lucio-eceiza@dkrz.de> Date: Wed, 19 Mar 2025 18:26:42 +0100 Subject: [PATCH 02/15] add: Makefile for dev env --- Makefile | 19 +++++++++++++++++++ setup.py | 5 +++-- 2 files changed, 22 insertions(+), 2 deletions(-) create mode 100644 Makefile diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..233482b --- /dev/null +++ b/Makefile @@ -0,0 +1,19 @@ +# makefile used for testing +# +# +all: install test + +install: + python3 -m pip install -e .[test] + +test: + python3 -m pytest -vv \ + --cov=$(PWD)/src/rechunk_data --cov-report=html:coverage_report \ + --junitxml report.xml --cov-report xml \ + $(PWD)/src/rechunk_data/tests + python3 -m coverage report + +lint: + mypy --install-types --non-interactive + black --check -t py310 -l 79 src + flake8 src/rechunk_data --count --max-complexity=15 --max-line-length=88 --statistics --doctests \ No newline at end of file diff --git a/setup.py b/setup.py index ae0a411..9aa3f8e 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -"""Setup script for packaging checkin.""" +"""Setup script for packaging check-in.""" import json from pathlib import Path @@ -51,9 +51,10 @@ setup( "pytest-cov", "testpath", "types-mock", + "flake8", ], }, - python_requires=">=3.6", + python_requires=">=3.9", classifiers=[ "Development Status :: 3 - Alpha", "Environment :: Console", -- GitLab From d3c01637dc7639b85210f2418348286a03586b1b Mon Sep 17 00:00:00 2001 From: k204229 <lucio-eceiza@dkrz.de> Date: Wed, 19 Mar 2025 18:27:31 +0100 Subject: [PATCH 03/15] fix: update info in README.md, fix typos --- README.md | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index b75b5d6..fc317c9 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Rechunking NetCDF data. -Rechunking of exsisting netcdf files to an optimal chunk size. This code provides +Rechunking of existing netcdf files to an optimal chunk size. This code provides a simple command line interface (cli) to rechunk existing netcdf data to an optimal chunksize of around 128 MB. @@ -28,8 +28,8 @@ new_data = rechunk_dataset(dset) ### Using the command line interface: ```bash -rechunk-data --help -usage: rechunk-data [-h] [--output OUTPUT] [--netcdf-engine {h5netcdf,netcdf4}] [--skip-cf-convention] [-v] [-V] input +rechunk-data --help +usage: rechunk-data [-h] [-o OUTPUT] [--netcdf-engine {h5netcdf,netcdf4}] [--skip-cf-convention] [--force-horizontal] [--check-chunk-size] [-v] [-V] input Rechunk input netcdf data to optimal chunk-size. approx. 126 MB per chunk @@ -38,12 +38,15 @@ positional arguments: options: -h, --help show this help message and exit - --output OUTPUT Output file/directory of the chunked netcdf file(s). - Note: If ``input`` is a directory output should be a directory. - If None given (default) the ``input`` is overidden. (default: None) + -o, --output OUTPUT Output file/directory of the chunked netcdf file(s). Note: If ``input`` is a directory output should be a directory. If None given (default) the + ``input`` is overridden. (default: None) --netcdf-engine {h5netcdf,netcdf4} The netcdf engine used to create the new netcdf file. (default: netcdf4) --skip-cf-convention Do not assume assume data variables follow CF conventions. (default: False) + --force-horizontal, -fh + Force horizontal chunking (~126 MB per chunk). (default: False) + --check-chunk-size, -c + Check the chunk size of the input dataset (in MB). (default: False) -v Increase verbosity (default: 0) -V, --version show program's version number and exit ``` -- GitLab From d3b7ea7d1785bc42ca12fd68a192adbd0bc379f7 Mon Sep 17 00:00:00 2001 From: k204229 <lucio-eceiza@dkrz.de> Date: Wed, 19 Mar 2025 18:28:14 +0100 Subject: [PATCH 04/15] fix: lint in testfiles, TODO: make sure they run, update test coverage --- src/rechunk_data/tests/test_rechunk_netcdf.py | 5 ++++- src/rechunk_data/tests/test_rechunking.py | 1 + src/rechunk_data/tests/test_search_files.py | 1 + 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/rechunk_data/tests/test_rechunk_netcdf.py b/src/rechunk_data/tests/test_rechunk_netcdf.py index 3544d3d..afc8f7d 100644 --- a/src/rechunk_data/tests/test_rechunk_netcdf.py +++ b/src/rechunk_data/tests/test_rechunk_netcdf.py @@ -1,4 +1,5 @@ """Test the actual rechunk method.""" + import logging from pathlib import Path import time @@ -24,7 +25,9 @@ def test_rechunk_data_dir_without_overwrite(data_dir: Path) -> None: """Testing the creation of new datafiles from a folder.""" with TemporaryDirectory() as temp_dir: rechunk_netcdf_file(data_dir, Path(temp_dir)) - new_files = sorted(f.relative_to(temp_dir) for f in Path(temp_dir).rglob(".nc")) + new_files = sorted( + f.relative_to(temp_dir) for f in Path(temp_dir).rglob(".nc") + ) old_files = sorted(f.relative_to(data_dir) for f in data_dir.rglob(".nc")) assert new_files == old_files diff --git a/src/rechunk_data/tests/test_rechunking.py b/src/rechunk_data/tests/test_rechunking.py index 945bb0d..1eba813 100644 --- a/src/rechunk_data/tests/test_rechunking.py +++ b/src/rechunk_data/tests/test_rechunking.py @@ -1,4 +1,5 @@ """Unit tests for rechunking the data.""" + import dask import xarray as xr diff --git a/src/rechunk_data/tests/test_search_files.py b/src/rechunk_data/tests/test_search_files.py index cff79c0..297ee64 100644 --- a/src/rechunk_data/tests/test_search_files.py +++ b/src/rechunk_data/tests/test_search_files.py @@ -1,4 +1,5 @@ """Unit tests for searching for files.""" + from pathlib import Path from rechunk_data._rechunk import _search_for_nc_files -- GitLab From 76521cdd2816c5e9cabfd861f76fa6362561572d Mon Sep 17 00:00:00 2001 From: k204229 <lucio-eceiza@dkrz.de> Date: Thu, 20 Mar 2025 13:45:32 +0100 Subject: [PATCH 05/15] fix: horizontal force-chunking now disregard the time original chunksize, simplify printout in check_chunk_size --- src/rechunk_data/_rechunk.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/src/rechunk_data/_rechunk.py b/src/rechunk_data/_rechunk.py index 8e48a7f..f14e997 100644 --- a/src/rechunk_data/_rechunk.py +++ b/src/rechunk_data/_rechunk.py @@ -207,39 +207,32 @@ def _map_chunks( return updated_chunks -def _check_horizontal_unchanged(orig_size: dict, new_chunksizes: dict): +def _check_horizontal_unchanged(orig_size: dict, chunksizes: dict): """ - Checks if time chunking is reduced while any horizontal dimension remains unchanged. + Checks if any horizontal dimension chunksize = dimension size Parameters ---------- orig_size: dict Original sizes of dataset dimensions. - new_chunksizes: dict - New chunk sizes after rechunking. + chunksizes: dict + Chunk sizes of dataset dimensions. Returns ------- bool - True if time chunking is reduced and any horizontal chunk size is unchanged. + True if any horizontal chunk size = original size. """ - time_dims = [dim for dim in orig_size if "time" in dim.lower()] horizontal_dims = [ dim for dim in orig_size if any(keyword in dim.lower() for keyword in ["lon", "lat", "x", "y"]) ] - - time_reduced = any( - new_chunksizes.get(dim, float("inf")) - < orig_size.get(dim, float("inf")) - for dim in time_dims - ) horizontal_unchanged = any( - new_chunksizes.get(dim) == orig_size.get(dim) + chunksizes.get(dim) == orig_size.get(dim) for dim in horizontal_dims ) - return time_reduced and horizontal_unchanged + return horizontal_unchanged def _horizontal_chunks( @@ -247,6 +240,12 @@ def _horizontal_chunks( ) -> dict: """ Updates chunk sizes, forcing horizontal chunking whenever possible. + + .. Note: + + This function will not take in account level dimensions such as + ''lev'' or ''depth'' in the chunking. probably resulting on a chunksize + of 1 for those dimensions. Parameters ---------- @@ -521,5 +520,4 @@ def check_chunk_size(dset: xr.Dataset) -> None: print(f" * Chunk Sizes: {chunksizes}") print(f" * Estimated Chunk Size: {chunk_size_mb:.2f} MB") - print(f" * Chunks: {dict(zip(dset[var].dims, map(tuple, chunks)))}") - print(f"\n----------\n{dset}\n") + print(f" * Chunks: {dict(zip(dset[var].dims, map(tuple, chunks)))}\n") -- GitLab From 4cfa8b65677127521648676f95b851f3f386c2f1 Mon Sep 17 00:00:00 2001 From: k204229 <lucio-eceiza@dkrz.de> Date: Thu, 20 Mar 2025 13:46:36 +0100 Subject: [PATCH 06/15] fix: bug in testing, test ci now for py>=3.9 --- .gitlab-ci.yml | 28 ++++++++++++++++++---------- src/rechunk_data/__init__.py | 1 + 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 86ea821..6d32edd 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -16,42 +16,50 @@ lint: - black --check src - pylint --fail-under 8.5 src/rechunk_data/__init__.py -test_36: +test_39: << : *py_test before_script: - - conda create -q -p /tmp/test python=3.6 pip dask -y + - conda create -q -p /tmp/test python=3.9 pip dask -y - /tmp/test/bin/python -m pip install -e .[test] script: - /tmp/test/bin/python -m pytest -vv -test_37: +test_310: << : *py_test before_script: - - conda create -q -p /tmp/test python=3.7 pip dask -y + - conda create -q -p /tmp/test python=3.10 pip dask -y - /tmp/test/bin/python -m pip install -e .[test] script: - /tmp/test/bin/python -m pytest -vv -test_38: +test_310: << : *py_test before_script: - - conda create -q -p /tmp/test python=3.8 pip dask -y + - conda create -q -p /tmp/test python=3.10 pip dask -y - /tmp/test/bin/python -m pip install -e .[test] script: - /tmp/test/bin/python -m pytest -vv -test_39: +test_311: << : *py_test before_script: - - conda create -q -p /tmp/test python=3.9 pip dask -y + - conda create -q -p /tmp/test python=3.11 pip dask -y - /tmp/test/bin/python -m pip install -e .[test] script: - /tmp/test/bin/python -m pytest -vv -test_310: +test_312: << : *py_test before_script: - - conda create -q -p /tmp/test python=3.10 pip dask -y + - conda create -q -p /tmp/test python=3.12 pip dask -y + - /tmp/test/bin/python -m pip install -e .[test] + script: + - /tmp/test/bin/python -m pytest -vv + +test_313: + << : *py_test + before_script: + - conda create -q -p /tmp/test python=3.13 pip dask -y - /tmp/test/bin/python -m pip install -e .[test] script: - /tmp/test/bin/python -m pytest -vv diff --git a/src/rechunk_data/__init__.py b/src/rechunk_data/__init__.py index 66645e5..bf28946 100644 --- a/src/rechunk_data/__init__.py +++ b/src/rechunk_data/__init__.py @@ -7,6 +7,7 @@ from typing import List, Optional from ._rechunk import ( rechunk_netcdf_file, check_chunk_size, + rechunk_dataset, logger, ) -- GitLab From 5935922b10eb8437e587b2fe119c3674f43c8054 Mon Sep 17 00:00:00 2001 From: k204229 <lucio-eceiza@dkrz.de> Date: Thu, 20 Mar 2025 14:09:51 +0100 Subject: [PATCH 07/15] fix: linting --- src/rechunk_data/_rechunk.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/rechunk_data/_rechunk.py b/src/rechunk_data/_rechunk.py index f14e997..2193d1b 100644 --- a/src/rechunk_data/_rechunk.py +++ b/src/rechunk_data/_rechunk.py @@ -229,8 +229,7 @@ def _check_horizontal_unchanged(orig_size: dict, chunksizes: dict): if any(keyword in dim.lower() for keyword in ["lon", "lat", "x", "y"]) ] horizontal_unchanged = any( - chunksizes.get(dim) == orig_size.get(dim) - for dim in horizontal_dims + chunksizes.get(dim) == orig_size.get(dim) for dim in horizontal_dims ) return horizontal_unchanged @@ -240,10 +239,10 @@ def _horizontal_chunks( ) -> dict: """ Updates chunk sizes, forcing horizontal chunking whenever possible. - - .. Note: - This function will not take in account level dimensions such as + .. Note: + + This function will not take in account level dimensions such as ''lev'' or ''depth'' in the chunking. probably resulting on a chunksize of 1 for those dimensions. -- GitLab From f7a9e598c6fb4ae395e902afcfaf7687c452e7c2 Mon Sep 17 00:00:00 2001 From: k204229 <lucio-eceiza@dkrz.de> Date: Thu, 20 Mar 2025 15:30:17 +0100 Subject: [PATCH 08/15] add: code coverage for new functions, linting --- src/rechunk_data/__init__.py | 9 +- src/rechunk_data/_rechunk.py | 7 +- .../tests/test_check_chunksize.py | 37 +++++++ src/rechunk_data/tests/test_cli.py | 17 ++- src/rechunk_data/tests/test_rechunking.py | 102 +++++++++++++++++- 5 files changed, 165 insertions(+), 7 deletions(-) create mode 100644 src/rechunk_data/tests/test_check_chunksize.py diff --git a/src/rechunk_data/__init__.py b/src/rechunk_data/__init__.py index bf28946..210fa90 100644 --- a/src/rechunk_data/__init__.py +++ b/src/rechunk_data/__init__.py @@ -7,10 +7,17 @@ from typing import List, Optional from ._rechunk import ( rechunk_netcdf_file, check_chunk_size, - rechunk_dataset, + rechunk_dataset, # noqa logger, ) +__all__ = [ + "rechunk_netcdf_file", + "check_chunk_size", + "rechunk_dataset", + "logger", +] + __version__ = "2503.0.1" PROGRAM_NAME = "rechunk-data" diff --git a/src/rechunk_data/_rechunk.py b/src/rechunk_data/_rechunk.py index 2193d1b..13d0fc6 100644 --- a/src/rechunk_data/_rechunk.py +++ b/src/rechunk_data/_rechunk.py @@ -260,11 +260,10 @@ def _horizontal_chunks( dict Updated chunk dictionary with appropriate chunk sizes. """ - orig_size = dict(da.sizes) - chunksizes_dict = dict(zip(da.dims, chunksizes)) - chunk_order = dict(zip(da.dims, chunks)) - try: + orig_size = dict(da.sizes) + chunk_order = dict(zip(da.dims, chunks)) + chunksizes_dict = dict(zip(da.dims, chunksizes)) if _check_horizontal_unchanged(orig_size, chunksizes_dict): time_size = next( ( diff --git a/src/rechunk_data/tests/test_check_chunksize.py b/src/rechunk_data/tests/test_check_chunksize.py new file mode 100644 index 0000000..78a9365 --- /dev/null +++ b/src/rechunk_data/tests/test_check_chunksize.py @@ -0,0 +1,37 @@ +"""Unit tests for checking the chunksize of the data.""" + +import xarray as xr +import numpy as np + +from rechunk_data import check_chunk_size + + +def test_check_chunk_size(capsys) -> None: + """Test the check_chunk_size function with valid and invalid chunksizes.""" + + data1 = np.random.rand(100, 1100, 1200) + da1 = xr.DataArray(data1, dims=("time", "lat", "lon"), name="valid_var") + dset1 = xr.Dataset({"valid_var": da1}) + dset1 = dset1.chunk({"time": 10, "lat": 550, "lon": 600}) + dset1["valid_var"].encoding = { + "chunksizes": (10, 550, 600), + "dtype": "float32", + "zlib": True, + } + + data2 = np.random.rand(100, 1100, 1200) + da2 = xr.DataArray(data2, dims=("time", "lat", "lon"), name="invalid_var") + dset2 = xr.Dataset({"invalid_var": da2}) + + combined_dset = xr.merge([dset1, dset2]) + + check_chunk_size(combined_dset) + captured = capsys.readouterr() + + assert "valid_var" in captured.out + assert "invalid_var" in captured.out + assert ( + "âš ï¸ Warning: No chunk sizes found for invalid_var, skipping..." + in captured.out + ) + assert "Estimated Chunk Size: 25.18 MB" in captured.out diff --git a/src/rechunk_data/tests/test_cli.py b/src/rechunk_data/tests/test_cli.py index cdf7c51..e428fd2 100644 --- a/src/rechunk_data/tests/test_cli.py +++ b/src/rechunk_data/tests/test_cli.py @@ -2,9 +2,10 @@ from pathlib import Path from tempfile import TemporaryDirectory - +from io import StringIO import pytest from rechunk_data import cli +import sys def test_command_line_interface(data_dir: Path) -> None: @@ -20,3 +21,17 @@ def test_command_line_interface(data_dir: Path) -> None: cli([str(data_dir), "--output", temp_dir]) new_files = sorted(Path(temp_dir).rglob("*.nc")) assert len(data_files) == len(new_files) + + +def test_check_chunk_size(data_dir: Path) -> None: + """Test the --check-chunk-size argument.""" + + data_files = sorted(data_dir.rglob("*.nc"))[0] + captured_output = StringIO() + sys.stdout = captured_output + cli([str(data_files), "--check-chunk-size"]) + sys.stdout = sys.__stdout__ + output = captured_output.getvalue() + assert "tas:" in output + assert "Chunk Sizes: (1, 1, 24, 24)" in output + assert "Estimated Chunk Size: 0.00 MB" in output diff --git a/src/rechunk_data/tests/test_rechunking.py b/src/rechunk_data/tests/test_rechunking.py index 1eba813..7a4f505 100644 --- a/src/rechunk_data/tests/test_rechunking.py +++ b/src/rechunk_data/tests/test_rechunking.py @@ -2,8 +2,14 @@ import dask import xarray as xr +import numpy as np -from rechunk_data._rechunk import _rechunk_dataset +from rechunk_data._rechunk import ( + _rechunk_dataset, + _optimal_chunks, + _check_horizontal_unchanged, + _horizontal_chunks, +) def test_rechunking_small_data( @@ -36,3 +42,97 @@ def test_rechunking_large_data( dset, encoding = _rechunk_dataset(large_chunk_data, "h5netcdf") assert encoding[variable_name]["chunksizes"] == chunks assert dset[variable_name].data.chunksize == chunks + + +def test_optimal_chunks(): + """Test the optimal chunk calculation.""" + + time_size = 100 + y_size = 192 + x_size = 384 + dtype = np.dtype("float32") + + time_chunk, y_chunk, x_chunk = _optimal_chunks( + time_size, y_size, x_size, dtype=dtype, only_horizontal=False + ) + assert time_chunk <= time_size + assert y_chunk <= y_size + assert x_chunk <= x_size + + time_chunk, y_chunk, x_chunk = _optimal_chunks( + time_size, y_size, x_size, dtype=dtype, only_horizontal=True + ) + assert time_chunk == time_size + assert y_chunk <= y_size + assert x_chunk <= x_size + + time_size = 100_000_000 + y_size = 1 + x_size = 1 + time_chunk, y_chunk, x_chunk = _optimal_chunks( + time_size, y_size, x_size, dtype=dtype, only_horizontal=True + ) + assert time_chunk < time_size + + +def test_check_horizontal_unchanged(): + """Test detection of horizontal chunks remaining unchanged.""" + orig_size = {"time": 100, "lat": 192, "lon": 384} + + chunksizes = {"time": 50, "lat": 192, "lon": 384} + assert _check_horizontal_unchanged(orig_size, chunksizes) is True + + chunksizes = {"time": 50, "lat": 192, "lon": 128} + assert _check_horizontal_unchanged(orig_size, chunksizes) is True + + chunksizes = {"time": 50, "lat": 128, "lon": 128} + assert _check_horizontal_unchanged(orig_size, chunksizes) is False + + +def test_horizontal_chunks(): + """Test horizontal chunking function and _rechunk_dataset() with + force_horizontal.""" + data = np.random.rand(100, 1100, 1200) + da = xr.DataArray(data, dims=("time", "lat", "lon"), name="test_var") + + chunks = {0: "auto", 1: None, 2: None} + chunksizes = (1, 100, 1200) + + updated_chunks = _horizontal_chunks(da, chunks, chunksizes) + assert updated_chunks[0] == 100 + assert updated_chunks[1] < 1100 + assert updated_chunks[2] < 1200 + + chunksizes = (50, 100, 600) + updated_chunks = _horizontal_chunks(da, chunks, chunksizes) + assert updated_chunks == chunks + + updated_chunks = _horizontal_chunks("invalid_data", chunks, chunksizes) + assert updated_chunks == chunks + + dset = xr.Dataset({"test_var": da}) + chunksizes = (10, 550, 1200) + dset = dset.chunk( + {"time": chunksizes[0], "lat": chunksizes[1], "lon": chunksizes[2]} + ) + encoding = { + "test_var": { + "zlib": True, + "complevel": 4, + "shuffle": True, + "dtype": "float32", + "chunksizes": chunksizes, + "_FillValue": np.nan, + } + } + dset["test_var"].encoding = encoding + _, encoding = _rechunk_dataset( + dset, engine="netcdf4", force_horizontal=True + ) + assert "test_var" in encoding + + chunksizes_applied = encoding["test_var"].get("chunksizes", None) + assert chunksizes_applied is not None + assert chunksizes_applied[0] == 100 + assert chunksizes_applied[1] < 1100 + assert chunksizes_applied[2] < 1200 -- GitLab From 5e65aca0f04ec3ed3c98ca5571cf943941348e46 Mon Sep 17 00:00:00 2001 From: k204229 <lucio-eceiza@dkrz.de> Date: Thu, 20 Mar 2025 15:33:17 +0100 Subject: [PATCH 09/15] doc: update README --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index fc317c9..c4cc18b 100644 --- a/README.md +++ b/README.md @@ -19,9 +19,10 @@ Use the `--user` flag if you do not have super user rights and are not using `an ### Using the python module ```python -from rechunk_data import rechunk_dataset +from rechunk_data import rechunk_dataset, check_chunk_size import xarray as xr dset = xr.open_mfdataset("/data/*", parallel=True, combine="by_coords") +check_chunk_size(dset) # to print the chunksizes of the original set-up new_data = rechunk_dataset(dset) ``` -- GitLab From 136944d090492aee04208799b26594c3f779e740 Mon Sep 17 00:00:00 2001 From: k204229 <lucio-eceiza@dkrz.de> Date: Thu, 20 Mar 2025 15:43:58 +0100 Subject: [PATCH 10/15] fix: black in CI to -l 79, to go with local lint conf --- .gitlab-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 6d32edd..2f87970 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -13,7 +13,7 @@ lint: - pip install .[test] script: - mypy - - black --check src + - black --check -l 79 src - pylint --fail-under 8.5 src/rechunk_data/__init__.py test_39: -- GitLab From f3505027ddc070957e4bdb7c0e9fb23042e43421 Mon Sep 17 00:00:00 2001 From: k204229 <lucio-eceiza@dkrz.de> Date: Thu, 20 Mar 2025 15:52:25 +0100 Subject: [PATCH 11/15] fix: correct asses dimension mapping in _map_chunks() --- src/rechunk_data/_rechunk.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rechunk_data/_rechunk.py b/src/rechunk_data/_rechunk.py index 13d0fc6..9bb3456 100644 --- a/src/rechunk_data/_rechunk.py +++ b/src/rechunk_data/_rechunk.py @@ -199,9 +199,9 @@ def _map_chunks( for dim, index in dim_mapping.items(): if "time" in dim.lower(): updated_chunks[index] = chunk_tuple[0] - elif "y" in dim.lower() or "lat" in dim.lower(): + if "y" in dim.lower() or "lat" in dim.lower(): updated_chunks[index] = chunk_tuple[1] - elif "x" in dim.lower() or "lon" in dim.lower(): + if "x" in dim.lower() or "lon" in dim.lower(): updated_chunks[index] = chunk_tuple[2] return updated_chunks -- GitLab From f10e9801e5b53c1067ca83204c5f9a370337a451 Mon Sep 17 00:00:00 2001 From: k204229 <lucio-eceiza@dkrz.de> Date: Fri, 11 Apr 2025 12:01:30 +0200 Subject: [PATCH 12/15] add: option of autochunk, size, also in cli, remove unncessary cross-check for force_horizontal --- src/rechunk_data/__init__.py | 27 +++- src/rechunk_data/_rechunk.py | 258 +++++++++++++++++++---------------- 2 files changed, 160 insertions(+), 125 deletions(-) diff --git a/src/rechunk_data/__init__.py b/src/rechunk_data/__init__.py index 210fa90..da4524a 100644 --- a/src/rechunk_data/__init__.py +++ b/src/rechunk_data/__init__.py @@ -4,12 +4,9 @@ import argparse import logging from pathlib import Path from typing import List, Optional -from ._rechunk import ( - rechunk_netcdf_file, - check_chunk_size, - rechunk_dataset, # noqa - logger, -) + +from ._rechunk import rechunk_dataset # noqa +from ._rechunk import check_chunk_size, logger, rechunk_netcdf_file __all__ = [ "rechunk_netcdf_file", @@ -29,7 +26,7 @@ def parse_args(argv: Optional[List[str]]) -> argparse.Namespace: prog=PROGRAM_NAME, description=( "Rechunk input netcdf data to optimal chunk-size." - " approx. 126 MB per chunk" + " approx. 126 MB per chunk. By default it only optimises time." ), formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) @@ -60,6 +57,20 @@ def parse_args(argv: Optional[List[str]]) -> argparse.Namespace: default="netcdf4", type=str, ) + parser.add_argument( + "--size", + "-s", + help=("Specify chunk-size (in MiB)."), + default=None, + type=str, + ) + parser.add_argument( + "--auto-chunks", + "-ac", + help="Allow Dask to determine optimal chunk sizes for all dimensions.", + action="store_true", + default=False, + ) parser.add_argument( "--skip-cf-convention", help="Do not assume assume data variables follow CF conventions.", @@ -110,6 +121,8 @@ def cli(argv: Optional[List[str]] = None) -> None: args.input, args.output, engine=args.netcdf_engine, + size=args.size, + auto_chunks=args.auto_chunks, decode_cf=args.skip_cf_convention is False, force_horizontal=args.force_horizontal, ) diff --git a/src/rechunk_data/_rechunk.py b/src/rechunk_data/_rechunk.py index 9bb3456..42384d1 100644 --- a/src/rechunk_data/_rechunk.py +++ b/src/rechunk_data/_rechunk.py @@ -1,16 +1,16 @@ """Rechunking module.""" -import os import logging +import os from pathlib import Path -from typing import cast, Any, Dict, Hashable, Generator, Optional, Tuple -from typing_extensions import Literal +from typing import Any, Dict, Generator, Hashable, Optional, Tuple, cast -from dask.utils import format_bytes -from dask.array.core import Array -import xarray as xr +import dask import numpy as np - +import xarray as xr +from dask.array.core import Array +from dask.utils import format_bytes +from typing_extensions import Literal logging.basicConfig( format="%(name)s - %(levelname)s - %(message)s", level=logging.ERROR @@ -42,6 +42,7 @@ ENCODINGS = dict( "dtype", }, ) +default_chunk_size = 126.0 # in MiB def _search_for_nc_files(input_path: Path) -> Generator[Path, None, None]: @@ -123,7 +124,7 @@ def _optimal_chunks( time_size: int, y_size: int, x_size: int, - target_mb: float = 126.0, + size: float = default_chunk_size, dtype: np.dtype = np.dtype("float32"), only_horizontal: bool = True, ) -> Tuple[int, int, int]: @@ -141,7 +142,7 @@ def _optimal_chunks( Total size of the y (latitude) dimension. x_size: int Total size of the x (longitude) dimension. - target_mb: float, default: 126MB + size: float, default: 126MB Desired chunk size in MB. dtype: np.dtype Encoding dtype. @@ -154,7 +155,7 @@ def _optimal_chunks( Optimal chunk sizes for time, y, and x. """ dtype_size = dtype.itemsize - target_elements = (target_mb * (1024**2)) / dtype_size + target_elements: float = (size * (1024**2)) / dtype_size if y_size == 1 and x_size == 1: only_horizontal = False @@ -170,7 +171,6 @@ def _optimal_chunks( y_chunk = min(y_chunk, y_size) x_chunk = min(x_chunk, x_size) time_chunk = min(time_chunk, time_size) - return (time_chunk, y_chunk, x_chunk) @@ -207,35 +207,11 @@ def _map_chunks( return updated_chunks -def _check_horizontal_unchanged(orig_size: dict, chunksizes: dict): - """ - Checks if any horizontal dimension chunksize = dimension size - - Parameters - ---------- - orig_size: dict - Original sizes of dataset dimensions. - chunksizes: dict - Chunk sizes of dataset dimensions. - - Returns - ------- - bool - True if any horizontal chunk size = original size. - """ - horizontal_dims = [ - dim - for dim in orig_size - if any(keyword in dim.lower() for keyword in ["lon", "lat", "x", "y"]) - ] - horizontal_unchanged = any( - chunksizes.get(dim) == orig_size.get(dim) for dim in horizontal_dims - ) - return horizontal_unchanged - - def _horizontal_chunks( - da: xr.DataArray, chunks: dict, chunksizes: Tuple[int, int, int] + da: xr.DataArray, + chunks: dict, + size: Optional[float] = None, + only_horizontal: bool = True, ) -> dict: """ Updates chunk sizes, forcing horizontal chunking whenever possible. @@ -252,8 +228,11 @@ def _horizontal_chunks( Data variable from the dataset. chunks: dict Original chunking dictionary. - chunksizes: Tuple[int, int, int] - Chunk sizes obtained with the chunks dict. + size: float, default: None + Desired chunk size in MB. + only_horizontal: bool default: True + Whether to force chunking only in the horizontal dimensions, + e.g. disregarding time. Returns ------- @@ -263,44 +242,44 @@ def _horizontal_chunks( try: orig_size = dict(da.sizes) chunk_order = dict(zip(da.dims, chunks)) - chunksizes_dict = dict(zip(da.dims, chunksizes)) - if _check_horizontal_unchanged(orig_size, chunksizes_dict): - time_size = next( - ( - value - for key, value in orig_size.items() - if isinstance(key, str) and "time" in key.lower() - ), - 1, - ) - - y_size = next( - ( - value - for key, value in orig_size.items() - if isinstance(key, str) - and any(k in key.lower() for k in ["lat", "y"]) - ), - 1, - ) + time_size = next( + ( + value + for key, value in orig_size.items() + if isinstance(key, str) and "time" in key.lower() + ), + 1, + ) - x_size = next( - ( - value - for key, value in orig_size.items() - if isinstance(key, str) - and any(k in key.lower() for k in ["lon", "x"]) - ), - 1, - ) - dtype = da.encoding.get("dtype", np.dtype("float32")) - chunk_tuple = _optimal_chunks( - time_size, y_size, x_size, dtype=dtype - ) - return _map_chunks(chunks, chunk_order, chunk_tuple) - else: - return chunks + y_size = next( + ( + value + for key, value in orig_size.items() + if isinstance(key, str) + and any(k in key.lower() for k in ["lat", "y"]) + ), + 1, + ) + x_size = next( + ( + value + for key, value in orig_size.items() + if isinstance(key, str) + and any(k in key.lower() for k in ["lon", "x"]) + ), + 1, + ) + dtype = da.encoding.get("dtype", np.dtype("float32")) + chunk_tuple = _optimal_chunks( + time_size, + y_size, + x_size, + size=size if size is not None else default_chunk_size, + dtype=dtype, + only_horizontal=only_horizontal, + ) + return _map_chunks(chunks, chunk_order, chunk_tuple) except Exception as e: logger.error(f"Error in _horizontal_chunks: {e}", exc_info=True) return chunks @@ -309,17 +288,23 @@ def _horizontal_chunks( def _rechunk_dataset( dset: xr.Dataset, engine: Literal["h5netcdf", "netcdf4"], + size: Optional[int] = None, + auto_chunks: bool = False, force_horizontal: bool = False, ) -> Tuple[xr.Dataset, Dict[Hashable, Dict[str, Any]]]: """ - Rechunks a dataset to optimize chunk sizes for storage and computation. + Rechunk a xarray dataset. Parameters ---------- - dset: xr.Dataset - The dataset to be rechunked. + dset: xarray.Dataset + Input dataset that is going to be rechunked engine: str, default: netcdf4 - The NetCDF engine used for encoding the new dataset. + The netcdf engine used to create the new netcdf file. + size: int, default: None + Desired chunk size in MB. If None, computed by Dask or default_chunk_size. + auto_chunks: bool, default: False + If True, Dask automatically determines the optimal chunk size. force_horizontal: bool, default: False If True, forces horizontal chunking whenever possible. @@ -345,20 +330,39 @@ def _rechunk_dataset( logger.debug("Skipping rechunking variable %s", var) continue logger.debug("Rechunking variable %s", var) - chunks: Dict[int, Optional[str]] = {} - for i, dim in enumerate(map(str, dset[var].dims)): - if any( - keyword in dim.lower() - for keyword in ["lon", "lat", "bnds", "x", "y"] - ): - chunks[i] = None - else: - chunks[i] = "auto" old_chunks = dset[var].encoding.get("chunksizes") - new_chunks = dset[var].data.rechunk(chunks).chunksize + chunks: Dict[int, Optional[str]] = { + i: "auto" for i, _ in enumerate(dset[var].dims) + } if force_horizontal: - chunks = _horizontal_chunks(dset[var], chunks, new_chunks) + if auto_chunks: + only_horizontal = False + else: + only_horizontal = True + chunks = _horizontal_chunks( + dset[var], + chunks, + size=size, + only_horizontal=only_horizontal, + ) new_chunks = dset[var].data.rechunk(chunks).chunksize + else: + if not auto_chunks: + for i, dim in enumerate(map(str, dset[var].dims)): + if any( + keyword in dim.lower() + for keyword in ["lon", "lat", "bnds", "x", "y"] + ): + chunks[i] = None + if size: + with dask.config.set(array__chunk_size=f"{size}MiB"): + new_chunks = ( + dset[var] + .data.rechunk(chunks, balance=True, method="tasks") + .chunksize + ) + else: + new_chunks = dset[var].data.rechunk(chunks).chunksize if new_chunks == old_chunks: logger.debug("%s: chunk sizes already optimized, skipping", var) continue @@ -386,52 +390,70 @@ def _rechunk_dataset( def rechunk_dataset( dset: xr.Dataset, engine: Literal["h5netcdf", "netcdf4"] = "netcdf4", + size: Optional[int] = None, + auto_chunks: bool = False, force_horizontal: bool = False, ) -> xr.Dataset: - """Rechunk a xarray dataset. + """ + Rechunk a xarray dataset. Parameters ---------- - dset: xarray.Dataset - Input dataset that is going to be rechunked - engine: str, default: netcdf4 - The netcdf engine used to create the new netcdf file. - force_horizontal: bool, default: False - If True, forces horizontal chunking whenever possible. + dset : xr.Dataset + The dataset to be rechunked. + engine : Literal["h5netcdf", "netcdf4"], optional + The engine to use for writing the netCDF file. Defaults to "netcdf4". + size : Optional[int], optional + The desired chunk size in MiB. If None, the default chunk size is used. + Defaults to None. + auto_chunks : bool, optional + If True, determine the chunk size automatically using Dask. Defaults to False. + force_horizontal : bool, optional + If True, force the chunk size to be in the horizontal dimensions + (y and x). Defaults to False. Returns ------- - xarray.Dataset: rechunked dataset + xr.Dataset + The rechunked dataset. """ - data, _ = _rechunk_dataset(dset.chunk(), engine, force_horizontal) + data, _ = _rechunk_dataset( + dset.chunk(), engine, size, auto_chunks, force_horizontal + ) return data def rechunk_netcdf_file( input_path: os.PathLike, output_path: Optional[os.PathLike] = None, - decode_cf: bool = True, engine: Literal["h5netcdf", "netcdf4"] = "netcdf4", + size: Optional[int] = None, + auto_chunks: bool = False, + decode_cf: bool = True, force_horizontal: bool = False, ) -> None: - """Rechunk NetCDF files. + """ + Rechunk a netCDF file. Parameters ---------- - input_path: os.PathLike - Input file or directory. If a directory is given, all ``.nc`` files in all - subdirectories will be processed. - output_path: os.PathLike, optional - Output file or directory for the chunked NetCDF file(s). If ``input_path`` - is a directory, ``output_path`` should also be a directory. If None (default), - the ``input_path`` will be overwritten. - decode_cf: bool, default: True - Whether to decode variables, assuming they were saved according to CF - conventions. - engine: str, default: netcdf4 - The NetCDF engine used to create the new NetCDF file. - force_horizontal: bool, default: False - If True, forces horizontal chunking whenever possible. + input_path : os.PathLike + The path to the netCDF file or directory to be rechunked. + output_path : Optional[os.PathLike], optional + The path to the directory or file where the rechunked data will be saved. + If None, the file is overwritten. Defaults to None. + engine : Literal["h5netcdf", "netcdf4"], optional + The engine to use for writing the netCDF file. Defaults to "netcdf4". + size : Optional[int], optional + The desired chunk size in MiB. If None, the default chunk size is used. + Defaults to None. + auto_chunks : bool, optional + If True, determine the chunk size automatically using Dask. Defaults to False. + decode_cf : bool, optional + Whether to decode CF conventions. Defaults to True. + force_horizontal : bool, optional + If True, force the chunk size to be in the horizontal dimensions + (y and x). Defaults to False. Returns ------- @@ -457,7 +479,7 @@ def rechunk_netcdf_file( decode_cf=decode_cf, ) as nc_data: new_data, encoding = _rechunk_dataset( - nc_data, engine, force_horizontal + nc_data, engine, size, auto_chunks, force_horizontal ) if encoding: logger.debug( -- GitLab From 2687a49b15f155c1a665f5fb50b9d8ee31d34ad7 Mon Sep 17 00:00:00 2001 From: k204229 <lucio-eceiza@dkrz.de> Date: Fri, 11 Apr 2025 12:01:50 +0200 Subject: [PATCH 13/15] update tests --- src/rechunk_data/__init__.pyi | 5 +- src/rechunk_data/tests/conftest.py | 4 +- .../tests/test_check_chunksize.py | 2 +- src/rechunk_data/tests/test_cli.py | 6 +- src/rechunk_data/tests/test_rechunk_netcdf.py | 13 ++-- src/rechunk_data/tests/test_rechunking.py | 75 ++++++++++++------- 6 files changed, 66 insertions(+), 39 deletions(-) diff --git a/src/rechunk_data/__init__.pyi b/src/rechunk_data/__init__.pyi index 8a11300..8bdf7f4 100644 --- a/src/rechunk_data/__init__.pyi +++ b/src/rechunk_data/__init__.pyi @@ -3,17 +3,16 @@ import os from pathlib import Path from typing import ( Any, - Generator, - Mapping, Dict, + Generator, Hashable, List, Optional, Tuple, ) -from typing_extensions import Literal import xarray as xr +from typing_extensions import Literal def parse_args() -> argparse.Namespace: ... def _search_for_nc_files(input_path: Path) -> Generator[Path, None, None]: ... diff --git a/src/rechunk_data/tests/conftest.py b/src/rechunk_data/tests/conftest.py index 02d03b1..0a6612b 100644 --- a/src/rechunk_data/tests/conftest.py +++ b/src/rechunk_data/tests/conftest.py @@ -1,12 +1,12 @@ """pytest definitions to run the unittests.""" from pathlib import Path -from tempfile import TemporaryDirectory, NamedTemporaryFile +from tempfile import NamedTemporaryFile, TemporaryDirectory from typing import Generator, Tuple import dask -import pytest import numpy as np +import pytest import xarray as xr diff --git a/src/rechunk_data/tests/test_check_chunksize.py b/src/rechunk_data/tests/test_check_chunksize.py index 78a9365..22d6edb 100644 --- a/src/rechunk_data/tests/test_check_chunksize.py +++ b/src/rechunk_data/tests/test_check_chunksize.py @@ -1,7 +1,7 @@ """Unit tests for checking the chunksize of the data.""" -import xarray as xr import numpy as np +import xarray as xr from rechunk_data import check_chunk_size diff --git a/src/rechunk_data/tests/test_cli.py b/src/rechunk_data/tests/test_cli.py index e428fd2..6ed1aaf 100644 --- a/src/rechunk_data/tests/test_cli.py +++ b/src/rechunk_data/tests/test_cli.py @@ -1,11 +1,13 @@ """Unit tests for the cli.""" +import sys +from io import StringIO from pathlib import Path from tempfile import TemporaryDirectory -from io import StringIO + import pytest + from rechunk_data import cli -import sys def test_command_line_interface(data_dir: Path) -> None: diff --git a/src/rechunk_data/tests/test_rechunk_netcdf.py b/src/rechunk_data/tests/test_rechunk_netcdf.py index afc8f7d..3bc542c 100644 --- a/src/rechunk_data/tests/test_rechunk_netcdf.py +++ b/src/rechunk_data/tests/test_rechunk_netcdf.py @@ -1,13 +1,14 @@ """Test the actual rechunk method.""" import logging -from pathlib import Path import time +from pathlib import Path from tempfile import NamedTemporaryFile, TemporaryDirectory import dask import pytest -from rechunk_data import rechunk_netcdf_file, rechunk_dataset + +from rechunk_data import rechunk_dataset, rechunk_netcdf_file from rechunk_data._rechunk import _save_dataset @@ -62,13 +63,15 @@ def test_wrong_or_format(small_chunk_data, caplog) -> None: _, loglevel, message = caplog.record_tuples[-1] assert loglevel == logging.ERROR assert "Error while" in message - _save_dataset(small_chunk_data, temp_file, {}, "foo") + _save_dataset(small_chunk_data, temp_file, {}, "foo") # type: ignore[arg-type] _, loglevel, message = caplog.record_tuples[-1] - _save_dataset(small_chunk_data, temp_file, {"foo": "bar"}, "foo") + _save_dataset( + small_chunk_data, temp_file, {"foo": "bar"}, "foo" + ) # type: ignore[arg-type] _, loglevel, message = caplog.record_tuples[-1] assert loglevel == logging.ERROR def test_wrong_engine(small_chunk_data) -> None: with pytest.raises(ValueError): - rechunk_dataset(small_chunk_data, engine="foo") + rechunk_dataset(small_chunk_data, engine="foo") # type: ignore[arg-type] diff --git a/src/rechunk_data/tests/test_rechunking.py b/src/rechunk_data/tests/test_rechunking.py index 7a4f505..4f0e9e1 100644 --- a/src/rechunk_data/tests/test_rechunking.py +++ b/src/rechunk_data/tests/test_rechunking.py @@ -1,14 +1,13 @@ """Unit tests for rechunking the data.""" import dask -import xarray as xr import numpy as np +import xarray as xr from rechunk_data._rechunk import ( - _rechunk_dataset, - _optimal_chunks, - _check_horizontal_unchanged, _horizontal_chunks, + _optimal_chunks, + _rechunk_dataset, ) @@ -75,20 +74,6 @@ def test_optimal_chunks(): assert time_chunk < time_size -def test_check_horizontal_unchanged(): - """Test detection of horizontal chunks remaining unchanged.""" - orig_size = {"time": 100, "lat": 192, "lon": 384} - - chunksizes = {"time": 50, "lat": 192, "lon": 384} - assert _check_horizontal_unchanged(orig_size, chunksizes) is True - - chunksizes = {"time": 50, "lat": 192, "lon": 128} - assert _check_horizontal_unchanged(orig_size, chunksizes) is True - - chunksizes = {"time": 50, "lat": 128, "lon": 128} - assert _check_horizontal_unchanged(orig_size, chunksizes) is False - - def test_horizontal_chunks(): """Test horizontal chunking function and _rechunk_dataset() with force_horizontal.""" @@ -96,18 +81,12 @@ def test_horizontal_chunks(): da = xr.DataArray(data, dims=("time", "lat", "lon"), name="test_var") chunks = {0: "auto", 1: None, 2: None} - chunksizes = (1, 100, 1200) - - updated_chunks = _horizontal_chunks(da, chunks, chunksizes) + updated_chunks = _horizontal_chunks(da, chunks) assert updated_chunks[0] == 100 assert updated_chunks[1] < 1100 assert updated_chunks[2] < 1200 - chunksizes = (50, 100, 600) - updated_chunks = _horizontal_chunks(da, chunks, chunksizes) - assert updated_chunks == chunks - - updated_chunks = _horizontal_chunks("invalid_data", chunks, chunksizes) + updated_chunks = _horizontal_chunks("invalid_data", chunks) assert updated_chunks == chunks dset = xr.Dataset({"test_var": da}) @@ -136,3 +115,47 @@ def test_horizontal_chunks(): assert chunksizes_applied[0] == 100 assert chunksizes_applied[1] < 1100 assert chunksizes_applied[2] < 1200 + + dset["test_var"].encoding = encoding + _, encoding = _rechunk_dataset( + dset, engine="netcdf4", force_horizontal=True, auto_chunks=True + ) + chunksizes_applied = encoding["test_var"].get("chunksizes", None) + assert chunksizes_applied[0] < 100 + assert chunksizes_applied[1] < 1100 + assert chunksizes_applied[2] < 1200 + + +def test_auto_size_chunks(): + """ + Test the automatic chunking and size adjustment functionality. + """ + data = np.random.rand(100, 1100, 1200) + da = xr.DataArray(data, dims=("time", "lat", "lon"), name="test_var") + dset = xr.Dataset({"test_var": da}) + chunksizes = (1, 1, 1200) + dset = dset.chunk( + {"time": chunksizes[0], "lat": chunksizes[1], "lon": chunksizes[2]} + ) + encoding = { + "test_var": { + "zlib": True, + "complevel": 4, + "shuffle": True, + "dtype": "float32", + "chunksizes": chunksizes, + "_FillValue": np.nan, + } + } + dset["test_var"].encoding = encoding + _, encoding = _rechunk_dataset(dset, engine="netcdf4", auto_chunks=True) + chunksizes_applied = encoding["test_var"].get("chunksizes", None) + assert chunksizes_applied[0] == 100 + assert chunksizes_applied[2] == 1200 + + dset["test_var"].encoding = encoding + _, encoding = _rechunk_dataset(dset, engine="netcdf4", size=20) + chunksizes_applied = encoding["test_var"].get("chunksizes", None) + assert 1 < chunksizes_applied[0] < 100 + assert 1 < chunksizes_applied[1] < 1100 + assert chunksizes_applied[2] == 1200 -- GitLab From 254ad92854f84193c8c67aad86f4f0efdf501f3c Mon Sep 17 00:00:00 2001 From: k204229 <lucio-eceiza@dkrz.de> Date: Fri, 11 Apr 2025 12:02:35 +0200 Subject: [PATCH 14/15] add isort to formatting, and ipython to dev env --- Makefile | 5 +++++ setup.py | 18 +++++++++++++++--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 233482b..73c495a 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,12 @@ test: $(PWD)/src/rechunk_data/tests python3 -m coverage report +format: + isort --profile=black src + black -t py310 -l 79 src + lint: mypy --install-types --non-interactive + isort --check-only --profile=black src black --check -t py310 -l 79 src flake8 src/rechunk_data --count --max-complexity=15 --max-line-length=88 --statistics --doctests \ No newline at end of file diff --git a/setup.py b/setup.py index 9aa3f8e..3bbc8c6 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,8 @@ import json from pathlib import Path -from setuptools import setup, find_packages + +from setuptools import find_packages, setup def read(*parts): @@ -35,14 +36,25 @@ setup( packages=find_packages("src"), package_dir={"": "src"}, entry_points={ - "console_scripts": [f"{find_key(key='PROGRAM_NAME')} = rechunk_data:cli"] + "console_scripts": [ + f"{find_key(key='PROGRAM_NAME')} = rechunk_data:cli" + ] }, - install_requires=["argparse", "dask", "xarray", "h5netcdf", "netCDF4", "typing_extensions"], + install_requires=[ + "argparse", + "dask", + "xarray", + "h5netcdf", + "netCDF4", + "typing_extensions", + ], extras_require={ "test": [ "black", + "isort", "mock", "mypy", + "ipython", "nbformat", "pytest", "pylint", -- GitLab From 1b2446bc2d86179afcd756003562b2d877e41c36 Mon Sep 17 00:00:00 2001 From: k204229 <lucio-eceiza@dkrz.de> Date: Fri, 11 Apr 2025 12:40:14 +0200 Subject: [PATCH 15/15] update README --- README.md | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c4cc18b..fbef163 100644 --- a/README.md +++ b/README.md @@ -29,10 +29,12 @@ new_data = rechunk_dataset(dset) ### Using the command line interface: ```bash -rechunk-data --help -usage: rechunk-data [-h] [-o OUTPUT] [--netcdf-engine {h5netcdf,netcdf4}] [--skip-cf-convention] [--force-horizontal] [--check-chunk-size] [-v] [-V] input +rechunk-data --help +usage: rechunk-data [-h] [-o OUTPUT] [--netcdf-engine {h5netcdf,netcdf4}] [--size SIZE] [--auto-chunks] [--skip-cf-convention] [--force-horizontal] [--check-chunk-size] + [-v] [-V] + input -Rechunk input netcdf data to optimal chunk-size. approx. 126 MB per chunk +Rechunk input netcdf data to optimal chunk-size. approx. 126 MB per chunk. By default it only optimises time. positional arguments: input Input file/directory. If a directory is given all ``.nc`` files in all sub directories will be processed @@ -43,6 +45,8 @@ options: ``input`` is overridden. (default: None) --netcdf-engine {h5netcdf,netcdf4} The netcdf engine used to create the new netcdf file. (default: netcdf4) + --size, -s SIZE Specify chunk-size (in MiB). (default: None) + --auto-chunks, -ac Allow Dask to determine optimal chunk sizes for all dimensions. (default: False) --skip-cf-convention Do not assume assume data variables follow CF conventions. (default: False) --force-horizontal, -fh Force horizontal chunking (~126 MB per chunk). (default: False) -- GitLab