Commit 825ec2fd authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Iterate over vars by stream.

* Instead of vars in order dictated by client streamWriteVar.
* This allows for differential handling of different stream types.
parent ebd05b24
......@@ -198,13 +198,6 @@ readFuncCall(struct funcCallDesc *header)
static
void readGetBuffers ( int tsID, int vdate, int vtime )
{
int modelID;
double * data = NULL, * dataHead = NULL;
int streamID = CDI_UNDEFID, streamIDNew = CDI_UNDEFID;
int varID, vlistID = CDI_UNDEFID, taxisID;
int size;
int nmiss = 0;
int nProcsModel = commInqNProcsModel ();
int root = commInqRootGlob ();
xdebug("%s", "START");
......@@ -230,50 +223,90 @@ void readGetBuffers ( int tsID, int vdate, int vtime )
}
xassert(numFuncCalls == winDict[0].headerSize.numRPCEntries);
}
/* build list of streams, data was transferred for */
{
int numDataEntries = winDict[0].headerSize.numDataEntries;
int headerIdx;
int streamIdx;
struct {
int streamID;
int firstHeaderIdx, lastHeaderIdx;
} *streamMap;
int numStreamIDs = 0, sizeStreamMap = 16;
streamMap = xmalloc(sizeStreamMap * sizeof (streamMap[0]));
int streamIDOld = CDI_UNDEFID;
int headerIdx, oldStreamIdx = CDI_UNDEFID;
for (headerIdx = 1; headerIdx < numDataEntries; ++headerIdx)
{
union winHeaderEntry *header
= winDict + headerIdx;
xassert(header->dataRecord.streamID > 0);
streamIDNew = header->dataRecord.streamID;
if (streamIDNew != streamID)
{
streamID = streamIDNew;
vlistID = streamInqVlist(streamID);
taxisID = vlistInqTaxis(vlistID);
taxisDefVdate(taxisID, vdate);
taxisDefVtime(taxisID, vtime);
streamDefTimestep(streamID, tsID);
}
varID = header->dataRecord.varID;
size = vlistInqVarSize(vlistID, varID);
data = xmalloc(size * sizeof (data[0]));
dataHead = data;
for (modelID = 0; modelID < nProcsModel; modelID++)
int streamID = winDict[headerIdx].dataRecord.streamID;
xassert(streamID > 0);
if (streamID != streamIDOld)
{
struct dataRecord *dataHeader
= (struct dataRecord *)rxWin[modelID].buffer + headerIdx;
xassert(dataHeader->streamID == streamID
&& dataHeader->varID == varID);
int chunk = vlistInqVarDecoChunk(vlistID, varID, modelID);
memcpy(dataHead, rxWin[modelID].buffer
+ dataHeader->offset, chunk * sizeof (data[0]));
dataHead += chunk;
nmiss = dataHeader->nmiss;
}
streamWriteVar(streamID, varID, data, nmiss);
if ( ddebug > 2 )
{
char text[1024];
sprintf(text, "streamID=%d, var[%d], size=%d",
streamID, varID, size);
xprintArray(text, data, size, DATATYPE_FLT);
int i;
for (i = numStreamIDs - 1; i >= 0; --i)
if ((streamIDOld = streamMap[i].streamID) == streamID)
goto streamIDInventorized;
if (numStreamIDs < sizeStreamMap) ; else
streamMap = xrealloc(streamMap,
(sizeStreamMap *= 2)
* sizeof (streamMap[0]));
streamMap[numStreamIDs].streamID = streamID;
streamMap[numStreamIDs].firstHeaderIdx = headerIdx;
oldStreamIdx = numStreamIDs;
streamIDOld = streamID;
++numStreamIDs;
}
free(data);
streamIDInventorized:
streamMap[oldStreamIdx].lastHeaderIdx = headerIdx;
}
double * data = NULL;
int currentDataBufSize = 0;
for (streamIdx = 0; streamIdx < numStreamIDs; ++streamIdx)
{
int streamID = streamMap[streamIdx].streamID;
int vlistID = streamInqVlist(streamID);
int taxisID = vlistInqTaxis(vlistID);
taxisDefVdate(taxisID, vdate);
taxisDefVtime(taxisID, vtime);
streamDefTimestep(streamID, tsID);
int headerIdx, lastHeaderIdx = streamMap[streamIdx].lastHeaderIdx;
for (headerIdx = streamMap[streamIdx].firstHeaderIdx;
headerIdx <= lastHeaderIdx;
++headerIdx)
if (streamID == winDict[headerIdx].dataRecord.streamID)
{
double * dataHead = NULL;
int varID = winDict[headerIdx].dataRecord.varID;
int size = vlistInqVarSize(vlistID, varID);
int nmiss = 0, modelID;
if (size <= currentDataBufSize) ; else
data = xrealloc(data, (currentDataBufSize = size)
* sizeof (data[0]));
dataHead = data;
for (modelID = 0; modelID < nProcsModel; modelID++)
{
struct dataRecord *dataHeader
= (struct dataRecord *)rxWin[modelID].buffer + headerIdx;
xassert(dataHeader->streamID == streamID
&& dataHeader->varID == varID);
int chunk = vlistInqVarDecoChunk(vlistID, varID, modelID);
memcpy(dataHead, rxWin[modelID].buffer
+ dataHeader->offset, chunk * sizeof (data[0]));
dataHead += chunk;
nmiss = dataHeader->nmiss;
}
streamWriteVar(streamID, varID, data, nmiss);
if ( ddebug > 2 )
{
char text[1024];
sprintf(text, "streamID=%d, var[%d], size=%d",
streamID, varID, size);
xprintArray(text, data, size, DATATYPE_FLT);
}
}
}
free(streamMap);
free(data);
}
xdebug("%s", "RETURN");
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment