Skip to content
Snippets Groups Projects
Commit 0903e5f0 authored by antarcticrainforest's avatar antarcticrainforest
Browse files

Inital commit

parent 0b7770ec
No related branches found
No related tags found
No related merge requests found
## 2022.03.08
Added plugin template repository
Security:
-
Features:
-
-
Fix:
-
LICENSE 0 → 100644
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
Version 2, December 2004
Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
Everyone is permitted to copy and distribute verbatim or modified
copies of this license document, and changing it is allowed as long
as the name is changed.
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
0. You just DO WHAT THE FUCK YOU WANT TO.
setup.py 0 → 100644
#!/usr/bin/env python3
"""Setup script for packaging checkin."""
import json
from pathlib import Path
from setuptools import setup, find_packages
def read(*parts):
"""Read the content of a file."""
script_path = Path(__file__).parent
with script_path.joinpath(*parts).open() as f:
return f.read()
def find_key(pck_name: str = "rechunk_data", key: str = "__version__"):
vers_file = Path(__file__).parent / "src" / pck_name / "__init__.py"
with vers_file.open() as f:
for line in f.readlines():
if key in line:
return json.loads(line.split("=")[-1].strip())
raise ValueError(f"{key} not found in {pck_name}")
setup(
name="rechunk_data",
version=find_key("rechunk_data"),
author="Martin Bergemann",
author_email="bergemann@dkrz.de",
maintainer="Martin Bergemann",
url="https://gitlab.dkrz.de/k204230/install-kernelspec",
description="Rechunk netCDF4 to optimal chunksize.",
long_description=read("README.md"),
license="WTFPL",
packages=find_packages("src"),
package_dir={"": "src"},
entry_points={
"console_scripts": [f"{find_key(key='ProgramName')} = rechunk_data:cli"]
},
install_requires=["argparse", "dask", "xarray", "h5netcdf", "netCDF4"],
python_requires=">=3.6",
classifiers=[
"Development Status :: 3 - Alpha",
"Environment :: Console",
"Intended Audience :: Developers",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: BSD License",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: 3",
],
)
"""CLI for rechunking netcdf data."""
import argparse
import logging
import os
from pathlib import Path
from typing import Iterator, Optional
from dask.utils import format_bytes
import xarray as xr
__version__ = "2206.0.1"
ProgramName = "rechunk-data"
logging.basicConfig(
format="%(name)s - %(levelname)s - %(message)s", level=logging.ERROR
)
logger = logging.getLogger(ProgramName)
def parse_args() -> argparse.Namespace:
"""Parse arguments for the command line parser."""
parser = argparse.ArgumentParser(
prog=ProgramName,
description=(
"Rechunk input netcdf data to optimal chunk-size."
" approx. 126 MB per chunk"
),
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"input",
type=Path,
help=(
"Input file/directory. If a directory is given "
"all ``.nc`` files in all sub directories will be "
"processed"
),
)
parser.add_argument(
"--output",
type=Path,
help=(
"Output file/directory of the chunked netcdf "
"file(s). Note: If ``input`` is a directory output should be a"
" directory. If None given (default) the ``input`` is overidden."
),
default=None,
)
parser.add_argument(
"--netcdf-engine",
help=("The netcdf engine used to create the new netcdf file."),
choices=("h5netcdf", "netcdf4"),
default="h5netcdf",
type=str,
)
parser.add_argument(
"-v", action="count", default=0,
)
parser.add_argument(
"-V",
"--version",
action="version",
version="%(prog)s {version}".format(version=__version__),
)
args = parser.parse_args()
logger.setLevel(max(logging.ERROR - (10 + args.v * 10), 0))
return args
def _search_for_nc_files(input_path: Path) -> Iterator[Path]:
suffixes = [".nc", "nc4"]
input_path = input_path.expanduser().absolute()
if input_path.is_dir() and input_path.exists():
nc_iter = input_path.rglob("*.*")
elif input_path.is_file() and input_path.exists():
nc_iter = [input_path]
else:
# This could be a path with a glob pattern, let's try to construct it
nc_iter = input_path.parent.rglob(input_path.name)
for ncfile in nc_iter:
if ncfile.suffix in suffixes:
yield ncfile
def _rechunk_dataset(dset: xr.Dataset, engine: str = "h5netcdf") -> xr.Dataset:
for var in dset.data_vars:
logger.debug("Rechunking variable %s", var)
chunks = {}
for i, dim in enumerate(dset[var].dims):
if "lon" in dim.lower() or "lat" in dim.lower() or "bnds" in dim.lower():
chunks[i] = None
else:
chunks[i] = "auto"
dset[var].data = dset[var].data.rechunk(chunks)
logger.debug("Settings encoding of variable %s", var)
dset[var].encoding["chunksizes"] = dset[var].data.chunksize
dset[var].encoding["zlib"] = True
dset[var].encoding["complevel"] = 4
logger.debug("Loading data into memory (%s).", format_bytes(dset.nbytes))
return dset.load()
def rechunk_netcdf_file(
input_path: os.PathLike,
output_path: Optional[os.PathLike] = None,
engine: str = "h5netcdf",
) -> None:
"""Rechunk netcdf files.
Parameters
----------
input_path: os.PathLike
Input file/directory. If a directory is given all ``.nc`` in all sub
directories will be processed
output_path: os.PathLike
Output file/directory of the chunked netcdf file(s). Note: If ``input``
is a directory output should be a directory. If None given (default)
the ``input`` is overidden.
engine: The netcdf engine used to create the new netcdf file.
"""
input_path = Path(input_path).expanduser().absolute()
for input_file in _search_for_nc_files(input_path):
logger.info("Working on file: %s", input_file)
if output_path is None:
output_file = input_file
elif Path(output_path).expanduser().absolute().is_dir():
output_file = Path(output_path).expanduser().absolute()
output_file /= input_file.relative_to(input_path)
else:
output_file = Path(output_path)
output_file.parent.mkdir(exist_ok=True, parents=True)
with xr.open_mfdataset(input_file, parallel=True) as nc_data:
new_data = _rechunk_dataset(nc_data)
logger.debug("Saving file ot %s", output_file.with_suffix(input_file.suffix))
new_data.to_netcdf(output_file.with_suffix(input_file.suffix), engine=engine)
def cli() -> None:
"""Command line interface calling the rechunking method."""
args = parse_args()
rechunk_netcdf_file(args.input, args.output, engine=args.netcdf_engine)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment