Skip to content
Snippets Groups Projects

Consolidation with CDI-PIO (develop)

Merged Sergey Kosukhin requested to merge m300488/develop-rebase into develop
1 file
+ 14
15
Compare changes
  • Side-by-side
  • Inline
  • 84f33ed4
    Split function. · 84f33ed4
    Thomas Jahns authored
    * The server buffer creation does not involve the clients,
      hence it is better to call MPI_Win_create early.
+ 14
15
@@ -168,8 +168,14 @@ setupClientRanks(void)
}
static void
createClientStreamBuf(size_t streamIdx, size_t streamBufferSize, const struct cdiPioConf *conf)
createClientStreamBuf(size_t streamIdx, struct clientBufSize *bufSizes, const struct cdiPioConf *conf)
{
size_t streamBufferSize = 0;
for (size_t i = 0; i < (size_t) numClients_; ++i)
{
streamBufferSize += (rxWin[streamIdx].clientBuf[i].size = bufSizes[i].bufSize);
rxWin[streamIdx].clientBuf[i].dictSize = bufSizes[i].numDataRecords + bufSizes[i].numRPCRecords;
}
if (conf->batchedRMA)
rxWin[streamIdx].clientBuf[0].mem = Malloc(streamBufferSize);
else
@@ -198,17 +204,13 @@ createClientStreamBuf(size_t streamIdx, size_t streamBufferSize, const struct cd
}
static void
cdiPioServerStreamWinCreate(size_t streamIdx, MPI_Info no_locks_info, MPI_Comm collClientIntraComm, struct clientBufSize *bufSizes,
const struct cdiPioConf *conf)
cdiPioServerStreamWinCreate(size_t streamIdx, MPI_Comm collClientIntraComm)
{
MPI_Info no_locks_info;
xmpi(MPI_Info_create(&no_locks_info));
xmpi(MPI_Info_set(no_locks_info, "no_locks", "true"));
xmpi(MPI_Win_create(MPI_BOTTOM, 0, 1, no_locks_info, collClientIntraComm, &rxWin[streamIdx].getWin));
size_t streamBufferSize = 0;
for (size_t i = 0; i < (size_t) numClients_; ++i)
{
streamBufferSize += (rxWin[streamIdx].clientBuf[i].size = bufSizes[i].bufSize);
rxWin[streamIdx].clientBuf[i].dictSize = bufSizes[i].numDataRecords + bufSizes[i].numRPCRecords;
}
createClientStreamBuf(streamIdx, streamBufferSize, conf);
xmpi(MPI_Info_free(&no_locks_info));
}
/************************************************************************/
@@ -1599,10 +1601,8 @@ cdiPioRecvStreamDefVlist(void *buffer, int size, int *pos, MPI_Comm pioInterComm
}
#endif
cdiStreamSetupVlist(streamptr, serverVlistID);
MPI_Info no_locks_info;
xmpi(MPI_Info_create(&no_locks_info));
xmpi(MPI_Info_set(no_locks_info, "no_locks", "true"));
size_t streamIdx = indexOfID(&openStreams, serverStreamID);
cdiPioServerStreamWinCreate(streamIdx, cdiPioInqCollClientIntraComm());
int numClients = cdiPioCommInqSizeClients(), numColl = commInqSizeColl();
struct collSpec collectorData = {
.numClients = numClients,
@@ -1654,8 +1654,7 @@ cdiPioRecvStreamDefVlist(void *buffer, int size, int *pos, MPI_Comm pioInterComm
rxWin[streamIdx].clientDeco.lists = &partDescPreset[0][0];
rxWin[streamIdx].clientDeco.uids = &partDescUID[0][0];
}
cdiPioServerStreamWinCreate(streamIdx, no_locks_info, cdiPioInqCollClientIntraComm(), bufSizes, conf);
xmpi(MPI_Info_free(&no_locks_info));
createClientStreamBuf(streamIdx, bufSizes, conf);
size_t maxNumStreamWrites = (size_t) getMaxNumStreamWrites(streamptr),
currentDstIdxlistCacheSize = cdiPioIdxlistCacheGetSize(DstIdxlistCache);
Loading