Skip to content
Snippets Groups Projects
Commit 00b1070d authored by Thomas Jahns's avatar Thomas Jahns :cartwheel:
Browse files

Extract function.

parent 11134e9d
No related branches found
No related tags found
No related merge requests found
......@@ -1269,6 +1269,47 @@ aggBufFlush(int streamID, int fileID,
aggBuf.used = 0;
}
struct passDict {
int varID;
unsigned recordStart, recordEnd;
};
static size_t
buildPassVarDict(size_t collSize, const struct passPlan *passes,
const struct recordWrite *restrict writtenRecords,
size_t *dictSize, struct passDict **dict)
{
unsigned base = passes[0].recordAggStart;
size_t numRecordsInPass = passes[collSize - 1].recordAggEnd
- base + 1;
size_t maxVarsInPass = (size_t)(passes[collSize - 1].varEnd
- passes[0].varStart + 1);
struct passDict *varsInPass = *dict;
size_t oldSize = *dictSize, newSize = szmin(numRecordsInPass, maxVarsInPass);
if (oldSize < newSize)
{
*dictSize = newSize;
varsInPass = *dict = Realloc(varsInPass, newSize * sizeof (*varsInPass));
}
/* establish variables involved in this pass */
size_t numVarsInPass = 1;
varsInPass[0].recordStart = base;
int lastSeenVarID = varsInPass[0].varID = writtenRecords[base].varID;
for (size_t i = 1; i < numRecordsInPass; ++i)
if (lastSeenVarID != writtenRecords[base + i].varID)
{
varsInPass[numVarsInPass - 1].recordEnd = (unsigned)(base + i - 1);
varsInPass[numVarsInPass].varID
= lastSeenVarID = writtenRecords[base + i].varID;
varsInPass[numVarsInPass].recordStart = (unsigned)(base + i);
++numVarsInPass;
}
varsInPass[numVarsInPass - 1].recordEnd
= (unsigned)(base + numRecordsInPass - 1);
return numVarsInPass;
}
static void
writeGribStream(size_t streamIdx,
struct streamMapping *mapping,
......@@ -1297,38 +1338,15 @@ writeGribStream(size_t streamIdx,
? handleRedistCache(streamIdx, mapping, (size_t)numPasses, vlistID, collComm)
: false;
struct cacheRedist *restrict retained = rxWin[streamIdx].retained;
struct {
int varID;
unsigned recordStart, recordEnd;
} *varsInPass = NULL;
struct passDict *varsInPass = NULL;
size_t varsInPassSize = 0;
MPI_Aint *displ = NULL;
const struct winHeaderEntry *winDict = (wHECast)clientBuf[0].mem;
for (size_t pass = 0; pass < numPasses; ++pass)
{
unsigned base = passes[pass][0].recordAggStart;
size_t numRecordsInPass = passes[pass][collSize - 1].recordAggEnd
- base + 1;
size_t maxVarsInPass = (size_t)(passes[pass][collSize - 1].varEnd
- passes[pass][0].varStart + 1);
varsInPass
= Realloc(varsInPass, sizeof (*varsInPass)
* szmin(numRecordsInPass, maxVarsInPass));
/* establish variables involved in this pass */
size_t numVarsInPass = 1;
varsInPass[0].recordStart = base;
int lastSeenVarID =
varsInPass[0].varID = writtenRecords[base].varID;
for (size_t i = 1; i < numRecordsInPass; ++i)
if (lastSeenVarID != writtenRecords[base + i].varID)
{
varsInPass[numVarsInPass - 1].recordEnd = (unsigned)(base + i - 1);
varsInPass[numVarsInPass].varID
= lastSeenVarID = writtenRecords[base + i].varID;
varsInPass[numVarsInPass].recordStart = (unsigned)(base + i);
++numVarsInPass;
}
varsInPass[numVarsInPass - 1].recordEnd
= (unsigned)(base + numRecordsInPass - 1);
size_t numVarsInPass
= buildPassVarDict(collSize, passes[pass], writtenRecords,
&varsInPassSize, &varsInPass);
varRedists = Realloc(varRedists, numVarsInPass * sizeof (*varRedists));
size_t myRecordStart = passes[pass][collRank].recordAggStart,
myRecordEnd = passes[pass][collRank].recordAggEnd;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment