Skip to content
Snippets Groups Projects

Consolidation with CDI-PIO (develop)

Merged Sergey Kosukhin requested to merge m300488/develop-rebase into develop
1 file
+ 39
22
Compare changes
  • Side-by-side
  • Inline
+ 39
22
@@ -1093,6 +1093,42 @@ aggBufFlush(int streamID, int fileID, size_t (*cdiPioFileWrite)(int, const void
aggBuf.used = 0;
}
struct passDict
{
int varID;
unsigned recordStart, recordEnd;
};
static size_t
buildPassVarDict(size_t collSize, const struct passPlan *passes, const struct recordWrite *restrict writtenRecords,
size_t *dictSize, struct passDict **dict)
{
unsigned base = passes[0].recordAggStart;
size_t numRecordsInPass = passes[collSize - 1].recordAggEnd - base + 1;
size_t maxVarsInPass = (size_t) (passes[collSize - 1].varEnd - passes[0].varStart + 1);
struct passDict *varsInPass = *dict;
size_t oldSize = *dictSize, newSize = szmin(numRecordsInPass, maxVarsInPass);
if (oldSize < newSize)
{
*dictSize = newSize;
varsInPass = *dict = Realloc(varsInPass, newSize * sizeof(*varsInPass));
}
/* establish variables involved in this pass */
size_t numVarsInPass = 1;
varsInPass[0].recordStart = base;
int lastSeenVarID = varsInPass[0].varID = writtenRecords[base].varID;
for (size_t i = 1; i < numRecordsInPass; ++i)
if (lastSeenVarID != writtenRecords[base + i].varID)
{
varsInPass[numVarsInPass - 1].recordEnd = (unsigned) (base + i - 1);
varsInPass[numVarsInPass].varID = lastSeenVarID = writtenRecords[base + i].varID;
varsInPass[numVarsInPass].recordStart = (unsigned) (base + i);
++numVarsInPass;
}
varsInPass[numVarsInPass - 1].recordEnd = (unsigned) (base + numRecordsInPass - 1);
return numVarsInPass;
}
static void
writeGribStream(size_t streamIdx, struct streamMapping *mapping, void **data_, size_t *currentDataBufSize,
const struct cdiPioConf *conf)
@@ -1115,32 +1151,13 @@ writeGribStream(size_t streamIdx, struct streamMapping *mapping, void **data_, s
bool reuseRedists
= conf->cacheRedists != 0 ? handleRedistCache(streamIdx, mapping, (size_t) numPasses, vlistID, collComm) : false;
struct cacheRedist *restrict retained = rxWin[streamIdx].retained;
struct
{
int varID;
unsigned recordStart, recordEnd;
} *varsInPass = NULL;
struct passDict *varsInPass = NULL;
size_t varsInPassSize = 0;
MPI_Aint *displ = NULL;
const struct winHeaderEntry *winDict = (wHECast) clientBuf[0].mem;
for (size_t pass = 0; pass < numPasses; ++pass)
{
unsigned base = passes[pass][0].recordAggStart;
size_t numRecordsInPass = passes[pass][collSize - 1].recordAggEnd - base + 1;
size_t maxVarsInPass = (size_t) (passes[pass][collSize - 1].varEnd - passes[pass][0].varStart + 1);
varsInPass = Realloc(varsInPass, sizeof(*varsInPass) * szmin(numRecordsInPass, maxVarsInPass));
/* establish variables involved in this pass */
size_t numVarsInPass = 1;
varsInPass[0].recordStart = base;
int lastSeenVarID = varsInPass[0].varID = writtenRecords[base].varID;
for (size_t i = 1; i < numRecordsInPass; ++i)
if (lastSeenVarID != writtenRecords[base + i].varID)
{
varsInPass[numVarsInPass - 1].recordEnd = (unsigned) (base + i - 1);
varsInPass[numVarsInPass].varID = lastSeenVarID = writtenRecords[base + i].varID;
varsInPass[numVarsInPass].recordStart = (unsigned) (base + i);
++numVarsInPass;
}
varsInPass[numVarsInPass - 1].recordEnd = (unsigned) (base + numRecordsInPass - 1);
size_t numVarsInPass = buildPassVarDict(collSize, passes[pass], writtenRecords, &varsInPassSize, &varsInPass);
varRedists = Realloc(varRedists, numVarsInPass * sizeof(*varRedists));
size_t myRecordStart = passes[pass][collRank].recordAggStart, myRecordEnd = passes[pass][collRank].recordAggEnd;
size_t myAggSize = 0;
Loading