Skip to content
Snippets Groups Projects
Commit 74cde724 authored by Martin Bergemann's avatar Martin Bergemann :speech_balloon:
Browse files

Merge branch 'fix_erros' into 'main'

Better error handling

See merge request !3
parents e96c82b9 2c7f7787
No related branches found
No related tags found
1 merge request!3Better error handling
......@@ -4,9 +4,10 @@ import argparse
import logging
import os
from pathlib import Path
from typing import Iterator, Optional
from typing import Any, Dict, Iterator, Optional, Tuple
from dask.utils import format_bytes
from dask.array.core import Array
import xarray as xr
__version__ = "2206.0.2"
......@@ -86,8 +87,31 @@ def _search_for_nc_files(input_path: Path) -> Iterator[Path]:
yield ncfile
def _rechunk_dataset(dset: xr.Dataset, engine: str = "h5netcdf") -> xr.Dataset:
def _save_dataset(
dset: xr.Dataset, file_name: Path, encoding: dict[str, Any], engine: str
) -> None:
if not encoding:
logger.debug("Chunk size already optimized for %s", file_name.name)
return
logger.debug("Saving file ot %s", file_name)
try:
dset.to_netcdf(
file_name,
engine=engine,
encoding=encoding,
)
except Exception as error:
logger.warning("Saving to file failed: %s", error.__str__())
def _rechunk_dataset(dset: xr.Dataset) -> Tuple[xr.Dataset, Dict[str, Any]]:
encoding: Dict[str, Any] = {}
dset_chunks = dset.chunks
for var in dset.data_vars:
skip_var: bool = False
if not isinstance(dset[var].data, Array):
logger.debug("Skipping rechunking variable %s", var)
continue
logger.debug("Rechunking variable %s", var)
chunks = {}
for i, dim in enumerate(dset[var].dims):
......@@ -96,16 +120,29 @@ def _rechunk_dataset(dset: xr.Dataset, engine: str = "h5netcdf") -> xr.Dataset:
else:
chunks[i] = "auto"
try:
dset[var].data = dset[var].data.rechunk(chunks)
old_chunks = dset[var].encoding.get("chunksizes")
new_chunks = dset[var].data.rechunk(chunks).chunksize
if new_chunks == old_chunks:
logger.debug("%s: chunk sizes already optimized, skipping", var)
skip_var = True
continue
dset[var] = dset[var].chunk(dict(zip(dset[var].dims, new_chunks)))
logger.debug(
"%s: old chunk size: %s, new chunk size: %s",
var,
old_chunks,
new_chunks,
)
except Exception as error:
logger.warning("Could not set chunk size for %s: ", var, error.__str__())
logger.warning("Could not set chunk size for %s: %s", var, error.__str__())
continue
logger.debug("Settings encoding of variable %s", var)
dset[var].encoding["chunksizes"] = dset[var].data.chunksize
dset[var].encoding["zlib"] = True
dset[var].encoding["complevel"] = 4
logger.debug("Loading data into memory (%s).", format_bytes(dset.nbytes))
return dset.load()
if not skip_var:
logger.debug("Settings encoding of variable %s", var)
dset[var].encoding["chunksizes"] = new_chunks
dset[var].encoding["zlib"] = True
dset[var].encoding["complevel"] = 4
encoding[var] = dset[var].encoding
return (dset, encoding)
def rechunk_netcdf_file(
......@@ -137,10 +174,21 @@ def rechunk_netcdf_file(
else:
output_file = Path(output_path)
output_file.parent.mkdir(exist_ok=True, parents=True)
with xr.open_mfdataset(input_file, parallel=True) as nc_data:
new_data = _rechunk_dataset(nc_data)
logger.debug("Saving file ot %s", output_file.with_suffix(input_file.suffix))
new_data.to_netcdf(output_file.with_suffix(input_file.suffix), engine=engine)
try:
with xr.open_mfdataset(input_file, parallel=True) as nc_data:
new_data, encoding = _rechunk_dataset(nc_data)
if encoding:
logger.debug(
"Loading data into memory (%s).", format_bytes(new_data.nbytes)
)
new_data = new_data.load()
except Exception as error:
logger.error(
"Error while processing file %s: %s", input_file, error.__str__()
)
_save_dataset(
new_data, output_file.with_suffix(input_file.suffix), encoding, engine
)
def cli() -> None:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment