Skip to content
Snippets Groups Projects
Commit 84d53075 authored by Thomas Jahns's avatar Thomas Jahns :cartwheel:
Browse files

Split function.

* The server buffer creation does not involve the clients,
  hence it is better to call MPI_Win_create early.
parent fb0161fc
No related branches found
No related tags found
No related merge requests found
......@@ -171,9 +171,17 @@ setupClientRanks(void)
}
static void
createClientStreamBuf(size_t streamIdx, size_t streamBufferSize,
createClientStreamBuf(size_t streamIdx, struct clientBufSize *bufSizes,
const struct cdiPioConf *conf)
{
size_t streamBufferSize = 0;
for (size_t i = 0; i < (size_t)numClients_; ++i)
{
streamBufferSize +=
(rxWin[streamIdx].clientBuf[i].size = bufSizes[i].bufSize);
rxWin[streamIdx].clientBuf[i].dictSize
= bufSizes[i].numDataRecords + bufSizes[i].numRPCRecords;
}
if (conf->batchedRMA)
rxWin[streamIdx].clientBuf[0].mem = Malloc(streamBufferSize);
else
......@@ -207,22 +215,15 @@ createClientStreamBuf(size_t streamIdx, size_t streamBufferSize,
static void
cdiPioServerStreamWinCreate(size_t streamIdx, MPI_Info no_locks_info,
MPI_Comm collClientIntraComm,
struct clientBufSize *bufSizes,
const struct cdiPioConf *conf)
cdiPioServerStreamWinCreate(size_t streamIdx,
MPI_Comm collClientIntraComm)
{
MPI_Info no_locks_info;
xmpi(MPI_Info_create(&no_locks_info));
xmpi(MPI_Info_set(no_locks_info, "no_locks", "true"));
xmpi(MPI_Win_create(MPI_BOTTOM, 0, 1, no_locks_info, collClientIntraComm,
&rxWin[streamIdx].getWin));
size_t streamBufferSize = 0;
for (size_t i = 0; i < (size_t)numClients_; ++i)
{
streamBufferSize +=
(rxWin[streamIdx].clientBuf[i].size = bufSizes[i].bufSize);
rxWin[streamIdx].clientBuf[i].dictSize
= bufSizes[i].numDataRecords + bufSizes[i].numRPCRecords;
}
createClientStreamBuf(streamIdx, streamBufferSize, conf);
xmpi(MPI_Info_free(&no_locks_info));
}
......@@ -1888,10 +1889,8 @@ cdiPioRecvStreamDefVlist(void *buffer, int size, int *pos,
}
#endif
cdiStreamSetupVlist(streamptr, serverVlistID);
MPI_Info no_locks_info;
xmpi(MPI_Info_create(&no_locks_info));
xmpi(MPI_Info_set(no_locks_info, "no_locks", "true"));
size_t streamIdx = indexOfID(&openStreams, serverStreamID);
cdiPioServerStreamWinCreate(streamIdx, cdiPioInqCollClientIntraComm());
int numClients = cdiPioCommInqSizeClients(),
numColl = commInqSizeColl();
struct collSpec collectorData = {
......@@ -1952,9 +1951,7 @@ cdiPioRecvStreamDefVlist(void *buffer, int size, int *pos,
rxWin[streamIdx].clientDeco.lists = &partDescPreset[0][0];
rxWin[streamIdx].clientDeco.uids = &partDescUID[0][0];
}
cdiPioServerStreamWinCreate(streamIdx, no_locks_info,
cdiPioInqCollClientIntraComm(), bufSizes, conf);
xmpi(MPI_Info_free(&no_locks_info));
createClientStreamBuf(streamIdx, bufSizes, conf);
size_t maxNumStreamWrites = (size_t)getMaxNumStreamWrites(streamptr),
currentDstIdxlistCacheSize = cdiPioIdxlistCacheGetSize(DstIdxlistCache);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment