Commit d15e3544 authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Virtualize stream functions to allow dynamic insertion of cdiPIO functions.

parent 50bc4659
......@@ -353,6 +353,24 @@ struct streamAssoc
streamUnpack(char * unpackBuffer, int unpackBufferSize,
int * unpackBufferPos, int nspTarget, void *context);
int
cdiStreamOpenDefaultDelegate(const char *filename, const char *filemode,
int filetype, Record **record
#if USE_MPI && ! HAVE_PARALLEL_NC4 && defined HAVE_LIBNETCDF
, int *rankp
#endif
);
void
cdiStreamDefVlist_(int streamID, int vlistID);
void
cdiStreamWriteVar_(int streamID, int varID, int memtype, const void *data,
int nmiss);
void
cdiStreamwriteVarChunk_(int streamID, int varID, int memtype,
const int rect[][2], const void *data, int nmiss);
void
cdiStreamCloseDefaultDelegate(stream_t *streamptr);
char *cdiUnitNamePtr(int cdi_unit);
......
......@@ -9,6 +9,7 @@
#include "error.h"
#include "cdf_int.h"
#include "file.h"
#include "cdi_int.h"
static int nNamespaces = 1;
static int activeNamespace = 0;
......@@ -19,14 +20,19 @@ static int activeNamespace = 0;
#define CDI_NETCDF_SWITCHES
#endif
#define defaultSwitches { \
cdiAbortC_serial, \
serializeGetSizeInCore, \
serializePackInCore, \
serializeUnpackInCore, \
fileOpen_serial, \
fileClose_serial, \
CDI_NETCDF_SWITCHES \
#define defaultSwitches { \
cdiAbortC_serial, \
serializeGetSizeInCore, \
serializePackInCore, \
serializeUnpackInCore, \
fileOpen_serial, \
fileClose_serial, \
cdiStreamOpenDefaultDelegate, \
cdiStreamDefVlist_, \
cdiStreamWriteVar_, \
cdiStreamwriteVarChunk_, \
cdiStreamCloseDefaultDelegate, \
CDI_NETCDF_SWITCHES \
}
struct namespace
......
......@@ -27,6 +27,11 @@ enum namespaceSwitch
NSSWITCH_SERIALIZE_UNPACK,
NSSWITCH_FILE_OPEN,
NSSWITCH_FILE_CLOSE,
NSSWITCH_STREAM_OPEN_BACKEND,
NSSWITCH_STREAM_DEF_VLIST_,
NSSWITCH_STREAM_WRITE_VAR_,
NSSWITCH_STREAM_WRITE_VAR_CHUNK_,
NSSWITCH_STREAM_CLOSE_BACKEND,
#ifdef HAVE_LIBNETCDF
NSSWITCH_NC__CREATE,
NSSWITCH_CDF_DEF_VAR,
......
......@@ -5,12 +5,19 @@
#ifdef USE_MPI
#include <ctype.h>
#include <yaxt.h>
#include "file.h"
#include "cdi_int.h"
#include "namespace.h"
#include "pio.h"
#include "cdi.h"
#include "pio_comm.h"
#include "pio_impl.h"
#include "pio_interface.h"
#include "pio_rpc.h"
#include "pio_util.h"
char * command2charP[6] = {"IO_Open_file", "IO_Close_file",
......@@ -134,6 +141,121 @@ int pioFileOpen(const char *filename, const char *mode)
/***************************************************************/
int
cdiPioStreamOpen(const char *filename, const char *filemode,
int filetype, Record **record)
{
if ( tolower ( * filemode ) == 'w' )
{
statusCode nspStatus = namespaceInqResStatus ();
switch ( nspStatus )
{
case STAGE_DEFINITION:
break;
case STAGE_TIMELOOP:
pioBufferFuncCall(STREAMOPEN, 2, filename, filetype);
break;
case STAGE_CLEANUP:
xabort ( "TRANSITION TO IO PROCESSES ALREADY FINISHED." );
break;
default:
xabort ( "INTERNAL ERROR" );
}
}
else
Error("cdiPIO read support not implemented");
return 1;
}
void
cdiPioStreamDefVlist_(int streamID, int vlistID)
{
statusCode nspStatus = namespaceInqResStatus ();
switch ( nspStatus )
{
case STAGE_DEFINITION:
break;
case STAGE_TIMELOOP:
pioBufferFuncCall(STREAMDEFVLIST, 2, streamID, vlistID);
break;
case STAGE_CLEANUP:
xabort ( "TRANSITION TO IO PROCESSES ALREADY FINISHED." );
break;
default:
xabort ( "INTERNAL ERROR" );
}
cdiStreamDefVlist_(streamID, vlistID);
}
/***************************************************************/
void
cdiPioStreamWriteVar_(int streamID, int varID, int memtype, const void *data,
int nmiss)
{
statusCode nspStatus = namespaceInqResStatus ();
switch ( nspStatus )
{
case STAGE_DEFINITION:
xabort ( "DEFINITION STAGE: PARALLEL WRITING NOT POSSIBLE." );
break;
case STAGE_TIMELOOP:
pioBufferData ( streamID, varID, data, nmiss );
return;
case STAGE_CLEANUP:
xabort ( "CLEANUP STAGE: PARALLEL WRITING NOT POSSIBLE." );
break;
default:
xabort ( "INTERNAL ERROR" );
}
}
/***************************************************************/
void
cdiPioStreamWriteVarChunk_(int streamID, int varID, int memtype,
const int rect[][2], const void *data, int nmiss)
{
int vlistID = streamInqVlist(streamID);
int size = vlistInqVarSize(vlistID, varID),
varShape[3];
unsigned ndims = (unsigned)cdiPioQueryVarDims(varShape, vlistID, varID);
Xt_int varShapeXt[3], chunkShape[3] = { 1, 1, 1 }, origin[3] = { 0, 0, 0 };
/* FIXME: verify xt_int ranges are good enough */
for (unsigned i = 0; i < 3; ++i)
varShapeXt[i] = varShape[i];
for (unsigned i = 0; i < ndims; ++i)
chunkShape[i] = rect[i][1] - rect[i][0] + 1;
int varSize = varShape[0] * varShape[1] * varShape[2];
xassert(varSize == size);
Xt_idxlist chunkDesc
= xt_idxsection_new(0, ndims, varShapeXt, chunkShape, origin);
pioBufferPartData(streamID, varID, data, nmiss, chunkDesc);
xt_idxlist_delete(chunkDesc);
}
void
cdiPioStreamClose(stream_t *streamptr)
{
statusCode nspStatus = namespaceInqResStatus ();
switch ( nspStatus )
{
case STAGE_DEFINITION:
break;
case STAGE_TIMELOOP:
pioBufferFuncCall(STREAMCLOSE, 1, streamptr->self);
break;
case STAGE_CLEANUP:
break;
default:
xabort ( "INTERNAL ERROR" );
}
}
/***************************************************************/
void backendInit ( void )
{
int IOMode = commInqIOMode ();
......
......@@ -9,12 +9,23 @@
#include <stdlib.h>
#include <mpi.h>
#include "cdi_int.h"
void backendCleanup ( void );
void backendInit ( void );
void backendFinalize ( void );
int pioFileOpen(const char *filename, const char *mode);
int pioFileClose ( int );
size_t pioFileWrite ( int, int, const void*, size_t );
int cdiPioStreamOpen(const char *filename, const char *filemode,
int filetype, Record **record);
void cdiPioStreamDefVlist_(int streamID, int vlistID);
void cdiPioStreamWriteVar_(int streamID, int varID, int memtype,
const void *data, int nmiss);
void cdiPioStreamWriteVarChunk_(int streamID, int varID, int memtype,
const int rect[][2], const void *data,
int nmiss);
void cdiPioStreamClose(stream_t *streamptr);
#else
typedef int MPI_Comm;
......
......@@ -831,6 +831,12 @@ pioInit(MPI_Comm commGlob, int nProcsIO, int IOMode,
int callerCDINamespace = namespaceGetActive();
pioNamespaceSetActive(pioNamespace_);
serializeSetMPI();
namespaceSwitchSet(NSSWITCH_STREAM_OPEN_BACKEND, cdiPioStreamOpen);
namespaceSwitchSet(NSSWITCH_STREAM_DEF_VLIST_, cdiPioStreamDefVlist_);
namespaceSwitchSet(NSSWITCH_STREAM_WRITE_VAR_, cdiPioStreamWriteVar_);
namespaceSwitchSet(NSSWITCH_STREAM_WRITE_VAR_CHUNK_,
cdiPioStreamWriteVarChunk_);
namespaceSwitchSet(NSSWITCH_STREAM_CLOSE_BACKEND, cdiPioStreamClose);
pioNamespaceSetActive(callerCDINamespace);
}
......
......@@ -657,125 +657,130 @@ extern int cdiPioNextOpenRank();
extern int cdiPioSerialOpenFileMap(int streamID);
#endif
int streamOpen(const char *filename, const char *filemode, int filetype)
{
int fileID = CDI_UNDEFID;
int streamID = CDI_ESYSTEM;
int status;
Record *record = NULL;
stream_t *streamptr = NULL;
int hasLocalFile = namespaceHasLocalFile ( namespaceGetActive ());
#if USE_MPI && ! defined (HAVE_PARALLEL_NC4) && defined (HAVE_LIBNETCDF)
int rank;
int cdiStreamOpenDefaultDelegate(const char *filename, const char *filemode,
int filetype, Record **record
#if USE_MPI && ! HAVE_PARALLEL_NC4 && defined HAVE_LIBNETCDF
, int *rankp
#endif
if ( CDI_Debug )
Message("Open %s mode %c file %s", strfiletype(filetype), (int) *filemode, filename);
if ( ! filename || ! filemode || filetype < 0 ) return (CDI_EINVAL);
if ( hasLocalFile )
)
{
int fileID;
switch (filetype)
{
switch (filetype)
{
#if defined (HAVE_LIBGRIB)
case FILETYPE_GRB:
case FILETYPE_GRB2:
{
fileID = gribOpen(filename, filemode);
if ( fileID < 0 ) fileID = CDI_ESYSTEM;
record = (Record *) malloc(sizeof(Record));
record->buffer = NULL;
break;
}
case FILETYPE_GRB:
case FILETYPE_GRB2:
{
fileID = gribOpen(filename, filemode);
if ( fileID < 0 ) fileID = CDI_ESYSTEM;
*record = (Record *) malloc(sizeof(Record));
(*record)->buffer = NULL;
break;
}
#endif
#if defined (HAVE_LIBSERVICE)
case FILETYPE_SRV:
{
fileID = fileOpen(filename, filemode);
if ( fileID < 0 ) fileID = CDI_ESYSTEM;
record = (Record *) malloc(sizeof(Record));
record->buffer = NULL;
record->srvp = srvNew();
break;
}
case FILETYPE_SRV:
{
fileID = fileOpen(filename, filemode);
if ( fileID < 0 ) fileID = CDI_ESYSTEM;
(*record) = (Record *) malloc(sizeof(Record));
(*record)->buffer = NULL;
(*record)->srvp = srvNew();
break;
}
#endif
#if defined (HAVE_LIBEXTRA)
case FILETYPE_EXT:
{
fileID = fileOpen(filename, filemode);
if ( fileID < 0 ) fileID = CDI_ESYSTEM;
record = (Record *) malloc(sizeof(Record));
record->buffer = NULL;
record->extp = extNew();
break;
}
case FILETYPE_EXT:
{
fileID = fileOpen(filename, filemode);
if ( fileID < 0 ) fileID = CDI_ESYSTEM;
(*record) = (Record *) malloc(sizeof(Record));
(*record)->buffer = NULL;
(*record)->extp = extNew();
break;
}
#endif
#if defined (HAVE_LIBIEG)
case FILETYPE_IEG:
{
fileID = fileOpen(filename, filemode);
if ( fileID < 0 ) fileID = CDI_ESYSTEM;
record = (Record *) malloc(sizeof(Record));
record->buffer = NULL;
record->iegp = iegNew();
break;
}
case FILETYPE_IEG:
{
fileID = fileOpen(filename, filemode);
if ( fileID < 0 ) fileID = CDI_ESYSTEM;
(*record) = (Record *) malloc(sizeof(Record));
(*record)->buffer = NULL;
(*record)->iegp = iegNew();
break;
}
#endif
#if defined (HAVE_LIBNETCDF)
case FILETYPE_NC:
{
fileID = cdfOpen(filename, filemode);
break;
}
case FILETYPE_NC2:
{
fileID = cdfOpen64(filename, filemode);
break;
}
case FILETYPE_NC4:
case FILETYPE_NC4C:
{
case FILETYPE_NC:
{
fileID = cdfOpen(filename, filemode);
break;
}
case FILETYPE_NC2:
{
fileID = cdfOpen64(filename, filemode);
break;
}
case FILETYPE_NC4:
case FILETYPE_NC4C:
{
#if USE_MPI && ! defined (HAVE_PARALLEL_NC4)
if (commInqIOMode() == PIO_NONE
|| commInqRankColl() == (rank = cdiPioNextOpenRank()))
int rank;
if (commInqIOMode() == PIO_NONE
|| commInqRankColl() == (rank = cdiPioNextOpenRank()))
#endif
fileID = cdf4Open(filename, filemode, &filetype);
fileID = cdf4Open(filename, filemode, &filetype);
#if USE_MPI && ! defined (HAVE_PARALLEL_NC4)
if (commInqIOMode() != PIO_NONE)
xmpi(MPI_Bcast(&fileID, 1, MPI_INT, rank, commInqCommColl()));
#endif
break;
}
if (commInqIOMode() != PIO_NONE)
xmpi(MPI_Bcast(&fileID, 1, MPI_INT, rank, commInqCommColl()));
*rankp = rank;
#endif
default:
{
if ( CDI_Debug ) Message("%s support not compiled in!", strfiletype(filetype));
return (CDI_ELIBNAVAIL);
}
}
}
#ifdef USE_MPI
else if ( tolower ( * filemode ) == 'w' )
{
statusCode nspStatus = namespaceInqResStatus ();
switch ( nspStatus )
{
case STAGE_DEFINITION:
break;
case STAGE_TIMELOOP:
pioBufferFuncCall(STREAMOPEN, 2, filename, filetype);
break;
case STAGE_CLEANUP:
xabort ( "TRANSITION TO IO PROCESSES ALREADY FINISHED." );
break;
default:
xabort ( "INTERNAL ERROR" );
}
}
#endif
default:
{
if ( CDI_Debug ) Message("%s support not compiled in!", strfiletype(filetype));
return (CDI_ELIBNAVAIL);
}
}
return fileID;
}
int streamOpen(const char *filename, const char *filemode, int filetype)
{
int fileID = CDI_UNDEFID;
int streamID = CDI_ESYSTEM;
int status;
Record *record = NULL;
stream_t *streamptr = NULL;
#if USE_MPI && ! defined (HAVE_PARALLEL_NC4) && defined (HAVE_LIBNETCDF)
int rank;
#endif
if ( fileID < 0 && hasLocalFile )
if ( CDI_Debug )
Message("Open %s mode %c file %s", strfiletype(filetype), (int) *filemode, filename);
if ( ! filename || ! filemode || filetype < 0 ) return (CDI_EINVAL);
{
int (*streamOpenDelegate)(const char *filename, const char *filemode,
int filetype, Record **record
#if USE_MPI && ! defined (HAVE_PARALLEL_NC4) && defined (HAVE_LIBNETCDF)
, int *rankp
#endif
)
= namespaceSwitchGet(NSSWITCH_STREAM_OPEN_BACKEND);
fileID = streamOpenDelegate(filename, filemode, filetype, &record
#if USE_MPI && ! defined (HAVE_PARALLEL_NC4) && defined (HAVE_LIBNETCDF)
, &rank
#endif
);
}
if (fileID < 0)
{
streamID = fileID;
}
......@@ -1202,6 +1207,74 @@ int streamOpenWrite(const char *filename, int filetype)
return (streamOpen(filename, "w", filetype));
}
void
cdiStreamCloseDefaultDelegate(stream_t *streamptr)
{
int fileID = streamptr->fileID;
int filetype = streamptr->filetype;
if ( fileID == CDI_UNDEFID )
Warning("File %s not open!", streamptr->filename);
else
switch (filetype)
{
#if defined (HAVE_LIBGRIB)
case FILETYPE_GRB:
case FILETYPE_GRB2:
{
gribClose(fileID);
gribContainersDelete(streamptr);
break;
}
#endif
#if defined (HAVE_LIBSERVICE)
case FILETYPE_SRV:
{
fileClose(fileID);
srvDelete(streamptr->record->srvp);
break;
}
#endif
#if defined (HAVE_LIBEXTRA)
case FILETYPE_EXT:
{
fileClose(fileID);
extDelete(streamptr->record->extp);
break;
}
#endif
#if defined (HAVE_LIBIEG)
case FILETYPE_IEG:
{
fileClose(fileID);
iegDelete(streamptr->record->iegp);
break;
}
#endif
#if defined (HAVE_LIBNETCDF)
case FILETYPE_NC:
case FILETYPE_NC2:
case FILETYPE_NC4:
case FILETYPE_NC4C:
{
#if USE_MPI && ! defined (HAVE_PARALLEL_NC4)
int rank, rankOpen;
if (commInqIOMode() == PIO_NONE
|| ((rank = commInqRankColl())
== (rankOpen = cdiPioSerialOpenFileMap(streamptr->self))))
#endif
cdfClose(fileID);
break;
}
#endif
default:
{
Error("%s support not compiled in!", strfiletype(filetype));
break;
}
}
}
/*
@Function streamClose
@Title Close an open dataset
......@@ -1217,8 +1290,6 @@ The function @func{streamClose} closes an open dataset.
*/
void streamClose(int streamID)
{
int filetype;
int fileID;
int index;
int vlistID;
stream_t *streamptr;
......@@ -1230,91 +1301,12 @@ void streamClose(int streamID)
if ( CDI_Debug )
Message("streamID = %d filename = %s", streamID, streamptr->filename);
fileID = streamptr->fileID;
filetype = streamptr->filetype;
vlistID = streamptr->vlistID;
if ( namespaceHasLocalFile ( namespaceGetActive ()))
{
if ( fileID == CDI_UNDEFID )
Warning("File %s not open!", streamptr->filename);
else
switch (filetype)
{
#if defined (HAVE_LIBGRIB)
case FILETYPE_GRB:
case FILETYPE_GRB2:
{
gribClose(fileID);
gribContainersDelete(streamptr);
break;
}
#endif
#if defined (HAVE_LIBSERVICE)
case FILETYPE_SRV:
{
fileClose(fileID);
srvDelete(streamptr->record->srvp);
break;
}
#endif
#if defined (HAVE_LIBEXTRA)
case FILETYPE_EXT:
{
fileClose(fileID);
extDelete(streamptr->record->extp);
break;
}
#endif
#if defined (HAVE_LIBIEG)
case FILETYPE_IEG:
{
fileClose(fileID);
iegDelete(streamptr->record->iegp);
break;
}
#endif
#if defined (HAVE_LIBNETCDF)
case FILETYPE_NC:
case FILETYPE_NC2:
case FILETYPE_NC4:
case FILETYPE_NC4C:
{
#if USE_MPI && ! defined (HAVE_PARALLEL_NC4)
int rank, rankOpen;
if (commInqIOMode() == PIO_NONE
|| ((rank = commInqRankColl())
== (rankOpen = cdiPioSerialOpenFileMap(streamID))))
#endif
cdfClose(fileID);