Commit 3a10d966 authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Aggregate small records before write to decrease synchronization of I/O servers.

parent 88c63df1
......@@ -143,9 +143,11 @@ cdiPioConfPrintP(void *cdiPioConfPtr, FILE * fp)
"client/server = %s\n"
"part data imbalance = %f\n"
"aligning of block buffers to large pages is %sabled\n"
"record aggregation buffer size %zu\n"
"callback after setup of communication = %p\n",
cdiPioConfPtr, iomodeStr, CSRoleStr, conf->partInflate,
conf->largePageAlign ? "en" : "dis",
(size_t)conf->recordAggBufLimMB * 1024 * 1024,
(void *)conf->postCommSetupActions);
}
......@@ -159,6 +161,7 @@ int cdiPioConfCreate(void)
conf->postCommSetupActions = cdiPioNoPostCommSetup;
conf->largePageAlign = false;
conf->cacheRedists = false;
conf->recordAggBufLimMB = 128;
int resH = reshPut(conf, &cdiPioConfOps);
/* configuration objects are never forwarded */
reshSetStatus(resH, &cdiPioConfOps,
......@@ -248,3 +251,18 @@ int cdiPioConfGetRedistCache(int confResH)
return conf->cacheRedists;
}
void cdiPioConfSetRecordAggBufLim(int confResH, int lim_mb)
{
struct cdiPioConf *conf = reshGetVal(confResH, &cdiPioConfOps);
if (lim_mb > 0)
conf->recordAggBufLimMB = (unsigned)lim_mb;
else
Error("unexpected negative buffer size value %d requested", lim_mb);
}
int cdiPioConfGetRecordAggBufLim(int confResH)
{
struct cdiPioConf *conf = reshGetVal(confResH, &cdiPioConfOps);
return (int)conf->recordAggBufLimMB;
}
......@@ -21,6 +21,7 @@ struct cdiPioConf {
int clientServerRole;
float partInflate;
void (*postCommSetupActions)(void);
unsigned recordAggBufLimMB;
bool largePageAlign;
bool cacheRedists;
};
......@@ -63,5 +64,9 @@ void cdiPioConfSetRedistCache(int confResH, int doCache);
int cdiPioConfGetRedistCache(int confResH);
void cdiPioConfSetRecordAggBufLim(int confResH, int lim_mb);
int cdiPioConfGetRecordAggBufLim(int confResH);
#endif
......@@ -70,13 +70,16 @@ static struct
struct streamMemLayout *prevLayout;
size_t numRetained;
struct cacheRedist *retained;
size_t aggBufSize, aggBufUsed;
void *aggBuf;
} *rxWin;
static struct idList openStreams;
static struct idList openStreams, openFiles;
struct recordWrite
{
int varID, level;
size_t dataSize;
};
struct streamMapping {
......@@ -638,14 +641,27 @@ streamMappingNew(size_t streamIdx, struct winHeaderEntry *winDict,
result->hasMissing = result->varMap + 2 * numVars;
{
size_t j = (size_t)-1;
/* initialized to shut up gcc, loop logic ensures initialization
* at least once */
size_t recordDataSize = 0;
int lastVarID = -1;
for (int varID = 0; varID < numVars; ++varID)
{
size_t numLvl = (size_t)(result->numLvlsW[varID] = numLvlsW[varID]);
if (varID != lastVarID)
{
int varShape[3];
cdiPioQueryVarDims(varShape, vlistID, varID);
recordDataSize = (size_t)varShape[0] * (size_t)varShape[1]
* sizeof (double);
lastVarID = varID;
}
result->varMap[varID] = varMap[varID];
result->hasMissing[varID] = hasMissing_[varID];
for (size_t lvl = 0; lvl < numLvl; ++lvl)
result->writtenRecords[++j]
= (struct recordWrite){ .varID = varID, .level = (int)lvl };
= (struct recordWrite){ .varID = varID, .level = (int)lvl,
.dataSize = recordDataSize};
}
}
result->numVars = numVars;
......@@ -717,6 +733,166 @@ handleRedistCache(size_t streamIdx,
return reuseRedists;
}
/* denote what will be aggregated at a single process */
struct passPlan
{
unsigned recordAggStart, recordAggEnd;
int varStart, varEnd;
};
void
deco1D_CCP(size_t n, const size_t weightPfxSums[n],
size_t nparts, size_t separators[nparts + 1]);
/**
* @param[out] passes pointer to pointer to 2-dimensional array of
* records of dimensions $number of passes \cdot number of collectors$,
* where $(*passes)[pass][i]$ details the records written by collector
* rank \a i
* @return number of passes
*/
static size_t
planPasses(size_t streamIdx, const struct streamMapping *mapping,
const struct cdiPioConf *conf, size_t collSize,
struct passPlan (**passes_)[collSize])
{
(void)streamIdx;
size_t numPasses = 0;
size_t recordAggBufLim = conf->recordAggBufLimMB * 1024 * 1024,
totalAggBufSpace = recordAggBufLim * collSize,
totalWritten = 0;
/* find total size of data written for the stream and build prefix sums */
size_t numWrittenRecords = (size_t)mapping->numWrittenRecords;
if (numWrittenRecords == 0)
return 0;
size_t *restrict recordDataSizePfxSums
= Malloc((numWrittenRecords + 1 + collSize + 1)
* sizeof (*recordDataSizePfxSums)),
*restrict recordSeparations = recordDataSizePfxSums + numWrittenRecords + 1;
const struct recordWrite *restrict writtenRecords
= mapping->writtenRecords;
recordDataSizePfxSums[0] = 0;
for (size_t i = 0; i < numWrittenRecords; ++i)
{
size_t recordDataSize = writtenRecords[i].dataSize;
recordDataSizePfxSums[i + 1]
= recordDataSizePfxSums[i] + recordDataSize;
totalWritten += recordDataSize;
}
/* move if into loop for handling last pass */
if (totalWritten < totalAggBufSpace)
{
/* don't go to limit of some tasks where a single pass will be
* sufficient to write everything, compute load-balancing
* instead */
numPasses = 1;
struct passPlan *passes = Malloc(sizeof (*passes) * collSize);
deco1D_CCP(numWrittenRecords, recordDataSizePfxSums,
collSize, recordSeparations);
for (size_t rank = 0; rank < collSize; ++rank)
{
size_t startRecord = recordSeparations[rank],
lastRecord = recordSeparations[rank + 1] - 1;
passes[rank] = (struct passPlan){
.recordAggStart = (unsigned)startRecord,
.recordAggEnd = (unsigned)lastRecord,
.varStart = writtenRecords[startRecord].varID,
.varEnd = writtenRecords[lastRecord].varID,
};
}
*passes_ = (struct passPlan(*)[collSize])passes;
}
else
{
/* aggregate as many records on each task to fill up to
* recordAggLim data bytes, but use at least one, unless none
* remain */
size_t firstRecordOfPass = 0, curRecord;
struct passPlan (*passes)[collSize] = NULL;
do
{
size_t taskBegin = firstRecordOfPass;
curRecord = firstRecordOfPass - 1;
passes = Realloc(passes, sizeof (*passes) * (numPasses + 1));
for (size_t rank = 0; rank < collSize; ++rank)
{
size_t recordAggBufSize = 0;
while (curRecord + 1 < numWrittenRecords
&& ((recordAggBufSize
+ writtenRecords[curRecord + 1].dataSize)
< recordAggBufLim))
recordAggBufSize += writtenRecords[++curRecord].dataSize;
if (curRecord == taskBegin - 1
&& curRecord + 1 < numWrittenRecords)
++curRecord;
passes[numPasses][rank] = (struct passPlan){
.recordAggStart = (unsigned)taskBegin,
.recordAggEnd = (unsigned)curRecord,
.varStart = writtenRecords[taskBegin].varID,
.varEnd = writtenRecords[curRecord].varID,
};
taskBegin = curRecord + 1;
}
++numPasses, firstRecordOfPass = curRecord + 1;
}
while (curRecord + 1 < numWrittenRecords);
*passes_ = passes;
}
Free(recordDataSizePfxSums);
return numPasses;
}
static inline unsigned
umax(unsigned a, unsigned b)
{
return a >= b ? a : b;
}
static inline unsigned
umin(unsigned a, unsigned b)
{
return a <= b ? a : b;
}
static inline size_t
szmin(size_t a, size_t b)
{
return a <= b ? a : b;
}
static inline size_t
szmax(size_t a, size_t b)
{
return a >= b ? a : b;
}
static size_t
aggBufAppend(int fileID, const void *restrict ptr, size_t size)
{
size_t fileIdx = indexOfID(&openFiles, fileID),
aggBufSize = rxWin[fileIdx].aggBufSize,
aggBufUsed = rxWin[fileIdx].aggBufUsed;
void *restrict aggBuf = rxWin[fileIdx].aggBuf;
if (aggBufUsed + size > aggBufSize)
rxWin[fileIdx].aggBuf = aggBuf
= Realloc(aggBuf, (rxWin[fileIdx].aggBufSize = aggBufUsed + size));
memcpy((unsigned char *)aggBuf + aggBufUsed, ptr, size);
rxWin[fileIdx].aggBufUsed = aggBufUsed + size;
return size;
}
static void
aggBufFlush(size_t streamIdx,
size_t (*cdiPioFileWrite)(int, const void *restrict, size_t, int))
{
int fileID = openFiles.entries[streamIdx];
int streamID = openStreams.entries[streamIdx];
cdiPioFileWrite(fileID, rxWin[streamIdx].aggBuf, rxWin[streamIdx].aggBufUsed,
streamInqCurTimestepID(streamID));
rxWin[streamIdx].aggBufUsed = 0;
}
static void
writeGribStream(size_t streamIdx,
......@@ -727,60 +903,104 @@ writeGribStream(size_t streamIdx,
const struct clientBuf *restrict clientBuf = rxWin[streamIdx].clientBuf;
int streamID = openStreams.entries[streamIdx];
int vlistID = streamInqVlist(streamID);
int numRecords = mapping->numWrittenRecords;
int fileID = streamInqFileID(streamID);
MPI_Comm collComm = commInqCommColl();
int collSize = commInqSizeColl();
int collRank = commInqRankColl();
int numPasses = (numRecords + collSize - 1)/collSize;
int varIDsOfPass[collSize];
Xt_redist varRedists[collSize];
MPI_Aint displ[collSize];
memset(displ, 0, sizeof (displ));
size_t collSize = (size_t)commInqSizeColl();
size_t collRank = (size_t)commInqRankColl();
struct passPlan (*passes)[collSize] = NULL;
size_t numPasses = planPasses(streamIdx, mapping, conf, collSize, &passes);
Xt_redist *varRedists = NULL;
struct recordWrite *restrict writtenRecords = mapping->writtenRecords;
size_t (*cdiPioFileWrite)(int fileID, const void *restrict buffer,
size_t len, int tsID) = 0;
/* will this task make a synchronization-only write call? */
if (collRank >= numRecords - (numPasses - 1) * collSize)
cdiPioFileWrite = (size_t (*)(int, const void *restrict, size_t, int))
namespaceSwitchGet(NSSWITCH_FILE_WRITE).func;
size_t len, int tsID)
= (size_t (*)(int, const void *restrict, size_t, int))
namespaceSwitchGet(NSSWITCH_FILE_WRITE).func;
bool reuseRedists = conf->cacheRedists != 0
? handleRedistCache(streamIdx, mapping, (size_t)numPasses, vlistID, collComm)
: false;
struct cacheRedist *restrict retained = rxWin[streamIdx].retained;
for (int pass = 0, base = 0; pass < numPasses; ++pass, base += collSize)
struct {
int varID;
unsigned recordStart, recordEnd;
} *varsInPass = NULL;
MPI_Aint *displ = NULL;
for (size_t pass = 0; pass < numPasses; ++pass)
{
unsigned numVarsInPass = 1;
varIDsOfPass[0] = writtenRecords[base].varID;
int passSize = (pass < numPasses - 1) ? collSize : numRecords - (numPasses - 1) * collSize;
unsigned base = passes[pass][0].recordAggStart;
size_t numRecordsInPass = passes[pass][collSize - 1].recordAggEnd
- base + 1;
size_t maxVarsInPass = (size_t)(passes[pass][collSize - 1].varEnd
- passes[pass][0].varStart + 1);
varsInPass
= Realloc(varsInPass, sizeof (*varsInPass)
* szmin(numRecordsInPass, maxVarsInPass));
/* establish variables involved in this pass */
for (int i = 1; i < passSize; ++i)
if (varIDsOfPass[numVarsInPass - 1] != writtenRecords[base + i].varID)
varIDsOfPass[numVarsInPass++] = writtenRecords[base + i].varID;
int mySliceSize = 0;
size_t numVarsInPass = 1;
varsInPass[0].recordStart = base;
int lastSeenVarID =
varsInPass[0].varID = writtenRecords[base].varID;
for (size_t i = 1; i < numRecordsInPass; ++i)
if (lastSeenVarID != writtenRecords[base + i].varID)
{
varsInPass[numVarsInPass - 1].recordEnd = (unsigned)(base + i - 1);
varsInPass[numVarsInPass].varID
= lastSeenVarID = writtenRecords[base + i].varID;
varsInPass[numVarsInPass].recordStart = (unsigned)(base + i);
++numVarsInPass;
}
varsInPass[numVarsInPass - 1].recordEnd
= (unsigned)(base + numRecordsInPass - 1);
varRedists = Realloc(varRedists, numVarsInPass * sizeof (*varRedists));
size_t myRecordStart = passes[pass][collRank].recordAggStart,
myRecordEnd = passes[pass][collRank].recordAggEnd;
size_t myAggSize = 0;
/* build or fetch from cache redists for all variables involved in current write pass */
Xt_redist compositePassRedist;
if (reuseRedists)
{
compositePassRedist = retained[pass].redist;
mySliceSize = retained[pass].sliceSize;
myAggSize = (size_t)retained[pass].sliceSize;
}
else
{
int myVarStart = passes[pass][collRank].varStart,
myVarEnd = passes[pass][collRank].varEnd;
displ = Realloc(displ, sizeof (*displ) * (numVarsInPass * 2 + 1));
memset(displ, 0, sizeof (*displ) * (numVarsInPass + 1));
for (unsigned varIdx = 0; varIdx < numVarsInPass; ++varIdx)
{
int varID = varIDsOfPass[varIdx];
int varID = varsInPass[varIdx].varID;
Xt_idxlist dstList;
/* is this process writing part of this variable? */
if (collRank < passSize
&& writtenRecords[base + collRank].varID == varID)
if (myRecordStart <= myRecordEnd
&& myVarStart <= varID && myVarEnd >= varID)
{
int myLevel = writtenRecords[base + collRank].level;
size_t myVarRecordStart
= writtenRecords[myRecordStart].varID == varID
? myRecordStart : varsInPass[varIdx].recordStart;
size_t myLevelStart
= (size_t)writtenRecords[myVarRecordStart].level;
size_t myVarRecordEnd
= writtenRecords[myRecordEnd].varID == varID
? myRecordEnd : (size_t)varsInPass[varIdx].recordEnd;
size_t myNumLevels
= (size_t)writtenRecords[myVarRecordEnd].level
- myLevelStart + 1;
dstList
= buildVarSlicesIdxList(vlistID, varID, myLevel, 1);
mySliceSize = xt_idxlist_get_num_indices(dstList);
= buildVarSlicesIdxList(vlistID, varID, (int)myLevelStart,
(int)myNumLevels);
size_t sliceSize = (size_t)xt_idxlist_get_num_indices(dstList);
assert(sliceSize * sizeof (double)
== (writtenRecords[myVarRecordStart].dataSize
* myNumLevels));
myAggSize += sliceSize;
}
else
dstList = xt_idxempty_new();
{
dstList = xt_idxempty_new();
}
displ[numVarsInPass + varIdx + 1]
= (MPI_Aint)(sizeof (double) * myAggSize);
varRedists[varIdx] = buildVarRedist(mapping->varMap[varID],
streamIdx, dstList);
xt_idxlist_delete(dstList);
......@@ -789,10 +1009,12 @@ writeGribStream(size_t streamIdx,
if (numVarsInPass > 1)
{
compositePassRedist
= xt_redist_collection_static_new(varRedists, (int)numVarsInPass,
displ, displ, collComm);
= xt_redist_collection_static_new(varRedists,
(int)numVarsInPass,
displ, displ + numVarsInPass,
collComm);
/* free individual redists */
for (unsigned varIdx = 0; varIdx < numVarsInPass; ++varIdx)
for (size_t varIdx = 0; varIdx < numVarsInPass; ++varIdx)
xt_redist_delete(varRedists[varIdx]);
}
else
......@@ -800,31 +1022,67 @@ writeGribStream(size_t streamIdx,
if (conf->cacheRedists)
{
retained[pass].redist = compositePassRedist;
retained[pass].sliceSize = mySliceSize;
retained[pass].sliceSize = (int)myAggSize;
}
}
/* resize gather buffer if needed */
resizeVarGatherBuf(mySliceSize, data_, currentDataBufSize);
resizeVarGatherBuf((int)myAggSize, data_, currentDataBufSize);
/* execute composite redist */
xt_redist_s_exchange1(compositePassRedist, clientBuf[0].mem, *data_);
/* delete composite redist */
if (!conf->cacheRedists)
xt_redist_delete(compositePassRedist);
/* write level (if having one) */
if (collRank < passSize)
/* append encoded data records from this pass to buffer written later */
/* todo: develop better heuristic for buffer size */
if (sizeof (double) * myAggSize > rxWin[streamIdx].aggBufSize)
{
int varID = writtenRecords[base + collRank].varID;
int level = writtenRecords[base + collRank].level;
int nmiss
= countVarChunkMissingVals(vlistID, varID, mapping,
mySliceSize, *data_);
streamWriteVarSlice(streamID, varID, level, *data_, nmiss);
Free(rxWin[streamIdx].aggBuf);
size_t aggBufSize = szmax((size_t)conf->recordAggBufLimMB
* (size_t)1024 * (size_t)1024,
sizeof (double) * myAggSize);
if (posix_memalign(&rxWin[streamIdx].aggBuf,
cdiPioGetPageSize(conf->largePageAlign),
aggBufSize) == 0) ;
else
rxWin[streamIdx].aggBuf = Malloc(aggBufSize);
}
namespaceSwitchSet(NSSWITCH_FILE_WRITE, NSSW_FUNC(aggBufAppend));
/* write records to aggregation buffer */
if (myRecordStart <= myRecordEnd)
{
size_t varIdx = (size_t)-1;
int varID = -1;
size_t base = 0;
const double *data = *data_;
for (size_t recordIdx = myRecordStart;
recordIdx <= myRecordEnd;
++recordIdx)
{
int level = writtenRecords[recordIdx].level;
int prevVarID = varID;
varID = writtenRecords[recordIdx].varID;
varIdx += varID != prevVarID;
size_t recordSize = writtenRecords[recordIdx].dataSize;
size_t nvals = recordSize / sizeof (double);
int nmiss
= countVarChunkMissingVals(vlistID, varID, mapping, (int)nvals,
data + base);
streamWriteVarSlice(streamID, varID, level, data + base, nmiss);
base += nvals;
}
aggBufFlush(streamIdx, cdiPioFileWrite);
}
else
/* write zero bytes to trigger synchronization code in fileWrite */
cdiPioFileWrite(streamInqFileID(streamID), NULL, 0,
cdiPioFileWrite(fileID, NULL, 0,
streamInqCurTimestepID(streamID));
namespaceSwitchSet(NSSWITCH_FILE_WRITE, NSSW_FUNC(cdiPioFileWrite));
}
Free(displ);
Free(varRedists);
Free(varsInPass);
Free(passes);
}
static void
......@@ -972,6 +1230,8 @@ cdiPioServerStreamOpen(const char *filename, char filemode,
{
size_t oldNumStreams = openStreams.size;
size_t streamIdx = insertID(&openStreams, streamptr->self);
size_t fileIdx = insertID(&openFiles, fileID);
xassert(fileIdx == streamIdx);
size_t numStreams = openStreams.size;
struct clientBuf *oldClientBufs = rxWin ? rxWin[0].clientBuf : NULL;
rxWin = Realloc(rxWin, numStreams * sizeof (rxWin[0]));
......@@ -988,6 +1248,9 @@ cdiPioServerStreamOpen(const char *filename, char filemode,
rxWin[streamIdx].prevLayout = NULL;
rxWin[streamIdx].retained = NULL;
rxWin[streamIdx].numRetained = 0;
rxWin[streamIdx].aggBufSize = 0;
rxWin[streamIdx].aggBufUsed = 0;
rxWin[streamIdx].aggBuf = NULL;
#if defined HAVE_LIBNETCDF && ! defined HAVE_PARALLEL_NC4
rxWin[streamIdx].ownerRank = rank;
#endif
......@@ -1028,8 +1291,11 @@ cdiPioServerStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
destructRetained(rxWin[streamIdx].retained,
rxWin[streamIdx].numRetained);
Free(rxWin[streamIdx].retained);
Free(rxWin[streamIdx].prevLayout);
Free(rxWin[streamIdx].aggBuf);
cdiPioServerStreamWinDestroy(streamIdx);
removeID(&openStreams, streamID);
removeID(&openFiles, fileID);
}
}
......@@ -1179,6 +1445,7 @@ void cdiPioCollectorMessageLoop(const struct cdiPioConf *conf)
Free(rxWin);
}
idSetDestroy(&openStreams);
idSetDestroy(&openFiles);
Free(streamActivity);
Free(clientRanks_);
#ifdef HAVE_PARALLEL_NC4
......
......@@ -240,11 +240,39 @@ cdiPioGetPageSize(bool largePageAlign)
}
void
deco1D_CCP(size_t nelems, const size_t *restrict weightPfxSums,
size_t nparts, size_t *restrict separators)
{
separators[0] = 0;
separators[nparts] = nelems;
size_t i = 0, k = 1;
size_t weightTotal = weightPfxSums[nelems];
while (k < nparts)
{
size_t target = k * weightTotal / nparts;
do {
++i;
} while (i < nelems && weightPfxSums[i] < target);
separators[k] = i;
++k;
--i;
}
/* todo: implement h2 and dp+ algorithms from
* A. Pınar, C. Aykanat
* Fast optimal load balancing algorithms for 1D partitioning
*/
}
/****************************************************/
/*
* Local Variables:
* c-file-style: "Java"
* c-basic-offset: 2
* coding: utf-8
* indent-tabs-mode: nil
* show-trailing-whitespace: t
* require-trailing-newline: t
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment