Commit 7cc363cf authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Move PIO client-side streamWriteVarChunk and streamClose to pio_client.c.

parent 05c7b052
......@@ -169,50 +169,6 @@ cdiPioStreamOpen(const char *filename, const char *filemode,
return 1;
}
/***************************************************************/
void
cdiPioStreamWriteVarChunk_(int streamID, int varID, int memtype,
const int rect[][2], const void *data, int nmiss)
{
int vlistID = streamInqVlist(streamID);
int size = vlistInqVarSize(vlistID, varID),
varShape[3];
unsigned ndims = (unsigned)cdiPioQueryVarDims(varShape, vlistID, varID);
Xt_int varShapeXt[3], chunkShape[3] = { 1, 1, 1 }, origin[3] = { 0, 0, 0 };
/* FIXME: verify xt_int ranges are good enough */
for (unsigned i = 0; i < 3; ++i)
varShapeXt[i] = varShape[i];
for (unsigned i = 0; i < ndims; ++i)
chunkShape[i] = rect[i][1] - rect[i][0] + 1;
int varSize = varShape[0] * varShape[1] * varShape[2];
xassert(varSize == size);
Xt_idxlist chunkDesc
= xt_idxsection_new(0, ndims, varShapeXt, chunkShape, origin);
pioBufferPartData(streamID, varID, data, nmiss, chunkDesc);
xt_idxlist_delete(chunkDesc);
}
void
cdiPioStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
{
statusCode nspStatus = namespaceInqResStatus ();
switch ( nspStatus )
{
case STAGE_DEFINITION:
break;
case STAGE_TIMELOOP:
pioBufferFuncCall(STREAMCLOSE, 1, streamptr->self);
break;
case STAGE_CLEANUP:
break;
default:
xabort ( "INTERNAL ERROR" );
}
}
/***************************************************************/
void backendInit ( void )
......
......@@ -21,11 +21,6 @@ size_t cdiPioFileWrite(int fileID, const void *restrict buffer, size_t len,
int cdiPioStreamOpen(const char *filename, const char *filemode,
int filetype, stream_t *streamptr,
int recordBufIsToBeCreated);
void cdiPioStreamWriteVarChunk_(int streamID, int varID, int memtype,
const int rect[][2], const void *data,
int nmiss);
void cdiPioStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted);
#else
typedef int MPI_Comm;
#endif
......
......@@ -3,12 +3,15 @@
#endif
#ifdef USE_MPI
#include <yaxt.h>
#include "namespace.h"
#include "pio.h"
#include "pio_client.h"
#include "pio_comm.h"
#include "pio_interface.h"
#include "pio_rpc.h"
#include "pio_util.h"
static void
cdiPioClientStreamDefVlist_(int streamID, int vlistID)
......@@ -38,6 +41,29 @@ cdiPioClientStreamWriteVar_(int streamID, int varID, int memtype,
" use streamWriteVarPart!");
}
static void
cdiPioClientStreamWriteVarChunk_(int streamID, int varID, int memtype,
const int rect[][2],
const void *data, int nmiss)
{
int vlistID = streamInqVlist(streamID);
int size = vlistInqVarSize(vlistID, varID),
varShape[3];
unsigned ndims = (unsigned)cdiPioQueryVarDims(varShape, vlistID, varID);
Xt_int varShapeXt[3], chunkShape[3] = { 1, 1, 1 }, origin[3] = { 0, 0, 0 };
/* FIXME: verify xt_int ranges are good enough */
for (unsigned i = 0; i < 3; ++i)
varShapeXt[i] = varShape[i];
for (unsigned i = 0; i < ndims; ++i)
chunkShape[i] = rect[i][1] - rect[i][0] + 1;
int varSize = varShape[0] * varShape[1] * varShape[2];
xassert(varSize == size);
Xt_idxlist chunkDesc
= xt_idxsection_new(0, ndims, varShapeXt, chunkShape, origin);
pioBufferPartData(streamID, varID, data, nmiss, chunkDesc);
xt_idxlist_delete(chunkDesc);
}
#if defined HAVE_LIBNETCDF
static void
cdiPioCdfDefTimestepNOP(stream_t *streamptr, int tsID)
......@@ -45,6 +71,25 @@ cdiPioCdfDefTimestepNOP(stream_t *streamptr, int tsID)
}
#endif
static void
cdiPioClientStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
{
statusCode nspStatus = namespaceInqResStatus ();
switch ( nspStatus )
{
case STAGE_DEFINITION:
break;
case STAGE_TIMELOOP:
pioBufferFuncCall(STREAMCLOSE, 1, streamptr->self);
break;
case STAGE_CLEANUP:
break;
default:
xabort ( "INTERNAL ERROR" );
}
}
void
cdiPioClientSetup(int *pioNamespace_, int *pioNamespace)
{
......@@ -58,8 +103,8 @@ cdiPioClientSetup(int *pioNamespace_, int *pioNamespace)
namespaceSwitchSet(NSSWITCH_STREAM_DEF_VLIST_, cdiPioClientStreamDefVlist_);
namespaceSwitchSet(NSSWITCH_STREAM_WRITE_VAR_, cdiPioClientStreamWriteVar_);
namespaceSwitchSet(NSSWITCH_STREAM_WRITE_VAR_CHUNK_,
cdiPioStreamWriteVarChunk_);
namespaceSwitchSet(NSSWITCH_STREAM_CLOSE_BACKEND, cdiPioStreamClose);
cdiPioClientStreamWriteVarChunk_);
namespaceSwitchSet(NSSWITCH_STREAM_CLOSE_BACKEND, cdiPioClientStreamClose);
#ifdef HAVE_LIBNETCDF
namespaceSwitchSet(NSSWITCH_CDF_DEF_TIMESTEP, cdiPioCdfDefTimestepNOP);
#endif
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment