Skip to content
Snippets Groups Projects
Commit e209230c authored by Thomas Jahns's avatar Thomas Jahns :cartwheel: Committed by Sergey Kosukhin
Browse files

Add alternative sequence of RMA and output.

parent 95e441cb
No related branches found
No related tags found
2 merge requests!34Version 2.2.0,!13Consolidation with CDI-PIO (develop)
......@@ -263,6 +263,19 @@ void cdiPioConfSetStripeConversion(int confResH, int doStripify);
* into stripes before being passed to the xmap constructor? */
int cdiPioConfGetStripeConversion(int confResH);
/* cdiPioConfSetBatchedRMA: (de-)activate batched transfer of data
* for all streams before any data is written to disk, i.e. if
* doBatchedRMA is set to 0, data is written as soon as it can be
* retrieved from client ranks, any other value results in all RMA
* transfers occurring immediately. doBatchedRMA == 0 implies less
* memory used on server ranks at the cost of distributing
* disturbances over a longer time */
void cdiPioConfSetBatchedRMA(int confResH, int doBatchedRMA);
/* cdiPioConfGetBatchedRMA: query if batched RMA is active, see
* cdiPioConfSetBatchedRMA */
int cdiPioConfGetBatchedRMA(int confResH);
/* cdiPioDistGridCreate: create a grid data structure where the
* per-coordinate data is distributed over the client tasks
* chunk_decomposition specifies how to distribute the data of each of
......
......@@ -181,6 +181,7 @@ cdiPioConfCreate(void)
conf->aioQueueDepth = 4U;
conf->xmap_new = xt_xmap_dist_dir_new;
conf->stripify = true;
conf->batchedRMA = true;
int resH = reshPut(conf, &cdiPioConfOps);
/* configuration objects are never forwarded */
reshSetStatus(resH, &cdiPioConfOps, reshGetStatus(resH, &cdiPioConfOps) & ~RESH_SYNC_BIT);
......@@ -412,6 +413,20 @@ cdiPioConfGetStripeConversion(int confResH)
return conf->stripify;
}
void
cdiPioConfSetBatchedRMA(int confResH, int doBatchedRMA)
{
struct cdiPioConf *conf = reshGetVal(confResH, &cdiPioConfOps);
conf->batchedRMA = doBatchedRMA;
}
int
cdiPioConfGetBatchedRMA(int confResH)
{
struct cdiPioConf *conf = reshGetVal(confResH, &cdiPioConfOps);
return conf->batchedRMA;
}
/*
* Local Variables:
* c-file-style: "Java"
......
......@@ -38,6 +38,7 @@ struct cdiPioConf
bool largePageAlign;
bool cacheRedists, cacheXmaps;
bool stripify;
bool batchedRMA;
};
extern const resOps cdiPioConfOps;
......@@ -102,4 +103,8 @@ void cdiPioConfSetStripeConversion(int confResH, int doConversion);
int cdiPioConfGetStripeConversion(int confResH);
void cdiPioConfSetBatchedRMA(int confResH, int doBatchedRMA);
int cdiPioConfGetBatchedRMA(int confResH);
#endif
......@@ -76,6 +76,9 @@ struct cacheRedist
#endif
};
static void *sharedClientBuf = NULL;
static size_t sharedClientBufSize = 0;
static struct
{
MPI_Win getWin;
......@@ -135,11 +138,12 @@ static int numPioPrimes;
/************************************************************************/
static void
cdiPioServerStreamWinDestroy(size_t streamIdx)
cdiPioServerStreamWinDestroy(size_t streamIdx, const struct cdiPioConf *conf)
{
if (rxWin[streamIdx].getWin != MPI_WIN_NULL)
{
Free(rxWin[streamIdx].clientBuf[0].mem);
if (conf->batchedRMA) Free(rxWin[streamIdx].clientBuf[0].mem);
rxWin[streamIdx].clientBuf[0].mem = NULL;
xmpi(MPI_Win_free(&rxWin[streamIdx].getWin));
}
}
......@@ -163,7 +167,38 @@ setupClientRanks(void)
}
static void
cdiPioServerStreamWinCreate(size_t streamIdx, MPI_Info no_locks_info, MPI_Comm collClientIntraComm, struct clientBufSize *bufSizes)
createClientStreamBuf(size_t streamIdx, size_t streamBufferSize, const struct cdiPioConf *conf)
{
if (conf->batchedRMA)
rxWin[streamIdx].clientBuf[0].mem = Malloc(streamBufferSize);
else
{
if (streamBufferSize > sharedClientBufSize)
{
intptr_t oldmem = (intptr_t) sharedClientBuf;
sharedClientBuf = Realloc(sharedClientBuf, streamBufferSize);
sharedClientBufSize = streamBufferSize;
if (oldmem != (intptr_t) sharedClientBuf)
{
size_t numStreams = openStreams.size;
for (size_t j = 0; j < numStreams; ++j)
if (rxWin[j].getWin != MPI_WIN_NULL)
{
rxWin[j].clientBuf[0].mem = sharedClientBuf;
for (size_t i = 1; i < (size_t) numClients_; ++i)
rxWin[j].clientBuf[i].mem = rxWin[j].clientBuf[i - 1].mem + rxWin[j].clientBuf[i - 1].size;
}
}
}
rxWin[streamIdx].clientBuf[0].mem = sharedClientBuf;
}
for (size_t i = 1; i < (size_t) numClients_; ++i)
rxWin[streamIdx].clientBuf[i].mem = rxWin[streamIdx].clientBuf[i - 1].mem + rxWin[streamIdx].clientBuf[i - 1].size;
}
static void
cdiPioServerStreamWinCreate(size_t streamIdx, MPI_Info no_locks_info, MPI_Comm collClientIntraComm, struct clientBufSize *bufSizes,
const struct cdiPioConf *conf)
{
xmpi(MPI_Win_create(MPI_BOTTOM, 0, 1, no_locks_info, collClientIntraComm, &rxWin[streamIdx].getWin));
size_t streamBufferSize = 0;
......@@ -172,9 +207,7 @@ cdiPioServerStreamWinCreate(size_t streamIdx, MPI_Info no_locks_info, MPI_Comm c
streamBufferSize += (rxWin[streamIdx].clientBuf[i].size = bufSizes[i].bufSize);
rxWin[streamIdx].clientBuf[i].dictSize = bufSizes[i].numDataRecords + bufSizes[i].numRPCRecords;
}
rxWin[streamIdx].clientBuf[0].mem = Malloc(streamBufferSize);
for (size_t i = 1; i < (size_t) numClients_; ++i)
rxWin[streamIdx].clientBuf[i].mem = rxWin[streamIdx].clientBuf[i - 1].mem + bufSizes[i - 1].bufSize;
createClientStreamBuf(streamIdx, streamBufferSize, conf);
}
/************************************************************************/
......@@ -1282,10 +1315,11 @@ getTimeStepData(int *streamActivity, const struct cdiPioConf *conf)
xmpi(MPI_Get(rxWin[streamIdx].clientBuf[i].mem, (int) rxWin[streamIdx].clientBuf[i].size, MPI_UNSIGNED_CHAR,
clientRanks_[i], 0, (int) rxWin[streamIdx].clientBuf[i].size, MPI_UNSIGNED_CHAR, rxWin[streamIdx].getWin));
xmpi(MPI_Win_complete(rxWin[streamIdx].getWin));
if (!conf->batchedRMA) readGetBuffers(streamIdx, conf);
}
for (size_t streamIdx = 0; streamIdx < openStreams.size; ++streamIdx)
if (streamActivity[streamIdx]) readGetBuffers(streamIdx, conf);
if (conf->batchedRMA)
for (size_t streamIdx = 0; streamIdx < openStreams.size; ++streamIdx)
if (streamActivity[streamIdx]) readGetBuffers(streamIdx, conf);
xdebug("%s", "RETURN");
}
......@@ -1357,6 +1391,7 @@ cdiPioServerStreamOpen(const char *filename, char filemode, int filetype, stream
else if (oldNumStreams < numStreams)
for (size_t i = oldNumStreams; i < numStreams; ++i) rxWin[i].clientBuf = newClientBufs + i * (size_t) numClients_;
rxWin[streamIdx].getWin = MPI_WIN_NULL;
rxWin[streamIdx].clientBuf[0].mem = NULL;
rxWin[streamIdx].prevLayout = NULL;
rxWin[streamIdx].retained = NULL;
rxWin[streamIdx].numRetained = 0;
......@@ -1444,7 +1479,7 @@ cdiPioServerStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
destructRetained(rxWin[streamIdx].retained, rxWin[streamIdx].numRetained);
Free(rxWin[streamIdx].retained);
Free(rxWin[streamIdx].prevLayout);
cdiPioServerStreamWinDestroy(streamIdx);
cdiPioServerStreamWinDestroy(streamIdx, conf);
removeID(&openStreams, streamID);
}
void (*streamCloseCallBack)(int streamID) = (void (*)(int)) conf->callbacks[CDIPIO_CALLBACK_POSTSTREAMCLOSE];
......@@ -1562,7 +1597,7 @@ cdiPioRecvStreamDefVlist(void *buffer, int size, int *pos, MPI_Comm pioInterComm
bufSizes[0] = computeClientStreamBufSize(serverStreamID, &collectorData);
collectorData.sendRPCData = 0;
for (size_t i = 1; i < (size_t) numClients_; ++i) bufSizes[i] = computeClientStreamBufSize(serverStreamID, &collectorData);
cdiPioServerStreamWinCreate(streamIdx, no_locks_info, cdiPioInqCollClientIntraComm(), bufSizes);
cdiPioServerStreamWinCreate(streamIdx, no_locks_info, cdiPioInqCollClientIntraComm(), bufSizes, conf);
xmpi(MPI_Info_free(&no_locks_info));
size_t maxNumStreamWrites = (size_t) getMaxNumStreamWrites(streamptr),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment