Skip to content
Snippets Groups Projects
Commit 4087a97b authored by Thomas Jahns's avatar Thomas Jahns :cartwheel: Committed by Sergey Kosukhin
Browse files

Add injection of xmaps into cache.

* For statically decomposed streams, the xmaps can be precomputed under the
  assumption that all elements are written.
parent 76f9bebc
No related branches found
No related tags found
No related merge requests found
......@@ -473,6 +473,236 @@ allocUIDLookup(size_t numClients, Xt_uid *restrict *uids, int *restrict *partSiz
*partSizes = (int *) (void *) ((unsigned char *) *uids + uidBytes + uidBytesRoundUp);
}
static Xt_idxlist buildVarSlicesIdxList(int vlistID, int varID, int startLvl, int numLvl, const struct cdiPioConf *conf);
static void
buildDecoPresetXmapsCommon(Xt_idxlist outPartList, size_t numClients, const Xt_uid *partDescUIDs, Xt_idxlist *partDesc,
int *restrict partSizes, Xt_uid *uids, MPI_Comm collComm, const struct cdiPioConf *conf)
{
uids[0] = xt_idxlist_get_uid(outPartList);
for (size_t clientIdx = 0; clientIdx < numClients; ++clientIdx) uids[clientIdx + 1] = partDescUIDs[clientIdx];
Xt_xmap varXmap;
if (!(varXmap = cdiPioXmapCacheLookup(XmapCache, uids, partSizes)))
{
Xt_idxlist srcList = xt_idxlist_collection_new(partDesc, (int) numClients);
if (conf->stripify)
{
Xt_idxlist srcListStriped = xt_idxstripes_from_idxlist_new(srcList);
xt_idxlist_delete(srcList);
srcList = srcListStriped;
}
varXmap = conf->xmap_new(srcList, outPartList, collComm);
xt_idxlist_delete(srcList);
partSizes[0] = xt_idxlist_get_num_indices(outPartList);
for (size_t clientIdx = 0; clientIdx < numClients; ++clientIdx)
partSizes[clientIdx + 1] = xt_idxlist_get_num_indices(partDesc[clientIdx]);
cdiPioXmapCacheAdd(XmapCache, uids, partSizes, varXmap);
}
}
#ifdef HAVE_LIBNETCDF
typedef Xt_idxlist (*outDstListConstruct)(int vlistID, int varID, const struct cdiPioConf *conf);
#ifdef HAVE_PARALLEL_NC4
Xt_idxlist
createOutDstListNetCDFPar(int vlistID, int varID, const struct cdiPioConf *conf)
{
(void) conf;
struct PPM_extent varChunk[3];
Xt_idxlist preWriteChunk;
cdiPioNetCDFParChunk(vlistID, varID, &preWriteChunk, varChunk);
return preWriteChunk;
}
#endif
static Xt_idxlist
createOutDstListNetCDFSerialFunnel(int vlistID, int varID, const struct cdiPioConf *conf)
{
Xt_idxlist preWriteChunk = buildVarSlicesIdxList(vlistID, varID, -1, -1, conf);
return preWriteChunk;
}
static Xt_idxlist
createOutDstListNetCDFSerialAssist(int vlistID, int varID, const struct cdiPioConf *conf)
{
(void) vlistID;
(void) varID;
(void) conf;
return xt_idxempty_new();
}
static void
buildDecoPresetXmapsNetCDF(int vlistID, struct partDescPreset clientDeco, MPI_Comm collComm, outDstListConstruct getOutDstList,
const struct cdiPioConf *conf)
{
int nVars = vlistNvars(vlistID);
size_t numClients = (size_t) numClients_;
/* assume all variables are written in full */
Xt_uid *restrict uids = NULL, (*partDescUIDs)[numClients] = (Xt_uid(*)[numClients]) clientDeco.uids;
Xt_idxlist(*partDesc)[numClients] = (Xt_idxlist(*)[numClients]) clientDeco.lists;
int *partSizes = NULL;
allocUIDLookup(numClients, &uids, &partSizes);
for (int varID = 0; varID < nVars; ++varID)
{
Xt_idxlist outPartList = getOutDstList(vlistID, varID, conf);
buildDecoPresetXmapsCommon(outPartList, numClients, partDescUIDs[varID], partDesc[varID], partSizes, uids, collComm, conf);
}
Free(uids);
}
#endif
/* build inventory of written variables for hypothetical full-stream */
static struct streamMapping *
streamMappingSpeculativeNew(int vlistID, struct partDescPreset clientDeco, const struct cdiPioConf *conf)
{
(void) conf;
int nVars = vlistNvars(vlistID);
/* varMap[i] == index of header if variable i is written to,
* numLvlsW[i] == number of levels of variable i or 0 if not written
*/
size_t numWrittenRecords = 0;
int *restrict numLvlsW = Malloc((size_t) nVars * sizeof(numLvlsW[0]));
for (int varID = 0; varID < nVars; ++varID)
{
numWrittenRecords += (size_t) (numLvlsW[varID] = zaxisInqSize(vlistInqVarZaxis(vlistID, varID)));
}
/* set number of levels for each variable written to full */
struct streamMapping *result = Malloc(sizeof(*result) + numWrittenRecords * sizeof(result->writtenRecords[0])
+ (size_t) nVars * 2 * sizeof(result->varMap[0]));
struct recordWrite *restrict writtenRecords = result->writtenRecords;
result->varMap = (void *) (writtenRecords + numWrittenRecords);
result->numLvlsW = result->varMap + nVars;
const int *conversion = clientDeco.conversion;
{
size_t j = (size_t) -1;
/* initialized to shut up gcc, loop logic ensures initialization
* to occur before first use */
size_t recordNumElem = 0;
int lastVarID = -1;
for (int varID = 0; varID < nVars; ++varID)
{
size_t numLvl = (size_t) (result->numLvlsW[varID] = numLvlsW[varID]);
if (varID != lastVarID)
{
int varShape[3];
cdiPioQueryVarDims(varShape, vlistID, varID);
recordNumElem = (size_t) varShape[0] * (size_t) varShape[1];
lastVarID = varID;
}
result->varMap[varID] = 1;
size_t elemSize = cdiPioElemSizeInference((size_t) varID, conversion);
size_t recordDataSize = recordNumElem * elemSize;
for (size_t lvl = 0; lvl < numLvl; ++lvl)
writtenRecords[++j] = (struct recordWrite){ .varID = varID, .level = (int) lvl, .dataSize = recordDataSize };
}
}
result->numVars = nVars;
result->numWrittenRecords = (int) numWrittenRecords;
Free(numLvlsW);
result->layout = NULL;
return result;
}
/* denote what will be aggregated at a single process */
struct passPlan
{
unsigned recordAggStart, recordAggEnd;
int varStart, varEnd;
};
struct passDict
{
int varID;
unsigned recordStart, recordEnd;
};
static size_t buildPassVarDict(size_t collSize, const struct passPlan *passes, const struct recordWrite *restrict writtenRecords,
size_t *dictSize, struct passDict **dict);
static size_t planPasses(const struct streamMapping *mapping, const struct cdiPioConf *conf, size_t collSize,
struct passPlan (**passes_)[collSize]);
struct idxlistAndSize
{
Xt_idxlist list;
int listSize;
};
static struct idxlistAndSize dstListFromRecordRange(int vlistID, int varID, int myVarStart, int myVarEnd, size_t myRecordStart,
size_t myRecordEnd, const struct recordWrite *restrict writtenRecords,
size_t recordStart, size_t recordEnd, const struct cdiPioConf *conf);
static void
buildDecoPresetXmapsGrib(int vlistID, struct partDescPreset clientDeco, MPI_Comm collComm, int collRank,
const struct cdiPioConf *conf)
{
struct streamMapping *syntheticMapping = streamMappingSpeculativeNew(vlistID, clientDeco, conf);
size_t collSize = (size_t) commInqSizeColl();
struct passPlan(*passes)[collSize] = NULL;
size_t numPasses = planPasses(syntheticMapping, conf, collSize, &passes);
struct passDict *varsInPass = NULL;
size_t varsInPassSize = 0;
size_t numClients = (size_t) numClients_;
/* assume all variables are written in full */
Xt_uid *restrict uids = NULL, (*partDescUIDs)[numClients] = (Xt_uid(*)[numClients]) clientDeco.uids;
Xt_idxlist(*partDesc)[numClients] = (Xt_idxlist(*)[numClients]) clientDeco.lists;
int *restrict partSizes = NULL;
allocUIDLookup(numClients, &uids, &partSizes);
struct recordWrite *restrict writtenRecords = syntheticMapping->writtenRecords;
for (size_t pass = 0; pass < numPasses; ++pass)
{
size_t myRecordStart = passes[pass][collRank].recordAggStart, myRecordEnd = passes[pass][collRank].recordAggEnd;
size_t numVarsInPass = buildPassVarDict(collSize, passes[pass], writtenRecords, &varsInPassSize, &varsInPass);
int myVarStart = passes[pass][collRank].varStart, myVarEnd = passes[pass][collRank].varEnd;
for (size_t varIdx = 0; varIdx < numVarsInPass; ++varIdx)
{
int varID = varsInPass[varIdx].varID;
struct idxlistAndSize dst
= dstListFromRecordRange(vlistID, varID, myVarStart, myVarEnd, myRecordStart, myRecordEnd, writtenRecords,
varsInPass[varIdx].recordStart, varsInPass[varIdx].recordEnd, conf);
buildDecoPresetXmapsCommon(dst.list, numClients, partDescUIDs[varID], partDesc[varID], partSizes, uids, collComm, conf);
}
}
Free(varsInPass);
Free(uids);
Free(syntheticMapping);
Free(passes);
}
static void
buildDecoPresetXmaps(int streamID, struct partDescPreset clientDeco, MPI_Comm collComm, const struct cdiPioConf *conf)
{
int filetype = streamInqFiletype(streamID);
int vlistID = streamInqVlist(streamID);
switch (filetype)
{
case CDI_FILETYPE_GRB:
case CDI_FILETYPE_GRB2:
buildDecoPresetXmapsGrib(vlistID, clientDeco, collComm, commInqRankColl(), conf);
/* writeGribStream(streamIdx, map, &data, &currentDataBufSize, conf); */
break;
#ifdef HAVE_LIBNETCDF
case CDI_FILETYPE_NC:
case CDI_FILETYPE_NC2:
case CDI_FILETYPE_NC4:
case CDI_FILETYPE_NC4C:
{
int rankOpen = cdiPioStream2Owner(streamID);
outDstListConstruct createOutList;
#ifdef HAVE_PARALLEL_NC4
if (rankOpen == CDI_PIO_COLLECTIVE_OPEN)
createOutList = createOutDstListNetCDFPar;
else
#endif
{
const int collRank = commInqRankColl();
createOutList = rankOpen == collRank ? createOutDstListNetCDFSerialFunnel : createOutDstListNetCDFSerialAssist;
}
buildDecoPresetXmapsNetCDF(vlistID, clientDeco, collComm, createOutList, conf);
}
#endif
}
}
static Xt_redist
buildVarRedist(int headerIdx, size_t streamIdx,
/* index list representing the data elements gathered on
......@@ -985,13 +1215,6 @@ streamMappingDelete(struct streamMapping **mapping)
*mapping = NULL;
}
/* denote what will be aggregated at a single process */
struct passPlan
{
unsigned recordAggStart, recordAggEnd;
int varStart, varEnd;
};
/**
* @param[out] passes_ pointer to pointer to 2-dimensional array of
* records of dimensions $number of passes \cdot number of collectors$,
......@@ -1108,12 +1331,6 @@ aggBufFlush(int streamID, int fileID, size_t (*cdiPioFileWrite)(int, const void
aggBuf.used = 0;
}
struct passDict
{
int varID;
unsigned recordStart, recordEnd;
};
static size_t
buildPassVarDict(size_t collSize, const struct passPlan *passes, const struct recordWrite *restrict writtenRecords,
size_t *dictSize, struct passDict **dict)
......@@ -1736,6 +1953,8 @@ cdiPioRecvStreamDefVlist(void *buffer, int size, int *pos, MPI_Comm pioInterComm
size_t currentXmapCacheSize = cdiPioXmapCacheGetSize(XmapCache);
if ((neededXmapCacheSize += maxNumStreamWrites) > currentXmapCacheSize)
XmapCache = cdiPioXmapCacheResize(XmapCache, neededXmapCacheSize);
if (tag == STREAM_DEF_DECOMPOSED_VLIST)
buildDecoPresetXmaps(serverStreamID, rxWin[streamIdx].clientDeco, commInqCommColl(), conf);
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment