Commit a42e80f0 authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Also handle streams not written to by collector.

parent 8e9221af
......@@ -489,6 +489,7 @@ inventorizeStream(struct streamMapping *streamMap, int numStreamIDs,
}
streamMap[numStreamIDs].streamID = streamID;
streamMap[numStreamIDs].firstHeaderIdx = headerIdx;
streamMap[numStreamIDs].lastHeaderIdx = headerIdx;
streamMap[numStreamIDs].numVars = -1;
int filetype = streamInqFiletype(streamID);
streamMap[numStreamIDs].filetype = filetype;
......@@ -507,6 +508,16 @@ inventorizeStream(struct streamMapping *streamMap, int numStreamIDs,
return numStreamIDs + 1;
}
static inline int
streamIsInList(struct streamMapping *streamMap, int numStreamIDs,
int streamIDQuery)
{
int p = 0;
for (int i = 0; i < numStreamIDs; ++i)
p |= streamMap[i].streamID == streamIDQuery;
return p;
}
static struct streamMap
buildStreamMap(union winHeaderEntry *winDict)
{
......@@ -514,9 +525,11 @@ buildStreamMap(union winHeaderEntry *winDict)
int oldStreamIdx = CDI_UNDEFID;
int filetype = CDI_UNDEFID;
int sizeStreamMap = 16;
struct streamMapping *streamMap = xmalloc(sizeStreamMap * sizeof (streamMap[0]));
struct streamMapping *streamMap
= xmalloc(sizeStreamMap * sizeof (streamMap[0]));
int numDataEntries = winDict[0].headerSize.numDataEntries;
int numStreamIDs = 0;
/* find streams written on this process */
for (int headerIdx = 1; headerIdx < numDataEntries; headerIdx += 2)
{
int streamID
......@@ -546,6 +559,28 @@ buildStreamMap(union winHeaderEntry *winDict)
streamMap[oldStreamIdx].varMap[varID] = headerIdx;
}
}
/* join with list of streams written to in total */
{
int *streamIDs, *streamIsWritten;
int numTotalStreamIDs = streamSize();
streamIDs = xmalloc(2 * sizeof (streamIDs[0]) * (size_t)numTotalStreamIDs);
streamGetIndexList(numTotalStreamIDs, streamIDs);
streamIsWritten = streamIDs + numTotalStreamIDs;
for (int i = 0; i < numTotalStreamIDs; ++i)
streamIsWritten[i] = streamIsInList(streamMap, numStreamIDs,
streamIDs[i]);
/* Find what streams are written to at all on any process */
xmpi(MPI_Allreduce(MPI_IN_PLACE, streamIsWritten, numTotalStreamIDs,
MPI_INT, MPI_BOR, commInqCommColl()));
/* append streams written to on other tasks to mapping */
for (int i = 0; i < numTotalStreamIDs; ++i)
if (streamIsWritten[i] && !streamIsInList(streamMap, numStreamIDs,
streamIDs[i]))
numStreamIDs = inventorizeStream(streamMap, numStreamIDs,
&sizeStreamMap, streamIDs[i], -1);
free(streamIDs);
}
return (struct streamMap){ .entries = streamMap, .numEntries = numStreamIDs };
}
......@@ -602,6 +637,8 @@ static void readGetBuffers()
case FILETYPE_GRB2:
{
int headerIdx, lastHeaderIdx = map.entries[streamIdx].lastHeaderIdx;
if (lastHeaderIdx < 0)
break;
for (headerIdx = map.entries[streamIdx].firstHeaderIdx;
headerIdx <= lastHeaderIdx;
headerIdx += 2)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment