Commit b5a7d823 authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Separate RPC and data record header entries in RDMA communication.

parent 9d1d0a30
......@@ -29,7 +29,7 @@ static struct rdmaWin
MPI_Win win;
int postSet, refuseFuncCall;
MPI_Group ioGroup;
int dictSize, dictUsed;
int dictSize, dictDataUsed, dictRPCUsed;
} *txWin = NULL;
......@@ -348,7 +348,7 @@ void modelWinCleanup ( void )
struct collDesc
{
int numRecords;
int numDataRecords, numRPCRecords;
};
static void
......@@ -381,7 +381,7 @@ modelWinDefBufferSizes(void)
collID = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
collIDchunk = vlistInqVarDecoChunk ( vlistID, varID, rankModel );
xassert ( collID != CDI_UNDEFID && collIDchunk > 0 );
++(collIndex[collID].numRecords);
++(collIndex[collID].numDataRecords);
txWin[collID].size += collIDchunk * sizeof (double) +
sizeof (union winHeaderEntry);
}
......@@ -391,7 +391,7 @@ modelWinDefBufferSizes(void)
if ( rankGlob == root )
for ( collID = 0; collID < nProcsColl; collID++ )
{
collIndex[collID].numRecords += 3;
collIndex[collID].numRPCRecords += 3;
txWin[collID].size +=
3 * sizeof (union winHeaderEntry)
+ MAXDATAFILENAME;
......@@ -399,9 +399,11 @@ modelWinDefBufferSizes(void)
}
for (collID = 0; collID < nProcsColl; ++collID)
{
int numRecords = ++(collIndex[collID].numRecords);
int numRecords = 1 + collIndex[collID].numDataRecords
+ collIndex[collID].numRPCRecords;
txWin[collID].dictSize = numRecords;
txWin[collID].dictUsed = 1;
txWin[collID].dictDataUsed = 1;
txWin[collID].dictRPCUsed = 0;
txWin[collID].size += sizeof (union winHeaderEntry);
sumWinBufferSize += txWin[collID].size;
}
......@@ -433,7 +435,8 @@ static
txWin[collID].head = txWin[collID].buffer
+ txWin[collID].dictSize * sizeof (union winHeaderEntry);
txWin[collID].refuseFuncCall = 0;
txWin[collID].dictUsed = 1;
txWin[collID].dictDataUsed = 1;
txWin[collID].dictRPCUsed = 0;
}
......@@ -479,23 +482,27 @@ modelWinEnqueue(int collID,
{
union winHeaderEntry *winDict
= (union winHeaderEntry *)txWin[collID].buffer;
int nextEntry = txWin[collID].dictUsed;
int targetEntry;
if (header.dataRecord.streamID > 0)
{
targetEntry = (txWin[collID].dictDataUsed)++;
header.dataRecord.offset
= (int)(txWin[collID].head - txWin[collID].buffer);
memcpy(txWin[collID].head, data, size);
txWin[collID].head += size;
}
else if (header.funcCall.funcID == STREAMOPEN)
else
{
header.funcCall.funcArgs.newFile.offset
= (int)(txWin[collID].head - txWin[collID].buffer);
memcpy(txWin[collID].head, data, size);
txWin[collID].head += size;
targetEntry = txWin[collID].dictSize - ++(txWin[collID].dictRPCUsed);
if (header.funcCall.funcID == STREAMOPEN)
{
header.funcCall.funcArgs.newFile.offset
= (int)(txWin[collID].head - txWin[collID].buffer);
memcpy(txWin[collID].head, data, size);
txWin[collID].head += size;
}
}
winDict[nextEntry] = header;
++(txWin[collID].dictUsed);
winDict[targetEntry] = header;
}
void pioBufferData ( const int streamID, const int varID, const double *data, int nmiss )
......@@ -591,7 +598,8 @@ void pioBufferFuncCall(int funcID, int argc, ... )
for (collID = 0; collID < nProcsColl; ++collID)
{
xassert(txWin[collID].dictSize > txWin[collID].dictUsed);
xassert((txWin[collID].dictSize - txWin[collID].dictRPCUsed)
> txWin[collID].dictDataUsed);
if (txWin[collID].postSet)
{
xmpi(MPI_Win_wait(txWin[collID].win));
......@@ -870,7 +878,8 @@ void pioWriteTimestep ( int tsID, int vdate, int vtime )
*/
union winHeaderEntry header
= { .headerSize = { .sizeID = HEADERSIZEMARKER,
.numHeaderEntries = txWin[collID].dictUsed } };
.numDataEntries = txWin[collID].dictDataUsed,
.numRPCEntries = txWin[collID].dictRPCUsed } };
union winHeaderEntry *winDict
= (union winHeaderEntry *)txWin[collID].buffer;
winDict[0] = header;
......
......@@ -54,7 +54,7 @@ void rpcUnpackResources ( char *, int, MPI_Comm );
struct headerSize
{
int sizeID, numHeaderEntries;
int sizeID, numDataEntries, numRPCEntries;
};
struct dataRecord
......
......@@ -28,7 +28,7 @@ static struct
{
size_t size;
unsigned char *buffer, *head;
int currentRecord;
int currentRecord, dictSize;
} *rxWin = NULL;
static MPI_Win getWin = MPI_WIN_NULL;
......@@ -89,6 +89,7 @@ static
xassert ( decoChunk > 0 );
rxWin[modelID].size += decoChunk * sizeof (double)
+ sizeof (union winHeaderEntry);
++(rxWin[modelID].dictSize);
}
}
}
......@@ -96,12 +97,14 @@ static
// once per stream and timestep for all collprocs only on the modelproc root
rxWin[root].size += 3 * sizeof (union winHeaderEntry)
+ MAXDATAFILENAME;
rxWin[root].dictSize += 3;
}
free ( streamIndexList );
for ( modelID = 0; modelID < nProcsModel; modelID++ )
{
/* account for size header */
rxWin[modelID].dictSize += 1;
rxWin[modelID].size += sizeof (union winHeaderEntry);
sumGetBufferSizes += rxWin[modelID].size;
}
......@@ -126,7 +129,7 @@ static
xmpi ( MPI_Comm_group ( commCalc, &groupCalc ));
xmpi ( MPI_Group_excl ( groupCalc, 1, ranks, &groupModel ));
rxWin = xmalloc(nProcsModel * sizeof (rxWin[0]));
rxWin = xcalloc(nProcsModel, sizeof (rxWin[0]));
collDefBufferSizes ();
for ( modelID = 0; modelID < nProcsModel; modelID++ )
......@@ -205,71 +208,73 @@ void readGetBuffers ( int tsID, int vdate, int vtime )
char text[1024];
int nProcsModel = commInqNProcsModel ();
int root = commInqRootGlob ();
int numFuncCalls = 0;
xdebug("%s", "START");
union winHeaderEntry *winDict
= (union winHeaderEntry *)rxWin[root].buffer;
xassert(winDict[0].headerSize.sizeID == HEADERSIZEMARKER);
int numHeaderEntries = winDict[0].headerSize.numHeaderEntries;
int headerIdx;
for (headerIdx = 1; headerIdx < numHeaderEntries; ++headerIdx)
{
union winHeaderEntry *header
= winDict + headerIdx;
if (header->dataRecord.streamID > 0)
{
streamIDNew = header->dataRecord.streamID;
if ( streamIDNew != streamID )
{
streamID = streamIDNew;
vlistID = streamInqVlist ( streamID );
taxisID = vlistInqTaxis ( vlistID );
taxisDefVdate ( taxisID, vdate );
taxisDefVtime ( taxisID, vtime );
streamDefTimestep ( streamID, tsID );
}
varID = header->dataRecord.varID;
size = vlistInqVarSize ( vlistID, varID );
data = xmalloc ( size * sizeof ( double ));
dataHead = data;
for ( modelID = 0; modelID < nProcsModel; modelID++ )
{
struct dataRecord *dataHeader;
if ( modelID != root )
{
dataHeader = (struct dataRecord *)rxWin[modelID].buffer
- numFuncCalls + headerIdx;
xassert(dataHeader->streamID == streamID
&& dataHeader->varID == varID);
}
else
dataHeader = &(winDict[headerIdx].dataRecord);
int chunk = vlistInqVarDecoChunk(vlistID, varID, modelID);
memcpy(dataHead, rxWin[modelID].buffer
+ dataHeader->offset, chunk * sizeof (data[0]));
dataHead += chunk;
nmiss = dataHeader->nmiss;
}
streamWriteVar ( streamID, varID, data, nmiss );
if ( ddebug > 2 )
{
sprintf ( text, "streamID=%d, var[%d], size=%d", streamID, varID, size );
xprintArray ( text, data, size, DATATYPE_FLT );
}
free ( data );
}
else if (header->funcCall.funcID >= MINFUNCID
&& header->funcCall.funcID <= MAXFUNCID)
{
struct funcCallDesc *header = &(winDict[headerIdx].funcCall);
++numFuncCalls;
readFuncCall(header);
}
else
xabort("Unexpected header item!");
}
{
int dictSize = rxWin[root].dictSize,
firstNonRPCEntry = dictSize - winDict[0].headerSize.numRPCEntries - 1,
headerIdx,
numFuncCalls = 0;
for (headerIdx = dictSize - 1;
headerIdx > firstNonRPCEntry;
--headerIdx)
{
struct funcCallDesc *header
= &(winDict[headerIdx].funcCall);
xassert(header->funcID >= MINFUNCID
&& header->funcID <= MAXFUNCID);
++numFuncCalls;
readFuncCall(header);
}
xassert(numFuncCalls == winDict[0].headerSize.numRPCEntries);
}
{
int numDataEntries = winDict[0].headerSize.numDataEntries;
int headerIdx;
for (headerIdx = 1; headerIdx < numDataEntries; ++headerIdx)
{
union winHeaderEntry *header
= winDict + headerIdx;
xassert(header->dataRecord.streamID > 0);
streamIDNew = header->dataRecord.streamID;
if (streamIDNew != streamID)
{
streamID = streamIDNew;
vlistID = streamInqVlist(streamID);
taxisID = vlistInqTaxis(vlistID);
taxisDefVdate(taxisID, vdate);
taxisDefVtime(taxisID, vtime);
streamDefTimestep(streamID, tsID);
}
varID = header->dataRecord.varID;
size = vlistInqVarSize(vlistID, varID);
data = xmalloc(size * sizeof (data[0]));
dataHead = data;
for (modelID = 0; modelID < nProcsModel; modelID++)
{
struct dataRecord *dataHeader
= (struct dataRecord *)rxWin[modelID].buffer + headerIdx;
xassert(dataHeader->streamID == streamID
&& dataHeader->varID == varID);
int chunk = vlistInqVarDecoChunk(vlistID, varID, modelID);
memcpy(dataHead, rxWin[modelID].buffer
+ dataHeader->offset, chunk * sizeof (data[0]));
dataHead += chunk;
nmiss = dataHeader->nmiss;
}
streamWriteVar(streamID, varID, data, nmiss);
if ( ddebug > 2 )
{
sprintf(text, "streamID=%d, var[%d], size=%d",
streamID, varID, size);
xprintArray(text, data, size, DATATYPE_FLT);
}
free(data);
}
}
xdebug("%s", "RETURN");
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment