Skip to content
Snippets Groups Projects
stream.c 57.09 KiB
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 600
#endif

#ifdef HAVE_LIBFDB5
#include "cdi_fdb.h"
#endif

#include <sys/stat.h>  // struct stat
#include <ctype.h>
#include <string.h>

#include "binary.h"
#include "cdi.h"
#include "cdi_across.h"
#include "cdi_int.h"
#include "cdi_cksum.h"
#include "cdf.h"
#include "dmemory.h"
#include "error.h"
#include "stream_cgribex.h"
#include "stream_grb.h"
#include "stream_cdf.h"
#include "stream_srv.h"
#include "stream_ext.h"
#include "stream_ieg.h"
#include "file.h"
#include "cgribex.h"
#include "gribapi.h"
#include "vlist.h"
#include "serialize.h"
#include "resource_handle.h"
#include "resource_unpack.h"
#include "namespace.h"
#include "async_worker.h"

static stream_t *stream_new_entry(int resH);
static int streamCompareP(void *streamptr1, void *streamptr2);
static void streamDestroyP(void *streamptr);
static void streamPrintP(void *streamptr, FILE *fp);
static int streamGetPackSize(void *streamptr, void *context);
static void streamPack(void *streamptr, void *buff, int size, int *position, void *context);
static int streamTxCode(void *streamptr);

const resOps streamOps = { streamCompareP, streamDestroyP, streamPrintP, streamGetPackSize, streamPack, streamTxCode };

static int
getByteorder(int byteswap)
{
  int byteorder = -1;

  switch (HOST_ENDIANNESS)
    {
    case CDI_BIGENDIAN: byteorder = byteswap ? CDI_LITTLEENDIAN : CDI_BIGENDIAN; break;
    case CDI_LITTLEENDIAN: byteorder = byteswap ? CDI_BIGENDIAN : CDI_LITTLEENDIAN; break;
    /* FIXME: does not currently adjust for PDP endianness */
    case CDI_PDPENDIAN:
    default: Error("unhandled endianness");
    }

  return byteorder;
}

// used also in CDO
int
cdiGetFiletype(const char *uri, int *byteorder)
{
  // clang-format off
  int filetype = CDI_EUFTYPE;
  int swap = 0;
  int version;
  long recpos;

  const char *filename;
  int protocol = cdiGetProtocol(uri, &filename);

  switch (protocol)
    {
    case CDI_PROTOCOL_ACROSS: return CDI_FILETYPE_GRB2;
    case CDI_PROTOCOL_FDB:    return CDI_FILETYPE_GRB2;
    case CDI_PROTOCOL_OTHER:  return CDI_FILETYPE_NC4;  // support for NetCDF remote types and ESDM
    case CDI_PROTOCOL_FILE:
      // handled below;
      break;
    }

  int fileID = fileOpen(filename, "r");

  if      (fileID == CDI_UNDEFID) return CDI_ESYSTEM;
  else if (fileID == -2)          return CDI_ETMOF;

  char buffer[8];
  if (fileRead(fileID, buffer, 8) != 8)
    {
      struct stat buf;
      if (stat(filename, &buf) == 0)
        {
          if (buf.st_size == 0)      return CDI_EISEMPTY;
          if (buf.st_mode & S_IFDIR) return CDI_EISDIR;
        }

      return CDI_EUFTYPE;
    }

  fileRewind(fileID);

  if (memcmp(buffer, "GRIB", 4) == 0)
    {
      version = buffer[7];
      if      (version <= 1) filetype = CDI_FILETYPE_GRB;
      else if (version == 2) filetype = CDI_FILETYPE_GRB2;
    }
  else if (memcmp(buffer, "CDF\001", 4) == 0) { filetype = CDI_FILETYPE_NC; }
  else if (memcmp(buffer, "CDF\002", 4) == 0) { filetype = CDI_FILETYPE_NC2; }
  else if (memcmp(buffer, "CDF\005", 4) == 0) { filetype = CDI_FILETYPE_NC5; }
  else if (memcmp(buffer + 1, "HDF", 3) == 0) { filetype = CDI_FILETYPE_NC4; }
#ifdef HAVE_LIBSERVICE
  else if (srvCheckFiletype(fileID, &swap))   { filetype = CDI_FILETYPE_SRV; }
#endif
#ifdef HAVE_LIBEXTRA
  else if (extCheckFiletype(fileID, &swap))   { filetype = CDI_FILETYPE_EXT; }
#endif
#ifdef HAVE_LIBIEG
  else if (iegCheckFiletype(fileID, &swap))   { filetype = CDI_FILETYPE_IEG; }
#endif
#ifdef HAVE_LIBGRIB
  else if (gribCheckSeek(fileID, &recpos, &version) == 0)
    {
      if      (version <= 1) filetype = CDI_FILETYPE_GRB;
      else if (version == 2) filetype = CDI_FILETYPE_GRB2;
    }
#endif
  // clang-format on

  if (CDI_Debug && filetype != CDI_EUFTYPE) Message("found %s file = %s", strfiletype(filetype), filename);
  fileClose(fileID);

  *byteorder = getByteorder(swap);

  return filetype;
}

/*
@Function  streamInqFiletype
@Title     Get the filetype

@Prototype int streamInqFiletype(int streamID)
@Parameter
    @Item  streamID  Stream ID, from a previous call to @fref{streamOpenRead} or @fref{streamOpenWrite}.

@Description
The function @func{streamInqFiletype} returns the filetype of a stream.

@Result
@func{streamInqFiletype} returns the type of the file format,
one of the set of predefined CDI file format types.
The valid CDI file format types are @func{CDI_FILETYPE_GRB}, @func{CDI_FILETYPE_GRB2}, @func{CDI_FILETYPE_NC},
@func{CDI_FILETYPE_NC2}, @func{CDI_FILETYPE_NC4}, @func{CDI_FILETYPE_NC4C}, @func{CDI_FILETYPE_NC5},
@func{CDI_FILETYPE_NCZARR}, @func{CDI_FILETYPE_SRV}, @func{CDI_FILETYPE_EXT} and @func{CDI_FILETYPE_IEG}.
@EndFunction
*/
int
streamInqFiletype(int streamID)
{
  stream_t *streamptr = stream_to_pointer(streamID);
  return streamptr->filetype;
}

int
getByteswap(int byteorder)
{
  int byteswap = -1;

  switch (byteorder)
    {
    case CDI_BIGENDIAN:
    case CDI_LITTLEENDIAN:
    case CDI_PDPENDIAN: byteswap = (HOST_ENDIANNESS != byteorder); break;
    case -1: break;
    default: Error("unexpected byteorder %d query!", byteorder);
    }

  return byteswap;
}

void
streamDefMaxSteps(int streamID, int maxSteps)
{
  if (maxSteps >= 0)
    {
      stream_t *streamptr = stream_to_pointer(streamID);
      streamptr->maxSteps = maxSteps;
    }
}

int
streamInqNumSteps(int streamID)
{
  stream_t *streamptr = stream_to_pointer(streamID);

  long ntsteps = streamptr->ntsteps;
  if (ntsteps == (long) CDI_UNDEFID)
    {
      int tsID = 0;
      while (streamInqTimestep(streamID, tsID++)) ntsteps = streamptr->ntsteps;
    }

  return (int) ntsteps;
}

static long
get_max_global_recs(stream_t *streamptr)
{
  long maxGlobalRecs = -1;
  const tsteps_t *tsteps = streamptr->tsteps;
  if (tsteps)
    {
      maxGlobalRecs = tsteps[0].nrecs;
      long numSteps = streamptr->ntsteps;
      if (numSteps > 1) maxGlobalRecs += tsteps[1].nrecs * (numSteps - 1);
    }
  return maxGlobalRecs;
}

void
streamDefNumWorker(int streamID, int numWorker)
{
  if (numWorker > 0)
    {
      stream_t *streamptr = stream_to_pointer(streamID);
      int filetype = streamptr->filetype;

      if (streamptr->filemode == 'r')
        {
          if (cdiBaseFiletype(filetype) == CDI_FILETYPE_GRIB)
            {
              (void) streamInqNumSteps(streamID);
              streamptr->maxGlobalRecs = get_max_global_recs(streamptr);
            }
#ifdef HAVE_LIBNETCDF
          else if (filetype == CDI_FILETYPE_NCZARR || (CDI_Test && cdiBaseFiletype(filetype) == CDI_FILETYPE_NETCDF))
            {
              streamptr->maxGlobalRecs = get_max_global_recs(streamptr);
              if (CDI_Test) Message("numWorker=%d", numWorker);
              if (CDI_Test) Message("maxGlobalRecs=%ld", streamptr->maxGlobalRecs);
              if (streamptr->maxGlobalRecs == -1) xabort("Internal error: number of timesteps missing!");
              if (streamptr->maxGlobalRecs == 1) numWorker = 0;
              if (numWorker > streamptr->maxGlobalRecs) numWorker = (int) streamptr->maxGlobalRecs;
              if (streamptr->chunkSizeTdim > 1 && numWorker > streamptr->nvars) numWorker = streamptr->nvars;
              if (streamptr->chunkSizeZdim > 1) numWorker = 0;
              if (CDI_Test) Message("chunkSizeTdim=%d chunkSizeZdim=%d", streamptr->chunkSizeTdim, streamptr->chunkSizeZdim);
            }
#endif
          else
            {
              numWorker = 0;
            }

          streamptr->numWorker = numWorker;
          if (CDI_Debug || CDI_Test) Message("Number of asynchronous worker: %d", numWorker);
        }
    }
}

/*
@Function  streamDefByteorder
@Title     Define the byte order

@Prototype void streamDefByteorder(int streamID, int byteorder)
@Parameter
    @Item  streamID  Stream ID, from a previous call to @fref{streamOpenWrite}.
    @Item  byteorder The byte order of a dataset, one of the CDI constants @func{CDI_BIGENDIAN} and
                     @func{CDI_LITTLEENDIAN}.

@Description
The function @func{streamDefByteorder} defines the byte order of a binary dataset
with the file format type @func{CDI_FILETYPE_SRV}, @func{CDI_FILETYPE_EXT} or @func{CDI_FILETYPE_IEG}.

@EndFunction
*/
void
streamDefByteorder(int streamID, int byteorder)
{
  stream_t *streamptr = stream_to_pointer(streamID);
  streamptr->byteorder = byteorder;
  int filetype = streamptr->filetype;

  switch (filetype)
    {
#ifdef HAVE_LIBSERVICE
    case CDI_FILETYPE_SRV:
      {
        srvrec_t *srvp = (srvrec_t *) streamptr->record->objectp;
        srvp->byteswap = getByteswap(byteorder);

        break;
      }
#endif
#ifdef HAVE_LIBEXTRA
    case CDI_FILETYPE_EXT:
      {
        extrec_t *extp = (extrec_t *) streamptr->record->objectp;
        extp->byteswap = getByteswap(byteorder);

        break;
      }
#endif
#ifdef HAVE_LIBIEG
    case CDI_FILETYPE_IEG:
      {
        iegrec_t *iegp = (iegrec_t *) streamptr->record->objectp;
        iegp->byteswap = getByteswap(byteorder);

        break;
      }
#endif
    }

  reshSetStatus(streamID, &streamOps, RESH_DESYNC_IN_USE);
}

/*
@Function  streamInqByteorder
@Title     Get the byte order

@Prototype int streamInqByteorder(int streamID)
@Parameter
    @Item  streamID  Stream ID, from a previous call to @fref{streamOpenRead} or @fref{streamOpenWrite}.

@Description
The function @func{streamInqByteorder} returns the byte order of a binary dataset
with the file format type @func{CDI_FILETYPE_SRV}, @func{CDI_FILETYPE_EXT} or @func{CDI_FILETYPE_IEG}.

@Result
@func{streamInqByteorder} returns the type of the byte order.
The valid CDI byte order types are @func{CDI_BIGENDIAN} and @func{CDI_LITTLEENDIAN}

@EndFunction
*/
int
streamInqByteorder(int streamID)
{
  stream_t *streamptr = stream_to_pointer(streamID);
  return streamptr->byteorder;
}

const char *
streamFilesuffix(int filetype)
{
  static const char noSuffix[] = "";
  static const char ncSuffix[] = ".nc";
  static const char grbSuffix[] = ".grb";
  static const char srvSuffix[] = ".srv";
  static const char extSuffix[] = ".ext";
  static const char iegSuffix[] = ".ieg";

  switch (cdiBaseFiletype(filetype))
    {
    case CDI_FILETYPE_GRIB: return grbSuffix;
    case CDI_FILETYPE_SRV: return srvSuffix;
    case CDI_FILETYPE_EXT: return extSuffix;
    case CDI_FILETYPE_IEG: return iegSuffix;
    case CDI_FILETYPE_NETCDF: return ncSuffix;
    default: return noSuffix;
    }
}

const char *
streamFilename(int streamID)
{
  stream_t *streamptr = stream_to_pointer(streamID);
  return streamptr->filename;
}

static long
cdiInqContents(stream_t *streamptr)
{
  if (streamptr->lockIO) CDI_IO_LOCK();

  long status = 0;
  int filetype = streamptr->filetype;

  switch (cdiBaseFiletype(filetype))
    {
#ifdef HAVE_LIBGRIB
    case CDI_FILETYPE_GRIB:
      {
        switch (streamptr->protocol)
          {
#ifdef HAVE_LIBFDB5
          case CDI_PROTOCOL_FDB: status = fdbInqContents(streamptr); break;
#endif
          case CDI_PROTOCOL_ACROSS:  // TODO read from ACROSS
          case CDI_PROTOCOL_OTHER:
          case CDI_PROTOCOL_FILE: status = grbInqContents(streamptr); break;
          }
        break;
      }
#endif
#ifdef HAVE_LIBSERVICE
    case CDI_FILETYPE_SRV: status = srvInqContents(streamptr); break;
#endif
#ifdef HAVE_LIBEXTRA
    case CDI_FILETYPE_EXT: status = extInqContents(streamptr); break;
#endif
#ifdef HAVE_LIBIEG
    case CDI_FILETYPE_IEG: status = iegInqContents(streamptr); break;
#endif
#ifdef HAVE_LIBNETCDF
    case CDI_FILETYPE_NETCDF: status = cdfInqContents(streamptr); break;
#endif
    default:
      {
        status = CDI_ELIBNAVAIL;
        if (CDI_Debug) Message("%s support not compiled in!", strfiletype(filetype));
      }
    }

  if (streamptr->lockIO) CDI_IO_UNLOCK();

  if (status == 0)
    {
      int taxisID = vlistInqTaxis(streamptr->vlistID);
      if (taxisID != CDI_UNDEFID)
        {
          taxis_t *taxisptr1 = &streamptr->tsteps[0].taxis;
          taxis_t *taxisptr2 = taxisPtr(taxisID);
          ptaxisCopy(taxisptr2, taxisptr1);
        }
    }

  return status;
}

int
cdiGetProtocol(const char *uri, const char **filename)
{
  const char *pos = strstr(uri, "://");
  if (pos == NULL)
    {
      *filename = uri;
      return CDI_PROTOCOL_FILE;
    }

  size_t protocollen = (size_t) (pos - uri);
  *filename = pos + 3;

  // if (strncmp(uri, "file", protocollen) == 0) return CDI_PROTOCOL_FILE; // file is already used in NetCDF
  if (strncmp(uri, "fdb", protocollen) == 0) return CDI_PROTOCOL_FDB;
  if (strncmp(uri, "across", protocollen) == 0) return CDI_PROTOCOL_ACROSS;

  *filename = uri;

  return CDI_PROTOCOL_OTHER;
}

int
cdiStreamOpenDefaultDelegate(const char *uri, char filemode, int filetype, stream_t *streamptr, int recordBufIsToBeCreated)
{
  int fileID;
  char temp[2] = { filemode, 0 };

  const char *filename;
  streamptr->protocol = cdiGetProtocol(uri, &filename);

  switch (streamptr->protocol)
    {
    case CDI_PROTOCOL_ACROSS:
#if defined(HAVE_ACROSS) && defined(HAVE_LIBGRIB_API)
      if (filetype != CDI_FILETYPE_GRB2)
        {
          Warning("ACROSS needs to be used with GRIB2");
          return CDI_EUFTYPE;
        }
      fileID = across_connect(filename, filemode, streamptr);
      if (fileID >= 0)
        {
          streamptr->filetype = filetype;
          if (recordBufIsToBeCreated)
            {
              streamptr->record = (Record *) Malloc(sizeof(Record));
              streamptr->record->buffer = NULL;
            }
        }
      return fileID;
#else
#ifdef HAVE_ACROSS
      Warning("ecCodes support not compiled in (Needed for ACROSS)!");
#else
      Warning("ACROSS support not compiled in!");
#endif
      return CDI_ELIBNAVAIL;
#endif

    case CDI_PROTOCOL_FDB:
#if defined(HAVE_LIBFDB5) && defined(HAVE_LIBGRIB_API)

      if (filetype != CDI_FILETYPE_GRB && filetype != CDI_FILETYPE_GRB2)
        {
          Warning("FDB5 needs to be used with GRIB or GRIB2");
          return CDI_EUFTYPE;
        }

      check_fdb_error(fdb_initialise());
      check_fdb_error(fdb_new_handle((fdb_handle_t **) &(streamptr->protocolData)));
      streamptr->filetype = filetype;
      if (recordBufIsToBeCreated)
        {
          streamptr->record = (Record *) Malloc(sizeof(Record));
          streamptr->record->buffer = NULL;
        }
      return 88;

#else  // !(defined(HAVE_LIBFDB5) && defined(HAVE_LIBGRIB_API))

#ifdef HAVE_LIBFDB5
      Warning("ecCodes support not compiled in (Needed for FDB5)!");
#else
      Warning("FDB5 support not compiled in!");
#endif

      return CDI_ELIBNAVAIL;
#endif

    case CDI_PROTOCOL_OTHER:
    case CDI_PROTOCOL_FILE:
      // handled below;
      break;
    }

  switch (filetype)
    {
#if defined(HAVE_LIBGRIB) && (defined(HAVE_LIBCGRIBEX) || defined(HAVE_LIBGRIB_API))
    case CDI_FILETYPE_GRB:
      {
        fileID = gribOpen(filename, temp);
        if (fileID < 0) return CDI_ESYSTEM;
        if (recordBufIsToBeCreated)
          {
            streamptr->record = (Record *) Malloc(sizeof(Record));
            streamptr->record->buffer = NULL;
#ifdef HAVE_LIBCGRIBEX
            streamptr->record->objectp = cgribexNew();
#else
            streamptr->record->objectp = NULL;
#endif
          }
        break;
      }
#ifdef HAVE_LIBGRIB_API
    case CDI_FILETYPE_GRB2:
      {
        fileID = gribOpen(filename, temp);
        if (fileID < 0) return CDI_ESYSTEM;
        if (recordBufIsToBeCreated)
          {
            streamptr->record = (Record *) Malloc(sizeof(Record));
            streamptr->record->buffer = NULL;
          }
        break;
      }
#endif
#endif
#ifdef HAVE_LIBSERVICE
    case CDI_FILETYPE_SRV:
      {
        fileID = fileOpen(filename, temp);
        if (fileID < 0) return CDI_ESYSTEM;
        if (recordBufIsToBeCreated)
          {
            streamptr->record = (Record *) Malloc(sizeof(Record));
            streamptr->record->buffer = NULL;
            streamptr->record->objectp = srvNew();
          }
        break;
      }
#endif
#ifdef HAVE_LIBEXTRA
    case CDI_FILETYPE_EXT:
      {
        fileID = fileOpen(filename, temp);
        if (fileID < 0) return CDI_ESYSTEM;
        if (recordBufIsToBeCreated)
          {
            streamptr->record = (Record *) Malloc(sizeof(Record));
            streamptr->record->buffer = NULL;
            streamptr->record->objectp = extNew();
          }
        break;
      }
#endif
#ifdef HAVE_LIBIEG
    case CDI_FILETYPE_IEG:
      {
        fileID = fileOpen(filename, temp);
        if (fileID < 0) return CDI_ESYSTEM;
        if (recordBufIsToBeCreated)
          {
            streamptr->record = (Record *) Malloc(sizeof(Record));
            streamptr->record->buffer = NULL;
            streamptr->record->objectp = iegNew();
          }
        break;
      }
#endif
#ifdef HAVE_LIBNETCDF
    case CDI_FILETYPE_NC:
    case CDI_FILETYPE_NC2:
    case CDI_FILETYPE_NC5:
      {
        fileID = cdfOpen(filename, temp, filetype);
        break;
      }
    case CDI_FILETYPE_NC4:
    case CDI_FILETYPE_NC4C:
    case CDI_FILETYPE_NCZARR:
      {
        fileID = cdf4Open(filename, temp, &filetype);
        break;
      }
#endif
    default:
      {
        if (CDI_Debug) Message("%s support not compiled in!", strfiletype(filetype));
        return CDI_ELIBNAVAIL;
      }
    }

  streamptr->filetype = filetype;

  return fileID;
}

static long
stream_create_vlist(stream_t *streamptr, CdiQuery *query)
{
  int vlistID = vlistCreate();
  if (vlistID < 0) return CDI_ELIMIT;

  cdiVlistMakeInternal(vlistID);
  streamptr->vlistID = vlistID;

  if (query) streamptr->query = query;

  long status = cdiInqContents(streamptr);
  if (status >= 0)
    {
      vlist_t *vlistptr = vlist_to_pointer(streamptr->vlistID);
      vlistptr->ntsteps = streamptr->ntsteps;
      cdiVlistMakeImmutable(vlistID);
    }

  return status;
}

int
streamOpenID(const char *filename, char filemode, int filetype, int resH)
{
  if (CDI_Debug) Message("Open %s mode %c file %s", strfiletype(filetype), filemode, filename ? filename : "(NUL)");

  if (!filename || filetype < 0) return CDI_EINVAL;

  stream_t *streamptr = stream_new_entry(resH);
  int streamID = CDI_ESYSTEM;

#ifndef HAVE_NC4HDF5_THREADSAFE
  if (CDI_Threadsafe)
    {
#ifdef HAVE_LIBPTHREAD
      if (filetype == CDI_FILETYPE_NC4 || filetype == CDI_FILETYPE_NC4C || filetype == CDI_FILETYPE_NCZARR)
        streamptr->lockIO = true;
#else
      static bool lwarn = true;
      if (lwarn)
        {
          lwarn = false;
          Warning("CDI threadsafe failed, pthread support not compiled in!");
        }
#endif
    }
#endif

  if (streamptr->lockIO) CDI_IO_LOCK();

  int (*streamOpenDelegate)(const char *filename, char filemode, int filetype, stream_t *streamptr, int recordBufIsToBeCreated)
      = (int (*)(const char *, char, int, stream_t *, int)) namespaceSwitchGet(NSSWITCH_STREAM_OPEN_BACKEND).func;

  int fileID = streamOpenDelegate(filename, filemode, filetype, streamptr, 1);
  if (fileID < 0)
    {
      streamID = fileID;
      if (streamptr->record) Free(streamptr->record);
      reshRemove(streamptr->self, &streamOps);
      Free(streamptr);
    }
  else
    {
      streamID = streamptr->self;
      if (streamID < 0) return CDI_ELIMIT;

      streamptr->filemode = filemode;
      streamptr->filename = strdup(filename);
      streamptr->fileID = fileID;
    }

  if (streamptr->lockIO) CDI_IO_UNLOCK();

  return streamID;
}

int
streamOpenNCMem(int ncidp, char mode)
{
  cdiInitialize();
  stream_t *streamptr = stream_new_entry(CDI_UNDEFID);
  int streamID = CDI_ESYSTEM;

#ifdef HAVE_NETCDF4
  streamID = streamptr->self;
  if (streamID < 0) return CDI_ELIMIT;

  size_t max_file_path_length = 128;
  size_t actual_length = 0;
  char *filename = (char *) malloc(sizeof(char) * max_file_path_length);

  nc_inq_path(ncidp, &actual_length, filename);

  streamptr->filename = strdup(filename);
  streamptr->filemode = mode;
  streamptr->filetype = CDI_FILETYPE_NC4;
  streamptr->fileID = ncidp;

  if (mode == 'r')
    {
      int status = stream_create_vlist(streamptr, NULL);
      if (status < 0)
        {
          streamID = status;
          if (streamptr->record) Free(streamptr->record);
          reshRemove(streamptr->self, &streamOps);
        }
    }
#endif

  return streamID;
}
static int
streamOpen(const char *filename, const char *filemode, int filetype)
{
  if (!filemode || strlen(filemode) != 1) return CDI_EINVAL;
  return streamOpenID(filename, (char) tolower(filemode[0]), filetype, CDI_UNDEFID);
}

static int
streamOpenA(const char *filename, const char *filemode, int filetype)
{
  if (CDI_Debug) Message("Open %s file (mode=%c); filename: %s", strfiletype(filetype), (int) *filemode, filename);
  if (CDI_Debug) printf("streamOpenA: %s\n", filename);  // seg fault without this line on thunder/squall with "cdo cat x y"

  if (!filename || !filemode || filetype < 0) return CDI_EINVAL;

  stream_t *streamptr = stream_new_entry(CDI_UNDEFID);
  int fileID = CDI_UNDEFID;

  {
    int (*streamOpenDelegate)(const char *filename, char filemode, int filetype, stream_t *streamptr, int recordBufIsToBeCreated)
        = (int (*)(const char *, char, int, stream_t *, int)) namespaceSwitchGet(NSSWITCH_STREAM_OPEN_BACKEND).func;

    fileID = streamOpenDelegate(filename, 'r', filetype, streamptr, 1);
  }

  if (fileID == CDI_UNDEFID || fileID == CDI_ELIBNAVAIL || fileID == CDI_ESYSTEM) return fileID;

  int streamID = streamptr->self;

  streamptr->filemode = tolower(*filemode);
  streamptr->filename = strdup(filename);
  streamptr->fileID = fileID;

  streamptr->vlistID = vlistCreate();
  cdiVlistMakeInternal(streamptr->vlistID);
  // cdiReadByteorder(streamID);
  long status = cdiInqContents(streamptr);
  if (status < 0) return (int) status;
  vlist_t *vlistptr = vlist_to_pointer(streamptr->vlistID);
  vlistptr->ntsteps = streamInqNumSteps(streamID);

  // Needed for NetCDF4
  for (int varID = 0; varID < vlistptr->nvars; ++varID) streamptr->vars[varID].defmiss = true;

  if (str_is_equal(filemode, "r")) cdiVlistMakeImmutable(streamptr->vlistID);

  {
    void (*streamCloseDelegate)(stream_t *streamptr, int recordBufIsToBeDeleted)
        = (void (*)(stream_t *, int)) namespaceSwitchGet(NSSWITCH_STREAM_CLOSE_BACKEND).func;

    streamCloseDelegate(streamptr, 0);
  }

  switch (filetype)
    {
#if defined(HAVE_LIBGRIB) && (defined(HAVE_LIBCGRIBEX) || defined(HAVE_LIBGRIB_API))
    case CDI_FILETYPE_GRB:
#ifdef HAVE_LIBGRIB_API
    case CDI_FILETYPE_GRB2:
#endif
      {
        fileID = gribOpen(filename, filemode);
        if (fileID != CDI_UNDEFID) gribContainersNew(streamptr);
        break;
      }
#endif
#ifdef HAVE_LIBSERVICE
    case CDI_FILETYPE_SRV:
      {
        fileID = fileOpen(filename, filemode);
        break;
      }
#endif
#ifdef HAVE_LIBEXTRA
    case CDI_FILETYPE_EXT:
      {
        fileID = fileOpen(filename, filemode);
        break;
      }
#endif
#ifdef HAVE_LIBIEG
    case CDI_FILETYPE_IEG:
      {
        fileID = fileOpen(filename, filemode);
        break;
      }
#endif
#ifdef HAVE_LIBNETCDF
    case CDI_FILETYPE_NC:
    case CDI_FILETYPE_NC2:
    case CDI_FILETYPE_NC5:
      {
        fileID = cdfOpen(filename, filemode, filetype);
        streamptr->ncmode = 2;
        break;
      }
    case CDI_FILETYPE_NC4:
    case CDI_FILETYPE_NC4C:
      {
        fileID = cdf4Open(filename, filemode, &filetype);
        streamptr->ncmode = 2;
        break;
      }
    case CDI_FILETYPE_NCZARR:
      {
        Warning("%s not available in append mode!", strfiletype(filetype));
        return CDI_ELIBNAVAIL;
      }
#endif
    default:
      {
        if (CDI_Debug) Message("%s support not compiled in!", strfiletype(filetype));
        return CDI_ELIBNAVAIL;
      }
    }

  if (fileID == CDI_UNDEFID)
    streamID = CDI_UNDEFID;
  else
    streamptr->fileID = fileID;

  return streamID;
}

/*
@Function  streamOpenRead
@Title     Open a dataset for reading

@Prototype int streamOpenRead(const char *path)
@Parameter
    @Item  path  The name of the dataset to be read.

@Description
The function @func{streamOpenRead} opens an existing dataset for reading.

@Result
Upon successful completion @func{streamOpenRead} returns an identifier to the
open stream. Otherwise, a negative number with the error status is returned.

@Errors
@List
   @Item  CDI_ESYSTEM     Operating system error.
   @Item  CDI_EINVAL      Invalid argument.
   @Item  CDI_EUFILETYPE  Unsupported file type.
   @Item  CDI_ELIBNAVAIL  Library support not compiled in.
@EndList

@Example
Here is an example using @func{streamOpenRead} to open an existing NetCDF
file named @func{foo.nc} for reading:

@Source
#include "cdi.h"
   ...
int streamID;
   ...
streamID = streamOpenRead("foo.nc");
if ( streamID < 0 ) handle_error(streamID);
   ...
@EndSource
@EndFunction
*/
int
streamOpenRead(const char *filename)
{
  cdiInitialize();

  int byteorder = 0;
  int filetype = cdiGetFiletype(filename, &byteorder);
  if (filetype < 0) return filetype;

  int streamID = streamOpen(filename, "r", filetype);
  if (streamID >= 0)
    {
      stream_t *streamptr = stream_to_pointer(streamID);
      streamptr->byteorder = byteorder;

      long status = stream_create_vlist(streamptr, NULL);
      if (status < 0)
        {
          streamID = (int) status;
          if (streamptr->record) Free(streamptr->record);
          reshRemove(streamptr->self, &streamOps);
        }
    }

  return streamID;
}

int
streamOpenReadQuery(const char *filename, CdiQuery *query)
{
  cdiInitialize();

  int byteorder = 0;
  int filetype = cdiGetFiletype(filename, &byteorder);
  if (filetype < 0) return filetype;

  if (cdiBaseFiletype(filetype) != CDI_FILETYPE_NETCDF && filetype != CDI_FILETYPE_GRB2) return CDI_EQNAVAIL;

  int streamID = streamOpen(filename, "r", filetype);
  if (streamID >= 0)
    {
      stream_t *streamptr = stream_to_pointer(streamID);
      streamptr->byteorder = byteorder;

      long status = stream_create_vlist(streamptr, query);
      if (status < 0)
        {
          streamID = (int) status;
          if (streamptr->record) Free(streamptr->record);
          reshRemove(streamptr->self, &streamOps);
        }
    }

  return streamID;
}

int
streamOpenAppend(const char *filename)
{
  cdiInitialize();

  int byteorder = 0;
  int filetype = cdiGetFiletype(filename, &byteorder);
  if (filetype < 0) return filetype;

  int streamID = streamOpenA(filename, "a", filetype);
  if (streamID >= 0)
    {
      stream_t *streamptr = stream_to_pointer(streamID);
      streamptr->byteorder = byteorder;
    }

  return streamID;
}

/*
@Function  streamOpenWrite
@Title     Create a new dataset

@Prototype int streamOpenWrite(const char *path, int filetype)
@Parameter
    @Item  path      The name of the new dataset.
    @Item  filetype  The type of the file format, one of the set of predefined CDI file format types.
                     The valid CDI file format types are @func{CDI_FILETYPE_GRB}, @func{CDI_FILETYPE_GRB2}, @func{CDI_FILETYPE_NC},
                     @func{CDI_FILETYPE_NC2}, @func{CDI_FILETYPE_NC4}, @func{CDI_FILETYPE_NC4C}, @func{CDI_FILETYPE_NC5},
                     @func{CDI_FILETYPE_NCZARR}, @func{CDI_FILETYPE_SRV}, @func{CDI_FILETYPE_EXT} and @func{CDI_FILETYPE_IEG}.

@Description
The function @func{streamOpenWrite} creates a new datset.
@Result
Upon successful completion @func{streamOpenWrite} returns an identifier to the
open stream. Otherwise, a negative number with the error status is returned.

@Errors
@List
   @Item  CDI_ESYSTEM     Operating system error.
   @Item  CDI_EINVAL      Invalid argument.
   @Item  CDI_EUFILETYPE  Unsupported file type.
   @Item  CDI_ELIBNAVAIL  Library support not compiled in.
@EndList

@Example
Here is an example using @func{streamOpenWrite} to create a new NetCDF file named @func{foo.nc} for writing:

@Source
#include "cdi.h"
   ...
int streamID;
   ...
streamID = streamOpenWrite("foo.nc", CDI_FILETYPE_NC);
if ( streamID < 0 ) handle_error(streamID);
   ...
@EndSource
@EndFunction
*/
int
streamOpenWrite(const char *filename, int filetype)
{
  cdiInitialize();

  return streamOpen(filename, "w", filetype);
}

static void
streamDefaultValue(stream_t *streamptr)
{
  streamptr->self = CDI_UNDEFID;
  streamptr->accesstype = CDI_UNDEFID;
  streamptr->accessmode = 0;
  streamptr->filetype = CDI_FILETYPE_UNDEF;
  streamptr->byteorder = CDI_UNDEFID;
  streamptr->fileID = 0;
  streamptr->filemode = 0;
  streamptr->numvals = 0;
  streamptr->filename = NULL;
  streamptr->record = NULL;
  streamptr->query = NULL;
  streamptr->varsAllocated = 0;
  streamptr->nrecs = 0;
  streamptr->nvars = 0;
  streamptr->vars = NULL;
  streamptr->ncmode = 0;
  streamptr->curTsID = CDI_UNDEFID;
  streamptr->rtsteps = 0;
  streamptr->ntsteps = CDI_UNDEFID;
  streamptr->maxSteps = CDI_UNDEFID;
  streamptr->tsteps = NULL;
  streamptr->tstepsTableSize = 0;
  streamptr->tstepsNextID = 0;
  streamptr->vlistID = CDI_UNDEFID;
  streamptr->globalatts = 0;
  streamptr->localatts = 0;
  streamptr->unreduced = cdiDataUnreduced;
  streamptr->have_missval = cdiHaveMissval;
  streamptr->comptype = CDI_COMPRESS_NONE;
  streamptr->complevel = 0;
  streamptr->shuffle = 0;
  streamptr->sortname = (cdiSortName > 0);
  streamptr->lockIO = CDI_Lock_IO;
  // netcdf4/HDF5 filter
  streamptr->filterSpec = NULL;

  basetimeInit(&streamptr->basetime);

#ifdef HAVE_LIBNETCDF
  streamptr->nc_complex_float_id = CDI_UNDEFID;
  streamptr->nc_complex_double_id = CDI_UNDEFID;

  for (int i = 0; i < MAX_ZAXES_PS; i++) streamptr->zaxisID[i] = CDI_UNDEFID;
  for (int i = 0; i < MAX_ZAXES_PS; i++) streamptr->nczvarID[i] = CDI_UNDEFID;

  for (int i = 0; i < MAX_GRIDS_PS; i++)
    {
      streamptr->ncgrid[i].start = CDI_UNDEFID;
      streamptr->ncgrid[i].count = CDI_UNDEFID;
      streamptr->ncgrid[i].gridID = CDI_UNDEFID;
      for (size_t j = 0; j < CDF_SIZE_ncIDs; ++j) streamptr->ncgrid[i].ncIDs[j] = CDI_UNDEFID;
    }

  streamptr->ncNumDims = 0;
  for (int i = 0; i < MAX_DIMS_PS; i++) streamptr->ncDimID[i] = CDI_UNDEFID;
  for (int i = 0; i < MAX_DIMS_PS; i++) streamptr->ncDimLen[i] = 0;

  streamptr->vct.ilev = 0;
  streamptr->vct.mlev = 0;
  streamptr->vct.ilevID = CDI_UNDEFID;
  streamptr->vct.mlevID = CDI_UNDEFID;

  streamptr->chunkSizeTdim = 0;
  streamptr->chunkSizeZdim = 0;
#endif
  streamptr->maxGlobalRecs = CDI_UNDEFID;

  streamptr->gribContainers = NULL;

  streamptr->numWorker = 0;
  streamptr->nextGlobalRecId = 0;
  streamptr->cachedTsID = -1;
  streamptr->jobs = NULL;
  streamptr->jobManager = NULL;

  streamptr->protocolData = NULL;

#ifdef HAVE_LIBFDB5
  streamptr->fdbNumItems = 0;
  streamptr->fdbKeyValueList = NULL;
#endif
}

static stream_t *
stream_new_entry(int resH)
{
  cdiInitialize(); /* ***************** make MT version !!! */

  stream_t *streamptr = (stream_t *) Malloc(sizeof(stream_t));
  streamDefaultValue(streamptr);

  if (resH == CDI_UNDEFID)
    streamptr->self = reshPut(streamptr, &streamOps);
  else
    {
      streamptr->self = resH;
      reshReplace(resH, streamptr, &streamOps);
    }

  return streamptr;
}

void
cdiStreamCloseDefaultDelegate(stream_t *streamptr, int recordBufIsToBeDeleted)
{
  int fileID = streamptr->fileID;
  int filetype = streamptr->filetype;

  if (streamptr->filterSpec)
    {
      free(streamptr->filterSpec);
      streamptr->filterSpec = NULL;
    }

  switch (streamptr->protocol)
    {
    case CDI_PROTOCOL_ACROSS:
#ifdef HAVE_ACROSS
      if (fileID) across_disconnect(fileID);
      if (streamptr->protocolData)
        {
          Free(((across_info_t *) streamptr->protocolData)->expid);
          Free(streamptr->protocolData);
          streamptr->protocolData = NULL;
        }
#endif
      return;

    case CDI_PROTOCOL_FDB:
#ifdef HAVE_LIBFDB5
      if (streamptr->protocolData) check_fdb_error(fdb_delete_handle(streamptr->protocolData));
      streamptr->protocolData = NULL;
#endif
      return;

    case CDI_PROTOCOL_OTHER:
    case CDI_PROTOCOL_FILE:
      // handled below;
      break;
    }

  if (fileID == CDI_UNDEFID)
    {
      Warning("File %s not open!", streamptr->filename);
      return;
    }

  switch (cdiBaseFiletype(filetype))
    {
#if defined(HAVE_LIBGRIB) && (defined(HAVE_LIBCGRIBEX) || defined(HAVE_LIBGRIB_API))
    case CDI_FILETYPE_GRIB:
      if (filetype == CDI_FILETYPE_GRB)
        {
          gribClose(fileID);
          if (recordBufIsToBeDeleted) gribContainersDelete(streamptr);
#ifdef HAVE_LIBCGRIBEX
          if (recordBufIsToBeDeleted) cgribexDelete(streamptr->record->objectp);
#endif
        }
      else if (filetype == CDI_FILETYPE_GRB2)
        {
          gribClose(fileID);
          if (recordBufIsToBeDeleted) gribContainersDelete(streamptr);
        }
      break;
#endif
#ifdef HAVE_LIBSERVICE
    case CDI_FILETYPE_SRV:
      {
        fileClose(fileID);
        if (recordBufIsToBeDeleted) srvDelete(streamptr->record->objectp);
        break;
      }
#endif
#ifdef HAVE_LIBEXTRA
    case CDI_FILETYPE_EXT:
      {
        fileClose(fileID);
        if (recordBufIsToBeDeleted) extDelete(streamptr->record->objectp);
        break;
      }
#endif
#ifdef HAVE_LIBIEG
    case CDI_FILETYPE_IEG:
      {
        fileClose(fileID);
        if (recordBufIsToBeDeleted) iegDelete(streamptr->record->objectp);
        break;
      }
#endif
#ifdef HAVE_LIBNETCDF
    case CDI_FILETYPE_NETCDF:
      {
        cdfClose(fileID);
        if (streamptr->ntsteps == 0 && streamptr->tsteps != NULL)
          {
            if (streamptr->tsteps[0].records)
              {
                Free(streamptr->tsteps[0].records);
                streamptr->tsteps[0].records = NULL;
              }
            if (streamptr->tsteps[0].recIDs)
              {
                Free(streamptr->tsteps[0].recIDs);
                streamptr->tsteps[0].recIDs = NULL;
              }
          }
        break;
      }
#endif
    default:
      {
        Error("%s support not compiled in (fileID = %d)!", strfiletype(filetype), fileID);
        break;
      }
    }
}

static void
deallocate_sleveltable_t(sleveltable_t *entry)
{
  if (entry->recordID) Free(entry->recordID);
  if (entry->lindex) Free(entry->lindex);
  entry->recordID = NULL;
  entry->lindex = NULL;
}

static void
streamDestroyViaDelegate(stream_t *streamptr, void (*streamCloseDelegate)(stream_t *streamptr, int recordBufIsToBeDeleted))
{

  xassert(streamptr);
  if (streamptr->filetype != CDI_FILETYPE_UNDEF) streamCloseDelegate(streamptr, 1);

  if (streamptr->record)
    {
      if (streamptr->record->buffer) Free(streamptr->record->buffer);
      Free(streamptr->record);
      streamptr->record = NULL;
    }

  streamptr->filetype = CDI_FILETYPE_UNDEF;
  if (streamptr->filename)
    {
      Free(streamptr->filename);
      streamptr->filename = NULL;
    }

  if (streamptr->filterSpec) free(streamptr->filterSpec);

  if (streamptr->vars)
    {
      for (int index = 0; index < streamptr->nvars; index++)
        {
          sleveltable_t *pslev = streamptr->vars[index].recordTable;
          unsigned nsub = streamptr->vars[index].subtypeSize >= 0 ? (unsigned) streamptr->vars[index].subtypeSize : 0U;
          for (size_t isub = 0; isub < nsub; isub++) deallocate_sleveltable_t(pslev + isub);
          if (pslev) Free(pslev);
        }
      Free(streamptr->vars);
      streamptr->vars = NULL;
    }

  if (streamptr->tsteps)
    {
      int maxSteps = streamptr->tstepsNextID;
      for (int index = 0; index < maxSteps; ++index)
        {
          tsteps_t *tstep = &(streamptr->tsteps[index]);
          if (tstep->records) Free(tstep->records);
          if (tstep->recIDs) Free(tstep->recIDs);
          taxisDestroyKernel(&(tstep->taxis));
        }

      Free(streamptr->tsteps);
      streamptr->tsteps = NULL;
    }

#ifdef HAVE_LIBFDB5
  if (streamptr->fdbKeyValueList)
    {
      cdi_fdb_delete_kvlist(streamptr->fdbNumItems, streamptr->fdbKeyValueList);
      streamptr->fdbNumItems = 0;
      streamptr->fdbKeyValueList = NULL;
    }
#endif

  int vlistID = streamptr->vlistID;
  if (vlistID != -1)
    {
      int taxisID = (streamptr->filemode != 'w') ? vlistInqTaxis(vlistID) : -1;
      if (taxisID != -1) taxisDestroy(taxisID);
      void (*mycdiVlistDestroy_)(int, bool) = (void (*)(int, bool)) namespaceSwitchGet(NSSWITCH_VLIST_DESTROY_).func;
      mycdiVlistDestroy_(vlistID, true);
    }

  if (streamptr->jobs) Free(streamptr->jobs);
  if (streamptr->jobManager) AsyncWorker_finalize((AsyncManager *) streamptr->jobManager);

  Free(streamptr);
}


static void
streamDestroy(stream_t *streamptr)
{
  void (*streamCloseDelegate)(stream_t *streamptr, int recordBufIsToBeDeleted)
      = (void (*)(stream_t *, int)) namespaceSwitchGet(NSSWITCH_STREAM_CLOSE_BACKEND).func;

  streamDestroyViaDelegate(streamptr, streamCloseDelegate);
}

static void
streamDestroyP(void *streamptr)
{
  streamDestroy((stream_t *) streamptr);
}

/*
@Function  streamClose
@Title     Close an open dataset

@Prototype  void streamClose(int streamID)
@Parameter
    @Item  streamID  Stream ID, from a previous call to @fref{streamOpenRead} or @fref{streamOpenWrite}.

@Description
The function @func{streamClose} closes an open dataset.

@EndFunction
*/
void
streamClose(int streamID)
{
  stream_t *streamptr = stream_to_pointer(streamID);

  bool lockIO = streamptr->lockIO;
  if (lockIO) CDI_IO_LOCK();

  if (CDI_Debug) Message("streamID = %d filename = %s", streamID, streamptr->filename);
  streamDestroy(streamptr);
  reshRemove(streamID, &streamOps);
  if (CDI_Debug) Message("Removed stream %d from stream list", streamID);

  if (lockIO) CDI_IO_UNLOCK();
}

void
cdiStreamSync_(stream_t *streamptr)
{
  int fileID = streamptr->fileID;
  int filetype = streamptr->filetype;
  int vlistID = streamptr->vlistID;
  int nvars = vlistNvars(vlistID);

  if (fileID == CDI_UNDEFID)
    Warning("File %s not open!", streamptr->filename);
  else if (vlistID == CDI_UNDEFID)
    Warning("Vlist undefined for file %s!", streamptr->filename);
  else if (nvars == 0)
    Warning("No variables defined!");
  else
    {
      if (streamptr->filemode == 'w' || streamptr->filemode == 'a')
        {
          switch (cdiBaseFiletype(filetype))
            {
#ifdef HAVE_LIBNETCDF
            case CDI_FILETYPE_NETCDF:
              {
                void cdf_sync(int ncid);
                if (streamptr->ncmode == 2) cdf_sync(fileID);
                break;
              }
#endif
            default:
              {
                fileFlush(fileID);
                break;
              }
            }
        }
    }
}

/*
@Function  streamSync
@Title     Synchronize an Open Dataset to Disk

@Prototype  void streamSync(int streamID)
@Parameter
    @Item  streamID  Stream ID, from a previous call to @fref{streamOpenWrite}.

@Description
The function @func{streamSync} offers a way to synchronize the disk copy of a dataset with in-memory buffers.

@EndFunction
*/
void
streamSync(int streamID)
{
  stream_t *streamptr = stream_to_pointer(streamID);

  void (*myStreamSync_)(stream_t *streamptr) = (void (*)(stream_t *)) namespaceSwitchGet(NSSWITCH_STREAM_SYNC).func;
  myStreamSync_(streamptr);
}

int
cdiStreamDefTimestep_(stream_t *streamptr, int tsID)
{
  stream_check_ptr(__func__, streamptr);

  if (CDI_Debug) Message("streamID = %d  tsID = %d", streamptr->self, tsID);

  int vlistID = streamptr->vlistID;
  if (vlistID == CDI_UNDEFID)
    Error("Must not call streamDefTimestep for stream (ID=%d) with (not yet) defined vlist", streamptr->self);

  if (tsID > 0)
    {
      int newtsID = tstepsNewEntry(streamptr);
      if (tsID != newtsID) Error("Internal problem: tsID = %d newtsID = %d", tsID, newtsID);
    }

  int taxisID = vlistInqTaxis(vlistID);
  if (taxisID != CDI_UNDEFID) ptaxisCopy(&streamptr->tsteps[tsID].taxis, taxisPtr(taxisID));

  streamptr->curTsID = tsID;
  streamptr->ntsteps = tsID + 1;

#ifdef HAVE_LIBNETCDF
  int timeIsVarying = vlistHasTime(vlistID);
  if (cdiBaseFiletype(streamptr->filetype) == CDI_FILETYPE_NETCDF && timeIsVarying)
    {
      /* usually points to cdfDefTimestep in serial mode but
       * to cdiPioCdfDefTimestep on servers and to a null-op on
       * clients in client/server mode */
      void (*myCdfDefTimestep)(stream_t *streamptr, int tsID, size_t)
          = (void (*)(stream_t *, int, size_t)) namespaceSwitchGet(NSSWITCH_CDF_DEF_TIMESTEP).func;
      myCdfDefTimestep(streamptr, tsID, 1);
    }
#endif

  cdi_create_records(streamptr, tsID);

  return (int) streamptr->ntsteps;
}

/*
@Function  streamDefTimestep
@Title     Define a timestep

@Prototype int streamDefTimestep(int streamID, int tsID)
@Parameter
    @Item  streamID  Stream ID, from a previous call to @fref{streamOpenWrite}.
    @Item  tsID      Timestep identifier.

@Description
The function @func{streamDefTimestep} defines a timestep of a stream by the identifier tsID.
The identifier tsID is the timestep index starting at 0 for the first timestep.
Before calling this function the functions @func{taxisDefVdate} and @func{taxisDefVtime} should be used
to define the timestamp for this timestep. All calls to write the data refer to this timestep.

@Result
@func{streamDefTimestep} returns the number of expected records of the timestep.

@EndFunction
*/
int
streamDefTimestep(int streamID, int tsID)
{
  stream_t *streamptr = stream_to_pointer(streamID);

  if (streamptr->lockIO) CDI_IO_LOCK();

  int (*myStreamDefTimestep_)(stream_t *streamptr, int tsID)
      = (int (*)(stream_t *, int)) namespaceSwitchGet(NSSWITCH_STREAM_DEF_TIMESTEP_).func;
  int status = myStreamDefTimestep_(streamptr, tsID);

  if (streamptr->lockIO) CDI_IO_UNLOCK();

  return status;
}

int
streamInqCurTimestepID(int streamID)
{
  stream_t *streamptr = stream_to_pointer(streamID);
  return streamptr->curTsID;
}

/*
@Function  streamInqTimestep
@Title     Get timestep information

@Prototype int streamInqTimestep(int streamID, int tsID)
@Parameter
    @Item  streamID  Stream ID, from a previous call to @fref{streamOpenRead} or @fref{streamOpenWrite}.
    @Item  tsID      Timestep identifier.

@Description
The function @func{streamInqTimestep} sets the next timestep to the identifier tsID.
The identifier tsID is the timestep index starting at 0 for the first timestep.
After a call to this function the functions @func{taxisInqVdate} and @func{taxisInqVtime} can be used
to read the timestamp for this timestep. All calls to read the data refer to this timestep.

@Result
@func{streamInqTimestep} returns the number of records of the timestep or 0, if the end of the file is reached.

@EndFunction
*/
int
streamInqTimestep(int streamID, int tsID)
{
  int nrecs = 0;
  stream_t *streamptr = stream_to_pointer(streamID);
  int vlistID = streamptr->vlistID;

  if (tsID < streamptr->ntsteps) streamptr->tsteps[tsID].curRecID = CDI_UNDEFID;  // fix for netCDF
  if (tsID < streamptr->rtsteps)
    {
      streamptr->curTsID = tsID;
      nrecs = streamptr->tsteps[tsID].nrecs;
      streamptr->tsteps[tsID].curRecID = CDI_UNDEFID;
      int taxisID = vlistInqTaxis(vlistID);
      if (taxisID == -1) Error("Timestep undefined for fileID = %d", streamID);
      ptaxisCopy(taxisPtr(taxisID), &streamptr->tsteps[tsID].taxis);

      return nrecs;
    }

  if (tsID >= streamptr->ntsteps && streamptr->ntsteps != CDI_UNDEFID) return 0;

  int filetype = streamptr->filetype;

  if (CDI_Debug) Message("streamID = %d  tsID = %d  filetype = %d", streamID, tsID, filetype);

  if (streamptr->lockIO) CDI_IO_LOCK();

  switch (cdiBaseFiletype(filetype))
    {
#ifdef HAVE_LIBGRIB
    case CDI_FILETYPE_GRIB:
      {
        switch (streamptr->protocol)
          {
          case CDI_PROTOCOL_FDB: nrecs = fdbInqTimestep(streamptr, tsID); break;

          case CDI_PROTOCOL_ACROSS:  // TODO read from ACROSS
          case CDI_PROTOCOL_OTHER:
          case CDI_PROTOCOL_FILE: nrecs = grbInqTimestep(streamptr, tsID); break;
          }
        break;
      }
#endif
#ifdef HAVE_LIBSERVICE
    case CDI_FILETYPE_SRV:
      {
        nrecs = srvInqTimestep(streamptr, tsID);
        break;
      }
#endif
#ifdef HAVE_LIBEXTRA
    case CDI_FILETYPE_EXT:
      {
        nrecs = extInqTimestep(streamptr, tsID);
        break;
      }
#endif
#ifdef HAVE_LIBIEG
    case CDI_FILETYPE_IEG:
      {
        nrecs = iegInqTimestep(streamptr, tsID);
        break;
      }
#endif
#ifdef HAVE_LIBNETCDF
    case CDI_FILETYPE_NETCDF:
      {
        nrecs = cdfInqTimestep(streamptr, tsID);
        break;
      }
#endif
    default:
      {
        Error("%s support not compiled in!", strfiletype(filetype));
        break;
      }
    }

  if (streamptr->lockIO) CDI_IO_UNLOCK();

  int taxisID = vlistInqTaxis(vlistID);
  if (taxisID == -1) Error("Timestep undefined for fileID = %d", streamID);

  ptaxisCopy(taxisPtr(taxisID), &streamptr->tsteps[tsID].taxis);

  return nrecs;
}

// This function is used in CDO!
SizeType
streamNvals(int streamID)
{
  stream_t *streamptr = stream_to_pointer(streamID);
  return streamptr->numvals;
}

/*
@Function  streamDefVlist
@Title     Define the variable list

@Prototype void streamDefVlist(int streamID, int vlistID)
@Parameter
    @Item  streamID Stream ID, from a previous call to @fref{streamOpenWrite}.
    @Item  vlistID  Variable list ID, from a previous call to @fref{vlistCreate}.

@Description
The function @func{streamDefVlist} defines the variable list of a stream.

To safeguard against errors by modifying the wrong vlist object,
this function makes the passed vlist object immutable.
All further vlist changes have to use the vlist object returned by streamInqVlist().

@EndFunction
*/
void
streamDefVlist(int streamID, int vlistID)
{
  void (*myStreamDefVlist)(int streamID, int vlistID) = (void (*)(int, int)) namespaceSwitchGet(NSSWITCH_STREAM_DEF_VLIST_).func;
  myStreamDefVlist(streamID, vlistID);
}

// The single image implementation of streamDefVlist
void
cdiStreamDefVlist_(int streamID, int vlistID)
{
  stream_t *streamptr = stream_to_pointer(streamID);

  if (streamptr->vlistID == CDI_UNDEFID)
    {
      if (streamptr->lockIO) CDI_IO_LOCK();

      int vlistCopy = vlistDuplicate(vlistID);
      cdiVlistMakeInternal(vlistCopy);
      cdiVlistMakeImmutable(vlistID);
      cdiStreamSetupVlist(streamptr, vlistCopy);

      if (streamptr->lockIO) CDI_IO_UNLOCK();
    }
  else
    Warning("vlist already defined for %s!", streamptr->filename);
}

/*
@Function  streamInqVlist
@Title     Get the variable list

@Prototype int streamInqVlist(int streamID)
@Parameter
    @Item  streamID  Stream ID, from a previous call to @fref{streamOpenRead} or @fref{streamOpenWrite}.

@Description
The function @func{streamInqVlist} returns the variable list of a stream.

@Result
@func{streamInqVlist} returns an identifier to the variable list.

@EndFunction
*/
int
streamInqVlist(int streamID)
{
  stream_t *s = stream_to_pointer(streamID);
  return s->vlistID;
}

void
streamDefShuffle(int streamID, int shuffle)
{
  stream_t *s = stream_to_pointer(streamID);
  if (s->shuffle != shuffle)
    {
      s->shuffle = shuffle;
      reshSetStatus(streamID, &streamOps, RESH_DESYNC_IN_USE);
    }
}

void
streamDefFilter(int streamID, const char *filterSpec)
{
  stream_t *s = stream_to_pointer(streamID);
  if (filterSpec)
    {
      // if (s->filterSpec) Error("Filter spec already defined!");
      if (s->filterSpec == NULL)
        {
          s->filterSpec = strdup(filterSpec);
          reshSetStatus(streamID, &streamOps, RESH_DESYNC_IN_USE);
        }
    }
}

void
streamDefCompType(int streamID, int comptype)
{
  stream_t *s = stream_to_pointer(streamID);
  if (s->comptype != comptype)
    {
      s->comptype = comptype;
      reshSetStatus(streamID, &streamOps, RESH_DESYNC_IN_USE);
    }
}

void
streamDefCompLevel(int streamID, int complevel)
{
  stream_t *s = stream_to_pointer(streamID);
  if (s->complevel != complevel)
    {
      s->complevel = complevel;
      reshSetStatus(streamID, &streamOps, RESH_DESYNC_IN_USE);
    }
}

int
streamInqCompType(int streamID)
{
  stream_t *s = stream_to_pointer(streamID);
  return s->comptype;
}

int
streamInqCompLevel(int streamID)
{
  stream_t *s = stream_to_pointer(streamID);
  return s->complevel;
}

int
streamInqFileID(int streamID)
{
  stream_t *s = (stream_t *) reshGetVal(streamID, &streamOps);
  return s->fileID;
}

void
cdiDefAccesstype(int streamID, int type)
{
  stream_t *s = (stream_t *) reshGetVal(streamID, &streamOps);

  if (s->accesstype == CDI_UNDEFID)
    {
      s->accesstype = type;
    }
  else if (s->accesstype != type)
    Error("Changing access type from %s not allowed!", s->accesstype == TYPE_REC ? "REC to VAR" : "VAR to REC");
}

int
cdiInqAccesstype(int streamID)
{
  stream_t *s = (stream_t *) reshGetVal(streamID, &streamOps);
  return s->accesstype;
}

static int
streamTxCode(void *s)
{
  (void) s;
  return STREAM;
}

void
cdiStreamSetupVlist(stream_t *s, int vlistID)
{
  void (*myStreamSetupVlist)(stream_t *s, int vlistID)
      = (void (*)(stream_t *, int)) namespaceSwitchGet(NSSWITCH_STREAM_SETUP_VLIST).func;
  myStreamSetupVlist(s, vlistID);
}
void
cdiStreamSetupVlist_(stream_t *streamptr, int vlistID)
{
  streamptr->vlistID = vlistID;
  int nvars = vlistNvars(vlistID);
  for (int varID = 0; varID < nvars; ++varID)
    {
      int gridID = vlistInqVarGrid(vlistID, varID);
      int zaxisID = vlistInqVarZaxis(vlistID, varID);
      int tilesetID = vlistInqVarSubtype(vlistID, varID);
      stream_new_var(streamptr, gridID, zaxisID, tilesetID);
      if (streamptr->have_missval) vlistDefVarMissval(vlistID, varID, vlistInqVarMissval(vlistID, varID));
    }

  if (streamptr->filemode == 'w')
    {
      tstepsNewEntry(streamptr);  // timestep 0
      int vlistIDw = streamptr->vlistID;
      int timeIsVarying = vlistHasTime(vlistIDw);
      if (timeIsVarying)
        {
          int taxisID = vlistInqTaxis(vlistIDw);
          if (taxisID == CDI_UNDEFID)
            {
              Warning("taxisID undefined for fileID = %d! Using absolute time axis.", streamptr->self);
              taxisID = taxisCreate(TAXIS_ABSOLUTE);
              vlistDefTaxis(vlistIDw, taxisID);
            }

#ifdef HAVE_LIBNETCDF
          if (taxisInqType(taxisID) == TAXIS_RELATIVE)
            if (cdiBaseFiletype(streamptr->filetype) == CDI_FILETYPE_NETCDF)
              {
                const taxis_t *taxisptr = taxisPtr(taxisID);
                if (cdiDateTime_isNull(taxisptr->rDateTime))
                  {
                    int vdate = taxisInqVdate(taxisID);
                    if (vdate == 0) vdate = 10101;
                    taxisDefRdate(taxisID, vdate);
                  }
              }
#endif
          ptaxisCopy(&streamptr->tsteps[0].taxis, taxisPtr(taxisID));
        }

      switch (cdiBaseFiletype(streamptr->filetype))
        {
#ifdef HAVE_LIBNETCDF
        case CDI_FILETYPE_NETCDF:
          {
            /* calls cdfDefCoordinateVars in serial mode but
             * cdiPioClientStreamNOP (i.e. nothing) on client ranks
             * and cdiPioServerCdfDefVars on server ranks in parallel mode*/
            void (*myCdfDefVars)(stream_t *streamptr) = (void (*)(stream_t *)) namespaceSwitchGet(NSSWITCH_CDF_STREAM_SETUP).func;
            myCdfDefVars(streamptr);
          }
          break;
#endif
#ifdef HAVE_LIBGRIB
        case CDI_FILETYPE_GRIB: gribContainersNew(streamptr); break;
#endif
        default:;
        }
    }
}

void
cdiStreamGetIndexList(unsigned numIDs, int *IDs)
{
  reshGetResHListOfType(numIDs, IDs, &streamOps);
}

int
streamInqNvars(int streamID)
{
  stream_t *s = (stream_t *) reshGetVal(streamID, &streamOps);
  return s->nvars;
}

static int
streamCompareP(void *streamptr1, void *streamptr2)
{
  stream_t *s1 = (stream_t *) streamptr1;
  stream_t *s2 = (stream_t *) streamptr2;
  enum
  {
    differ = -1,
    equal = 0,
  };

  xassert(s1);
  xassert(s2);

  if (s1->filetype != s2->filetype) return differ;
  if (s1->byteorder != s2->byteorder) return differ;
  if (s1->comptype != s2->comptype) return differ;
  if (s1->complevel != s2->complevel) return differ;

  if (s1->filename)
    {
      if (!str_is_equal(s1->filename, s2->filename)) return differ;
    }
  else if (s2->filename)
    return differ;

  return equal;
}

void
streamPrintP(void *streamptr, FILE *fp)
{
  stream_t *sp = (stream_t *) streamptr;

  if (!sp) return;

  fprintf(fp,
          "#\n"
          "# streamID %d\n"
          "#\n"
          "self          = %d\n"
          "accesstype    = %d\n"
          "accessmode    = %d\n"
          "filetype      = %d\n"
          "byteorder     = %d\n"
          "fileID        = %d\n"
          "filemode      = %d\n"
          "filename      = %s\n"
          "nrecs         = %d\n"
          "nvars         = %d\n"
          "varsAllocated = %d\n"
          "curTsID       = %d\n"
          "rtsteps       = %d\n"
          "ntsteps       = %ld\n"
          "tstepsTableSize= %d\n"
          "tstepsNextID  = %d\n"
          "ncmode        = %d\n"
          "vlistID       = %d\n"
          "globalatts    = %d\n"
          "localatts     = %d\n"
          "unreduced     = %d\n"
          "sortname      = %d\n"
          "have_missval  = %d\n"
          "ztype         = %d\n"
          "zlevel        = %d\n",
          sp->self, sp->self, sp->accesstype, sp->accessmode, sp->filetype, sp->byteorder, sp->fileID, sp->filemode, sp->filename,
          sp->nrecs, sp->nvars, sp->varsAllocated, sp->curTsID, sp->rtsteps, sp->ntsteps, sp->tstepsTableSize, sp->tstepsNextID,
          sp->ncmode, sp->vlistID, sp->globalatts, sp->localatts, sp->unreduced, sp->sortname, sp->have_missval, sp->comptype,
          sp->complevel);
}

enum
{
  streamNint = 10,
};

static int
streamGetPackSize(void *voidP, void *context)
{
  stream_t *streamP = (stream_t *) voidP;
  int packBufferSize = serializeGetSize(streamNint, CDI_DATATYPE_INT, context) + serializeGetSize(2, CDI_DATATYPE_UINT32, context)
                       + serializeGetSize((int) strlen(streamP->filename) + 1, CDI_DATATYPE_TXT, context)
                       + serializeGetSize(1, CDI_DATATYPE_FLT64, context);
  return packBufferSize;
}

static void
streamPack(void *streamptr, void *packBuffer, int packBufferSize, int *packBufferPos, void *context)
{
  stream_t *streamP = (stream_t *) streamptr;
  int intBuffer[streamNint];

  intBuffer[0] = streamP->self;
  intBuffer[1] = streamP->filetype;
  intBuffer[2] = (int) strlen(streamP->filename) + 1;
  intBuffer[3] = streamP->vlistID;
  intBuffer[4] = streamP->byteorder;
  intBuffer[5] = streamP->comptype;
  intBuffer[6] = streamP->complevel;
  intBuffer[7] = streamP->unreduced;
  intBuffer[8] = streamP->sortname;
  intBuffer[9] = streamP->have_missval;

  serializePack(intBuffer, streamNint, CDI_DATATYPE_INT, packBuffer, packBufferSize, packBufferPos, context);
  uint32_t d = cdiCheckSum(CDI_DATATYPE_INT, streamNint, intBuffer);
  serializePack(&d, 1, CDI_DATATYPE_UINT32, packBuffer, packBufferSize, packBufferPos, context);

  serializePack(&CDI_Default_Missval, 1, CDI_DATATYPE_FLT64, packBuffer, packBufferSize, packBufferPos, context);
  serializePack(streamP->filename, intBuffer[2], CDI_DATATYPE_TXT, packBuffer, packBufferSize, packBufferPos, context);
  d = cdiCheckSum(CDI_DATATYPE_TXT, intBuffer[2], streamP->filename);
  serializePack(&d, 1, CDI_DATATYPE_UINT32, packBuffer, packBufferSize, packBufferPos, context);
}

struct streamAssoc
streamUnpack(char *unpackBuffer, int unpackBufferSize, int *unpackBufferPos, int originNamespace, void *context)
{
  int intBuffer[streamNint];
  uint32_t d;
  char filename[CDI_MAX_NAME];

  serializeUnpack(unpackBuffer, unpackBufferSize, unpackBufferPos, intBuffer, streamNint, CDI_DATATYPE_INT, context);
  serializeUnpack(unpackBuffer, unpackBufferSize, unpackBufferPos, &d, 1, CDI_DATATYPE_UINT32, context);
  xassert(cdiCheckSum(CDI_DATATYPE_INT, streamNint, intBuffer) == d);

  serializeUnpack(unpackBuffer, unpackBufferSize, unpackBufferPos, &CDI_Default_Missval, 1, CDI_DATATYPE_FLT64, context);
  serializeUnpack(unpackBuffer, unpackBufferSize, unpackBufferPos, &filename, intBuffer[2], CDI_DATATYPE_TXT, context);
  serializeUnpack(unpackBuffer, unpackBufferSize, unpackBufferPos, &d, 1, CDI_DATATYPE_UINT32, context);
  xassert(d == cdiCheckSum(CDI_DATATYPE_TXT, intBuffer[2], filename));
  int targetStreamID = namespaceAdaptKey(intBuffer[0], originNamespace),
      streamID = streamOpenID(filename, 'w', intBuffer[1], targetStreamID);
  xassert(streamID >= 0 && targetStreamID == streamID);
  streamDefByteorder(streamID, intBuffer[4]);
  streamDefCompType(streamID, intBuffer[5]);
  streamDefCompLevel(streamID, intBuffer[6]);
  stream_t *streamptr = stream_to_pointer(streamID);
  streamptr->unreduced = intBuffer[7];
  streamptr->sortname = intBuffer[8];
  streamptr->have_missval = intBuffer[9];
  struct streamAssoc retval = { streamID, intBuffer[3] };
  return retval;
}


/* *
 * This function does not really close the memio,
 * this has to be done outside cdi to access the memory buffer*/
void
freePtrAfterNCMem(stream_t *streamptr, int recordBufIsToBeDeleted)
{
  int fileID = streamptr->fileID;

  if (fileID == CDI_UNDEFID)
    {
      Warning("File %s not open!", streamptr->filename);
      return;
    }

  if (streamptr->ntsteps == 0 && streamptr->tsteps != NULL)
    {
      if (streamptr->tsteps[0].records)
        {
          Free(streamptr->tsteps[0].records);
          streamptr->tsteps[0].records = NULL;
        }
      if (streamptr->tsteps[0].recIDs)
        {
          Free(streamptr->tsteps[0].recIDs);
          streamptr->tsteps[0].recIDs = NULL;
        }
    }
}


void
streamCloseNCMem(int streamID)
{
  stream_t *streamptr = stream_to_pointer(streamID);

  bool lockIO = streamptr->lockIO;
  if (lockIO) CDI_IO_LOCK();

  if (CDI_Debug) Message("streamID = %d filename = %s", streamID, streamptr->filename);
  streamDestroyViaDelegate(streamptr,freePtrAfterNCMem);
  reshRemove(streamID, &streamOps);
  if (CDI_Debug) Message("Removed stream %d from stream list", streamID);

  if (lockIO) CDI_IO_UNLOCK();
}

int
streamOpenReadNCMem(int ncidp)
{
  return streamOpenNCMem(ncidp, 'r');
}
int
streamOpenWriteNCMem(int ncidp)
{
  return streamOpenNCMem(ncidp, 'w');
}

/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */