Commit 1d6b3463 authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Change RDMA window content to use explicit ordering.

* This way all header elements can later be accessed directly.
parent 573af921
......@@ -29,6 +29,7 @@ static struct rdmaWin
MPI_Win win;
int postSet, refuseFuncCall;
MPI_Group ioGroup;
int dictSize, dictUsed;
} *txWin = NULL;
......@@ -328,42 +329,51 @@ void modelWinCleanup ( void )
xdebug("%s", "START");
if (txWin != NULL)
for ( collID = 0; collID < commInqNProcsColl (); collID ++ )
{
if (txWin[collID].postSet)
xmpi(MPI_Win_wait(txWin[collID].win));
xmpi(MPI_Win_free(&txWin[collID].win));
xmpi ( MPI_Free_mem ( txWin[collID].buffer ));
xmpi(MPI_Group_free(&txWin[collID].ioGroup));
}
if (txWin) free(txWin);
{
for ( collID = 0; collID < commInqNProcsColl (); collID ++ )
{
if (txWin[collID].postSet)
xmpi(MPI_Win_wait(txWin[collID].win));
xmpi(MPI_Win_free(&txWin[collID].win));
xmpi ( MPI_Free_mem ( txWin[collID].buffer ));
xmpi(MPI_Group_free(&txWin[collID].ioGroup));
}
free(txWin);
}
xdebug("%s", "RETURN. CLEANED UP MPI_WIN'S");
}
/************************************************************************/
struct collDesc
{
int numRecords;
};
static void
modelWinDefBufferSizes(void)
{
int collID, nstreams, * streamIndexList, streamNo, vlistID, nvars, varID;
int collID, nstreams, * streamIndexList, streamNo, nvars, varID;
int collIDchunk = 0, sumWinBufferSize = 0;
int nProcsColl = commInqNProcsColl ();
int rankGlob = commInqRankGlob ();
int rankModel = commInqRankModel ();
int root = commInqRootGlob ();
struct collDesc *collIndex;
xdebug("%s", "START");
xassert(txWin != NULL);
nstreams = reshCountType ( &streamOps );
streamIndexList = xmalloc ( nstreams * sizeof ( streamIndexList[0] ));
collIndex = xcalloc(nProcsColl, sizeof (collIndex[0]));
reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
for ( streamNo = 0; streamNo < nstreams; streamNo++ )
{
// space required for data
vlistID = streamInqVlist ( streamIndexList[streamNo] );
// memory required for data
int streamID = streamIndexList[streamNo];
int vlistID = streamInqVlist(streamID);
nvars = vlistNvars ( vlistID );
for ( varID = 0; varID < nvars; varID++ )
{
......@@ -371,28 +381,36 @@ modelWinDefBufferSizes(void)
collID = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
collIDchunk = vlistInqVarDecoChunk ( vlistID, varID, rankModel );
xassert ( collID != CDI_UNDEFID && collIDchunk > 0 );
++(collIndex[collID].numRecords);
txWin[collID].size += collIDchunk * sizeof (double) +
winBufferOverheadChunk * sizeof (int);
sizeof (union winHeaderEntry);
}
// space required for the 3 function calls streamOpen, streamDefVlist, streamClose
// memory required for the 3 function calls streamOpen, streamDefVlist, streamClose
// once per stream and timestep for all collprocs only on the modelproc root
if ( rankGlob == root )
for ( collID = 0; collID < nProcsColl; collID++ )
txWin[collID].size += 3 * winBufferOverheadFuncCall * sizeof (int)
+ 5 * sizeof (int) + MAXDATAFILENAME;
{
collIndex[collID].numRecords += 3;
txWin[collID].size +=
3 * sizeof (union winHeaderEntry)
+ MAXDATAFILENAME;
}
}
free ( streamIndexList );
for ( collID = 0; collID < nProcsColl; collID++ )
for (collID = 0; collID < nProcsColl; ++collID)
{
txWin[collID].size += winBufferOverhead * sizeof (int);
int numRecords = ++(collIndex[collID].numRecords);
txWin[collID].dictSize = numRecords;
txWin[collID].dictUsed = 1;
txWin[collID].size += sizeof (union winHeaderEntry);
sumWinBufferSize += txWin[collID].size;
}
free(collIndex);
free ( streamIndexList );
xdebug("sumWinBufferSize=%zu, MAXWINBUFFERSIZE=%zu", (size_t)sumWinBufferSize,
(size_t)MAXWINBUFFERSIZE);
xassert ( sumWinBufferSize <= MAXWINBUFFERSIZE );
/* xprintArray("txWin.size", txWin, nProcsColl, DATATYPE_INT); */
xdebug("%s", "RETURN");
}
......@@ -412,8 +430,10 @@ static
txWin[collID].size >= 0 &&
txWin[collID].size <= MAXWINBUFFERSIZE);
memset(txWin[collID].buffer, 0, txWin[collID].size);
txWin[collID].head = txWin[collID].buffer;
txWin[collID].head = txWin[collID].buffer
+ txWin[collID].dictSize * sizeof (union winHeaderEntry);
txWin[collID].refuseFuncCall = 0;
txWin[collID].dictUsed = 1;
}
......@@ -439,7 +459,8 @@ void modelWinCreate ( void )
xmpi(MPI_Alloc_mem((MPI_Aint)txWin[collID].size, MPI_INFO_NULL,
&txWin[collID].buffer));
xassert ( txWin[collID].buffer != NULL );
txWin[collID].head = txWin[collID].buffer;
txWin[collID].head = txWin[collID].buffer
+ txWin[collID].dictSize * sizeof (union winHeaderEntry);
xmpi(MPI_Win_create(txWin[collID].buffer, (MPI_Aint)txWin[collID].size, 1,
MPI_INFO_NULL, commInqCommsIO(collID),
&txWin[collID].win));
......@@ -453,33 +474,33 @@ void modelWinCreate ( void )
/************************************************************************/
static void
modelWinBufferPutAtEnd(const char * caller,
int collID, const void * argBuffer, size_t size)
modelWinEnqueue(int collID,
union winHeaderEntry header, const void *data, size_t size)
{
/*
xdebug ( "collID=%d, size=%d, newBufferHead=%d, oldBufferSize=%d",
collID, size, txWin[collID].head - txWin[collID].buffer + size,
txWin[collID].size );
*/
if ( txWin == NULL ||
argBuffer == NULL ||
size < 0 ||
collID < 0 ||
collID >= commInqNProcsColl () ||
txWin[collID].head - txWin[collID].buffer + size > txWin[collID].size)
xabort("caller: %s", caller);
memcpy ( txWin[collID].head, argBuffer, size );
txWin[collID].head += size;
union winHeaderEntry *winDict
= (union winHeaderEntry *)txWin[collID].buffer;
int nextEntry = txWin[collID].dictUsed;
if (header.dataRecord.streamID > 0)
{
header.dataRecord.offset
= (int)(txWin[collID].head - txWin[collID].buffer);
memcpy(txWin[collID].head, data, size);
txWin[collID].head += size;
}
else if (header.funcCall.funcID == STREAMOPEN)
{
header.funcCall.funcArgs.newFile.offset
= (int)(txWin[collID].head - txWin[collID].buffer);
memcpy(txWin[collID].head, data, size);
txWin[collID].head += size;
}
winDict[nextEntry] = header;
++(txWin[collID].dictUsed);
}
/************************************************************************/
void pioBufferData ( const int streamID, const int varID, const double *data, int nmiss )
{
int chunk, vlistID, collID = CDI_UNDEFID;
int tokenSep = SEPARATOR, tokenData = DATATOKEN;
size_t size;
int rankModel = commInqRankModel ();
vlistID = streamInqVlist ( streamID );
......@@ -497,15 +518,9 @@ void pioBufferData ( const int streamID, const int varID, const double *data, in
modelWinFlushBuffer ( collID );
}
size = chunk * sizeof ( double ) + winBufferOverheadChunk * sizeof ( int );
xassert(txWin[collID].head - txWin[collID].buffer + size < txWin[collID].size);
modelWinBufferPutAtEnd ( __func__, collID, &tokenData, sizeof ( tokenData ));
modelWinBufferPutAtEnd ( __func__, collID, &streamID , sizeof ( streamID ));
modelWinBufferPutAtEnd ( __func__, collID, &varID , sizeof ( varID ));
modelWinBufferPutAtEnd ( __func__, collID, data , chunk * sizeof ( double ));
modelWinBufferPutAtEnd ( __func__, collID, &nmiss , sizeof ( nmiss ));
modelWinBufferPutAtEnd ( __func__, collID, &tokenSep , sizeof ( tokenSep ));
union winHeaderEntry header
= { .dataRecord = { streamID, varID, -1, nmiss } };
modelWinEnqueue(collID, header, data, chunk * sizeof (data[0]));
txWin[collID].refuseFuncCall = 1;
}
......@@ -518,8 +533,6 @@ void pioBufferFuncCall(int funcID, int argc, ... )
int rankGlob = commInqRankGlob ();
int root = commInqRootGlob ();
int collID, nProcsColl = commInqNProcsColl ();
int tokenSep = SEPARATOR, tokenFuncCall = FUNCCALL;
size_t size = 0;
xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
xdebug("%s, func: %s", "START", funcMap[(-1 - funcID)]);
......@@ -543,9 +556,7 @@ void pioBufferFuncCall(int funcID, int argc, ... )
for ( collID = 0; collID < nProcsColl; collID++ )
{
size = ( winBufferOverheadFuncCall + 1 ) * sizeof ( int );
xassert(txWin[collID].head - txWin[collID].buffer + size <
txWin[collID].size);
xassert(txWin[collID].dictSize > txWin[collID].dictUsed);
if (txWin[collID].postSet)
{
......@@ -556,14 +567,11 @@ void pioBufferFuncCall(int funcID, int argc, ... )
xassert(txWin[collID].refuseFuncCall == 0);
modelWinBufferPutAtEnd ( __func__, collID, &tokenFuncCall,
sizeof ( tokenFuncCall));
modelWinBufferPutAtEnd ( __func__, collID, &funcID,
sizeof ( funcID ));
modelWinBufferPutAtEnd ( __func__, collID, &streamID,
sizeof ( streamID ));
modelWinBufferPutAtEnd ( __func__, collID, &tokenSep,
sizeof ( tokenSep ));
union winHeaderEntry header
= { .funcCall = { .funcID = STREAMCLOSE,
.funcArgs.streamChange = { streamID } } };
modelWinEnqueue(collID, header, 0, 0);
}
xdebug ( "WROTE FUNCTION CALL IN BUFFER OF WINS: %s, streamID=%d",
funcMap[(-1 - funcID)], streamID );
......@@ -584,10 +592,7 @@ void pioBufferFuncCall(int funcID, int argc, ... )
for ( collID = 0; collID < nProcsColl; collID++ )
{
size = ( winBufferOverheadFuncCall + 2 ) * sizeof ( int ) +
MAXDATAFILENAME;
xassert(txWin[collID].head - txWin[collID].buffer + size <
txWin[collID].size);
xassert(txWin[collID].dictSize > txWin[collID].dictUsed);
if (txWin[collID].postSet)
{
......@@ -595,18 +600,13 @@ void pioBufferFuncCall(int funcID, int argc, ... )
txWin[collID].postSet = 0;
modelWinFlushBuffer ( collID );
}
modelWinBufferPutAtEnd ( __func__, collID, &tokenFuncCall,
sizeof ( tokenFuncCall));
modelWinBufferPutAtEnd ( __func__, collID, &funcID,
sizeof ( funcID ));
modelWinBufferPutAtEnd ( __func__, collID, &filenamesz,
sizeof ( filenamesz ));
modelWinBufferPutAtEnd ( __func__, collID, filename,
filenamesz );
modelWinBufferPutAtEnd ( __func__, collID, &filetype,
sizeof ( filetype ));
modelWinBufferPutAtEnd ( __func__, collID, &tokenSep,
sizeof ( tokenSep ));
union winHeaderEntry header
= { .funcCall = { .funcID = STREAMOPEN,
.funcArgs.newFile
= { .fnamelen = filenamesz,
.filetype = filetype } } };
modelWinEnqueue(collID, header, filename, filenamesz + 1);
}
xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS: %s, filenamesz=%zu,"
......@@ -624,9 +624,7 @@ void pioBufferFuncCall(int funcID, int argc, ... )
for ( collID = 0; collID < nProcsColl; collID++ )
{
size = ( winBufferOverheadFuncCall + 2 ) * sizeof ( int );
xassert(txWin[collID].head - txWin[collID].buffer + size <
txWin[collID].size);
xassert(txWin[collID].dictSize > txWin[collID].dictUsed);
if (txWin[collID].postSet)
{
......@@ -634,16 +632,13 @@ void pioBufferFuncCall(int funcID, int argc, ... )
txWin[collID].postSet = 0;
modelWinFlushBuffer ( collID );
}
modelWinBufferPutAtEnd ( __func__, collID, &tokenFuncCall,
sizeof ( tokenFuncCall));
modelWinBufferPutAtEnd ( __func__, collID, &funcID,
sizeof ( funcID ));
modelWinBufferPutAtEnd ( __func__, collID, &streamID,
sizeof ( streamID ));
modelWinBufferPutAtEnd ( __func__, collID, &vlistID,
sizeof ( streamID ));
modelWinBufferPutAtEnd ( __func__, collID, &tokenSep,
sizeof ( tokenSep ));
union winHeaderEntry header
= { .funcCall = { .funcID = STREAMDEFVLIST,
.funcArgs.streamChange = { streamID,
vlistID } } };
modelWinEnqueue(collID, header, 0, 0);
}
xdebug ( "WROTE FUNCTION CALL IN BUFFER OF WINS: %s, streamID=%d,"
......@@ -727,7 +722,8 @@ MPI_Comm pioInit_c ( MPI_Comm commGlob, int nProcsIO, int IOMode,
sizeGlob = commInqSizeGlob ();
if ( nProcsIO <= 0 || nProcsIO > sizeGlob - 1 )
xabort ( "DISTRIBUTION OF TASKS ON PROCS IS NOT VALID." );
xabort("DISTRIBUTION OF TASKS ON PROCS IS NOT VALID.\n"
"nProcsIO=%d, sizeGlob=%d\n", nProcsIO, sizeGlob);
commDefNProcsIO ( nProcsIO );
commDefIOMode ( IOMode, PIO_MAXIOMODE, PIO_MINIOMODEWITHSPECIALPROCS );
......@@ -869,7 +865,7 @@ void pioWriteTimestep ( int tsID, int vdate, int vtime )
{
#ifdef USE_MPI
int collID, buffer[timestepSize], iAssert = 0;
int tokenEnd = END;
/* int tokenEnd = END; */
int rankGlob = commInqRankGlob ();
int nProcsColl = commInqNProcsColl ();
int nProcsModel = commInqNProcsModel ();
......@@ -900,8 +896,17 @@ void pioWriteTimestep ( int tsID, int vdate, int vtime )
txWin[collID].postSet = 0;
modelWinFlushBuffer ( collID );
}
modelWinBufferPutAtEnd ( __func__, collID, &tokenEnd,
sizeof ( tokenEnd ));
/*
* modelWinBufferPutAtEnd ( __func__, collID, &tokenEnd,
* sizeof ( tokenEnd ));
*/
union winHeaderEntry header
= { .headerSize = { .sizeID = HEADERSIZEMARKER,
.numHeaderEntries = txWin[collID].dictUsed } };
union winHeaderEntry *winDict
= (union winHeaderEntry *)txWin[collID].buffer;
winDict[0] = header;
xmpi(MPI_Win_post(txWin[collID].ioGroup, iAssert, txWin[collID].win));
txWin[collID].postSet = 1;
}
......
......@@ -34,20 +34,57 @@ enum
enum
{
winBufferOverhead = 1,
winBufferOverheadChunk = 5,
winBufferOverheadFuncCall = 3,
timestepSize = 3
};
#define MAXWINBUFFERSIZE ((size_t)512 * 1024 * 1024)
enum { nFuncs = 3, STREAMOPEN = -1, STREAMDEFVLIST = -2, STREAMCLOSE = -3 };
enum
{
nFuncs = 3,
STREAMOPEN = -1,
STREAMDEFVLIST = -2,
STREAMCLOSE = -3,
HEADERSIZEMARKER = -nFuncs - 1,
};
enum { MAXDATAFILENAME = 256, MINFUNCID = STREAMCLOSE, MAXFUNCID = STREAMOPEN };
extern char * funcMap[nFuncs];
void rpcUnpackResources ( char *, int, MPI_Comm );
struct headerSize
{
int sizeID, numHeaderEntries;
};
struct dataRecord
{
int streamID, varID, offset, nmiss;
};
struct funcCallDesc
{
int funcID;
union
{
struct
{
int streamID, vlistID;
} streamChange;
struct
{
int fnamelen, offset, filetype;
} newFile;
} funcArgs;
};
union winHeaderEntry
{
struct headerSize headerSize;
struct dataRecord dataRecord;
struct funcCallDesc funcCall;
};
#endif
#endif
......
......@@ -28,6 +28,7 @@ static struct
{
size_t size;
unsigned char *buffer, *head;
int currentRecord;
} *rxWin = NULL;
static MPI_Win getWin = MPI_WIN_NULL;
......@@ -87,24 +88,24 @@ static
decoChunk = vlistInqVarDecoChunk ( vlistID, varID, modelID );
xassert ( decoChunk > 0 );
rxWin[modelID].size += decoChunk * sizeof (double)
+ winBufferOverheadChunk * sizeof (int);
+ sizeof (union winHeaderEntry);
}
}
}
}
// space required for the 3 function calls streamOpen, streamDefVlist, streamClose
// once per stream and timestep for all collprocs only on the modelproc root
rxWin[root].size += 3 * winBufferOverheadFuncCall * sizeof (int)
+ 5 * sizeof (int) + MAXDATAFILENAME;
rxWin[root].size += 3 * sizeof (union winHeaderEntry)
+ MAXDATAFILENAME;
}
free ( streamIndexList );
for ( modelID = 0; modelID < nProcsModel; modelID++ )
{
rxWin[modelID].size += winBufferOverhead * sizeof (int);
/* account for size header */
rxWin[modelID].size += sizeof (union winHeaderEntry);
sumGetBufferSizes += rxWin[modelID].size;
}
xassert ( sumGetBufferSizes <= MAXWINBUFFERSIZE );
/* xprintArray ( "getBufferSize", getBufferSize, nProcsModel, DATATYPE_INT ); */
}
/************************************************************************/
......@@ -127,7 +128,6 @@ static
rxWin = xmalloc(nProcsModel * sizeof (rxWin[0]));
collDefBufferSizes ();
/* xprintArray ( "getBufferSizes", getBufferSize, nProcsModel, DATATYPE_INT ); */
for ( modelID = 0; modelID < nProcsModel; modelID++ )
{
......@@ -140,95 +140,54 @@ static
/************************************************************************/
static
void getBufferGetFromEnd ( const char * caller, int line,
int ID, void * argBuffer, size_t size )
static void
readFuncCall(struct funcCallDesc *header)
{
if (rxWin == NULL ||
argBuffer == NULL ||
size < 0 ||
ID < 0 ||
ID >= commInqNProcsModel () ||
rxWin[ID].head - rxWin[ID].buffer + size > rxWin[ID].size)
xabort("caller: %s, line %d, ID = %d, nProcsModel=%d,"
" size = %lu, rxWin[%d].head = %ld, rxWin[%d].size = %lu",
caller, line, ID, (unsigned long)size, ID,
commInqNProcsModel(), rxWin[ID].head - rxWin[ID].buffer,
ID, (unsigned long)rxWin[ID].size);
memcpy ( argBuffer, rxWin[ID].head, size );
rxWin[ID].head += size;
}
/************************************************************************/
static
void readFuncCall ( void )
{
int funcID, tokenID;
int root = commInqRootGlob ();
int funcID = header->funcID;
getBufferGetFromEnd ( __func__, __LINE__,
root, &funcID, sizeof ( funcID ));
xassert ( funcID >= MINFUNCID && funcID <= MAXFUNCID );
xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
switch ( funcID )
{
case STREAMCLOSE:
{
int streamID;
getBufferGetFromEnd ( __func__, __LINE__,
root, &streamID, sizeof ( streamID ));
streamClose ( streamID );
xdebug ( "READ FUNCTION CALL FROM WIN: %s, streamID=%d,"
" closed stream",
funcMap[(-1 - funcID)], streamID );
int streamID = header->funcArgs.streamChange.streamID;
streamClose(streamID);
xdebug("READ FUNCTION CALL FROM WIN: %s, streamID=%d,"
" closed stream",
funcMap[(-1 - funcID)], streamID);
}
break;
case STREAMOPEN:
{
char *filename;
size_t filenamesz;
int filetype, streamID;
getBufferGetFromEnd ( __func__, __LINE__,
root, &filenamesz, sizeof ( filenamesz ));
size_t filenamesz = header->funcArgs.newFile.fnamelen;
xassert ( filenamesz > 0 && filenamesz < MAXDATAFILENAME );
filename = xmalloc(filenamesz + 1);
getBufferGetFromEnd ( __func__, __LINE__,
root, filename, filenamesz );
filename[filenamesz] = '\0';
getBufferGetFromEnd ( __func__, __LINE__,
root, &filetype, sizeof ( filetype ));
const char *filename
= (const char *)(rxWin[root].buffer
+ header->funcArgs.newFile.offset);
xassert(filename[filenamesz] == '\0');
int filetype = header->funcArgs.newFile.filetype;
xassert ( filetype >= MINFILETYPE && filetype <= MAXFILETYPE );
streamID = streamOpenWrite ( filename, filetype );
int streamID = streamOpenWrite(filename, filetype);
xdebug("READ FUNCTION CALL FROM WIN: %s, filenamesz=%zu,"
" filename=%s, filetype=%d, OPENED STREAM %d",
funcMap[(-1 - funcID)], filenamesz, filename,
filetype, streamID);
free(filename);
}
break;
break;
case STREAMDEFVLIST:
{
int streamID, vlistID;
getBufferGetFromEnd ( __func__, __LINE__,
root, &streamID, sizeof ( vlistID ));
getBufferGetFromEnd ( __func__, __LINE__,
root, &vlistID, sizeof ( vlistID ));
streamDefVlist ( streamID, vlistID );
xdebug ( "READ FUNCTION CALL FROM WIN: %s, streamID=%d,"
" vlistID=%d, called streamDefVlist ().",
funcMap[(-1 - funcID)], streamID, vlistID );
int streamID = header->funcArgs.streamChange.streamID;
int vlistID = header->funcArgs.streamChange.vlistID;
streamDefVlist(streamID, vlistID);
xdebug("READ FUNCTION CALL FROM WIN: %s, streamID=%d,"
" vlistID=%d, called streamDefVlist ().",
funcMap[(-1 - funcID)], streamID, vlistID);
}
break;
default:
xabort ( "REMOTE FUNCTIONCALL NOT IMPLEMENTED!" );
}
getBufferGetFromEnd ( __func__, __LINE__,
root, &tokenID, sizeof ( tokenID ));
xassert ( tokenID == SEPARATOR );
}
/************************************************************************/
......@@ -240,26 +199,28 @@ void readGetBuffers ( int tsID, int vdate, int vtime )
double * data = NULL, * dataHead = NULL;
int streamID = CDI_UNDEFID, streamIDNew = CDI_UNDEFID;
int varID, vlistID = CDI_UNDEFID, taxisID;
int size, chunk;
int tokenID, tokenID2;
int size;
int nmiss = 0;
char text[1024];
int nProcsModel = commInqNProcsModel ();
int root = commInqRootGlob ();
int numFuncCalls = 0;
xdebug("%s", "START");
getBufferGetFromEnd ( __func__, __LINE__,
root, &tokenID, sizeof ( tokenID ));
while ( tokenID != END )
union winHeaderEntry *winDict
= (union winHeaderEntry *)rxWin[root].buffer;
xassert(winDict[0].headerSize.sizeID == HEADERSIZEMARKER);
int numHeaderEntries = winDict[0].headerSize.numHeaderEntries;
int headerIdx;
for (headerIdx = 1; headerIdx < numHeaderEntries; ++headerIdx)
{
switch ( tokenID )