diff --git a/src/pio_server.c b/src/pio_server.c index 9ddfd8c28cf69d42ed4c5838430e73ec9ba434ee..b581edf0723ec7daf11046b3eee8072c70e1841b 100644 --- a/src/pio_server.c +++ b/src/pio_server.c @@ -1269,6 +1269,47 @@ aggBufFlush(int streamID, int fileID, aggBuf.used = 0; } +struct passDict { + int varID; + unsigned recordStart, recordEnd; +}; + +static size_t +buildPassVarDict(size_t collSize, const struct passPlan *passes, + const struct recordWrite *restrict writtenRecords, + size_t *dictSize, struct passDict **dict) +{ + unsigned base = passes[0].recordAggStart; + size_t numRecordsInPass = passes[collSize - 1].recordAggEnd + - base + 1; + size_t maxVarsInPass = (size_t)(passes[collSize - 1].varEnd + - passes[0].varStart + 1); + struct passDict *varsInPass = *dict; + size_t oldSize = *dictSize, newSize = szmin(numRecordsInPass, maxVarsInPass); + if (oldSize < newSize) + { + *dictSize = newSize; + varsInPass = *dict = Realloc(varsInPass, newSize * sizeof (*varsInPass)); + } + /* establish variables involved in this pass */ + size_t numVarsInPass = 1; + varsInPass[0].recordStart = base; + int lastSeenVarID = varsInPass[0].varID = writtenRecords[base].varID; + for (size_t i = 1; i < numRecordsInPass; ++i) + if (lastSeenVarID != writtenRecords[base + i].varID) + { + varsInPass[numVarsInPass - 1].recordEnd = (unsigned)(base + i - 1); + varsInPass[numVarsInPass].varID + = lastSeenVarID = writtenRecords[base + i].varID; + varsInPass[numVarsInPass].recordStart = (unsigned)(base + i); + ++numVarsInPass; + } + varsInPass[numVarsInPass - 1].recordEnd + = (unsigned)(base + numRecordsInPass - 1); + return numVarsInPass; +} + + static void writeGribStream(size_t streamIdx, struct streamMapping *mapping, @@ -1297,38 +1338,15 @@ writeGribStream(size_t streamIdx, ? handleRedistCache(streamIdx, mapping, (size_t)numPasses, vlistID, collComm) : false; struct cacheRedist *restrict retained = rxWin[streamIdx].retained; - struct { - int varID; - unsigned recordStart, recordEnd; - } *varsInPass = NULL; + struct passDict *varsInPass = NULL; + size_t varsInPassSize = 0; MPI_Aint *displ = NULL; const struct winHeaderEntry *winDict = (wHECast)clientBuf[0].mem; for (size_t pass = 0; pass < numPasses; ++pass) { - unsigned base = passes[pass][0].recordAggStart; - size_t numRecordsInPass = passes[pass][collSize - 1].recordAggEnd - - base + 1; - size_t maxVarsInPass = (size_t)(passes[pass][collSize - 1].varEnd - - passes[pass][0].varStart + 1); - varsInPass - = Realloc(varsInPass, sizeof (*varsInPass) - * szmin(numRecordsInPass, maxVarsInPass)); - /* establish variables involved in this pass */ - size_t numVarsInPass = 1; - varsInPass[0].recordStart = base; - int lastSeenVarID = - varsInPass[0].varID = writtenRecords[base].varID; - for (size_t i = 1; i < numRecordsInPass; ++i) - if (lastSeenVarID != writtenRecords[base + i].varID) - { - varsInPass[numVarsInPass - 1].recordEnd = (unsigned)(base + i - 1); - varsInPass[numVarsInPass].varID - = lastSeenVarID = writtenRecords[base + i].varID; - varsInPass[numVarsInPass].recordStart = (unsigned)(base + i); - ++numVarsInPass; - } - varsInPass[numVarsInPass - 1].recordEnd - = (unsigned)(base + numRecordsInPass - 1); + size_t numVarsInPass + = buildPassVarDict(collSize, passes[pass], writtenRecords, + &varsInPassSize, &varsInPass); varRedists = Realloc(varRedists, numVarsInPass * sizeof (*varRedists)); size_t myRecordStart = passes[pass][collRank].recordAggStart, myRecordEnd = passes[pass][collRank].recordAggEnd;