Commit 98b3f4c4 authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Extract function building list of streams.

parent e9771252
......@@ -469,6 +469,76 @@ struct streamMapping {
int numVars, *varMap;
};
struct streamMap
{
struct streamMapping *entries;
int numEntries;
};
static struct streamMap
buildStreamMap(union winHeaderEntry *winDict)
{
int streamIDOld = CDI_UNDEFID;
int oldStreamIdx = CDI_UNDEFID;
int filetype = CDI_UNDEFID;
int sizeStreamMap = 16;
struct streamMapping *streamMap = xmalloc(sizeStreamMap * sizeof (streamMap[0]));
int numDataEntries = winDict[0].headerSize.numDataEntries;
int numStreamIDs = 0;
for (int headerIdx = 1; headerIdx < numDataEntries; headerIdx += 2)
{
int streamID
= winDict[headerIdx].dataRecord.streamID
= namespaceAdaptKey2(winDict[headerIdx].dataRecord.streamID);
xassert(streamID > 0);
if (streamID != streamIDOld)
{
for (int i = numStreamIDs - 1; i >= 0; --i)
if ((streamIDOld = streamMap[i].streamID) == streamID)
{
filetype = streamMap[i].filetype;
oldStreamIdx = i;
goto streamIDInventorized;
}
if (numStreamIDs < sizeStreamMap) ; else
streamMap = xrealloc(streamMap,
(sizeStreamMap *= 2)
* sizeof (streamMap[0]));
streamMap[numStreamIDs].streamID = streamID;
streamMap[numStreamIDs].firstHeaderIdx = headerIdx;
streamMap[numStreamIDs].numVars = -1;
oldStreamIdx = numStreamIDs;
streamIDOld = streamID;
filetype = streamInqFiletype(streamID);
streamMap[numStreamIDs].filetype = filetype;
if (filetype == FILETYPE_NC || filetype == FILETYPE_NC2
|| filetype == FILETYPE_NC4)
{
int vlistID = streamInqVlist(streamID);
int nvars = vlistNvars(vlistID);
streamMap[numStreamIDs].numVars = nvars;
streamMap[numStreamIDs].varMap
= xmalloc(sizeof (streamMap[numStreamIDs].varMap[0])
* nvars);
for (int i = 0; i < nvars; ++i)
streamMap[numStreamIDs].varMap[i] = -1;
}
++numStreamIDs;
}
streamIDInventorized:
streamMap[oldStreamIdx].lastHeaderIdx = headerIdx;
if (filetype == FILETYPE_NC || filetype == FILETYPE_NC2
|| filetype == FILETYPE_NC4)
{
int varID = winDict[headerIdx].dataRecord.varID;
streamMap[oldStreamIdx].varMap[varID] = headerIdx;
}
}
return (struct streamMap){ .entries = streamMap, .numEntries = numStreamIDs };
}
static void readGetBuffers()
{
int nProcsModel = commInqNProcsModel ();
......@@ -502,81 +572,25 @@ static void readGetBuffers()
}
/* build list of streams, data was transferred for */
{
int numDataEntries = winDict[0].headerSize.numDataEntries;
int streamIdx;
struct streamMapping *streamMap;
int numStreamIDs = 0, sizeStreamMap = 16;
streamMap = xmalloc(sizeStreamMap * sizeof (streamMap[0]));
int streamIDOld = CDI_UNDEFID;
int oldStreamIdx = CDI_UNDEFID;
int filetype = CDI_UNDEFID;
for (int headerIdx = 1; headerIdx < numDataEntries; headerIdx += 2)
{
int streamID
= winDict[headerIdx].dataRecord.streamID
= namespaceAdaptKey2(winDict[headerIdx].dataRecord.streamID);
xassert(streamID > 0);
if (streamID != streamIDOld)
{
for (int i = numStreamIDs - 1; i >= 0; --i)
if ((streamIDOld = streamMap[i].streamID) == streamID)
{
filetype = streamMap[i].filetype;
oldStreamIdx = i;
goto streamIDInventorized;
}
if (numStreamIDs < sizeStreamMap) ; else
streamMap = xrealloc(streamMap,
(sizeStreamMap *= 2)
* sizeof (streamMap[0]));
streamMap[numStreamIDs].streamID = streamID;
streamMap[numStreamIDs].firstHeaderIdx = headerIdx;
streamMap[numStreamIDs].numVars = -1;
oldStreamIdx = numStreamIDs;
streamIDOld = streamID;
filetype = streamInqFiletype(streamID);
streamMap[numStreamIDs].filetype = filetype;
if (filetype == FILETYPE_NC || filetype == FILETYPE_NC2
|| filetype == FILETYPE_NC4)
{
int vlistID = streamInqVlist(streamID);
int nvars = vlistNvars(vlistID);
streamMap[numStreamIDs].numVars = nvars;
streamMap[numStreamIDs].varMap
= xmalloc(sizeof (streamMap[numStreamIDs].varMap[0])
* nvars);
for (int i = 0; i < nvars; ++i)
streamMap[numStreamIDs].varMap[i] = -1;
}
++numStreamIDs;
}
streamIDInventorized:
streamMap[oldStreamIdx].lastHeaderIdx = headerIdx;
if (filetype == FILETYPE_NC || filetype == FILETYPE_NC2
|| filetype == FILETYPE_NC4)
{
int varID = winDict[headerIdx].dataRecord.varID;
streamMap[oldStreamIdx].varMap[varID] = headerIdx;
}
}
struct streamMap map = buildStreamMap(winDict);
double *data = NULL;
#if defined (HAVE_PARALLEL_NC4)
double *writeBuf = NULL;
#endif
int currentDataBufSize = 0;
for (streamIdx = 0; streamIdx < numStreamIDs; ++streamIdx)
for (int streamIdx = 0; streamIdx < map.numEntries; ++streamIdx)
{
int streamID = streamMap[streamIdx].streamID;
int streamID = map.entries[streamIdx].streamID;
int vlistID = streamInqVlist(streamID);
int fileType = streamMap[streamIdx].filetype;
int filetype = map.entries[streamIdx].filetype;
switch (fileType)
switch (filetype)
{
case FILETYPE_GRB:
case FILETYPE_GRB2:
{
int headerIdx, lastHeaderIdx = streamMap[streamIdx].lastHeaderIdx;
for (headerIdx = streamMap[streamIdx].firstHeaderIdx;
int headerIdx, lastHeaderIdx = map.entries[streamIdx].lastHeaderIdx;
for (headerIdx = map.entries[streamIdx].firstHeaderIdx;
headerIdx <= lastHeaderIdx;
headerIdx += 2)
if (streamID == winDict[headerIdx].dataRecord.streamID)
......@@ -606,8 +620,8 @@ static void readGetBuffers()
#ifdef HAVE_PARALLEL_NC4
/* HAVE_PARALLE_NC4 implies having ScalES-PPM and yaxt */
{
int nvars = streamMap[streamIdx].numVars;
int *varMap = streamMap[streamIdx].varMap;
int nvars = map.entries[streamIdx].numVars;
int *varMap = map.entries[streamIdx].varMap;
int *varIsWritten = xmalloc(sizeof (varIsWritten[0]) * nvars);
for (int varID = 0; varID < nvars; ++varID)
varIsWritten[varID] = ((varMap[varID] != -1)
......@@ -704,8 +718,8 @@ static void readGetBuffers()
* which has data for which variable (var owner)
* three cases need to be distinguished */
{
int nvars = streamMap[streamIdx].numVars;
int *varMap = streamMap[streamIdx].varMap;
int nvars = map.entries[streamIdx].numVars;
int *varMap = map.entries[streamIdx].varMap;
int *varIsWritten = xmalloc(sizeof (varIsWritten[0]) * nvars);
for (int varID = 0; varID < nvars; ++varID)
varIsWritten[varID] = ((varMap[varID] != -1)
......@@ -787,7 +801,7 @@ static void readGetBuffers()
xabort("unhandled filetype in parallel I/O.");
}
}
free(streamMap);
free(map.entries);
free(data);
}
xdebug("%s", "RETURN");
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment