Commit 22759cd0 authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Use pack function argument to serialize RPC function arguments to RDMA window.

parent 52a1f1e3
......@@ -45,7 +45,9 @@ cdiPioClientStreamOpen(const char *filename, const char *filemode,
.specific.funcArgs.newFile
= { .fnamelen = (int)filename_len,
.filetype = filetype } };
pioBufferFuncCall(header, filename, filename_len + 1, nullPackFunc);
pioBufferFuncCall(header,
&(struct memCpyDataDesc){filename,
filename_len + 1}, 0, memcpyPackFunc);
xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS: %s, filenamesz=%zu,"
" filename=%s, filetype=%d",
funcMap[(-1 - STREAMOPEN)], filename_len + 1, filename,
......@@ -178,31 +180,31 @@ cdiPioClientStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
}
}
static void
cdiPioTaxisPackWrap(void *data, void *buf, int size, int *pos,
void *context)
{
int taxisID = (int)(intptr_t)data;
reshPackResource(taxisID, &taxisOps, buf, size, pos, context);
}
static int
cdiPioClientStreamDefTimestep_(stream_t *streamptr, int tsID)
{
struct winHeaderEntry header;
statusCode nspStatus = namespaceInqResStatus ();
int taxisID, buf_size, position;
char *buf;
MPI_Comm commCalc;
int taxisID;
switch ( nspStatus )
{
case STAGE_DEFINITION:
break;
case STAGE_TIMELOOP:
position = 0;
taxisID = vlistInqTaxis(streamptr->vlistID);
header = (struct winHeaderEntry){
.id = STREAMDEFTIMESTEP,
.specific.funcArgs.streamNewTimestep = { streamptr->self, tsID } };
commCalc = commInqCommCalc();
buf_size = reshResourceGetPackSize(taxisID, &taxisOps, &commCalc);
buf = xmalloc((size_t)buf_size);
reshPackResource(taxisID, &taxisOps, buf, buf_size, &position,
&commCalc);
pioBufferFuncCall(header, buf, buf_size, nullPackFunc);
free(buf);
xassert(sizeof (void *) >= sizeof (int));
pioBufferFuncCall(header, (void *)(intptr_t)taxisID, 0, cdiPioTaxisPackWrap);
break;
case STAGE_CLEANUP:
break;
......
......@@ -534,20 +534,12 @@ modelWinEnqueue(int collID,
else
{
targetEntry = txWin[collID].dictSize - ++(txWin[collID].dictRPCUsed);
if (header.id == STREAMOPEN)
{
header.offset
= (int)(txWin[collID].head - txWin[collID].buffer);
memcpy(txWin[collID].head, data, size);
txWin[collID].head += size;
}
else if (header.id == STREAMDEFTIMESTEP)
{
header.offset
= (int)(txWin[collID].head - txWin[collID].buffer);
memcpy(txWin[collID].head, data, size);
txWin[collID].head += size;
}
int position = header.offset
= (int)(txWin[collID].head - txWin[collID].buffer);
MPI_Comm comm = commInqCommsIO(collID);
packFunc((void *)data, txWin[collID].buffer, (int)txWin[collID].size,
&position, &comm);
txWin[collID].head = txWin[collID].buffer + position;
}
winDict[targetEntry] = header;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment