-
Uwe Schulzweida authoredUwe Schulzweida authored
stream.c 57.09 KiB
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 600
#endif
#ifdef HAVE_LIBFDB5
#include "cdi_fdb.h"
#endif
#include <sys/stat.h> // struct stat
#include <ctype.h>
#include <string.h>
#include "binary.h"
#include "cdi.h"
#include "cdi_across.h"
#include "cdi_int.h"
#include "cdi_cksum.h"
#include "cdf.h"
#include "dmemory.h"
#include "error.h"
#include "stream_cgribex.h"
#include "stream_grb.h"
#include "stream_cdf.h"
#include "stream_srv.h"
#include "stream_ext.h"
#include "stream_ieg.h"
#include "file.h"
#include "cgribex.h"
#include "gribapi.h"
#include "vlist.h"
#include "serialize.h"
#include "resource_handle.h"
#include "resource_unpack.h"
#include "namespace.h"
#include "async_worker.h"
static stream_t *stream_new_entry(int resH);
static int streamCompareP(void *streamptr1, void *streamptr2);
static void streamDestroyP(void *streamptr);
static void streamPrintP(void *streamptr, FILE *fp);
static int streamGetPackSize(void *streamptr, void *context);
static void streamPack(void *streamptr, void *buff, int size, int *position, void *context);
static int streamTxCode(void *streamptr);
const resOps streamOps = { streamCompareP, streamDestroyP, streamPrintP, streamGetPackSize, streamPack, streamTxCode };
static int
getByteorder(int byteswap)
{
int byteorder = -1;
switch (HOST_ENDIANNESS)
{
case CDI_BIGENDIAN: byteorder = byteswap ? CDI_LITTLEENDIAN : CDI_BIGENDIAN; break;
case CDI_LITTLEENDIAN: byteorder = byteswap ? CDI_BIGENDIAN : CDI_LITTLEENDIAN; break;
/* FIXME: does not currently adjust for PDP endianness */
case CDI_PDPENDIAN:
default: Error("unhandled endianness");
}
return byteorder;
}
// used also in CDO
int
cdiGetFiletype(const char *uri, int *byteorder)
{
// clang-format off
int filetype = CDI_EUFTYPE;
int swap = 0;
int version;
long recpos;
const char *filename;
int protocol = cdiGetProtocol(uri, &filename);
switch (protocol)
{
case CDI_PROTOCOL_ACROSS: return CDI_FILETYPE_GRB2;
case CDI_PROTOCOL_FDB: return CDI_FILETYPE_GRB2;
case CDI_PROTOCOL_OTHER: return CDI_FILETYPE_NC4; // support for NetCDF remote types and ESDM
case CDI_PROTOCOL_FILE:
// handled below;
break;
}
int fileID = fileOpen(filename, "r");
if (fileID == CDI_UNDEFID) return CDI_ESYSTEM;
else if (fileID == -2) return CDI_ETMOF;
char buffer[8];
if (fileRead(fileID, buffer, 8) != 8)
{
struct stat buf;
if (stat(filename, &buf) == 0)
{
if (buf.st_size == 0) return CDI_EISEMPTY;
if (buf.st_mode & S_IFDIR) return CDI_EISDIR;
}
return CDI_EUFTYPE;
}
fileRewind(fileID);
if (memcmp(buffer, "GRIB", 4) == 0)
{
version = buffer[7];
if (version <= 1) filetype = CDI_FILETYPE_GRB;
else if (version == 2) filetype = CDI_FILETYPE_GRB2;
}
else if (memcmp(buffer, "CDF\001", 4) == 0) { filetype = CDI_FILETYPE_NC; }
else if (memcmp(buffer, "CDF\002", 4) == 0) { filetype = CDI_FILETYPE_NC2; }
else if (memcmp(buffer, "CDF\005", 4) == 0) { filetype = CDI_FILETYPE_NC5; }
else if (memcmp(buffer + 1, "HDF", 3) == 0) { filetype = CDI_FILETYPE_NC4; }
#ifdef HAVE_LIBSERVICE
else if (srvCheckFiletype(fileID, &swap)) { filetype = CDI_FILETYPE_SRV; }
#endif
#ifdef HAVE_LIBEXTRA
else if (extCheckFiletype(fileID, &swap)) { filetype = CDI_FILETYPE_EXT; }
#endif
#ifdef HAVE_LIBIEG
else if (iegCheckFiletype(fileID, &swap)) { filetype = CDI_FILETYPE_IEG; }
#endif
#ifdef HAVE_LIBGRIB
else if (gribCheckSeek(fileID, &recpos, &version) == 0)
{
if (version <= 1) filetype = CDI_FILETYPE_GRB;
else if (version == 2) filetype = CDI_FILETYPE_GRB2;
}
#endif
// clang-format on
if (CDI_Debug && filetype != CDI_EUFTYPE) Message("found %s file = %s", strfiletype(filetype), filename);
fileClose(fileID);
*byteorder = getByteorder(swap);
return filetype;
}
/*
@Function streamInqFiletype
@Title Get the filetype
@Prototype int streamInqFiletype(int streamID)
@Parameter
@Item streamID Stream ID, from a previous call to @fref{streamOpenRead} or @fref{streamOpenWrite}.
@Description
The function @func{streamInqFiletype} returns the filetype of a stream.
@Result
@func{streamInqFiletype} returns the type of the file format,
one of the set of predefined CDI file format types.
The valid CDI file format types are @func{CDI_FILETYPE_GRB}, @func{CDI_FILETYPE_GRB2}, @func{CDI_FILETYPE_NC},
@func{CDI_FILETYPE_NC2}, @func{CDI_FILETYPE_NC4}, @func{CDI_FILETYPE_NC4C}, @func{CDI_FILETYPE_NC5},
@func{CDI_FILETYPE_NCZARR}, @func{CDI_FILETYPE_SRV}, @func{CDI_FILETYPE_EXT} and @func{CDI_FILETYPE_IEG}.
@EndFunction
*/
int
streamInqFiletype(int streamID)
{
stream_t *streamptr = stream_to_pointer(streamID);
return streamptr->filetype;
}
int
getByteswap(int byteorder)
{
int byteswap = -1;
switch (byteorder)
{
case CDI_BIGENDIAN:
case CDI_LITTLEENDIAN:
case CDI_PDPENDIAN: byteswap = (HOST_ENDIANNESS != byteorder); break;
case -1: break;
default: Error("unexpected byteorder %d query!", byteorder);
}
return byteswap;
}
void
streamDefMaxSteps(int streamID, int maxSteps)
{
if (maxSteps >= 0)
{
stream_t *streamptr = stream_to_pointer(streamID);
streamptr->maxSteps = maxSteps;
}
}
int
streamInqNumSteps(int streamID)
{
stream_t *streamptr = stream_to_pointer(streamID);
long ntsteps = streamptr->ntsteps;
if (ntsteps == (long) CDI_UNDEFID)
{
int tsID = 0;
while (streamInqTimestep(streamID, tsID++)) ntsteps = streamptr->ntsteps;
}
return (int) ntsteps;
}
static long
get_max_global_recs(stream_t *streamptr)
{
long maxGlobalRecs = -1;
const tsteps_t *tsteps = streamptr->tsteps;
if (tsteps)
{
maxGlobalRecs = tsteps[0].nrecs;
long numSteps = streamptr->ntsteps;
if (numSteps > 1) maxGlobalRecs += tsteps[1].nrecs * (numSteps - 1);
}
return maxGlobalRecs;
}
void
streamDefNumWorker(int streamID, int numWorker)
{
if (numWorker > 0)
{
stream_t *streamptr = stream_to_pointer(streamID);
int filetype = streamptr->filetype;
if (streamptr->filemode == 'r')
{
if (cdiBaseFiletype(filetype) == CDI_FILETYPE_GRIB)
{
(void) streamInqNumSteps(streamID);
streamptr->maxGlobalRecs = get_max_global_recs(streamptr);
}
#ifdef HAVE_LIBNETCDF
else if (filetype == CDI_FILETYPE_NCZARR || (CDI_Test && cdiBaseFiletype(filetype) == CDI_FILETYPE_NETCDF))
{
streamptr->maxGlobalRecs = get_max_global_recs(streamptr);
if (CDI_Test) Message("numWorker=%d", numWorker);
if (CDI_Test) Message("maxGlobalRecs=%ld", streamptr->maxGlobalRecs);
if (streamptr->maxGlobalRecs == -1) xabort("Internal error: number of timesteps missing!");
if (streamptr->maxGlobalRecs == 1) numWorker = 0;
if (numWorker > streamptr->maxGlobalRecs) numWorker = (int) streamptr->maxGlobalRecs;
if (streamptr->chunkSizeTdim > 1 && numWorker > streamptr->nvars) numWorker = streamptr->nvars;
if (streamptr->chunkSizeZdim > 1) numWorker = 0;
if (CDI_Test) Message("chunkSizeTdim=%d chunkSizeZdim=%d", streamptr->chunkSizeTdim, streamptr->chunkSizeZdim);
}
#endif
else
{
numWorker = 0;
}
streamptr->numWorker = numWorker;
if (CDI_Debug || CDI_Test) Message("Number of asynchronous worker: %d", numWorker);
}
}
}
/*
@Function streamDefByteorder
@Title Define the byte order
@Prototype void streamDefByteorder(int streamID, int byteorder)
@Parameter
@Item streamID Stream ID, from a previous call to @fref{streamOpenWrite}.
@Item byteorder The byte order of a dataset, one of the CDI constants @func{CDI_BIGENDIAN} and
@func{CDI_LITTLEENDIAN}.
@Description
The function @func{streamDefByteorder} defines the byte order of a binary dataset
with the file format type @func{CDI_FILETYPE_SRV}, @func{CDI_FILETYPE_EXT} or @func{CDI_FILETYPE_IEG}.
@EndFunction
*/
void
streamDefByteorder(int streamID, int byteorder)
{
stream_t *streamptr = stream_to_pointer(streamID);
streamptr->byteorder = byteorder;
int filetype = streamptr->filetype;
switch (filetype)
{
#ifdef HAVE_LIBSERVICE
case CDI_FILETYPE_SRV:
{
srvrec_t *srvp = (srvrec_t *) streamptr->record->objectp;
srvp->byteswap = getByteswap(byteorder);
break;
}
#endif
#ifdef HAVE_LIBEXTRA
case CDI_FILETYPE_EXT:
{
extrec_t *extp = (extrec_t *) streamptr->record->objectp;
extp->byteswap = getByteswap(byteorder);
break;
}
#endif
#ifdef HAVE_LIBIEG
case CDI_FILETYPE_IEG:
{
iegrec_t *iegp = (iegrec_t *) streamptr->record->objectp;
iegp->byteswap = getByteswap(byteorder);
break;
}
#endif
}
reshSetStatus(streamID, &streamOps, RESH_DESYNC_IN_USE);
}
/*
@Function streamInqByteorder
@Title Get the byte order
@Prototype int streamInqByteorder(int streamID)
@Parameter
@Item streamID Stream ID, from a previous call to @fref{streamOpenRead} or @fref{streamOpenWrite}.
@Description
The function @func{streamInqByteorder} returns the byte order of a binary dataset
with the file format type @func{CDI_FILETYPE_SRV}, @func{CDI_FILETYPE_EXT} or @func{CDI_FILETYPE_IEG}.
@Result
@func{streamInqByteorder} returns the type of the byte order.
The valid CDI byte order types are @func{CDI_BIGENDIAN} and @func{CDI_LITTLEENDIAN}
@EndFunction
*/
int
streamInqByteorder(int streamID)
{
stream_t *streamptr = stream_to_pointer(streamID);
return streamptr->byteorder;
}
const char *
streamFilesuffix(int filetype)
{
static const char noSuffix[] = "";
static const char ncSuffix[] = ".nc";
static const char grbSuffix[] = ".grb";
static const char srvSuffix[] = ".srv";
static const char extSuffix[] = ".ext";
static const char iegSuffix[] = ".ieg";
switch (cdiBaseFiletype(filetype))
{
case CDI_FILETYPE_GRIB: return grbSuffix;
case CDI_FILETYPE_SRV: return srvSuffix;
case CDI_FILETYPE_EXT: return extSuffix;
case CDI_FILETYPE_IEG: return iegSuffix;
case CDI_FILETYPE_NETCDF: return ncSuffix;
default: return noSuffix;
}
}
const char *
streamFilename(int streamID)
{
stream_t *streamptr = stream_to_pointer(streamID);
return streamptr->filename;
}
static long
cdiInqContents(stream_t *streamptr)
{
if (streamptr->lockIO) CDI_IO_LOCK();
long status = 0;
int filetype = streamptr->filetype;
switch (cdiBaseFiletype(filetype))
{
#ifdef HAVE_LIBGRIB
case CDI_FILETYPE_GRIB:
{
switch (streamptr->protocol)
{
#ifdef HAVE_LIBFDB5
case CDI_PROTOCOL_FDB: status = fdbInqContents(streamptr); break;
#endif
case CDI_PROTOCOL_ACROSS: // TODO read from ACROSS
case CDI_PROTOCOL_OTHER:
case CDI_PROTOCOL_FILE: status = grbInqContents(streamptr); break;
}
break;
}
#endif
#ifdef HAVE_LIBSERVICE
case CDI_FILETYPE_SRV: status = srvInqContents(streamptr); break;
#endif
#ifdef HAVE_LIBEXTRA
case CDI_FILETYPE_EXT: status = extInqContents(streamptr); break;
#endif
#ifdef HAVE_LIBIEG
case CDI_FILETYPE_IEG: status = iegInqContents(streamptr); break;
#endif
#ifdef HAVE_LIBNETCDF
case CDI_FILETYPE_NETCDF: status = cdfInqContents(streamptr); break;
#endif
default:
{
status = CDI_ELIBNAVAIL;
if (CDI_Debug) Message("%s support not compiled in!", strfiletype(filetype));
}
}
if (streamptr->lockIO) CDI_IO_UNLOCK();
if (status == 0)
{
int taxisID = vlistInqTaxis(streamptr->vlistID);
if (taxisID != CDI_UNDEFID)
{
taxis_t *taxisptr1 = &streamptr->tsteps[0].taxis;
taxis_t *taxisptr2 = taxisPtr(taxisID);
ptaxisCopy(taxisptr2, taxisptr1);
}
}
return status;
}
int
cdiGetProtocol(const char *uri, const char **filename)
{
const char *pos = strstr(uri, "://");
if (pos == NULL)
{
*filename = uri;
return CDI_PROTOCOL_FILE;
}
size_t protocollen = (size_t) (pos - uri);
*filename = pos + 3;
// if (strncmp(uri, "file", protocollen) == 0) return CDI_PROTOCOL_FILE; // file is already used in NetCDF
if (strncmp(uri, "fdb", protocollen) == 0) return CDI_PROTOCOL_FDB;
if (strncmp(uri, "across", protocollen) == 0) return CDI_PROTOCOL_ACROSS;
*filename = uri;
return CDI_PROTOCOL_OTHER;
}
int
cdiStreamOpenDefaultDelegate(const char *uri, char filemode, int filetype, stream_t *streamptr, int recordBufIsToBeCreated)
{
int fileID;
char temp[2] = { filemode, 0 };
const char *filename;
streamptr->protocol = cdiGetProtocol(uri, &filename);
switch (streamptr->protocol)
{
case CDI_PROTOCOL_ACROSS:
#if defined(HAVE_ACROSS) && defined(HAVE_LIBGRIB_API)
if (filetype != CDI_FILETYPE_GRB2)
{
Warning("ACROSS needs to be used with GRIB2");
return CDI_EUFTYPE;
}
fileID = across_connect(filename, filemode, streamptr);
if (fileID >= 0)
{
streamptr->filetype = filetype;
if (recordBufIsToBeCreated)
{
streamptr->record = (Record *) Malloc(sizeof(Record));
streamptr->record->buffer = NULL;
}
}
return fileID;
#else
#ifdef HAVE_ACROSS
Warning("ecCodes support not compiled in (Needed for ACROSS)!");
#else
Warning("ACROSS support not compiled in!");
#endif
return CDI_ELIBNAVAIL;
#endif
case CDI_PROTOCOL_FDB:
#if defined(HAVE_LIBFDB5) && defined(HAVE_LIBGRIB_API)
if (filetype != CDI_FILETYPE_GRB && filetype != CDI_FILETYPE_GRB2)
{
Warning("FDB5 needs to be used with GRIB or GRIB2");
return CDI_EUFTYPE;
}
check_fdb_error(fdb_initialise());
check_fdb_error(fdb_new_handle((fdb_handle_t **) &(streamptr->protocolData)));
streamptr->filetype = filetype;
if (recordBufIsToBeCreated)
{
streamptr->record = (Record *) Malloc(sizeof(Record));
streamptr->record->buffer = NULL;
}
return 88;
#else // !(defined(HAVE_LIBFDB5) && defined(HAVE_LIBGRIB_API))
#ifdef HAVE_LIBFDB5
Warning("ecCodes support not compiled in (Needed for FDB5)!");
#else
Warning("FDB5 support not compiled in!");
#endif
return CDI_ELIBNAVAIL;
#endif
case CDI_PROTOCOL_OTHER:
case CDI_PROTOCOL_FILE:
// handled below;
break;
}
switch (filetype)
{
#if defined(HAVE_LIBGRIB) && (defined(HAVE_LIBCGRIBEX) || defined(HAVE_LIBGRIB_API))
case CDI_FILETYPE_GRB:
{
fileID = gribOpen(filename, temp);
if (fileID < 0) return CDI_ESYSTEM;
if (recordBufIsToBeCreated)
{
streamptr->record = (Record *) Malloc(sizeof(Record));
streamptr->record->buffer = NULL;
#ifdef HAVE_LIBCGRIBEX
streamptr->record->objectp = cgribexNew();
#else
streamptr->record->objectp = NULL;
#endif
}
break;
}
#ifdef HAVE_LIBGRIB_API
case CDI_FILETYPE_GRB2:
{
fileID = gribOpen(filename, temp);
if (fileID < 0) return CDI_ESYSTEM;
if (recordBufIsToBeCreated)
{
streamptr->record = (Record *) Malloc(sizeof(Record));
streamptr->record->buffer = NULL;
}
break;
}
#endif
#endif
#ifdef HAVE_LIBSERVICE
case CDI_FILETYPE_SRV:
{
fileID = fileOpen(filename, temp);
if (fileID < 0) return CDI_ESYSTEM;
if (recordBufIsToBeCreated)
{
streamptr->record = (Record *) Malloc(sizeof(Record));
streamptr->record->buffer = NULL;
streamptr->record->objectp = srvNew();
}
break;
}
#endif
#ifdef HAVE_LIBEXTRA
case CDI_FILETYPE_EXT:
{
fileID = fileOpen(filename, temp);
if (fileID < 0) return CDI_ESYSTEM;
if (recordBufIsToBeCreated)
{
streamptr->record = (Record *) Malloc(sizeof(Record));
streamptr->record->buffer = NULL;
streamptr->record->objectp = extNew();
}
break;
}
#endif
#ifdef HAVE_LIBIEG
case CDI_FILETYPE_IEG:
{
fileID = fileOpen(filename, temp);
if (fileID < 0) return CDI_ESYSTEM;
if (recordBufIsToBeCreated)
{
streamptr->record = (Record *) Malloc(sizeof(Record));
streamptr->record->buffer = NULL;
streamptr->record->objectp = iegNew();
}
break;
}
#endif
#ifdef HAVE_LIBNETCDF
case CDI_FILETYPE_NC:
case CDI_FILETYPE_NC2:
case CDI_FILETYPE_NC5:
{
fileID = cdfOpen(filename, temp, filetype);
break;
}
case CDI_FILETYPE_NC4:
case CDI_FILETYPE_NC4C:
case CDI_FILETYPE_NCZARR:
{
fileID = cdf4Open(filename, temp, &filetype);
break;
}
#endif
default:
{
if (CDI_Debug) Message("%s support not compiled in!", strfiletype(filetype));
return CDI_ELIBNAVAIL;
}
}
streamptr->filetype = filetype;
return fileID;
}
static long
stream_create_vlist(stream_t *streamptr, CdiQuery *query)
{
int vlistID = vlistCreate();
if (vlistID < 0) return CDI_ELIMIT;
cdiVlistMakeInternal(vlistID);
streamptr->vlistID = vlistID;
if (query) streamptr->query = query;
long status = cdiInqContents(streamptr);
if (status >= 0)
{
vlist_t *vlistptr = vlist_to_pointer(streamptr->vlistID);
vlistptr->ntsteps = streamptr->ntsteps;
cdiVlistMakeImmutable(vlistID);
}
return status;
}
int
streamOpenID(const char *filename, char filemode, int filetype, int resH)
{
if (CDI_Debug) Message("Open %s mode %c file %s", strfiletype(filetype), filemode, filename ? filename : "(NUL)");
if (!filename || filetype < 0) return CDI_EINVAL;
stream_t *streamptr = stream_new_entry(resH);
int streamID = CDI_ESYSTEM;
#ifndef HAVE_NC4HDF5_THREADSAFE
if (CDI_Threadsafe)
{
#ifdef HAVE_LIBPTHREAD
if (filetype == CDI_FILETYPE_NC4 || filetype == CDI_FILETYPE_NC4C || filetype == CDI_FILETYPE_NCZARR)
streamptr->lockIO = true;
#else
static bool lwarn = true;
if (lwarn)
{
lwarn = false;
Warning("CDI threadsafe failed, pthread support not compiled in!");
}
#endif
}
#endif
if (streamptr->lockIO) CDI_IO_LOCK();
int (*streamOpenDelegate)(const char *filename, char filemode, int filetype, stream_t *streamptr, int recordBufIsToBeCreated)
= (int (*)(const char *, char, int, stream_t *, int)) namespaceSwitchGet(NSSWITCH_STREAM_OPEN_BACKEND).func;
int fileID = streamOpenDelegate(filename, filemode, filetype, streamptr, 1);
if (fileID < 0)
{
streamID = fileID;
if (streamptr->record) Free(streamptr->record);
reshRemove(streamptr->self, &streamOps);
Free(streamptr);
}
else
{
streamID = streamptr->self;
if (streamID < 0) return CDI_ELIMIT;
streamptr->filemode = filemode;
streamptr->filename = strdup(filename);
streamptr->fileID = fileID;
}
if (streamptr->lockIO) CDI_IO_UNLOCK();
return streamID;
}
int
streamOpenNCMem(int ncidp, char mode)
{
cdiInitialize();
stream_t *streamptr = stream_new_entry(CDI_UNDEFID);
int streamID = CDI_ESYSTEM;
#ifdef HAVE_NETCDF4
streamID = streamptr->self;
if (streamID < 0) return CDI_ELIMIT;
size_t max_file_path_length = 128;
size_t actual_length = 0;
char *filename = (char *) malloc(sizeof(char) * max_file_path_length);
nc_inq_path(ncidp, &actual_length, filename);
streamptr->filename = strdup(filename);
streamptr->filemode = mode;
streamptr->filetype = CDI_FILETYPE_NC4;
streamptr->fileID = ncidp;
if (mode == 'r')
{
int status = stream_create_vlist(streamptr, NULL);
if (status < 0)
{
streamID = status;
if (streamptr->record) Free(streamptr->record);
reshRemove(streamptr->self, &streamOps);
}
}
#endif
return streamID;
}
static int
streamOpen(const char *filename, const char *filemode, int filetype)
{
if (!filemode || strlen(filemode) != 1) return CDI_EINVAL;
return streamOpenID(filename, (char) tolower(filemode[0]), filetype, CDI_UNDEFID);
}
static int
streamOpenA(const char *filename, const char *filemode, int filetype)
{
if (CDI_Debug) Message("Open %s file (mode=%c); filename: %s", strfiletype(filetype), (int) *filemode, filename);
if (CDI_Debug) printf("streamOpenA: %s\n", filename); // seg fault without this line on thunder/squall with "cdo cat x y"
if (!filename || !filemode || filetype < 0) return CDI_EINVAL;
stream_t *streamptr = stream_new_entry(CDI_UNDEFID);
int fileID = CDI_UNDEFID;
{
int (*streamOpenDelegate)(const char *filename, char filemode, int filetype, stream_t *streamptr, int recordBufIsToBeCreated)
= (int (*)(const char *, char, int, stream_t *, int)) namespaceSwitchGet(NSSWITCH_STREAM_OPEN_BACKEND).func;
fileID = streamOpenDelegate(filename, 'r', filetype, streamptr, 1);
}
if (fileID == CDI_UNDEFID || fileID == CDI_ELIBNAVAIL || fileID == CDI_ESYSTEM) return fileID;
int streamID = streamptr->self;
streamptr->filemode = tolower(*filemode);
streamptr->filename = strdup(filename);
streamptr->fileID = fileID;
streamptr->vlistID = vlistCreate();
cdiVlistMakeInternal(streamptr->vlistID);
// cdiReadByteorder(streamID);
long status = cdiInqContents(streamptr);
if (status < 0) return (int) status;
vlist_t *vlistptr = vlist_to_pointer(streamptr->vlistID);
vlistptr->ntsteps = streamInqNumSteps(streamID);
// Needed for NetCDF4
for (int varID = 0; varID < vlistptr->nvars; ++varID) streamptr->vars[varID].defmiss = true;
if (str_is_equal(filemode, "r")) cdiVlistMakeImmutable(streamptr->vlistID);
{
void (*streamCloseDelegate)(stream_t *streamptr, int recordBufIsToBeDeleted)
= (void (*)(stream_t *, int)) namespaceSwitchGet(NSSWITCH_STREAM_CLOSE_BACKEND).func;
streamCloseDelegate(streamptr, 0);
}
switch (filetype)
{
#if defined(HAVE_LIBGRIB) && (defined(HAVE_LIBCGRIBEX) || defined(HAVE_LIBGRIB_API))
case CDI_FILETYPE_GRB:
#ifdef HAVE_LIBGRIB_API
case CDI_FILETYPE_GRB2:
#endif
{
fileID = gribOpen(filename, filemode);
if (fileID != CDI_UNDEFID) gribContainersNew(streamptr);
break;
}
#endif
#ifdef HAVE_LIBSERVICE
case CDI_FILETYPE_SRV:
{
fileID = fileOpen(filename, filemode);
break;
}
#endif
#ifdef HAVE_LIBEXTRA
case CDI_FILETYPE_EXT:
{
fileID = fileOpen(filename, filemode);
break;
}
#endif
#ifdef HAVE_LIBIEG
case CDI_FILETYPE_IEG:
{
fileID = fileOpen(filename, filemode);
break;
}
#endif
#ifdef HAVE_LIBNETCDF
case CDI_FILETYPE_NC:
case CDI_FILETYPE_NC2:
case CDI_FILETYPE_NC5:
{
fileID = cdfOpen(filename, filemode, filetype);
streamptr->ncmode = 2;
break;
}
case CDI_FILETYPE_NC4:
case CDI_FILETYPE_NC4C:
{
fileID = cdf4Open(filename, filemode, &filetype);
streamptr->ncmode = 2;
break;
}
case CDI_FILETYPE_NCZARR:
{
Warning("%s not available in append mode!", strfiletype(filetype));
return CDI_ELIBNAVAIL;
}
#endif
default:
{
if (CDI_Debug) Message("%s support not compiled in!", strfiletype(filetype));
return CDI_ELIBNAVAIL;
}
}
if (fileID == CDI_UNDEFID)
streamID = CDI_UNDEFID;
else
streamptr->fileID = fileID;
return streamID;
}
/*
@Function streamOpenRead
@Title Open a dataset for reading
@Prototype int streamOpenRead(const char *path)
@Parameter
@Item path The name of the dataset to be read.
@Description
The function @func{streamOpenRead} opens an existing dataset for reading.
@Result
Upon successful completion @func{streamOpenRead} returns an identifier to the
open stream. Otherwise, a negative number with the error status is returned.
@Errors
@List
@Item CDI_ESYSTEM Operating system error.
@Item CDI_EINVAL Invalid argument.
@Item CDI_EUFILETYPE Unsupported file type.
@Item CDI_ELIBNAVAIL Library support not compiled in.
@EndList
@Example
Here is an example using @func{streamOpenRead} to open an existing NetCDF
file named @func{foo.nc} for reading:
@Source
#include "cdi.h"
...
int streamID;
...
streamID = streamOpenRead("foo.nc");
if ( streamID < 0 ) handle_error(streamID);
...
@EndSource
@EndFunction
*/
int
streamOpenRead(const char *filename)
{
cdiInitialize();
int byteorder = 0;
int filetype = cdiGetFiletype(filename, &byteorder);
if (filetype < 0) return filetype;
int streamID = streamOpen(filename, "r", filetype);
if (streamID >= 0)
{
stream_t *streamptr = stream_to_pointer(streamID);
streamptr->byteorder = byteorder;
long status = stream_create_vlist(streamptr, NULL);
if (status < 0)
{
streamID = (int) status;
if (streamptr->record) Free(streamptr->record);
reshRemove(streamptr->self, &streamOps);
}
}
return streamID;
}
int
streamOpenReadQuery(const char *filename, CdiQuery *query)
{
cdiInitialize();
int byteorder = 0;
int filetype = cdiGetFiletype(filename, &byteorder);
if (filetype < 0) return filetype;
if (cdiBaseFiletype(filetype) != CDI_FILETYPE_NETCDF && filetype != CDI_FILETYPE_GRB2) return CDI_EQNAVAIL;
int streamID = streamOpen(filename, "r", filetype);
if (streamID >= 0)
{
stream_t *streamptr = stream_to_pointer(streamID);
streamptr->byteorder = byteorder;
long status = stream_create_vlist(streamptr, query);
if (status < 0)
{
streamID = (int) status;
if (streamptr->record) Free(streamptr->record);
reshRemove(streamptr->self, &streamOps);
}
}
return streamID;
}
int
streamOpenAppend(const char *filename)
{
cdiInitialize();
int byteorder = 0;
int filetype = cdiGetFiletype(filename, &byteorder);
if (filetype < 0) return filetype;
int streamID = streamOpenA(filename, "a", filetype);
if (streamID >= 0)
{
stream_t *streamptr = stream_to_pointer(streamID);
streamptr->byteorder = byteorder;
}
return streamID;
}
/*
@Function streamOpenWrite
@Title Create a new dataset
@Prototype int streamOpenWrite(const char *path, int filetype)
@Parameter
@Item path The name of the new dataset.
@Item filetype The type of the file format, one of the set of predefined CDI file format types.
The valid CDI file format types are @func{CDI_FILETYPE_GRB}, @func{CDI_FILETYPE_GRB2}, @func{CDI_FILETYPE_NC},
@func{CDI_FILETYPE_NC2}, @func{CDI_FILETYPE_NC4}, @func{CDI_FILETYPE_NC4C}, @func{CDI_FILETYPE_NC5},
@func{CDI_FILETYPE_NCZARR}, @func{CDI_FILETYPE_SRV}, @func{CDI_FILETYPE_EXT} and @func{CDI_FILETYPE_IEG}.
@Description
The function @func{streamOpenWrite} creates a new datset.
@Result
Upon successful completion @func{streamOpenWrite} returns an identifier to the
open stream. Otherwise, a negative number with the error status is returned.
@Errors
@List
@Item CDI_ESYSTEM Operating system error.
@Item CDI_EINVAL Invalid argument.
@Item CDI_EUFILETYPE Unsupported file type.
@Item CDI_ELIBNAVAIL Library support not compiled in.
@EndList
@Example
Here is an example using @func{streamOpenWrite} to create a new NetCDF file named @func{foo.nc} for writing:
@Source
#include "cdi.h"
...
int streamID;
...
streamID = streamOpenWrite("foo.nc", CDI_FILETYPE_NC);
if ( streamID < 0 ) handle_error(streamID);
...
@EndSource
@EndFunction
*/
int
streamOpenWrite(const char *filename, int filetype)
{
cdiInitialize();
return streamOpen(filename, "w", filetype);
}
static void
streamDefaultValue(stream_t *streamptr)
{
streamptr->self = CDI_UNDEFID;
streamptr->accesstype = CDI_UNDEFID;
streamptr->accessmode = 0;
streamptr->filetype = CDI_FILETYPE_UNDEF;
streamptr->byteorder = CDI_UNDEFID;
streamptr->fileID = 0;
streamptr->filemode = 0;
streamptr->numvals = 0;
streamptr->filename = NULL;
streamptr->record = NULL;
streamptr->query = NULL;
streamptr->varsAllocated = 0;
streamptr->nrecs = 0;
streamptr->nvars = 0;
streamptr->vars = NULL;
streamptr->ncmode = 0;
streamptr->curTsID = CDI_UNDEFID;
streamptr->rtsteps = 0;
streamptr->ntsteps = CDI_UNDEFID;
streamptr->maxSteps = CDI_UNDEFID;
streamptr->tsteps = NULL;
streamptr->tstepsTableSize = 0;
streamptr->tstepsNextID = 0;
streamptr->vlistID = CDI_UNDEFID;
streamptr->globalatts = 0;
streamptr->localatts = 0;
streamptr->unreduced = cdiDataUnreduced;
streamptr->have_missval = cdiHaveMissval;
streamptr->comptype = CDI_COMPRESS_NONE;
streamptr->complevel = 0;
streamptr->shuffle = 0;
streamptr->sortname = (cdiSortName > 0);
streamptr->lockIO = CDI_Lock_IO;
// netcdf4/HDF5 filter
streamptr->filterSpec = NULL;
basetimeInit(&streamptr->basetime);
#ifdef HAVE_LIBNETCDF
streamptr->nc_complex_float_id = CDI_UNDEFID;
streamptr->nc_complex_double_id = CDI_UNDEFID;
for (int i = 0; i < MAX_ZAXES_PS; i++) streamptr->zaxisID[i] = CDI_UNDEFID;
for (int i = 0; i < MAX_ZAXES_PS; i++) streamptr->nczvarID[i] = CDI_UNDEFID;
for (int i = 0; i < MAX_GRIDS_PS; i++)
{
streamptr->ncgrid[i].start = CDI_UNDEFID;
streamptr->ncgrid[i].count = CDI_UNDEFID;
streamptr->ncgrid[i].gridID = CDI_UNDEFID;
for (size_t j = 0; j < CDF_SIZE_ncIDs; ++j) streamptr->ncgrid[i].ncIDs[j] = CDI_UNDEFID;
}
streamptr->ncNumDims = 0;
for (int i = 0; i < MAX_DIMS_PS; i++) streamptr->ncDimID[i] = CDI_UNDEFID;
for (int i = 0; i < MAX_DIMS_PS; i++) streamptr->ncDimLen[i] = 0;
streamptr->vct.ilev = 0;
streamptr->vct.mlev = 0;
streamptr->vct.ilevID = CDI_UNDEFID;
streamptr->vct.mlevID = CDI_UNDEFID;
streamptr->chunkSizeTdim = 0;
streamptr->chunkSizeZdim = 0;
#endif
streamptr->maxGlobalRecs = CDI_UNDEFID;
streamptr->gribContainers = NULL;
streamptr->numWorker = 0;
streamptr->nextGlobalRecId = 0;
streamptr->cachedTsID = -1;
streamptr->jobs = NULL;
streamptr->jobManager = NULL;
streamptr->protocolData = NULL;
#ifdef HAVE_LIBFDB5
streamptr->fdbNumItems = 0;
streamptr->fdbKeyValueList = NULL;
#endif
}
static stream_t *
stream_new_entry(int resH)
{
cdiInitialize(); /* ***************** make MT version !!! */
stream_t *streamptr = (stream_t *) Malloc(sizeof(stream_t));
streamDefaultValue(streamptr);
if (resH == CDI_UNDEFID)
streamptr->self = reshPut(streamptr, &streamOps);
else
{
streamptr->self = resH;
reshReplace(resH, streamptr, &streamOps);
}
return streamptr;
}
void
cdiStreamCloseDefaultDelegate(stream_t *streamptr, int recordBufIsToBeDeleted)
{
int fileID = streamptr->fileID;
int filetype = streamptr->filetype;
if (streamptr->filterSpec)
{
free(streamptr->filterSpec);
streamptr->filterSpec = NULL;
}
switch (streamptr->protocol)
{
case CDI_PROTOCOL_ACROSS:
#ifdef HAVE_ACROSS
if (fileID) across_disconnect(fileID);
if (streamptr->protocolData)
{
Free(((across_info_t *) streamptr->protocolData)->expid);
Free(streamptr->protocolData);
streamptr->protocolData = NULL;
}
#endif
return;
case CDI_PROTOCOL_FDB:
#ifdef HAVE_LIBFDB5
if (streamptr->protocolData) check_fdb_error(fdb_delete_handle(streamptr->protocolData));
streamptr->protocolData = NULL;
#endif
return;
case CDI_PROTOCOL_OTHER:
case CDI_PROTOCOL_FILE:
// handled below;
break;
}
if (fileID == CDI_UNDEFID)
{
Warning("File %s not open!", streamptr->filename);
return;
}
switch (cdiBaseFiletype(filetype))
{
#if defined(HAVE_LIBGRIB) && (defined(HAVE_LIBCGRIBEX) || defined(HAVE_LIBGRIB_API))
case CDI_FILETYPE_GRIB:
if (filetype == CDI_FILETYPE_GRB)
{
gribClose(fileID);
if (recordBufIsToBeDeleted) gribContainersDelete(streamptr);
#ifdef HAVE_LIBCGRIBEX
if (recordBufIsToBeDeleted) cgribexDelete(streamptr->record->objectp);
#endif
}
else if (filetype == CDI_FILETYPE_GRB2)
{
gribClose(fileID);
if (recordBufIsToBeDeleted) gribContainersDelete(streamptr);
}
break;
#endif
#ifdef HAVE_LIBSERVICE
case CDI_FILETYPE_SRV:
{
fileClose(fileID);
if (recordBufIsToBeDeleted) srvDelete(streamptr->record->objectp);
break;
}
#endif
#ifdef HAVE_LIBEXTRA
case CDI_FILETYPE_EXT:
{
fileClose(fileID);
if (recordBufIsToBeDeleted) extDelete(streamptr->record->objectp);
break;
}
#endif
#ifdef HAVE_LIBIEG
case CDI_FILETYPE_IEG:
{
fileClose(fileID);
if (recordBufIsToBeDeleted) iegDelete(streamptr->record->objectp);
break;
}
#endif
#ifdef HAVE_LIBNETCDF
case CDI_FILETYPE_NETCDF:
{
cdfClose(fileID);
if (streamptr->ntsteps == 0 && streamptr->tsteps != NULL)
{
if (streamptr->tsteps[0].records)
{
Free(streamptr->tsteps[0].records);
streamptr->tsteps[0].records = NULL;
}
if (streamptr->tsteps[0].recIDs)
{
Free(streamptr->tsteps[0].recIDs);
streamptr->tsteps[0].recIDs = NULL;
}
}
break;
}
#endif
default:
{
Error("%s support not compiled in (fileID = %d)!", strfiletype(filetype), fileID);
break;
}
}
}
static void
deallocate_sleveltable_t(sleveltable_t *entry)
{
if (entry->recordID) Free(entry->recordID);
if (entry->lindex) Free(entry->lindex);
entry->recordID = NULL;
entry->lindex = NULL;
}
static void
streamDestroyViaDelegate(stream_t *streamptr, void (*streamCloseDelegate)(stream_t *streamptr, int recordBufIsToBeDeleted))
{
xassert(streamptr);
if (streamptr->filetype != CDI_FILETYPE_UNDEF) streamCloseDelegate(streamptr, 1);
if (streamptr->record)
{
if (streamptr->record->buffer) Free(streamptr->record->buffer);
Free(streamptr->record);
streamptr->record = NULL;
}
streamptr->filetype = CDI_FILETYPE_UNDEF;
if (streamptr->filename)
{
Free(streamptr->filename);
streamptr->filename = NULL;
}
if (streamptr->filterSpec) free(streamptr->filterSpec);
if (streamptr->vars)
{
for (int index = 0; index < streamptr->nvars; index++)
{
sleveltable_t *pslev = streamptr->vars[index].recordTable;
unsigned nsub = streamptr->vars[index].subtypeSize >= 0 ? (unsigned) streamptr->vars[index].subtypeSize : 0U;
for (size_t isub = 0; isub < nsub; isub++) deallocate_sleveltable_t(pslev + isub);
if (pslev) Free(pslev);
}
Free(streamptr->vars);
streamptr->vars = NULL;
}
if (streamptr->tsteps)
{
int maxSteps = streamptr->tstepsNextID;
for (int index = 0; index < maxSteps; ++index)
{
tsteps_t *tstep = &(streamptr->tsteps[index]);
if (tstep->records) Free(tstep->records);
if (tstep->recIDs) Free(tstep->recIDs);
taxisDestroyKernel(&(tstep->taxis));
}
Free(streamptr->tsteps);
streamptr->tsteps = NULL;
}
#ifdef HAVE_LIBFDB5
if (streamptr->fdbKeyValueList)
{
cdi_fdb_delete_kvlist(streamptr->fdbNumItems, streamptr->fdbKeyValueList);
streamptr->fdbNumItems = 0;
streamptr->fdbKeyValueList = NULL;
}
#endif
int vlistID = streamptr->vlistID;
if (vlistID != -1)
{
int taxisID = (streamptr->filemode != 'w') ? vlistInqTaxis(vlistID) : -1;
if (taxisID != -1) taxisDestroy(taxisID);
void (*mycdiVlistDestroy_)(int, bool) = (void (*)(int, bool)) namespaceSwitchGet(NSSWITCH_VLIST_DESTROY_).func;
mycdiVlistDestroy_(vlistID, true);
}
if (streamptr->jobs) Free(streamptr->jobs);
if (streamptr->jobManager) AsyncWorker_finalize((AsyncManager *) streamptr->jobManager);
Free(streamptr);
}
static void
streamDestroy(stream_t *streamptr)
{
void (*streamCloseDelegate)(stream_t *streamptr, int recordBufIsToBeDeleted)
= (void (*)(stream_t *, int)) namespaceSwitchGet(NSSWITCH_STREAM_CLOSE_BACKEND).func;
streamDestroyViaDelegate(streamptr, streamCloseDelegate);
}
static void
streamDestroyP(void *streamptr)
{
streamDestroy((stream_t *) streamptr);
}
/*
@Function streamClose
@Title Close an open dataset
@Prototype void streamClose(int streamID)
@Parameter
@Item streamID Stream ID, from a previous call to @fref{streamOpenRead} or @fref{streamOpenWrite}.
@Description
The function @func{streamClose} closes an open dataset.
@EndFunction
*/
void
streamClose(int streamID)
{
stream_t *streamptr = stream_to_pointer(streamID);
bool lockIO = streamptr->lockIO;
if (lockIO) CDI_IO_LOCK();
if (CDI_Debug) Message("streamID = %d filename = %s", streamID, streamptr->filename);
streamDestroy(streamptr);
reshRemove(streamID, &streamOps);
if (CDI_Debug) Message("Removed stream %d from stream list", streamID);
if (lockIO) CDI_IO_UNLOCK();
}
void
cdiStreamSync_(stream_t *streamptr)
{
int fileID = streamptr->fileID;
int filetype = streamptr->filetype;
int vlistID = streamptr->vlistID;
int nvars = vlistNvars(vlistID);
if (fileID == CDI_UNDEFID)
Warning("File %s not open!", streamptr->filename);
else if (vlistID == CDI_UNDEFID)
Warning("Vlist undefined for file %s!", streamptr->filename);
else if (nvars == 0)
Warning("No variables defined!");
else
{
if (streamptr->filemode == 'w' || streamptr->filemode == 'a')
{
switch (cdiBaseFiletype(filetype))
{
#ifdef HAVE_LIBNETCDF
case CDI_FILETYPE_NETCDF:
{
void cdf_sync(int ncid);
if (streamptr->ncmode == 2) cdf_sync(fileID);
break;
}
#endif
default:
{
fileFlush(fileID);
break;
}
}
}
}
}
/*
@Function streamSync
@Title Synchronize an Open Dataset to Disk
@Prototype void streamSync(int streamID)
@Parameter
@Item streamID Stream ID, from a previous call to @fref{streamOpenWrite}.
@Description
The function @func{streamSync} offers a way to synchronize the disk copy of a dataset with in-memory buffers.
@EndFunction
*/
void
streamSync(int streamID)
{
stream_t *streamptr = stream_to_pointer(streamID);
void (*myStreamSync_)(stream_t *streamptr) = (void (*)(stream_t *)) namespaceSwitchGet(NSSWITCH_STREAM_SYNC).func;
myStreamSync_(streamptr);
}
int
cdiStreamDefTimestep_(stream_t *streamptr, int tsID)
{
stream_check_ptr(__func__, streamptr);
if (CDI_Debug) Message("streamID = %d tsID = %d", streamptr->self, tsID);
int vlistID = streamptr->vlistID;
if (vlistID == CDI_UNDEFID)
Error("Must not call streamDefTimestep for stream (ID=%d) with (not yet) defined vlist", streamptr->self);
if (tsID > 0)
{
int newtsID = tstepsNewEntry(streamptr);
if (tsID != newtsID) Error("Internal problem: tsID = %d newtsID = %d", tsID, newtsID);
}
int taxisID = vlistInqTaxis(vlistID);
if (taxisID != CDI_UNDEFID) ptaxisCopy(&streamptr->tsteps[tsID].taxis, taxisPtr(taxisID));
streamptr->curTsID = tsID;
streamptr->ntsteps = tsID + 1;
#ifdef HAVE_LIBNETCDF
int timeIsVarying = vlistHasTime(vlistID);
if (cdiBaseFiletype(streamptr->filetype) == CDI_FILETYPE_NETCDF && timeIsVarying)
{
/* usually points to cdfDefTimestep in serial mode but
* to cdiPioCdfDefTimestep on servers and to a null-op on
* clients in client/server mode */
void (*myCdfDefTimestep)(stream_t *streamptr, int tsID, size_t)
= (void (*)(stream_t *, int, size_t)) namespaceSwitchGet(NSSWITCH_CDF_DEF_TIMESTEP).func;
myCdfDefTimestep(streamptr, tsID, 1);
}
#endif
cdi_create_records(streamptr, tsID);
return (int) streamptr->ntsteps;
}
/*
@Function streamDefTimestep
@Title Define a timestep
@Prototype int streamDefTimestep(int streamID, int tsID)
@Parameter
@Item streamID Stream ID, from a previous call to @fref{streamOpenWrite}.
@Item tsID Timestep identifier.
@Description
The function @func{streamDefTimestep} defines a timestep of a stream by the identifier tsID.
The identifier tsID is the timestep index starting at 0 for the first timestep.
Before calling this function the functions @func{taxisDefVdate} and @func{taxisDefVtime} should be used
to define the timestamp for this timestep. All calls to write the data refer to this timestep.
@Result
@func{streamDefTimestep} returns the number of expected records of the timestep.
@EndFunction
*/
int
streamDefTimestep(int streamID, int tsID)
{
stream_t *streamptr = stream_to_pointer(streamID);
if (streamptr->lockIO) CDI_IO_LOCK();
int (*myStreamDefTimestep_)(stream_t *streamptr, int tsID)
= (int (*)(stream_t *, int)) namespaceSwitchGet(NSSWITCH_STREAM_DEF_TIMESTEP_).func;
int status = myStreamDefTimestep_(streamptr, tsID);
if (streamptr->lockIO) CDI_IO_UNLOCK();
return status;
}
int
streamInqCurTimestepID(int streamID)
{
stream_t *streamptr = stream_to_pointer(streamID);
return streamptr->curTsID;
}
/*
@Function streamInqTimestep
@Title Get timestep information
@Prototype int streamInqTimestep(int streamID, int tsID)
@Parameter
@Item streamID Stream ID, from a previous call to @fref{streamOpenRead} or @fref{streamOpenWrite}.
@Item tsID Timestep identifier.
@Description
The function @func{streamInqTimestep} sets the next timestep to the identifier tsID.
The identifier tsID is the timestep index starting at 0 for the first timestep.
After a call to this function the functions @func{taxisInqVdate} and @func{taxisInqVtime} can be used
to read the timestamp for this timestep. All calls to read the data refer to this timestep.
@Result
@func{streamInqTimestep} returns the number of records of the timestep or 0, if the end of the file is reached.
@EndFunction
*/
int
streamInqTimestep(int streamID, int tsID)
{
int nrecs = 0;
stream_t *streamptr = stream_to_pointer(streamID);
int vlistID = streamptr->vlistID;
if (tsID < streamptr->ntsteps) streamptr->tsteps[tsID].curRecID = CDI_UNDEFID; // fix for netCDF
if (tsID < streamptr->rtsteps)
{
streamptr->curTsID = tsID;
nrecs = streamptr->tsteps[tsID].nrecs;
streamptr->tsteps[tsID].curRecID = CDI_UNDEFID;
int taxisID = vlistInqTaxis(vlistID);
if (taxisID == -1) Error("Timestep undefined for fileID = %d", streamID);
ptaxisCopy(taxisPtr(taxisID), &streamptr->tsteps[tsID].taxis);
return nrecs;
}
if (tsID >= streamptr->ntsteps && streamptr->ntsteps != CDI_UNDEFID) return 0;
int filetype = streamptr->filetype;
if (CDI_Debug) Message("streamID = %d tsID = %d filetype = %d", streamID, tsID, filetype);
if (streamptr->lockIO) CDI_IO_LOCK();
switch (cdiBaseFiletype(filetype))
{
#ifdef HAVE_LIBGRIB
case CDI_FILETYPE_GRIB:
{
switch (streamptr->protocol)
{
case CDI_PROTOCOL_FDB: nrecs = fdbInqTimestep(streamptr, tsID); break;
case CDI_PROTOCOL_ACROSS: // TODO read from ACROSS
case CDI_PROTOCOL_OTHER:
case CDI_PROTOCOL_FILE: nrecs = grbInqTimestep(streamptr, tsID); break;
}
break;
}
#endif
#ifdef HAVE_LIBSERVICE
case CDI_FILETYPE_SRV:
{
nrecs = srvInqTimestep(streamptr, tsID);
break;
}
#endif
#ifdef HAVE_LIBEXTRA
case CDI_FILETYPE_EXT:
{
nrecs = extInqTimestep(streamptr, tsID);
break;
}
#endif
#ifdef HAVE_LIBIEG
case CDI_FILETYPE_IEG:
{
nrecs = iegInqTimestep(streamptr, tsID);
break;
}
#endif
#ifdef HAVE_LIBNETCDF
case CDI_FILETYPE_NETCDF:
{
nrecs = cdfInqTimestep(streamptr, tsID);
break;
}
#endif
default:
{
Error("%s support not compiled in!", strfiletype(filetype));
break;
}
}
if (streamptr->lockIO) CDI_IO_UNLOCK();
int taxisID = vlistInqTaxis(vlistID);
if (taxisID == -1) Error("Timestep undefined for fileID = %d", streamID);
ptaxisCopy(taxisPtr(taxisID), &streamptr->tsteps[tsID].taxis);
return nrecs;
}
// This function is used in CDO!
SizeType
streamNvals(int streamID)
{
stream_t *streamptr = stream_to_pointer(streamID);
return streamptr->numvals;
}
/*
@Function streamDefVlist
@Title Define the variable list
@Prototype void streamDefVlist(int streamID, int vlistID)
@Parameter
@Item streamID Stream ID, from a previous call to @fref{streamOpenWrite}.
@Item vlistID Variable list ID, from a previous call to @fref{vlistCreate}.
@Description
The function @func{streamDefVlist} defines the variable list of a stream.
To safeguard against errors by modifying the wrong vlist object,
this function makes the passed vlist object immutable.
All further vlist changes have to use the vlist object returned by streamInqVlist().
@EndFunction
*/
void
streamDefVlist(int streamID, int vlistID)
{
void (*myStreamDefVlist)(int streamID, int vlistID) = (void (*)(int, int)) namespaceSwitchGet(NSSWITCH_STREAM_DEF_VLIST_).func;
myStreamDefVlist(streamID, vlistID);
}
// The single image implementation of streamDefVlist
void
cdiStreamDefVlist_(int streamID, int vlistID)
{
stream_t *streamptr = stream_to_pointer(streamID);
if (streamptr->vlistID == CDI_UNDEFID)
{
if (streamptr->lockIO) CDI_IO_LOCK();
int vlistCopy = vlistDuplicate(vlistID);
cdiVlistMakeInternal(vlistCopy);
cdiVlistMakeImmutable(vlistID);
cdiStreamSetupVlist(streamptr, vlistCopy);
if (streamptr->lockIO) CDI_IO_UNLOCK();
}
else
Warning("vlist already defined for %s!", streamptr->filename);
}
/*
@Function streamInqVlist
@Title Get the variable list
@Prototype int streamInqVlist(int streamID)
@Parameter
@Item streamID Stream ID, from a previous call to @fref{streamOpenRead} or @fref{streamOpenWrite}.
@Description
The function @func{streamInqVlist} returns the variable list of a stream.
@Result
@func{streamInqVlist} returns an identifier to the variable list.
@EndFunction
*/
int
streamInqVlist(int streamID)
{
stream_t *s = stream_to_pointer(streamID);
return s->vlistID;
}
void
streamDefShuffle(int streamID, int shuffle)
{
stream_t *s = stream_to_pointer(streamID);
if (s->shuffle != shuffle)
{
s->shuffle = shuffle;
reshSetStatus(streamID, &streamOps, RESH_DESYNC_IN_USE);
}
}
void
streamDefFilter(int streamID, const char *filterSpec)
{
stream_t *s = stream_to_pointer(streamID);
if (filterSpec)
{
// if (s->filterSpec) Error("Filter spec already defined!");
if (s->filterSpec == NULL)
{
s->filterSpec = strdup(filterSpec);
reshSetStatus(streamID, &streamOps, RESH_DESYNC_IN_USE);
}
}
}
void
streamDefCompType(int streamID, int comptype)
{
stream_t *s = stream_to_pointer(streamID);
if (s->comptype != comptype)
{
s->comptype = comptype;
reshSetStatus(streamID, &streamOps, RESH_DESYNC_IN_USE);
}
}
void
streamDefCompLevel(int streamID, int complevel)
{
stream_t *s = stream_to_pointer(streamID);
if (s->complevel != complevel)
{
s->complevel = complevel;
reshSetStatus(streamID, &streamOps, RESH_DESYNC_IN_USE);
}
}
int
streamInqCompType(int streamID)
{
stream_t *s = stream_to_pointer(streamID);
return s->comptype;
}
int
streamInqCompLevel(int streamID)
{
stream_t *s = stream_to_pointer(streamID);
return s->complevel;
}
int
streamInqFileID(int streamID)
{
stream_t *s = (stream_t *) reshGetVal(streamID, &streamOps);
return s->fileID;
}
void
cdiDefAccesstype(int streamID, int type)
{
stream_t *s = (stream_t *) reshGetVal(streamID, &streamOps);
if (s->accesstype == CDI_UNDEFID)
{
s->accesstype = type;
}
else if (s->accesstype != type)
Error("Changing access type from %s not allowed!", s->accesstype == TYPE_REC ? "REC to VAR" : "VAR to REC");
}
int
cdiInqAccesstype(int streamID)
{
stream_t *s = (stream_t *) reshGetVal(streamID, &streamOps);
return s->accesstype;
}
static int
streamTxCode(void *s)
{
(void) s;
return STREAM;
}
void
cdiStreamSetupVlist(stream_t *s, int vlistID)
{
void (*myStreamSetupVlist)(stream_t *s, int vlistID)
= (void (*)(stream_t *, int)) namespaceSwitchGet(NSSWITCH_STREAM_SETUP_VLIST).func;
myStreamSetupVlist(s, vlistID);
}
void
cdiStreamSetupVlist_(stream_t *streamptr, int vlistID)
{
streamptr->vlistID = vlistID;
int nvars = vlistNvars(vlistID);
for (int varID = 0; varID < nvars; ++varID)
{
int gridID = vlistInqVarGrid(vlistID, varID);
int zaxisID = vlistInqVarZaxis(vlistID, varID);
int tilesetID = vlistInqVarSubtype(vlistID, varID);
stream_new_var(streamptr, gridID, zaxisID, tilesetID);
if (streamptr->have_missval) vlistDefVarMissval(vlistID, varID, vlistInqVarMissval(vlistID, varID));
}
if (streamptr->filemode == 'w')
{
tstepsNewEntry(streamptr); // timestep 0
int vlistIDw = streamptr->vlistID;
int timeIsVarying = vlistHasTime(vlistIDw);
if (timeIsVarying)
{
int taxisID = vlistInqTaxis(vlistIDw);
if (taxisID == CDI_UNDEFID)
{
Warning("taxisID undefined for fileID = %d! Using absolute time axis.", streamptr->self);
taxisID = taxisCreate(TAXIS_ABSOLUTE);
vlistDefTaxis(vlistIDw, taxisID);
}
#ifdef HAVE_LIBNETCDF
if (taxisInqType(taxisID) == TAXIS_RELATIVE)
if (cdiBaseFiletype(streamptr->filetype) == CDI_FILETYPE_NETCDF)
{
const taxis_t *taxisptr = taxisPtr(taxisID);
if (cdiDateTime_isNull(taxisptr->rDateTime))
{
int vdate = taxisInqVdate(taxisID);
if (vdate == 0) vdate = 10101;
taxisDefRdate(taxisID, vdate);
}
}
#endif
ptaxisCopy(&streamptr->tsteps[0].taxis, taxisPtr(taxisID));
}
switch (cdiBaseFiletype(streamptr->filetype))
{
#ifdef HAVE_LIBNETCDF
case CDI_FILETYPE_NETCDF:
{
/* calls cdfDefCoordinateVars in serial mode but
* cdiPioClientStreamNOP (i.e. nothing) on client ranks
* and cdiPioServerCdfDefVars on server ranks in parallel mode*/
void (*myCdfDefVars)(stream_t *streamptr) = (void (*)(stream_t *)) namespaceSwitchGet(NSSWITCH_CDF_STREAM_SETUP).func;
myCdfDefVars(streamptr);
}
break;
#endif
#ifdef HAVE_LIBGRIB
case CDI_FILETYPE_GRIB: gribContainersNew(streamptr); break;
#endif
default:;
}
}
}
void
cdiStreamGetIndexList(unsigned numIDs, int *IDs)
{
reshGetResHListOfType(numIDs, IDs, &streamOps);
}
int
streamInqNvars(int streamID)
{
stream_t *s = (stream_t *) reshGetVal(streamID, &streamOps);
return s->nvars;
}
static int
streamCompareP(void *streamptr1, void *streamptr2)
{
stream_t *s1 = (stream_t *) streamptr1;
stream_t *s2 = (stream_t *) streamptr2;
enum
{
differ = -1,
equal = 0,
};
xassert(s1);
xassert(s2);
if (s1->filetype != s2->filetype) return differ;
if (s1->byteorder != s2->byteorder) return differ;
if (s1->comptype != s2->comptype) return differ;
if (s1->complevel != s2->complevel) return differ;
if (s1->filename)
{
if (!str_is_equal(s1->filename, s2->filename)) return differ;
}
else if (s2->filename)
return differ;
return equal;
}
void
streamPrintP(void *streamptr, FILE *fp)
{
stream_t *sp = (stream_t *) streamptr;
if (!sp) return;
fprintf(fp,
"#\n"
"# streamID %d\n"
"#\n"
"self = %d\n"
"accesstype = %d\n"
"accessmode = %d\n"
"filetype = %d\n"
"byteorder = %d\n"
"fileID = %d\n"
"filemode = %d\n"
"filename = %s\n"
"nrecs = %d\n"
"nvars = %d\n"
"varsAllocated = %d\n"
"curTsID = %d\n"
"rtsteps = %d\n"
"ntsteps = %ld\n"
"tstepsTableSize= %d\n"
"tstepsNextID = %d\n"
"ncmode = %d\n"
"vlistID = %d\n"
"globalatts = %d\n"
"localatts = %d\n"
"unreduced = %d\n"
"sortname = %d\n"
"have_missval = %d\n"
"ztype = %d\n"
"zlevel = %d\n",
sp->self, sp->self, sp->accesstype, sp->accessmode, sp->filetype, sp->byteorder, sp->fileID, sp->filemode, sp->filename,
sp->nrecs, sp->nvars, sp->varsAllocated, sp->curTsID, sp->rtsteps, sp->ntsteps, sp->tstepsTableSize, sp->tstepsNextID,
sp->ncmode, sp->vlistID, sp->globalatts, sp->localatts, sp->unreduced, sp->sortname, sp->have_missval, sp->comptype,
sp->complevel);
}
enum
{
streamNint = 10,
};
static int
streamGetPackSize(void *voidP, void *context)
{
stream_t *streamP = (stream_t *) voidP;
int packBufferSize = serializeGetSize(streamNint, CDI_DATATYPE_INT, context) + serializeGetSize(2, CDI_DATATYPE_UINT32, context)
+ serializeGetSize((int) strlen(streamP->filename) + 1, CDI_DATATYPE_TXT, context)
+ serializeGetSize(1, CDI_DATATYPE_FLT64, context);
return packBufferSize;
}
static void
streamPack(void *streamptr, void *packBuffer, int packBufferSize, int *packBufferPos, void *context)
{
stream_t *streamP = (stream_t *) streamptr;
int intBuffer[streamNint];
intBuffer[0] = streamP->self;
intBuffer[1] = streamP->filetype;
intBuffer[2] = (int) strlen(streamP->filename) + 1;
intBuffer[3] = streamP->vlistID;
intBuffer[4] = streamP->byteorder;
intBuffer[5] = streamP->comptype;
intBuffer[6] = streamP->complevel;
intBuffer[7] = streamP->unreduced;
intBuffer[8] = streamP->sortname;
intBuffer[9] = streamP->have_missval;
serializePack(intBuffer, streamNint, CDI_DATATYPE_INT, packBuffer, packBufferSize, packBufferPos, context);
uint32_t d = cdiCheckSum(CDI_DATATYPE_INT, streamNint, intBuffer);
serializePack(&d, 1, CDI_DATATYPE_UINT32, packBuffer, packBufferSize, packBufferPos, context);
serializePack(&CDI_Default_Missval, 1, CDI_DATATYPE_FLT64, packBuffer, packBufferSize, packBufferPos, context);
serializePack(streamP->filename, intBuffer[2], CDI_DATATYPE_TXT, packBuffer, packBufferSize, packBufferPos, context);
d = cdiCheckSum(CDI_DATATYPE_TXT, intBuffer[2], streamP->filename);
serializePack(&d, 1, CDI_DATATYPE_UINT32, packBuffer, packBufferSize, packBufferPos, context);
}
struct streamAssoc
streamUnpack(char *unpackBuffer, int unpackBufferSize, int *unpackBufferPos, int originNamespace, void *context)
{
int intBuffer[streamNint];
uint32_t d;
char filename[CDI_MAX_NAME];
serializeUnpack(unpackBuffer, unpackBufferSize, unpackBufferPos, intBuffer, streamNint, CDI_DATATYPE_INT, context);
serializeUnpack(unpackBuffer, unpackBufferSize, unpackBufferPos, &d, 1, CDI_DATATYPE_UINT32, context);
xassert(cdiCheckSum(CDI_DATATYPE_INT, streamNint, intBuffer) == d);
serializeUnpack(unpackBuffer, unpackBufferSize, unpackBufferPos, &CDI_Default_Missval, 1, CDI_DATATYPE_FLT64, context);
serializeUnpack(unpackBuffer, unpackBufferSize, unpackBufferPos, &filename, intBuffer[2], CDI_DATATYPE_TXT, context);
serializeUnpack(unpackBuffer, unpackBufferSize, unpackBufferPos, &d, 1, CDI_DATATYPE_UINT32, context);
xassert(d == cdiCheckSum(CDI_DATATYPE_TXT, intBuffer[2], filename));
int targetStreamID = namespaceAdaptKey(intBuffer[0], originNamespace),
streamID = streamOpenID(filename, 'w', intBuffer[1], targetStreamID);
xassert(streamID >= 0 && targetStreamID == streamID);
streamDefByteorder(streamID, intBuffer[4]);
streamDefCompType(streamID, intBuffer[5]);
streamDefCompLevel(streamID, intBuffer[6]);
stream_t *streamptr = stream_to_pointer(streamID);
streamptr->unreduced = intBuffer[7];
streamptr->sortname = intBuffer[8];
streamptr->have_missval = intBuffer[9];
struct streamAssoc retval = { streamID, intBuffer[3] };
return retval;
}
/* *
* This function does not really close the memio,
* this has to be done outside cdi to access the memory buffer*/
void
freePtrAfterNCMem(stream_t *streamptr, int recordBufIsToBeDeleted)
{
int fileID = streamptr->fileID;
if (fileID == CDI_UNDEFID)
{
Warning("File %s not open!", streamptr->filename);
return;
}
if (streamptr->ntsteps == 0 && streamptr->tsteps != NULL)
{
if (streamptr->tsteps[0].records)
{
Free(streamptr->tsteps[0].records);
streamptr->tsteps[0].records = NULL;
}
if (streamptr->tsteps[0].recIDs)
{
Free(streamptr->tsteps[0].recIDs);
streamptr->tsteps[0].recIDs = NULL;
}
}
}
void
streamCloseNCMem(int streamID)
{
stream_t *streamptr = stream_to_pointer(streamID);
bool lockIO = streamptr->lockIO;
if (lockIO) CDI_IO_LOCK();
if (CDI_Debug) Message("streamID = %d filename = %s", streamID, streamptr->filename);
streamDestroyViaDelegate(streamptr,freePtrAfterNCMem);
reshRemove(streamID, &streamOps);
if (CDI_Debug) Message("Removed stream %d from stream list", streamID);
if (lockIO) CDI_IO_UNLOCK();
}
int
streamOpenReadNCMem(int ncidp)
{
return streamOpenNCMem(ncidp, 'r');
}
int
streamOpenWriteNCMem(int ncidp)
{
return streamOpenNCMem(ncidp, 'w');
}
/*
* Local Variables:
* c-file-style: "Java"
* c-basic-offset: 2
* indent-tabs-mode: nil
* show-trailing-whitespace: t
* require-trailing-newline: t
* End:
*/