Skip to content
Snippets Groups Projects

Consolidation with CDI-PIO (develop)

Merged Sergey Kosukhin requested to merge m300488/develop-rebase into develop
1 file
+ 77
17
Compare changes
  • Side-by-side
  • Inline
+ 77
17
@@ -217,20 +217,13 @@ resizeVarGatherBuf(size_t size, double **buf, size_t *bufSize)
#define wHECast struct winHeaderEntry *)(void *
static Xt_redist
buildVarRedist(int headerIdx, size_t streamIdx,
/* index list representing the data elements gathered on
* this rank */
Xt_idxlist dstList, const struct cdiPioConf *conf)
static Xt_xmap
buildVarXmap(struct Xt_offset_ext *restrict partExts, const struct clientBuf *restrict clientBuf, size_t headerIdx,
Xt_idxlist dstList, MPI_Comm pioInterComm, MPI_Comm collComm, int streamID, int varID, const struct cdiPioConf *conf)
{
const struct clientBuf *restrict clientBuf = rxWin[streamIdx].clientBuf;
const struct winHeaderEntry *winDict = (wHECast) clientBuf[0].mem;
int streamID = openStreams.entries[streamIdx];
int varID = winDict[headerIdx].specific.dataRecord.varID;
struct Xt_offset_ext *partExts = Malloc((size_t) numClients_ * sizeof(partExts[0]));
Xt_idxlist *part = Malloc((size_t) numClients_ * sizeof(part[0]));
MPI_Comm pioInterComm = cdiPioInqInterComm(), collComm = commInqCommColl();
for (size_t clientIdx = 0; clientIdx < (size_t) numClients_; ++clientIdx)
size_t numClients = (size_t) numClients_;
Xt_idxlist *part = Malloc(numClients * sizeof(part[0]));
for (size_t clientIdx = 0; clientIdx < numClients; ++clientIdx)
{
unsigned char *clientMem = clientBuf[clientIdx].mem;
struct dataRecord *dataHeader = &((wHECast) clientMem)[headerIdx].specific.dataRecord;
@@ -248,8 +241,8 @@ buildVarRedist(int headerIdx, size_t streamIdx,
partExts[clientIdx].size = (int) partSize;
partExts[clientIdx].stride = 1;
}
Xt_idxlist srcList = xt_idxlist_collection_new(part, numClients_);
for (size_t clientIdx = 0; clientIdx < (size_t) numClients_; ++clientIdx) xt_idxlist_delete(part[clientIdx]);
Xt_idxlist srcList = xt_idxlist_collection_new(part, (int) numClients);
for (size_t clientIdx = 0; clientIdx < numClients; ++clientIdx) xt_idxlist_delete(part[clientIdx]);
Free(part);
if (conf->stripify)
{
@@ -259,9 +252,76 @@ buildVarRedist(int headerIdx, size_t streamIdx,
}
Xt_xmap gatherXmap = conf->xmap_new(srcList, dstList, collComm);
xt_idxlist_delete(srcList);
return gatherXmap;
}
static Xt_redist
buildVarRedist(int headerIdx, size_t streamIdx,
/* index list representing the data elements gathered on
* this rank */
Xt_idxlist dstList, const struct cdiPioConf *conf)
{
const struct clientBuf *restrict clientBuf = rxWin[streamIdx].clientBuf;
const struct winHeaderEntry *winDict = (wHECast) clientBuf[0].mem;
int streamID = openStreams.entries[streamIdx];
int varID = winDict[headerIdx].specific.dataRecord.varID;
size_t numClients = (size_t) numClients_;
struct Xt_offset_ext *partExts = Malloc(numClients * sizeof(partExts[0]));
MPI_Comm pioInterComm = cdiPioInqInterComm(), collComm = commInqCommColl();
Xt_xmap gatherXmap;
struct Xt_offset_ext gatherExt = { .start = 0, .size = xt_idxlist_get_num_indices(dstList), .stride = 1 };
Xt_redist varRedist = xt_redist_p2p_ext_new(gatherXmap, numClients_, partExts, 1, &gatherExt, MPI_DOUBLE);
xt_xmap_delete(gatherXmap);
Xt_uid *restrict uids = NULL;
int *restrict partSizes = NULL;
bool cacheXmaps = conf->cacheXmaps;
if (cacheXmaps)
{
{
size_t uidBytes = sizeof(*uids) * (numClients + 1), partSizesBytes = sizeof(*partSizes) * (numClients + 1),
partSizeAlign = sizeof(*partSizes), uidBytesRoundUp = (uidBytes + partSizeAlign - 1) / partSizeAlign * partSizeAlign;
uids = Malloc(uidBytes + uidBytesRoundUp + partSizesBytes);
partSizes = (int *) (void *) ((unsigned char *) uids + uidBytes + uidBytesRoundUp);
}
uids[0] = xt_idxlist_get_uid(dstList);
for (size_t clientIdx = 0; clientIdx < numClients; ++clientIdx)
{
unsigned char *clientMem = clientBuf[clientIdx].mem;
struct winHeaderEntry *partHeader = ((wHECast) clientMem) + headerIdx + 1;
xassert(partHeader->id == PARTDESCMARKER);
uids[clientIdx + 1] = unpackXTUID(partHeader->specific.partDesc.packedUID);
}
if ((gatherXmap = cdiPioXmapCacheLookup(XmapCache, uids, partSizes)))
{
for (size_t clientIdx = 0; clientIdx < numClients; ++clientIdx)
{
unsigned char *clientMem = clientBuf[clientIdx].mem;
struct dataRecord *dataHeader = &((wHECast) clientMem)[headerIdx].specific.dataRecord;
int position = ((wHECast) clientMem)[headerIdx + 1].offset;
xassert(namespaceAdaptKey2(((wHECast) clientMem)[headerIdx].id) == streamID && dataHeader->varID == varID
&& position > 0
&& ((size_t) position >= sizeof(struct winHeaderEntry) * (size_t) clientBuf[clientIdx].dictSize)
&& ((size_t) position < clientBuf[clientIdx].size));
size_t charOfs = (size_t) ((clientMem + ((wHECast) clientMem)[headerIdx].offset) - clientBuf[0].mem);
int partSize = partSizes[clientIdx + 1];
xassert(charOfs % sizeof(double) == 0 && charOfs / sizeof(double) + (size_t) partSize <= INT_MAX);
int elemOfs = (int) (charOfs / sizeof(double));
partExts[clientIdx].start = elemOfs;
partExts[clientIdx].size = partSize;
partExts[clientIdx].stride = 1;
}
goto finishXmapCaching;
}
}
gatherXmap = buildVarXmap(partExts, clientBuf, (size_t) headerIdx, dstList, pioInterComm, collComm, streamID, varID, conf);
if (cacheXmaps)
{
partSizes[0] = gatherExt.size;
for (size_t i = 0; i < numClients; ++i) partSizes[i + 1] = partExts[i].size;
cdiPioXmapCacheAdd(XmapCache, uids, partSizes, gatherXmap);
finishXmapCaching:
Free(uids);
}
Xt_redist varRedist = xt_redist_p2p_ext_new(gatherXmap, (int) numClients, partExts, 1, &gatherExt, MPI_DOUBLE);
if (!cacheXmaps) xt_xmap_delete(gatherXmap);
Free(partExts);
return varRedist;
}
Loading