Commit 3f497ca4 authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Make algorithm for intersection computation configurable.

* Use distributed directory for intersection computation by default.
parent 7e6327c5
......@@ -132,4 +132,16 @@ void cdiPioConfSetRedistCache(int confResH, int doCache);
* be cached? */
int cdiPioConfGetRedistCache(int confResH);
/* cdiPioConfSetXmapNew: set method to compute part intersections,
* defaults to xt_xmap_dist_dir_new */
void cdiPioConfSetXmapNew(int confResH,
Xt_xmap (*xmap_new)(Xt_idxlist src_idxlist,
Xt_idxlist dst_idxlist,
MPI_Comm comm));
/* cdiPioConfSetXmapNew: get method to compute part intersections */
Xt_xmap (*cdiPioConfGetXmapNew(int confResH))(Xt_idxlist src_idxlist,
Xt_idxlist dst_idxlist,
MPI_Comm comm);
#endif
......@@ -2,6 +2,8 @@
#include <stdlib.h>
#include <string.h>
#include <yaxt.h>
#include "dmemory.h"
#include "error.h"
#include "resource_handle.h"
......@@ -162,6 +164,7 @@ int cdiPioConfCreate(void)
conf->largePageAlign = false;
conf->cacheRedists = false;
conf->recordAggBufLimMB = 128;
conf->xmap_new = xt_xmap_dist_dir_new;
int resH = reshPut(conf, &cdiPioConfOps);
/* configuration objects are never forwarded */
reshSetStatus(resH, &cdiPioConfOps,
......@@ -266,3 +269,15 @@ int cdiPioConfGetRecordAggBufLim(int confResH)
return (int)conf->recordAggBufLimMB;
}
void cdiPioConfSetXmapNew(int confResH, xmap_new_func_ptr xmap_new)
{
struct cdiPioConf *conf = reshGetVal(confResH, &cdiPioConfOps);
conf->xmap_new = xmap_new;
}
xmap_new_func_ptr cdiPioConfGetXmapNew(int confResH)
{
struct cdiPioConf *conf = reshGetVal(confResH, &cdiPioConfOps);
return conf->xmap_new;
}
......@@ -4,6 +4,8 @@
#include <stdbool.h>
#include <yaxt.h>
/*
* declare data structures and functions to manipulate/query CDI-PIO
* configuration object.
......@@ -11,6 +13,10 @@
#include "resource_handle.h"
typedef Xt_xmap (*xmap_new_func_ptr)(Xt_idxlist src_idxlist,
Xt_idxlist dst_idxlist,
MPI_Comm comm);
/*
* cdiPioConf is meant to be internal to the library and not to be
* used directly if possible, CDI-PIO users should rely on the
......@@ -20,8 +26,9 @@ struct cdiPioConf {
int IOMode;
int clientServerRole;
float partInflate;
void (*postCommSetupActions)(void);
unsigned recordAggBufLimMB;
void (*postCommSetupActions)(void);
xmap_new_func_ptr xmap_new;
bool largePageAlign;
bool cacheRedists;
};
......@@ -68,5 +75,8 @@ void cdiPioConfSetRecordAggBufLim(int confResH, int lim_mb);
int cdiPioConfGetRecordAggBufLim(int confResH);
void cdiPioConfSetXmapNew(int confResH, xmap_new_func_ptr xmap_new);
xmap_new_func_ptr cdiPioConfGetXmapNew(int confResH);
#endif
......@@ -207,7 +207,8 @@ static Xt_redist
buildVarRedist(int headerIdx, size_t streamIdx,
/* index list representing the data elements gathered on
* this rank */
Xt_idxlist dstList)
Xt_idxlist dstList,
const struct cdiPioConf *conf)
{
const struct clientBuf *restrict clientBuf = rxWin[streamIdx].clientBuf;
const struct winHeaderEntry *winDict
......@@ -256,7 +257,7 @@ buildVarRedist(int headerIdx, size_t streamIdx,
for (size_t clientIdx = 0; clientIdx < (size_t)numClients_; ++clientIdx)
xt_idxlist_delete(part[clientIdx]);
Free(part);
Xt_xmap gatherXmap = xt_xmap_all2all_new(srcList, dstList, collComm);
Xt_xmap gatherXmap = conf->xmap_new(srcList, dstList, collComm);
xt_idxlist_delete(srcList);
struct Xt_offset_ext gatherExt
= { .start = 0,
......@@ -276,10 +277,11 @@ gatherArray(int headerIdx, size_t streamIdx,
double *gatherBuf,
/* index list representing the data elements gathered on
* this rank */
Xt_idxlist dstList)
Xt_idxlist dstList,
const struct cdiPioConf *conf)
{
Xt_redist gatherRedist = buildVarRedist(headerIdx, streamIdx,
dstList);
dstList, conf);
xt_redist_s_exchange1(gatherRedist,
rxWin[streamIdx].clientBuf[0].mem, gatherBuf);
xt_redist_delete(gatherRedist);
......@@ -389,7 +391,8 @@ myVarPart(struct PPM_extent varShape[3], struct xyzDims collGrid,
static void
writeNetCDFStream(size_t streamIdx,
struct streamMapping *mapping,
double **data_, int *currentDataBufSize)
double **data_, int *currentDataBufSize,
const struct cdiPioConf *conf)
{
int nvars = mapping->numVars;
int *restrict varMap = mapping->varMap;
......@@ -443,7 +446,7 @@ writeNetCDFStream(size_t streamIdx,
/* transpose data into write deco */
{
int headerIdx = varMap[varID];
gatherArray(headerIdx, streamIdx, data, preWriteChunk);
gatherArray(headerIdx, streamIdx, data, preWriteChunk, conf);
xt_idxlist_delete(preWriteChunk);
}
/* count missing values if appropriate */
......@@ -516,7 +519,8 @@ cdiPioServerCdfDefVars(stream_t *streamptr)
static void
writeNetCDFStream(size_t streamIdx,
struct streamMapping *mapping,
double **data_, int *currentDataBufSize)
double **data_, int *currentDataBufSize,
const struct cdiPioConf *conf)
{
int nvars = mapping->numVars;
int *restrict varMap = mapping->varMap,
......@@ -546,7 +550,7 @@ writeNetCDFStream(size_t streamIdx,
}
double *restrict data = *data_;
int headerIdx = varMap[varID];
gatherArray(headerIdx, streamIdx, data, dstList);
gatherArray(headerIdx, streamIdx, data, dstList, conf);
if (writerRank == collRank)
{
int nmiss = countVarChunkMissingVals(vlistID, varID,
......@@ -1002,7 +1006,7 @@ writeGribStream(size_t streamIdx,
displ[numVarsInPass + varIdx + 1]
= (MPI_Aint)(sizeof (double) * myAggSize);
varRedists[varIdx] = buildVarRedist(mapping->varMap[varID],
streamIdx, dstList);
streamIdx, dstList, conf);
xt_idxlist_delete(dstList);
}
/* merge all redists for current pass */
......@@ -1129,7 +1133,7 @@ readGetBuffers(size_t streamIdx, const struct cdiPioConf *conf)
case FILETYPE_NC:
case FILETYPE_NC2:
case FILETYPE_NC4:
writeNetCDFStream(streamIdx, map, &data, &currentDataBufSize);
writeNetCDFStream(streamIdx, map, &data, &currentDataBufSize, conf);
break;
#endif
default:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment