Commit 4c17a0cd authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Make RDMA window header entries structs.

* This allows to have the offset entry available for all types.
parent a25a79ba
......@@ -23,7 +23,7 @@ cdiPioClientStreamOpen(const char *filename, const char *filemode,
int filetype, stream_t *streamptr,
int recordBufIsToBeCreated)
{
union winHeaderEntry header;
struct winHeaderEntry header;
size_t filename_len;
if ( tolower ( * filemode ) == 'w' )
{
......@@ -35,7 +35,7 @@ cdiPioClientStreamOpen(const char *filename, const char *filemode,
case STAGE_TIMELOOP:
filename_len = strlen(filename);
xassert(filename_len > 0 && filename_len < MAXDATAFILENAME);
header.funcCall
header.specific.funcCall
= (struct funcCallDesc){
.funcID = STREAMOPEN,
.funcArgs.newFile = { .fnamelen = (int)filename_len,
......@@ -61,14 +61,14 @@ cdiPioClientStreamOpen(const char *filename, const char *filemode,
static void
cdiPioClientStreamDefVlist_(int streamID, int vlistID)
{
union winHeaderEntry header;
struct winHeaderEntry header;
statusCode nspStatus = namespaceInqResStatus ();
switch ( nspStatus )
{
case STAGE_DEFINITION:
break;
case STAGE_TIMELOOP:
header.funcCall
header.specific.funcCall
= (struct funcCallDesc){
.funcID = STREAMDEFVLIST,
.funcArgs.streamChange = { streamID, vlistID } };
......@@ -152,14 +152,14 @@ cdiPioClientStreamNOP(stream_t *streamptr)
static void
cdiPioClientStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
{
union winHeaderEntry header;
struct winHeaderEntry header;
statusCode nspStatus = namespaceInqResStatus ();
switch ( nspStatus )
{
case STAGE_DEFINITION:
break;
case STAGE_TIMELOOP:
header.funcCall
header.specific.funcCall
= (struct funcCallDesc){
.funcID = STREAMCLOSE,
.funcArgs.streamChange = { streamptr->self, CDI_UNDEFID } };
......@@ -177,7 +177,7 @@ cdiPioClientStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
static int
cdiPioClientStreamDefTimestep_(stream_t *streamptr, int tsID)
{
union winHeaderEntry header;
struct winHeaderEntry header;
statusCode nspStatus = namespaceInqResStatus ();
int taxisID, buf_size, position;
char *buf;
......@@ -189,7 +189,7 @@ cdiPioClientStreamDefTimestep_(stream_t *streamptr, int tsID)
case STAGE_TIMELOOP:
position = 0;
taxisID = vlistInqTaxis(streamptr->vlistID);
header.funcCall
header.specific.funcCall
= (struct funcCallDesc){
.funcID = STREAMDEFTIMESTEP,
.funcArgs.streamNewTimestep = { streamptr->self, tsID } };
......
......@@ -372,7 +372,7 @@ modelWinDefBufferSizes(void)
+ sizeof (double) - 1
/* one header for data record, one for corresponding part
* descriptor*/
+ 2 * sizeof (union winHeaderEntry)
+ 2 * sizeof (struct winHeaderEntry)
/* FIXME: heuristic for size of packed Xt_idxlist */
+ sizeof (Xt_int) * collIDchunk * 3;
}
......@@ -385,7 +385,7 @@ modelWinDefBufferSizes(void)
{
collIndex[collID].numRPCRecords += numRPCFuncs;
txWin[collID].size +=
numRPCFuncs * sizeof (union winHeaderEntry)
numRPCFuncs * sizeof (struct winHeaderEntry)
+ MAXDATAFILENAME
/* data part of streamDefTimestep */
+ (2 * CDI_MAX_NAME + sizeof (taxis_t));
......@@ -399,7 +399,7 @@ modelWinDefBufferSizes(void)
txWin[collID].dictDataUsed = 1;
txWin[collID].dictRPCUsed = 0;
/* account for size header */
txWin[collID].size += sizeof (union winHeaderEntry);
txWin[collID].size += sizeof (struct winHeaderEntry);
txWin[collID].size = roundUpToMultiple(txWin[collID].size,
PIO_WIN_ALIGN);
sumWinBufferSize += txWin[collID].size;
......@@ -430,7 +430,7 @@ static
txWin[collID].size <= MAXWINBUFFERSIZE);
memset(txWin[collID].buffer, 0, txWin[collID].size);
txWin[collID].head = txWin[collID].buffer
+ txWin[collID].dictSize * sizeof (union winHeaderEntry);
+ txWin[collID].dictSize * sizeof (struct winHeaderEntry);
txWin[collID].refuseFuncCall = 0;
txWin[collID].dictDataUsed = 1;
txWin[collID].dictRPCUsed = 0;
......@@ -460,7 +460,7 @@ void modelWinCreate ( void )
&txWin[collID].buffer));
xassert ( txWin[collID].buffer != NULL );
txWin[collID].head = txWin[collID].buffer
+ txWin[collID].dictSize * sizeof (union winHeaderEntry);
+ txWin[collID].dictSize * sizeof (struct winHeaderEntry);
xmpi(MPI_Win_create(txWin[collID].buffer, (MPI_Aint)txWin[collID].size, 1,
MPI_INFO_NULL, commInqCommsIO(collID),
&txWin[collID].win));
......@@ -475,33 +475,33 @@ void modelWinCreate ( void )
static void
modelWinEnqueue(int collID,
union winHeaderEntry header, const void *data, size_t size)
struct winHeaderEntry header, const void *data, size_t size)
{
union winHeaderEntry *winDict
= (union winHeaderEntry *)txWin[collID].buffer;
struct winHeaderEntry *winDict
= (struct winHeaderEntry *)txWin[collID].buffer;
int targetEntry;
if (header.dataRecord.streamID > 0)
if (header.specific.dataRecord.streamID > 0)
{
targetEntry = (txWin[collID].dictDataUsed)++;
int offset = header.dataRecord.offset
int offset = header.offset
= (int)roundUpToMultiple(txWin[collID].head - txWin[collID].buffer,
sizeof (double));
memcpy(txWin[collID].buffer + offset, data, size);
txWin[collID].head = txWin[collID].buffer + offset + size;
}
else if (header.partDesc.partDescMarker == PARTDESCMARKER)
else if (header.specific.partDesc.partDescMarker == PARTDESCMARKER)
{
targetEntry = (txWin[collID].dictDataUsed)++;
Xt_uid uid = header.partDesc.uid;
Xt_uid uid = header.specific.partDesc.uid;
int offset = -1;
/* search if same uid entry has already been enqueued */
for (int entry = 2; entry < targetEntry; entry += 2)
{
xassert(winDict[entry].partDesc.partDescMarker
xassert(winDict[entry].specific.partDesc.partDescMarker
== PARTDESCMARKER);
if (winDict[entry].partDesc.uid == uid)
if (winDict[entry].specific.partDesc.uid == uid)
{
offset = winDict[entry].partDesc.offset;
offset = winDict[entry].offset;
break;
}
}
......@@ -511,7 +511,7 @@ modelWinEnqueue(int collID,
* current position */
int position = 0;
MPI_Comm comm = commInqCommsIO(collID);
header.partDesc.offset
header.offset
= (int)(txWin[collID].head - txWin[collID].buffer);
size_t size = xt_idxlist_get_pack_size((Xt_idxlist)data, comm);
size_t remaining_size = txWin[collID].size
......@@ -523,21 +523,21 @@ modelWinEnqueue(int collID,
}
else
/* duplicate entries are copied only once per timestep */
header.partDesc.offset = offset;
header.offset = offset;
}
else
{
targetEntry = txWin[collID].dictSize - ++(txWin[collID].dictRPCUsed);
if (header.funcCall.funcID == STREAMOPEN)
if (header.specific.funcCall.funcID == STREAMOPEN)
{
header.funcCall.funcArgs.newFile.offset
header.offset
= (int)(txWin[collID].head - txWin[collID].buffer);
memcpy(txWin[collID].head, data, size);
txWin[collID].head += size;
}
else if (header.funcCall.funcID == STREAMDEFTIMESTEP)
else if (header.specific.funcCall.funcID == STREAMDEFTIMESTEP)
{
header.funcCall.funcArgs.streamNewTimestep.offset
header.offset
= (int)(txWin[collID].head - txWin[collID].buffer);
memcpy(txWin[collID].head, data, size);
txWin[collID].head += size;
......@@ -568,14 +568,14 @@ pioBufferPartData(int streamID, int varID, const double *data,
Xt_int chunk = xt_idxlist_get_num_indices(partDesc);
xassert(chunk <= INT_MAX);
union winHeaderEntry dataHeader
= { .dataRecord = { streamID, varID, -1, nmiss } };
struct winHeaderEntry dataHeader
= { .specific.dataRecord = { streamID, varID, nmiss }, .offset = -1 };
modelWinEnqueue(collID, dataHeader, data, chunk * sizeof (data[0]));
{
union winHeaderEntry partHeader
= { .partDesc = { .partDescMarker = PARTDESCMARKER,
.uid = xt_idxlist_get_uid(partDesc),
.offset = 0 } };
struct winHeaderEntry partHeader
= { .specific.partDesc = { .partDescMarker = PARTDESCMARKER,
.uid = xt_idxlist_get_uid(partDesc) },
.offset = 0 };
modelWinEnqueue(collID, partHeader, partDesc, 0);
}
......@@ -584,13 +584,13 @@ pioBufferPartData(int streamID, int varID, const double *data,
/************************************************************************/
void pioBufferFuncCall(union winHeaderEntry header,
void pioBufferFuncCall(struct winHeaderEntry header,
const void *data, size_t data_len)
{
int rankGlob = commInqRankGlob ();
int root = commInqRootGlob ();
int collID, nProcsColl = commInqNProcsColl ();
int funcID = header.funcCall.funcID;
int funcID = header.specific.funcCall.funcID;
xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
xdebug("%s, func: %s", "START", funcMap[(-1 - funcID)]);
......@@ -827,12 +827,13 @@ void pioWriteTimestep()
txWin[collID].postSet = 0;
modelWinFlushBuffer ( collID );
}
union winHeaderEntry header
= { .headerSize = { .sizeID = HEADERSIZEMARKER,
.numDataEntries = txWin[collID].dictDataUsed,
.numRPCEntries = txWin[collID].dictRPCUsed } };
union winHeaderEntry *winDict
= (union winHeaderEntry *)txWin[collID].buffer;
struct winHeaderEntry header
= { .specific.headerSize
= { .sizeID = HEADERSIZEMARKER,
.numDataEntries = txWin[collID].dictDataUsed,
.numRPCEntries = txWin[collID].dictRPCUsed } };
struct winHeaderEntry *winDict
= (struct winHeaderEntry *)txWin[collID].buffer;
winDict[0] = header;
xmpi(MPI_Win_post(txWin[collID].ioGroup, iAssert, txWin[collID].win));
......
......@@ -16,8 +16,8 @@ void
pioBufferPartData(int streamID, int varID, const double *data,
int nmiss, Xt_idxlist partDesc);
void pioBufferData (int, int, const double *, int );
void pioBufferFuncCall(union winHeaderEntry header,
const void *data, size_t data_len);
void pioBufferFuncCall(struct winHeaderEntry header,
const void *data, size_t dataSize);
extern float cdiPIOpartInflate_;
......
......@@ -42,13 +42,13 @@ struct headerSize
struct dataRecord
{
int streamID, varID, offset, nmiss;
int streamID, varID, nmiss;
};
struct funcCallDesc
{
int funcID;
union
union funcArgs
{
struct
{
......@@ -56,11 +56,11 @@ struct funcCallDesc
} streamChange;
struct
{
int streamID, tsID, offset;
int streamID, tsID;
} streamNewTimestep;
struct
{
int fnamelen, offset, filetype;
int fnamelen, filetype;
} newFile;
} funcArgs;
};
......@@ -69,16 +69,20 @@ struct funcCallDesc
* partDescMarker == PARTDESCMARKER, always. */
struct partDescRecord
{
int partDescMarker, offset;
int partDescMarker;
Xt_uid uid;
};
union winHeaderEntry
struct winHeaderEntry
{
struct headerSize headerSize;
struct dataRecord dataRecord;
struct funcCallDesc funcCall;
struct partDescRecord partDesc;
union
{
struct headerSize headerSize;
struct dataRecord dataRecord;
struct funcCallDesc funcCall;
struct partDescRecord partDesc;
} specific;
int offset;
};
/* round size to next multiple of factor */
......
......@@ -118,7 +118,7 @@ collDefBufferSizes()
+ sizeof (double) - 1
/* one header for data record, one for
* corresponding part descriptor*/
+ 2 * sizeof (union winHeaderEntry)
+ 2 * sizeof (struct winHeaderEntry)
/* FIXME: heuristic for size of packed Xt_idxlist */
+ sizeof (Xt_int) * decoChunk * 3;
rxWin[modelID].dictSize += 2;
......@@ -127,7 +127,7 @@ collDefBufferSizes()
}
// space required for the 3 function calls streamOpen, streamDefVlist, streamClose
// once per stream and timestep for all collprocs only on the modelproc root
rxWin[root].size += numRPCFuncs * sizeof (union winHeaderEntry)
rxWin[root].size += numRPCFuncs * sizeof (struct winHeaderEntry)
/* serialized filename */
+ MAXDATAFILENAME
/* data part of streamDefTimestep */
......@@ -140,7 +140,7 @@ collDefBufferSizes()
{
/* account for size header */
rxWin[modelID].dictSize += 1;
rxWin[modelID].size += sizeof (union winHeaderEntry);
rxWin[modelID].size += sizeof (struct winHeaderEntry);
rxWin[modelID].size = roundUpToMultiple(rxWin[modelID].size,
PIO_WIN_ALIGN);
sumGetBufferSizes += (size_t)rxWin[modelID].size;
......@@ -183,10 +183,11 @@ static
/************************************************************************/
static void
readFuncCall(struct funcCallDesc *header)
readFuncCall(struct winHeaderEntry *header)
{
int root = commInqRootGlob ();
int funcID = header->funcID;
int funcID = header->specific.funcCall.funcID;
union funcArgs *funcArgs = &(header->specific.funcCall.funcArgs);
xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
switch ( funcID )
......@@ -194,7 +195,7 @@ readFuncCall(struct funcCallDesc *header)
case STREAMCLOSE:
{
int streamID
= namespaceAdaptKey2(header->funcArgs.streamChange.streamID);
= namespaceAdaptKey2(funcArgs->streamChange.streamID);
streamClose(streamID);
xdebug("READ FUNCTION CALL FROM WIN: %s, streamID=%d,"
" closed stream",
......@@ -203,13 +204,12 @@ readFuncCall(struct funcCallDesc *header)
break;
case STREAMOPEN:
{
size_t filenamesz = header->funcArgs.newFile.fnamelen;
size_t filenamesz = funcArgs->newFile.fnamelen;
xassert ( filenamesz > 0 && filenamesz < MAXDATAFILENAME );
const char *filename
= (const char *)(rxWin[root].buffer
+ header->funcArgs.newFile.offset);
= (const char *)(rxWin[root].buffer + header->offset);
xassert(filename[filenamesz] == '\0');
int filetype = header->funcArgs.newFile.filetype;
int filetype = funcArgs->newFile.filetype;
int streamID = streamOpenWrite(filename, filetype);
xassert(streamID != CDI_ELIBNAVAIL);
xdebug("READ FUNCTION CALL FROM WIN: %s, filenamesz=%zu,"
......@@ -221,8 +221,8 @@ readFuncCall(struct funcCallDesc *header)
case STREAMDEFVLIST:
{
int streamID
= namespaceAdaptKey2(header->funcArgs.streamChange.streamID);
int vlistID = namespaceAdaptKey2(header->funcArgs.streamChange.vlistID);
= namespaceAdaptKey2(funcArgs->streamChange.streamID);
int vlistID = namespaceAdaptKey2(funcArgs->streamChange.vlistID);
streamDefVlist(streamID, vlistID);
xdebug("READ FUNCTION CALL FROM WIN: %s, streamID=%d,"
" vlistID=%d, called streamDefVlist ().",
......@@ -232,14 +232,14 @@ readFuncCall(struct funcCallDesc *header)
case STREAMDEFTIMESTEP:
{
MPI_Comm commCalc = commInqCommCalc ();
int streamID = header->funcArgs.streamNewTimestep.streamID;
int streamID = funcArgs->streamNewTimestep.streamID;
int nspTarget = namespaceResHDecode(streamID).nsp;
streamID = namespaceAdaptKey2(streamID);
int tsID
= header->funcArgs.streamNewTimestep.tsID;
= funcArgs->streamNewTimestep.tsID;
int oldTaxisID
= vlistInqTaxis(streamInqVlist(streamID));
int position = header->funcArgs.streamNewTimestep.offset;
int position = header->offset;
int changedTaxisID
= taxisUnpack((char *)rxWin[root].buffer, (int)rxWin[root].size,
&position, nspTarget, &commCalc, 0);
......@@ -270,10 +270,10 @@ gatherArray(int root, int nProcsModel, int headerIdx,
int vlistID,
double *gatherBuf, int *nmiss)
{
union winHeaderEntry *winDict
= (union winHeaderEntry *)rxWin[root].buffer;
int streamID = winDict[headerIdx].dataRecord.streamID;
int varID = winDict[headerIdx].dataRecord.varID;
struct winHeaderEntry *winDict
= (struct winHeaderEntry *)rxWin[root].buffer;
int streamID = winDict[headerIdx].specific.dataRecord.streamID;
int varID = winDict[headerIdx].specific.dataRecord.varID;
int varShape[3] = { 0, 0, 0 };
cdiPioQueryVarDims(varShape, vlistID, varID);
Xt_int varShapeXt[3];
......@@ -290,24 +290,27 @@ gatherArray(int root, int nProcsModel, int headerIdx,
for (int modelID = 0; modelID < nProcsModel; modelID++)
{
struct dataRecord *dataHeader
= &((union winHeaderEntry *)
rxWin[modelID].buffer)[headerIdx].dataRecord;
= &((struct winHeaderEntry *)
rxWin[modelID].buffer)[headerIdx].specific.dataRecord;
struct partDescRecord *partHeader
= &((union winHeaderEntry *)
rxWin[modelID].buffer)[headerIdx + 1].partDesc;
int position = partHeader->offset;
= &((struct winHeaderEntry *)
rxWin[modelID].buffer)[headerIdx + 1].specific.partDesc;
int position =
((struct winHeaderEntry *)rxWin[modelID].buffer)[headerIdx + 1].offset;
xassert(namespaceAdaptKey2(dataHeader->streamID) == streamID
&& dataHeader->varID == varID
&& partHeader->partDescMarker == PARTDESCMARKER
&& position > 0
&& ((size_t)position
>= sizeof (union winHeaderEntry) * rxWin[modelID].dictSize)
>= sizeof (struct winHeaderEntry) * rxWin[modelID].dictSize)
&& ((size_t)position < rxWin[modelID].size));
part[modelID] = xt_idxlist_unpack(rxWin[modelID].buffer,
(int)rxWin[modelID].size,
&position, commCalc);
Xt_int partSize = xt_idxlist_get_num_indices(part[modelID]);
size_t charOfs = (rxWin[modelID].buffer + dataHeader->offset)
size_t charOfs = (rxWin[modelID].buffer
+ ((struct winHeaderEntry *)
rxWin[modelID].buffer)[headerIdx].offset)
- rxWin[0].buffer;
xassert(charOfs % sizeof (double) == 0
&& charOfs / sizeof (double) + partSize <= INT_MAX);
......@@ -527,7 +530,7 @@ streamIsInList(struct streamMapping *streamMap, int numStreamIDs,
}
static struct streamMap
buildStreamMap(union winHeaderEntry *winDict)
buildStreamMap(struct winHeaderEntry *winDict)
{
int streamIDOld = CDI_UNDEFID;
int oldStreamIdx = CDI_UNDEFID;
......@@ -535,14 +538,14 @@ buildStreamMap(union winHeaderEntry *winDict)
int sizeStreamMap = 16;
struct streamMapping *streamMap
= xmalloc(sizeStreamMap * sizeof (streamMap[0]));
int numDataEntries = winDict[0].headerSize.numDataEntries;
int numDataEntries = winDict[0].specific.headerSize.numDataEntries;
int numStreamIDs = 0;
/* find streams written on this process */
for (int headerIdx = 1; headerIdx < numDataEntries; headerIdx += 2)
{
int streamID
= winDict[headerIdx].dataRecord.streamID
= namespaceAdaptKey2(winDict[headerIdx].dataRecord.streamID);
= winDict[headerIdx].specific.dataRecord.streamID
= namespaceAdaptKey2(winDict[headerIdx].specific.dataRecord.streamID);
xassert(streamID > 0);
if (streamID != streamIDOld)
{
......@@ -563,7 +566,7 @@ buildStreamMap(union winHeaderEntry *winDict)
if (filetype == FILETYPE_NC || filetype == FILETYPE_NC2
|| filetype == FILETYPE_NC4)
{
int varID = winDict[headerIdx].dataRecord.varID;
int varID = winDict[headerIdx].specific.dataRecord.varID;
streamMap[oldStreamIdx].varMap[varID] = headerIdx;
}
}
......@@ -607,12 +610,12 @@ static void readGetBuffers()
#endif
xdebug("%s", "START");
union winHeaderEntry *winDict
= (union winHeaderEntry *)rxWin[root].buffer;
xassert(winDict[0].headerSize.sizeID == HEADERSIZEMARKER);
struct winHeaderEntry *winDict
= (struct winHeaderEntry *)rxWin[root].buffer;
xassert(winDict[0].specific.headerSize.sizeID == HEADERSIZEMARKER);
{
int dictSize = rxWin[root].dictSize,
firstNonRPCEntry = dictSize - winDict[0].headerSize.numRPCEntries - 1,
firstNonRPCEntry = dictSize - winDict[0].specific.headerSize.numRPCEntries - 1,
headerIdx,
numFuncCalls = 0;
for (headerIdx = dictSize - 1;
......@@ -620,13 +623,13 @@ static void readGetBuffers()
--headerIdx)
{
struct funcCallDesc *header
= &(winDict[headerIdx].funcCall);
= &(winDict[headerIdx].specific.funcCall);
xassert(header->funcID >= MINFUNCID
&& header->funcID <= MAXFUNCID);
++numFuncCalls;
readFuncCall(header);
readFuncCall(winDict + headerIdx);
}
xassert(numFuncCalls == winDict[0].headerSize.numRPCEntries);
xassert(numFuncCalls == winDict[0].specific.headerSize.numRPCEntries);
}
/* build list of streams, data was transferred for */
{
......@@ -657,9 +660,9 @@ static void readGetBuffers()
for (headerIdx = map.entries[streamIdx].firstHeaderIdx;
headerIdx <= lastHeaderIdx;
headerIdx += 2)
if (streamID == winDict[headerIdx].dataRecord.streamID)
if (streamID == winDict[headerIdx].specific.dataRecord.streamID)
{
int varID = winDict[headerIdx].dataRecord.varID;
int varID = winDict[headerIdx].specific.dataRecord.varID;
int size = vlistInqVarSize(vlistID, varID);
int nmiss;
resizeVarGatherBuf(vlistID, varID, &data,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment