Commit b06d9ca9 authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Move complexity of header constrution to users of pioBufferFuncCall.

parent bc399216
......@@ -21,6 +21,8 @@ cdiPioClientStreamOpen(const char *filename, const char *filemode,
int filetype, stream_t *streamptr,
int recordBufIsToBeCreated)
{
union winHeaderEntry header;
size_t filename_len;
if ( tolower ( * filemode ) == 'w' )
{
statusCode nspStatus = namespaceInqResStatus ();
......@@ -29,7 +31,18 @@ cdiPioClientStreamOpen(const char *filename, const char *filemode,
case STAGE_DEFINITION:
break;
case STAGE_TIMELOOP:
pioBufferFuncCall(STREAMOPEN, 2, filename, filetype);
filename_len = strlen(filename);
xassert(filename_len > 0 && filename_len < MAXDATAFILENAME);
header.funcCall
= (struct funcCallDesc){
.funcID = STREAMOPEN,
.funcArgs.newFile = { .fnamelen = (int)filename_len,
.filetype = filetype } };
pioBufferFuncCall(header, filename, filename_len + 1);
xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS: %s, filenamesz=%zu,"
" filename=%s, filetype=%d",
funcMap[(-1 - STREAMOPEN)], filename_len + 1, filename,
filetype);
break;
case STAGE_CLEANUP:
xabort ( "TRANSITION TO IO PROCESSES ALREADY FINISHED." );
......@@ -46,13 +59,20 @@ cdiPioClientStreamOpen(const char *filename, const char *filemode,
static void
cdiPioClientStreamDefVlist_(int streamID, int vlistID)
{
union winHeaderEntry header;
statusCode nspStatus = namespaceInqResStatus ();
switch ( nspStatus )
{
case STAGE_DEFINITION:
break;
case STAGE_TIMELOOP:
pioBufferFuncCall(STREAMDEFVLIST, 2, streamID, vlistID);
header.funcCall
= (struct funcCallDesc){
.funcID = STREAMDEFVLIST,
.funcArgs.streamChange = { streamID, vlistID } };
pioBufferFuncCall(header, NULL, 0);
xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS: %s, streamID=%d,"
" vlistID=%d", funcMap[(-1 - STREAMDEFVLIST)], streamID, vlistID);
break;
case STAGE_CLEANUP:
xabort ( "TRANSITION TO IO PROCESSES ALREADY FINISHED." );
......@@ -130,13 +150,20 @@ cdiPioClientStreamNOP(stream_t *streamptr)
static void
cdiPioClientStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
{
union winHeaderEntry header;
statusCode nspStatus = namespaceInqResStatus ();
switch ( nspStatus )
{
case STAGE_DEFINITION:
break;
case STAGE_TIMELOOP:
pioBufferFuncCall(STREAMCLOSE, 1, streamptr->self);
header.funcCall
= (struct funcCallDesc){
.funcID = STREAMCLOSE,
.funcArgs.streamChange = { streamptr->self, CDI_UNDEFID } };
pioBufferFuncCall(header, NULL, 0);
xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS: %s, streamID=%d",
funcMap[-1 - STREAMCLOSE], streamptr->self);
break;
case STAGE_CLEANUP:
break;
......
......@@ -574,67 +574,20 @@ pioBufferPartData(int streamID, int varID, const double *data,
/************************************************************************/
void pioBufferFuncCall(int funcID, int argc, ... )
void pioBufferFuncCall(union winHeaderEntry header,
const void *data, size_t data_len)
{
va_list ap;
int rankGlob = commInqRankGlob ();
int root = commInqRootGlob ();
int collID, nProcsColl = commInqNProcsColl ();
int funcID = header.funcCall.funcID;
xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
xdebug("%s, func: %s", "START", funcMap[(-1 - funcID)]);
if ( rankGlob != root ) return;
xassert (argc >= 1 &&
argc <= 2 &&
txWin != NULL);
va_start ( ap, argc );
union winHeaderEntry header;
char * filename = NULL;
size_t filenamesz = 0;
int streamID = 0, vlistID = 0, filetype = 0;
switch ( funcID )
{
case STREAMCLOSE:
xassert ( argc == 1 );
case STREAMDEFVLIST:
{
streamID = va_arg(ap, int);
vlistID = CDI_UNDEFID;
if (funcID == STREAMDEFVLIST)
{
xassert ( argc == 2 );
vlistID = va_arg(ap, int);
}
struct funcCallDesc f = { .funcID = funcID,
.funcArgs.streamChange = { streamID,
vlistID } };
header.funcCall = f;
}
break;
case STREAMOPEN:
{
xassert ( argc == 2 );
filename = va_arg(ap, char *);
filenamesz = strlen(filename);
xassert ( filenamesz > 0 &&
filenamesz < MAXDATAFILENAME );
filetype = va_arg(ap, int);
struct funcCallDesc f = { .funcID = STREAMOPEN,
.funcArgs.newFile
= { .fnamelen = filenamesz,
.filetype = filetype } };
header.funcCall = f;
++filenamesz;
}
break;
default:
xabort("FUNCTION NOT MAPPED!");
}
xassert(txWin != NULL);
for (collID = 0; collID < nProcsColl; ++collID)
{
......@@ -647,27 +600,9 @@ void pioBufferFuncCall(int funcID, int argc, ... )
modelWinFlushBuffer ( collID );
}
xassert(txWin[collID].refuseFuncCall == 0);
modelWinEnqueue(collID, header, filename, filenamesz);
}
switch ( funcID )
{
case STREAMCLOSE:
xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS: %s, streamID=%d",
funcMap[(-1 - funcID)], streamID);
break;
case STREAMDEFVLIST:
xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS: %s, streamID=%d,"
" vlistID=%d", funcMap[(-1 - funcID)], streamID, vlistID);
break;
case STREAMOPEN:
xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS: %s, filenamesz=%zu,"
" filename=%s, filetype=%d",
funcMap[(-1 - funcID)], filenamesz, filename, filetype);
break;
modelWinEnqueue(collID, header, data, data_len);
}
va_end(ap);
xdebug("%s", "RETURN");
}
......
......@@ -10,11 +10,14 @@
#include <mpi.h>
#include <yaxt.h>
#include "pio_rpc.h"
void
pioBufferPartData(int streamID, int varID, const double *data,
int nmiss, Xt_idxlist partDesc);
void pioBufferData (int, int, const double *, int );
void pioBufferFuncCall(int, int, ... );
void pioBufferFuncCall(union winHeaderEntry header,
const void *data, size_t data_len);
extern float cdiPIOpartInflate_;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment