Commit b26e89c0 authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Extend pioBufferFuncCall with pack function argument.

parent b29440f6
...@@ -18,6 +18,11 @@ ...@@ -18,6 +18,11 @@
#include "pio_util.h" #include "pio_util.h"
#include "pio_serialize.h" #include "pio_serialize.h"
void nullPackFunc(void *obj, void *buf, int size, int *pos, void *context)
{
}
static int static int
cdiPioClientStreamOpen(const char *filename, const char *filemode, cdiPioClientStreamOpen(const char *filename, const char *filemode,
int filetype, stream_t *streamptr, int filetype, stream_t *streamptr,
...@@ -40,7 +45,7 @@ cdiPioClientStreamOpen(const char *filename, const char *filemode, ...@@ -40,7 +45,7 @@ cdiPioClientStreamOpen(const char *filename, const char *filemode,
.specific.funcArgs.newFile .specific.funcArgs.newFile
= { .fnamelen = (int)filename_len, = { .fnamelen = (int)filename_len,
.filetype = filetype } }; .filetype = filetype } };
pioBufferFuncCall(header, filename, filename_len + 1); pioBufferFuncCall(header, filename, filename_len + 1, nullPackFunc);
xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS: %s, filenamesz=%zu," xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS: %s, filenamesz=%zu,"
" filename=%s, filetype=%d", " filename=%s, filetype=%d",
funcMap[(-1 - STREAMOPEN)], filename_len + 1, filename, funcMap[(-1 - STREAMOPEN)], filename_len + 1, filename,
...@@ -71,7 +76,7 @@ cdiPioClientStreamDefVlist_(int streamID, int vlistID) ...@@ -71,7 +76,7 @@ cdiPioClientStreamDefVlist_(int streamID, int vlistID)
header = (struct winHeaderEntry){ header = (struct winHeaderEntry){
.id = STREAMDEFVLIST, .id = STREAMDEFVLIST,
.specific.funcArgs.streamChange = { streamID, vlistID } }; .specific.funcArgs.streamChange = { streamID, vlistID } };
pioBufferFuncCall(header, NULL, 0); pioBufferFuncCall(header, NULL, 0, nullPackFunc);
xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS: %s, streamID=%d," xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS: %s, streamID=%d,"
" vlistID=%d", funcMap[(-1 - STREAMDEFVLIST)], streamID, vlistID); " vlistID=%d", funcMap[(-1 - STREAMDEFVLIST)], streamID, vlistID);
break; break;
...@@ -162,7 +167,7 @@ cdiPioClientStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted) ...@@ -162,7 +167,7 @@ cdiPioClientStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
.id = STREAMCLOSE, .id = STREAMCLOSE,
.specific.funcArgs.streamChange .specific.funcArgs.streamChange
= { streamptr->self, CDI_UNDEFID } }; = { streamptr->self, CDI_UNDEFID } };
pioBufferFuncCall(header, NULL, 0); pioBufferFuncCall(header, NULL, 0, nullPackFunc);
xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS: %s, streamID=%d", xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS: %s, streamID=%d",
funcMap[-1 - STREAMCLOSE], streamptr->self); funcMap[-1 - STREAMCLOSE], streamptr->self);
break; break;
...@@ -196,7 +201,7 @@ cdiPioClientStreamDefTimestep_(stream_t *streamptr, int tsID) ...@@ -196,7 +201,7 @@ cdiPioClientStreamDefTimestep_(stream_t *streamptr, int tsID)
buf = xmalloc((size_t)buf_size); buf = xmalloc((size_t)buf_size);
reshPackResource(taxisID, &taxisOps, buf, buf_size, &position, reshPackResource(taxisID, &taxisOps, buf, buf_size, &position,
&commCalc); &commCalc);
pioBufferFuncCall(header, buf, buf_size); pioBufferFuncCall(header, buf, buf_size, nullPackFunc);
free(buf); free(buf);
break; break;
case STAGE_CLEANUP: case STAGE_CLEANUP:
......
...@@ -64,6 +64,15 @@ static int cmp ( const void * va, const void * vb ) ...@@ -64,6 +64,15 @@ static int cmp ( const void * va, const void * vb )
return (( **a < **b ) - ( **a > **b )); return (( **a < **b ) - ( **a > **b ));
} }
void memcpyPackFunc(void *dataDesc, void *buf, int size, int *pos,
void *context)
{
struct memCpyDataDesc *p = dataDesc;
xassert(size >= *pos && (size_t)(size - *pos) >= p->obj_size);
memcpy((unsigned char *)buf + *pos, p->obj, p->obj_size);
*pos += (int)p->obj_size;
}
/****************************************************/ /****************************************************/
static void static void
...@@ -475,7 +484,8 @@ void modelWinCreate ( void ) ...@@ -475,7 +484,8 @@ void modelWinCreate ( void )
static void static void
modelWinEnqueue(int collID, modelWinEnqueue(int collID,
struct winHeaderEntry header, const void *data, size_t size) struct winHeaderEntry header, const void *data, size_t size,
valPackFunc packFunc)
{ {
struct winHeaderEntry *winDict struct winHeaderEntry *winDict
= (struct winHeaderEntry *)txWin[collID].buffer; = (struct winHeaderEntry *)txWin[collID].buffer;
...@@ -569,13 +579,14 @@ pioBufferPartData(int streamID, int varID, const double *data, ...@@ -569,13 +579,14 @@ pioBufferPartData(int streamID, int varID, const double *data,
struct winHeaderEntry dataHeader struct winHeaderEntry dataHeader
= { .id = streamID, .specific.dataRecord = { varID, nmiss }, .offset = -1 }; = { .id = streamID, .specific.dataRecord = { varID, nmiss }, .offset = -1 };
modelWinEnqueue(collID, dataHeader, data, chunk * sizeof (data[0])); modelWinEnqueue(collID, dataHeader, data, chunk * sizeof (data[0]),
memcpyPackFunc);
{ {
struct winHeaderEntry partHeader struct winHeaderEntry partHeader
= { .id = PARTDESCMARKER, = { .id = PARTDESCMARKER,
.specific.partDesc = { .uid = xt_idxlist_get_uid(partDesc) }, .specific.partDesc = { .uid = xt_idxlist_get_uid(partDesc) },
.offset = 0 }; .offset = 0 };
modelWinEnqueue(collID, partHeader, partDesc, 0); modelWinEnqueue(collID, partHeader, partDesc, 0, memcpyPackFunc);
} }
txWin[collID].refuseFuncCall = 1; txWin[collID].refuseFuncCall = 1;
...@@ -584,7 +595,8 @@ pioBufferPartData(int streamID, int varID, const double *data, ...@@ -584,7 +595,8 @@ pioBufferPartData(int streamID, int varID, const double *data,
/************************************************************************/ /************************************************************************/
void pioBufferFuncCall(struct winHeaderEntry header, void pioBufferFuncCall(struct winHeaderEntry header,
const void *data, size_t data_len) const void *data, size_t data_len,
valPackFunc dataPackFunc)
{ {
int rankGlob = commInqRankGlob (); int rankGlob = commInqRankGlob ();
int root = commInqRootGlob (); int root = commInqRootGlob ();
...@@ -609,7 +621,7 @@ void pioBufferFuncCall(struct winHeaderEntry header, ...@@ -609,7 +621,7 @@ void pioBufferFuncCall(struct winHeaderEntry header,
xassert(txWin[collID].dictRPCUsed + txWin[collID].dictDataUsed xassert(txWin[collID].dictRPCUsed + txWin[collID].dictDataUsed
< txWin[collID].dictSize); < txWin[collID].dictSize);
xassert(txWin[collID].refuseFuncCall == 0); xassert(txWin[collID].refuseFuncCall == 0);
modelWinEnqueue(collID, header, data, data_len); modelWinEnqueue(collID, header, data, data_len, dataPackFunc);
} }
xdebug("%s", "RETURN"); xdebug("%s", "RETURN");
......
...@@ -10,13 +10,24 @@ ...@@ -10,13 +10,24 @@
#include <mpi.h> #include <mpi.h>
#include <yaxt.h> #include <yaxt.h>
#include "resource_handle.h"
#include "pio_rpc.h" #include "pio_rpc.h"
void void
pioBufferPartData(int streamID, int varID, const double *data, pioBufferPartData(int streamID, int varID, const double *data,
int nmiss, Xt_idxlist partDesc); int nmiss, Xt_idxlist partDesc);
void pioBufferFuncCall(struct winHeaderEntry header, void pioBufferFuncCall(struct winHeaderEntry header,
const void *data, size_t dataSize); const void *data, size_t dataSize,
valPackFunc dataPackFunc);
struct memCpyDataDesc
{
const void *obj;
size_t obj_size;
};
void memcpyPackFunc(void *dataDesc, void *buf, int size, int *pos, void *context);
extern float cdiPIOpartInflate_; extern float cdiPIOpartInflate_;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment