Commit f37b2a7a authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Add caching of YAXT objects for PIO GRIB writing.

parent 31e787c6
......@@ -115,5 +115,17 @@ void cdiPioConfSetLargePageAlign(int confResH, int largePageAlign);
* large pages instead of normal pages? */
int cdiPioConfGetLargePageAlign(int confResH);
/* cdiPioConfSetRedistCache: set doCache to anything non-zero if data
* for internal data exchanges is to be cached. This makes sense when
* the data passed via streamWriteVarPart or streamWriteScatteredVarPart
* is always decomposed statically using the same partitioning
* description objects and the sequence of calls to streamWriteVarPart
* or streamWriteScatteredVarPart for a stream matches the sequence
* of the previous sequence (divided by pioWriteTimestep) */
void cdiPioConfSetRedistCache(int confResH, int doCache);
/* cdiPioConfSetRedistCache: will data for internal data exchanges
* be cached? */
int cdiPioConfGetRedistCache(int confResH);
#endif
......@@ -159,3 +159,12 @@
! (INTEGER confResH)
EXTERNAL cdiPioConfGetLargePageAlign
! cdiPioConfSetRedistCache
! (INTEGER confResH,
! INTEGER doCache)
EXTERNAL cdiPioConfSetRedistCache
INTEGER cdiPioConfGetRedistCache
! (INTEGER confResH)
EXTERNAL cdiPioConfGetRedistCache
......@@ -86,5 +86,7 @@ FCALLSCSUB2 (cdiPioConfSetPostCommSetupActions, CDIPIOCONFSETPOSTCOMMSETUPACTION
FCALLSCSUB2 (cdiPioConfSetLargePageAlign, CDIPIOCONFSETLARGEPAGEALIGN, cdipioconfsetlargepagealign, INT, INT)
FCALLSCFUN1 (INT, cdiPioConfGetLargePageAlign, CDIPIOCONFGETLARGEPAGEALIGN, cdipioconfgetlargepagealign, INT)
FCALLSCSUB2 (cdiPioConfSetRedistCache, CDIPIOCONFSETREDISTCACHE, cdipioconfsetredistcache, INT, INT)
FCALLSCFUN1 (INT, cdiPioConfGetRedistCache, CDIPIOCONFGETREDISTCACHE, cdipioconfgetredistcache, INT)
#endif
......@@ -158,6 +158,7 @@ int cdiPioConfCreate(void)
conf->partInflate = 1.1f;
conf->postCommSetupActions = cdiPioNoPostCommSetup;
conf->largePageAlign = false;
conf->cacheRedists = false;
int resH = reshPut(conf, &cdiPioConfOps);
/* configuration objects are never forwarded */
reshSetStatus(resH, &cdiPioConfOps,
......@@ -235,3 +236,15 @@ int cdiPioConfGetLargePageAlign(int confResH)
return conf->largePageAlign;
}
void cdiPioConfSetRedistCache(int confResH, int doCache)
{
struct cdiPioConf *conf = reshGetVal(confResH, &cdiPioConfOps);
conf->cacheRedists = doCache;
}
int cdiPioConfGetRedistCache(int confResH)
{
struct cdiPioConf *conf = reshGetVal(confResH, &cdiPioConfOps);
return conf->cacheRedists;
}
......@@ -22,6 +22,7 @@ struct cdiPioConf {
float partInflate;
void (*postCommSetupActions)(void);
bool largePageAlign;
bool cacheRedists;
};
extern const resOps cdiPioConfOps;
......@@ -58,4 +59,9 @@ int cdiPioConfGetLargePageAlign(int confResH);
void cdiPioConfSetLargePageAlign(int confResH, int largePageAlign);
void cdiPioConfSetRedistCache(int confResH, int doCache);
int cdiPioConfGetRedistCache(int confResH);
#endif
......@@ -513,7 +513,7 @@ cdiPioInit(MPI_Comm commGlob, int confResH, int *pioNamespace)
cdiPioFileWritingInit(conf);
if (commInqRankColl() >= 0)
{
cdiPioCollectorMessageLoop();
cdiPioCollectorMessageLoop(conf);
cdiPioFileWritingFinalize();
}
cdiPioCommFinalize();
......
......@@ -26,6 +26,7 @@
#include "taxis.h"
#include "pio.h"
#include "pio_comm.h"
#include "pio_conf.h"
#include "pio_id_set.h"
#include "pio_interface.h"
#include "pio_rpc.h"
......@@ -47,6 +48,17 @@ struct clientBuf
int dictSize;
};
struct streamMemLayout
{
Xt_uid varPartIdxListUID;
size_t offset;
};
struct cacheRedist {
Xt_redist redist;
int sliceSize;
};
static struct
{
MPI_Win getWin;
......@@ -54,6 +66,10 @@ static struct
#if defined HAVE_LIBNETCDF && ! defined HAVE_PARALLEL_NC4
int ownerRank;
#endif
/* put data for description of last layout from RMA GET here */
struct streamMemLayout *prevLayout;
size_t numRetained;
struct cacheRedist *retained;
} *rxWin;
static struct idList openStreams;
......@@ -74,6 +90,7 @@ struct streamMapping {
/* nMiss[i] = missing values were provided for variable i */
int *hasMissing;
int numWrittenRecords;
struct streamMemLayout *layout;
struct recordWrite writtenRecords[];
};
......@@ -541,9 +558,44 @@ writeNetCDFStream(size_t streamIdx,
#endif
static inline struct winHeaderEntry *
winDictEntry(size_t streamIdx, size_t client, size_t entry)
{
return ((struct winHeaderEntry *)rxWin[streamIdx].clientBuf[client].mem)
+ entry;
}
static struct streamMemLayout *
getLayout(size_t streamIdx)
{
int streamID = openStreams.entries[streamIdx];
size_t numClients = (size_t)numClients_;
int vlistID = streamInqVlist(streamID);
size_t numVars = (size_t)vlistNvars(vlistID);
struct streamMemLayout (*layout)[numVars]
= Calloc(numClients * numVars, sizeof (layout[0]));
size_t numDataEntries
= (size_t)(winDictEntry(streamIdx, 0, 0)->specific.headerSize.numDataEntries);
for (size_t client = 0; client < numClients; ++client)
for (size_t headerIdx = 1; headerIdx < numDataEntries; headerIdx += 2)
{
xassert(namespaceAdaptKey2(winDictEntry(streamIdx, client,
headerIdx)->id) == streamID);
struct winHeaderEntry *varHeader
= winDictEntry(streamIdx, client, headerIdx);
size_t varID = (size_t)varHeader[0].specific.dataRecord.varID;
size_t offset = (size_t)varHeader[0].offset;
Xt_uid uid = varHeader[1].specific.partDesc.uid;
layout[client][varID] = (struct streamMemLayout){
.varPartIdxListUID = uid, .offset = offset };
}
return *layout;
}
/* build inventory of written variables for stream */
static struct streamMapping *
streamMappingNew(size_t streamIdx, struct winHeaderEntry *winDict)
streamMappingNew(size_t streamIdx, struct winHeaderEntry *winDict,
const struct cdiPioConf *conf)
{
int streamID = openStreams.entries[streamIdx];
int numDataEntries = winDict[0].specific.headerSize.numDataEntries;
......@@ -562,7 +614,7 @@ streamMappingNew(size_t streamIdx, struct winHeaderEntry *winDict)
int varID = winDict[headerIdx].specific.dataRecord.varID;
/* ensure a variable has not been enqueued twice */
/* FIXME: this could better be ensured on client */
xassert(varMap[varID] == 0);
xassert(varID < numVars && varID >= 0 && varMap[varID] == 0);
varMap[varID] = headerIdx;
hasMissing[varID] += winDict[headerIdx].specific.dataRecord.nmiss;
}
......@@ -601,20 +653,78 @@ streamMappingNew(size_t streamIdx, struct winHeaderEntry *winDict)
result->numVars = numVars;
result->numWrittenRecords = (int)numWrittenRecords;
Free(varMap);
result->layout = conf->cacheRedists ? getLayout(streamIdx) : NULL;
return result;
}
static void
streamMappingDelete(struct streamMapping **mapping)
{
Free((*mapping)->layout);
Free(*mapping);
*mapping = NULL;
}
static inline void
destructRetained(struct cacheRedist *restrict retained, size_t numRetained)
{
for (size_t i = 0; i < (size_t)numRetained; ++i)
xt_redist_delete(retained[i].redist);
}
static inline bool
handleRedistCache(size_t streamIdx,
struct streamMapping *restrict mapping,
size_t numPasses, int vlistID, MPI_Comm collComm)
{
bool reuseRedists = false;
if (!rxWin[streamIdx].retained)
{
rxWin[streamIdx].retained
= Malloc(numPasses * sizeof (*rxWin[streamIdx].retained));
rxWin[streamIdx].numRetained = numPasses;
rxWin[streamIdx].prevLayout = mapping->layout;
mapping->layout = NULL;
}
else
{
size_t numClients = (size_t)numClients_,
numVars = (size_t)vlistNvars(vlistID);
reuseRedists
= !memcmp(mapping->layout, rxWin[streamIdx].prevLayout,
numClients * numVars
* sizeof (mapping->layout[0]));
if (!reuseRedists)
{
Free(rxWin[streamIdx].prevLayout);
rxWin[streamIdx].prevLayout = mapping->layout;
mapping->layout = NULL;
}
{
int temp = reuseRedists;
xmpi(MPI_Allreduce(MPI_IN_PLACE, &temp, 1, MPI_INT, MPI_LAND,
collComm));
reuseRedists = temp;
}
if (!reuseRedists)
{
destructRetained(rxWin[streamIdx].retained,
rxWin[streamIdx].numRetained);
rxWin[streamIdx].retained
= Realloc(rxWin[streamIdx].retained,
numPasses * sizeof (*rxWin[streamIdx].retained));
rxWin[streamIdx].numRetained = numPasses;
}
}
return reuseRedists;
}
static void
writeGribStream(size_t streamIdx,
struct streamMapping *mapping,
double **data_, int *currentDataBufSize)
double **data_, int *currentDataBufSize,
const struct cdiPioConf *conf)
{
const struct clientBuf *restrict clientBuf = rxWin[streamIdx].clientBuf;
int streamID = openStreams.entries[streamIdx];
......@@ -635,6 +745,10 @@ writeGribStream(size_t streamIdx,
if (collRank >= numRecords - (numPasses - 1) * collSize)
cdiPioFileWrite = (size_t (*)(int, const void *restrict, size_t, int))
namespaceSwitchGet(NSSWITCH_FILE_WRITE).func;
bool reuseRedists = conf->cacheRedists != 0
? handleRedistCache(streamIdx, mapping, (size_t)numPasses, vlistID, collComm)
: false;
struct cacheRedist *restrict retained = rxWin[streamIdx].retained;
for (int pass = 0, base = 0; pass < numPasses; ++pass, base += collSize)
{
unsigned numVarsInPass = 1;
......@@ -642,49 +756,62 @@ writeGribStream(size_t streamIdx,
int passSize = (pass < numPasses - 1) ? collSize : numRecords - (numPasses - 1) * collSize;
/* establish variables involved in this pass */
for (int i = 1; i < passSize; ++i)
if (varIDsOfPass[numVarsInPass - 1]
!= writtenRecords[base + i].varID)
if (varIDsOfPass[numVarsInPass - 1] != writtenRecords[base + i].varID)
varIDsOfPass[numVarsInPass++] = writtenRecords[base + i].varID;
int mySliceSize = 0;
/* build redists for all variables involved in current write pass */
for (unsigned varIdx = 0; varIdx < numVarsInPass; ++varIdx)
/* build or fetch from cache redists for all variables involved in current write pass */
Xt_redist compositePassRedist;
if (reuseRedists)
{
int varID = varIDsOfPass[varIdx];
Xt_idxlist dstList;
/* is this process writing part of this variable? */
if (collRank < passSize
&& writtenRecords[base + collRank].varID == varID)
{
int myLevel = writtenRecords[base + collRank].level;
dstList
= buildVarSlicesIdxList(vlistID, varID, myLevel, 1);
mySliceSize = xt_idxlist_get_num_indices(dstList);
}
else
dstList = xt_idxempty_new();
varRedists[varIdx] = buildVarRedist(mapping->varMap[varID],
streamIdx, dstList);
xt_idxlist_delete(dstList);
compositePassRedist = retained[pass].redist;
mySliceSize = retained[pass].sliceSize;
}
/* merge all redists for current pass */
Xt_redist compositePassRedist;
if (numVarsInPass > 1)
else
{
compositePassRedist
= xt_redist_collection_static_new(varRedists, (int)numVarsInPass,
displ, displ, collComm);
/* free individual redists */
for (unsigned varIdx = 0; varIdx < numVarsInPass; ++varIdx)
xt_redist_delete(varRedists[varIdx]);
{
int varID = varIDsOfPass[varIdx];
Xt_idxlist dstList;
/* is this process writing part of this variable? */
if (collRank < passSize
&& writtenRecords[base + collRank].varID == varID)
{
int myLevel = writtenRecords[base + collRank].level;
dstList
= buildVarSlicesIdxList(vlistID, varID, myLevel, 1);
mySliceSize = xt_idxlist_get_num_indices(dstList);
}
else
dstList = xt_idxempty_new();
varRedists[varIdx] = buildVarRedist(mapping->varMap[varID],
streamIdx, dstList);
xt_idxlist_delete(dstList);
}
/* merge all redists for current pass */
if (numVarsInPass > 1)
{
compositePassRedist
= xt_redist_collection_static_new(varRedists, (int)numVarsInPass,
displ, displ, collComm);
/* free individual redists */
for (unsigned varIdx = 0; varIdx < numVarsInPass; ++varIdx)
xt_redist_delete(varRedists[varIdx]);
}
else
compositePassRedist = varRedists[0];
if (conf->cacheRedists)
{
retained[pass].redist = compositePassRedist;
retained[pass].sliceSize = mySliceSize;
}
}
else
compositePassRedist = varRedists[0];
/* resize gather buffer if needed */
resizeVarGatherBuf(mySliceSize, data_, currentDataBufSize);
/* execute composite redist */
xt_redist_s_exchange1(compositePassRedist, clientBuf[0].mem, *data_);
/* delete composite redist */
xt_redist_delete(compositePassRedist);
if (!conf->cacheRedists)
xt_redist_delete(compositePassRedist);
/* write level (if having one) */
if (collRank < passSize)
{
......@@ -702,7 +829,8 @@ writeGribStream(size_t streamIdx,
}
}
static void readGetBuffers(size_t streamIdx)
static void
readGetBuffers(size_t streamIdx, const struct cdiPioConf *conf)
{
int streamID = openStreams.entries[streamIdx];
xdebug("%s", "START");
......@@ -728,7 +856,9 @@ static void readGetBuffers(size_t streamIdx)
}
/* build list of streams, data was transferred for */
{
struct streamMapping *map = streamMappingNew(streamIdx, winDict);
struct streamMapping *map = streamMappingNew(streamIdx, winDict, conf);
/* TODO: build list of rma buffer layout here to check if caching
* can be done */
double *data = NULL;
int currentDataBufSize = 0;
int filetype = streamInqFiletype(streamID);
......@@ -737,7 +867,7 @@ static void readGetBuffers(size_t streamIdx)
{
case FILETYPE_GRB:
case FILETYPE_GRB2:
writeGribStream(streamIdx, map, &data, &currentDataBufSize);
writeGribStream(streamIdx, map, &data, &currentDataBufSize, conf);
break;
#ifdef HAVE_NETCDF4
case FILETYPE_NC:
......@@ -774,8 +904,8 @@ void clearModelWinBuffer(size_t streamIdx)
/************************************************************************/
static
void getTimeStepData(int *streamActivity)
static void
getTimeStepData(int *streamActivity, const struct cdiPioConf *conf)
{
MPI_Group clientGroup = cdiPioInqRemoteGroup();
......@@ -799,7 +929,7 @@ void getTimeStepData(int *streamActivity)
for (size_t streamIdx = 0; streamIdx < openStreams.size; ++streamIdx)
if (streamActivity[streamIdx])
readGetBuffers(streamIdx);
readGetBuffers(streamIdx, conf);
xdebug("%s", "RETURN");
}
......@@ -857,6 +987,9 @@ cdiPioServerStreamOpen(const char *filename, char filemode,
for (size_t i = oldNumStreams; i < numStreams; ++i)
rxWin[i].clientBuf = newClientBufs + i * (size_t)numClients_;
rxWin[streamIdx].getWin = MPI_WIN_NULL;
rxWin[streamIdx].prevLayout = NULL;
rxWin[streamIdx].retained = NULL;
rxWin[streamIdx].numRetained = 0;
#if defined HAVE_LIBNETCDF && ! defined HAVE_PARALLEL_NC4
rxWin[streamIdx].ownerRank = rank;
#endif
......@@ -894,6 +1027,9 @@ cdiPioServerStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
}
int streamID = streamptr->self;
size_t streamIdx = indexOfID(&openStreams, streamID);
destructRetained(rxWin[streamIdx].retained,
rxWin[streamIdx].numRetained);
Free(rxWin[streamIdx].retained);
cdiPioServerStreamWinDestroy(streamIdx);
removeID(&openStreams, streamID);
}
......@@ -996,7 +1132,7 @@ cdiPioRecvStreamDefVlist(void *buffer, int size, int *pos,
* @brief is encapsulated in CDI library and run on I/O PEs.
*/
void cdiPioCollectorMessageLoop(void)
void cdiPioCollectorMessageLoop(const struct cdiPioConf *conf)
{
MPI_Status status;
......@@ -1088,7 +1224,7 @@ void cdiPioCollectorMessageLoop(void)
MPI_INT, source, tag, pioInterComm, &status));
xdebug("RECEIVED MESSAGE WITH TAG \"WRITETS\": source=%d",
source);
getTimeStepData(streamActivity);
getTimeStepData(streamActivity, conf);
}
break;
......
......@@ -7,7 +7,9 @@
#include <mpi.h>
void cdiPioCollectorMessageLoop(void);
#include "pio_conf.h"
void cdiPioCollectorMessageLoop(const struct cdiPioConf *conf);
#endif
/*
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment