Skip to content
Snippets Groups Projects
Commit 29f453ae authored by Thomas Jahns's avatar Thomas Jahns :cartwheel:
Browse files

Extract function bodies for parallel netCDF writing.

parent 78b22629
No related branches found
No related tags found
No related merge requests found
......@@ -55,6 +55,26 @@ static struct
static struct idList openStreams;
struct recordWrite
{
int varID, level;
};
struct streamMapping {
int numVars;
/* data entry varMap[i] contains data for variable i or -1 if no
* data entry for i has been transferred */
int *varMap;
/* numLvls[i] number of levels written for variable i or 0 if
* variable is not written to this timestep */
int *numLvlsW;
/* nMiss[i] = missing values were provided for variable i */
int *hasMissing;
int numWrittenRecords;
struct recordWrite writtenRecords[];
};
#ifdef HAVE_PARALLEL_NC4
/* prime factorization of number of pio collectors */
static uint32_t *pioPrimes;
......@@ -373,6 +393,40 @@ xyzGridSize(struct xyzDims dims)
return dims.sizes[0] * dims.sizes[1] * dims.sizes[2];
}
static Xt_idxlist
buildVarSlicesIdxList(int vlistID, int varID, int startLvl, int numLvl)
{
/* doubles as slice shape later */
int varShape[3] = { 0, 0, 0 };
cdiPioQueryVarDims(varShape, vlistID, varID);
/* int varSize = varShape[0] * varShape[1] * varShape[2]; */
Xt_int varShapeXt[3];
Xt_int origin[3] = { 0, 0, 0 };
for (unsigned i = 0; i < 3; ++i)
varShapeXt[2 - i] = varShape[i];
if (startLvl >= 0) origin[0] = (Xt_int)startLvl;
varShape[0] = varShapeXt[0];
varShape[2] = varShapeXt[2];
if (numLvl >= 0) varShape[0] = numLvl;
return xt_idxsection_new(0, 3, varShapeXt, varShape,
origin);
}
static int
countVarChunkMissingVals(int vlistID, int varID,
struct streamMapping *mapping,
int chunkLen, const double *restrict data)
{
int nmiss = 0;
if (mapping->hasMissing[varID])
{
double missval = vlistInqVarMissval(vlistID, varID);
for (size_t i = 0; i < (size_t)chunkLen; ++i)
nmiss += (data[i] == missval);
}
return nmiss;
}
#ifdef HAVE_PARALLEL_NC4
static void
queryVarBounds(struct PPM_extent varShape[3], int vlistID, int varID)
......@@ -429,6 +483,80 @@ myVarPart(struct PPM_extent varShape[3], struct xyzDims collGrid,
PPM_uniform_partition_nd(3, varShape, collGrid.sizes,
myCollGridCoord, myPart);
}
/* collective writing variant */
static void
writeNetCDFStream(size_t streamIdx,
struct streamMapping *mapping,
double **data_, int *currentDataBufSize)
{
int nvars = mapping->numVars;
int *restrict varMap = mapping->varMap;
int streamID = openStreams.entries[streamIdx],
vlistID = streamInqVlist(streamID);
for (int varID = 0; varID < nvars; ++varID)
if (mapping->numLvlsW[varID])
{
struct PPM_extent varShape[3];
queryVarBounds(varShape, vlistID, varID);
struct xyzDims collGrid = varDimsCollGridMatch(varShape);
xdebug("writing varID %d with dimensions: "
"x=%d, y=%d, z=%d,\n"
"found distribution with dimensions:"
" x=%d, y=%d, z=%d.", varID,
varShape[0].size, varShape[1].size, varShape[2].size,
collGrid.sizes[0], collGrid.sizes[1],
collGrid.sizes[2]);
struct PPM_extent varChunk[3];
myVarPart(varShape, collGrid, varChunk);
int myChunk[3][2];
for (int i = 0; i < 3; ++i)
{
myChunk[i][0] = PPM_extent_start(varChunk[i]);
myChunk[i][1] = PPM_extent_end(varChunk[i]);
}
xdebug("Writing chunk { { %d, %d }, { %d, %d },"
" { %d, %d } }", myChunk[0][0], myChunk[0][1],
myChunk[1][0], myChunk[1][1], myChunk[2][0],
myChunk[2][1]);
Xt_int varSize[3];
for (int i = 0; i < 3; ++i)
varSize[2 - i] = varShape[i].size;
Xt_idxlist preWriteChunk;
/* prepare yaxt descriptor for write chunk */
{
Xt_int preWriteChunkStart[3];
int preWriteChunkSize[3];
for (int i = 0; i < 3; ++i)
{
preWriteChunkStart[2 - i] = (Xt_int)varChunk[i].first;
preWriteChunkSize[2 - i] = (int)varChunk[i].size;
}
preWriteChunk = xt_idxsection_new(0, 3, varSize,
preWriteChunkSize,
preWriteChunkStart);
}
resizeVarGatherBuf(xt_idxlist_get_num_indices(preWriteChunk),
data_, currentDataBufSize);
double *restrict data = *data_;
/* transpose data into write deco */
{
int headerIdx = varMap[varID];
gatherArray(headerIdx, streamIdx, data, preWriteChunk);
xt_idxlist_delete(preWriteChunk);
}
/* count missing values if appropriate */
int nmiss
= countVarChunkMissingVals(vlistID, varID, mapping,
PPM_extents_size(3, varChunk),
data);
/* write chunk */
streamWriteVarChunk(streamID, varID,
(const int (*)[2])myChunk, data,
nmiss);
}
}
#elif defined (HAVE_LIBNETCDF)
/* needed for writing when some files are only written to by a single process */
/* cdiOpenFileMap(fileID) gives the writer process */
......@@ -482,26 +610,51 @@ cdiPioServerCdfDefVars(stream_t *streamptr)
cdfDefVars(streamptr);
}
#endif
struct recordWrite
static void
writeNetCDFStream(size_t streamIdx,
struct streamMapping *mapping,
double **data_, int *currentDataBufSize)
{
int varID, level;
};
int nvars = mapping->numVars;
int *restrict varMap = mapping->varMap,
*restrict numLvlsW = mapping->numLvlsW;
/* determine process which has stream open (writer) and
* which has data for which variable (var owner)
* three cases need to be distinguished */
int streamID = openStreams.entries[streamIdx],
vlistID = streamInqVlist(streamID);
int writerRank = cdiPioSerialOpenFileMap(streamID);
int collRank = commInqRankColl();
for (int varID = 0; varID < nvars; ++varID)
if (numLvlsW[varID])
{
int varSize;
Xt_idxlist dstList;
if (writerRank == collRank)
{
dstList = buildVarSlicesIdxList(vlistID, varID, -1, -1);
varSize = xt_idxlist_get_num_indices(dstList);
resizeVarGatherBuf(varSize, data_, currentDataBufSize);
}
else
{
varSize = 0;
dstList = xt_idxempty_new();
}
double *restrict data = *data_;
int headerIdx = varMap[varID];
gatherArray(headerIdx, streamIdx, data, dstList);
if (writerRank == collRank)
{
int nmiss = countVarChunkMissingVals(vlistID, varID,
mapping, varSize, data);
streamWriteVar(streamID, varID, data, nmiss);
}
xt_idxlist_delete(dstList);
}
}
struct streamMapping {
int numVars;
/* data entry varMap[i] contains data for variable i or -1 if no
* data entry for i has been transferred */
int *varMap;
/* numLvls[i] number of levels written for variable i or 0 if
* variable is not written to this timestep */
int *numLvlsW;
/* nMiss[i] = missing values were provided for variable i */
int *hasMissing;
int numWrittenRecords;
struct recordWrite writtenRecords[];
};
#endif
/* build inventory of written variables for stream */
static struct streamMapping *
......@@ -573,40 +726,6 @@ streamMappingDelete(struct streamMapping **mapping)
*mapping = NULL;
}
static Xt_idxlist
buildVarSlicesIdxList(int vlistID, int varID, int startLvl, int numLvl)
{
/* doubles as slice shape later */
int varShape[3] = { 0, 0, 0 };
cdiPioQueryVarDims(varShape, vlistID, varID);
/* int varSize = varShape[0] * varShape[1] * varShape[2]; */
Xt_int varShapeXt[3];
Xt_int origin[3] = { 0, 0, 0 };
for (unsigned i = 0; i < 3; ++i)
varShapeXt[2 - i] = varShape[i];
if (startLvl >= 0) origin[0] = (Xt_int)startLvl;
varShape[0] = varShapeXt[0];
varShape[2] = varShapeXt[2];
if (numLvl >= 0) varShape[0] = numLvl;
return xt_idxsection_new(0, 3, varShapeXt, varShape,
origin);
}
static int
countVarChunkMissingVals(int vlistID, int varID,
struct streamMapping *mapping,
int chunkLen, const double *restrict data)
{
int nmiss = 0;
if (mapping->hasMissing[varID])
{
double missval = vlistInqVarMissval(vlistID, varID);
for (size_t i = 0; i < (size_t)chunkLen; ++i)
nmiss += (data[i] == missval);
}
return nmiss;
}
static void
writeGribStream(size_t streamIdx,
struct streamMapping *mapping,
......@@ -727,124 +846,19 @@ static void readGetBuffers(size_t streamIdx)
struct streamMapping *map = streamMappingNew(streamIdx, winDict);
double *data = NULL;
int currentDataBufSize = 0;
int vlistID = streamInqVlist(streamID);
int filetype = streamInqFiletype(streamID);
switch (filetype)
{
case FILETYPE_GRB:
case FILETYPE_GRB2:
writeGribStream(streamIdx, map,
&data, &currentDataBufSize);
writeGribStream(streamIdx, map, &data, &currentDataBufSize);
break;
#ifdef HAVE_NETCDF4
case FILETYPE_NC:
case FILETYPE_NC2:
case FILETYPE_NC4:
#ifdef HAVE_PARALLEL_NC4
/* HAVE_PARALLE_NC4 implies having ScalES-PPM and yaxt */
{
int nvars = map->numVars;
int *restrict varMap = map->varMap;
for (int varID = 0; varID < nvars; ++varID)
if (map->numLvlsW[varID])
{
struct PPM_extent varShape[3];
queryVarBounds(varShape, vlistID, varID);
struct xyzDims collGrid = varDimsCollGridMatch(varShape);
xdebug("writing varID %d with dimensions: "
"x=%d, y=%d, z=%d,\n"
"found distribution with dimensions:"
" x=%d, y=%d, z=%d.", varID,
varShape[0].size, varShape[1].size, varShape[2].size,
collGrid.sizes[0], collGrid.sizes[1],
collGrid.sizes[2]);
struct PPM_extent varChunk[3];
myVarPart(varShape, collGrid, varChunk);
int myChunk[3][2];
for (int i = 0; i < 3; ++i)
{
myChunk[i][0] = PPM_extent_start(varChunk[i]);
myChunk[i][1] = PPM_extent_end(varChunk[i]);
}
xdebug("Writing chunk { { %d, %d }, { %d, %d },"
" { %d, %d } }", myChunk[0][0], myChunk[0][1],
myChunk[1][0], myChunk[1][1], myChunk[2][0],
myChunk[2][1]);
Xt_int varSize[3];
for (int i = 0; i < 3; ++i)
varSize[2 - i] = varShape[i].size;
Xt_idxlist preWriteChunk;
/* prepare yaxt descriptor for write chunk */
{
Xt_int preWriteChunkStart[3];
int preWriteChunkSize[3];
for (int i = 0; i < 3; ++i)
{
preWriteChunkStart[2 - i] = (Xt_int)varChunk[i].first;
preWriteChunkSize[2 - i] = (int)varChunk[i].size;
}
preWriteChunk = xt_idxsection_new(0, 3, varSize,
preWriteChunkSize,
preWriteChunkStart);
}
resizeVarGatherBuf(xt_idxlist_get_num_indices(preWriteChunk),
&data, &currentDataBufSize);
/* transpose data into write deco */
{
int headerIdx = varMap[varID];
gatherArray(headerIdx, streamIdx, data, preWriteChunk);
xt_idxlist_delete(preWriteChunk);
}
/* count missing values if appropriate */
int nmiss
= countVarChunkMissingVals(vlistID, varID, map,
PPM_extents_size(3, varChunk),
data);
/* write chunk */
streamWriteVarChunk(streamID, varID,
(const int (*)[2])myChunk, data,
nmiss);
}
}
#else
/* determine process which has stream open (writer) and
* which has data for which variable (var owner)
* three cases need to be distinguished */
{
int nvars = map->numVars;
int *restrict varMap = map->varMap,
*restrict numLvlsW = map->numLvlsW;
int writerRank = cdiPioSerialOpenFileMap(streamID);
int collRank = commInqRankColl();
for (int varID = 0; varID < nvars; ++varID)
if (numLvlsW[varID])
{
int varSize;
Xt_idxlist dstList;
if (writerRank == collRank)
{
dstList = buildVarSlicesIdxList(vlistID, varID, -1, -1);
varSize = xt_idxlist_get_num_indices(dstList);
resizeVarGatherBuf(varSize, &data, &currentDataBufSize);
}
else
{
varSize = 0;
dstList = xt_idxempty_new();
}
int headerIdx = varMap[varID];
gatherArray(headerIdx, streamIdx, data, dstList);
if (writerRank == collRank)
{
int nmiss = countVarChunkMissingVals(vlistID, varID,
map, varSize, data);
streamWriteVar(streamID, varID, data, nmiss);
}
xt_idxlist_delete(dstList);
}
}
#endif
writeNetCDFStream(streamIdx, map, &data, &currentDataBufSize);
break;
#endif
default:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment