Skip to content
Snippets Groups Projects
Commit 34ecc346 authored by Thomas Jahns's avatar Thomas Jahns :cartwheel: Committed by Sergey Kosukhin
Browse files

Use single instead of per-file buffer.

* This also makes removal of the open files dict possible.
parent 68785f2f
No related branches found
No related tags found
2 merge requests!34Version 2.2.0,!13Consolidation with CDI-PIO (develop)
......@@ -86,12 +86,16 @@ static struct
struct streamMemLayout *prevLayout;
size_t numRetained;
struct cacheRedist *retained;
size_t aggBufSize, aggBufUsed;
void *aggBuf;
} * rxWin;
/* map streamID's and fileID's to linear index into arrays */
static struct idList openStreams, openFiles;
static struct
{
size_t size, used;
void *mem;
} aggBuf;
/* map streamID's to linear index into arrays */
static struct idList openStreams;
static struct dstIdxlistCache *DstIdxlistCache;
static size_t neededDstIdxlistCacheSize;
......@@ -1017,22 +1021,20 @@ szmax(size_t a, size_t b)
static size_t
aggBufAppend(int fileID, const void *restrict ptr, size_t size)
{
size_t fileIdx = indexOfID(&openFiles, fileID), aggBufSize = rxWin[fileIdx].aggBufSize, aggBufUsed = rxWin[fileIdx].aggBufUsed;
void *restrict aggBuf = rxWin[fileIdx].aggBuf;
if (aggBufUsed + size > aggBufSize)
rxWin[fileIdx].aggBuf = aggBuf = Realloc(aggBuf, (rxWin[fileIdx].aggBufSize = aggBufUsed + size));
memcpy((unsigned char *) aggBuf + aggBufUsed, ptr, size);
rxWin[fileIdx].aggBufUsed = aggBufUsed + size;
(void) fileID;
size_t aggBufSize = aggBuf.size, aggBufUsed = aggBuf.used;
void *restrict buf = aggBuf.mem;
if (aggBufUsed + size > aggBufSize) aggBuf.mem = buf = Realloc(buf, (aggBuf.size = aggBufUsed + size));
memcpy((unsigned char *) buf + aggBufUsed, ptr, size);
aggBuf.used = aggBufUsed + size;
return size;
}
static void
aggBufFlush(size_t streamIdx, size_t (*cdiPioFileWrite)(int, const void *restrict, size_t, int))
aggBufFlush(int streamID, int fileID, size_t (*cdiPioFileWrite)(int, const void *restrict, size_t, int))
{
int fileID = openFiles.entries[streamIdx];
int streamID = openStreams.entries[streamIdx];
cdiPioFileWrite(fileID, rxWin[streamIdx].aggBuf, rxWin[streamIdx].aggBufUsed, streamInqCurTimestepID(streamID));
rxWin[streamIdx].aggBufUsed = 0;
cdiPioFileWrite(fileID, aggBuf.mem, aggBuf.used, streamInqCurTimestepID(streamID));
aggBuf.used = 0;
}
static void
......@@ -1148,15 +1150,15 @@ writeGribStream(size_t streamIdx, struct streamMapping *mapping, void **data_, s
/* append encoded data records from this pass to buffer written later */
/* todo: develop better heuristic for buffer size */
if (myAggSize > rxWin[streamIdx].aggBufSize)
if (myAggSize > aggBuf.size)
{
Free(rxWin[streamIdx].aggBuf);
Free(aggBuf.mem);
size_t aggBufSize = szmax((size_t) conf->recordAggBufLimMB * (size_t) 1024 * (size_t) 1024, myAggSize);
if (posix_memalign(&rxWin[streamIdx].aggBuf, cdiGetPageSize(conf->largePageAlign), aggBufSize) == 0)
if (posix_memalign(&aggBuf.mem, cdiGetPageSize(conf->largePageAlign), aggBufSize) == 0)
;
else
rxWin[streamIdx].aggBuf = Malloc(aggBufSize);
rxWin[streamIdx].aggBufSize = aggBufSize;
aggBuf.mem = Malloc(aggBufSize);
aggBuf.size = aggBufSize;
}
namespaceSwitchSet(NSSWITCH_FILE_WRITE, NSSW_FUNC(aggBufAppend));
/* write records to aggregation buffer */
......@@ -1184,7 +1186,7 @@ writeGribStream(size_t streamIdx, struct streamMapping *mapping, void **data_, s
streamWriteVarSliceF(streamID, varID, level, (const float *) (const void *) (data + recordDataOfs), nmiss);
recordDataOfs += recordSize;
}
aggBufFlush(streamIdx, cdiPioFileWrite);
aggBufFlush(streamID, fileID, cdiPioFileWrite);
}
else
/* write zero bytes to trigger synchronization code in fileWrite */
......@@ -1344,8 +1346,6 @@ cdiPioServerStreamOpen(const char *filename, char filemode, int filetype, stream
{
size_t oldNumStreams = openStreams.size;
size_t streamIdx = insertID(&openStreams, streamptr->self);
size_t fileIdx = insertID(&openFiles, fileID);
xassert(fileIdx == streamIdx);
size_t numStreams = openStreams.size;
struct clientBuf *oldClientBufs = rxWin ? rxWin[0].clientBuf : NULL;
rxWin = Realloc(rxWin, numStreams * sizeof(rxWin[0]));
......@@ -1359,9 +1359,6 @@ cdiPioServerStreamOpen(const char *filename, char filemode, int filetype, stream
rxWin[streamIdx].prevLayout = NULL;
rxWin[streamIdx].retained = NULL;
rxWin[streamIdx].numRetained = 0;
rxWin[streamIdx].aggBufSize = 0;
rxWin[streamIdx].aggBufUsed = 0;
rxWin[streamIdx].aggBuf = NULL;
#ifdef HAVE_LIBNETCDF
rxWin[streamIdx].ownerRank = rank;
#endif
......@@ -1446,10 +1443,8 @@ cdiPioServerStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
destructRetained(rxWin[streamIdx].retained, rxWin[streamIdx].numRetained);
Free(rxWin[streamIdx].retained);
Free(rxWin[streamIdx].prevLayout);
Free(rxWin[streamIdx].aggBuf);
cdiPioServerStreamWinDestroy(streamIdx);
removeID(&openStreams, streamID);
removeID(&openFiles, fileID);
}
void (*streamCloseCallBack)(int streamID) = (void (*)(int)) conf->callbacks[CDIPIO_CALLBACK_POSTSTREAMCLOSE];
streamCloseCallBack(streamptr->self);
......@@ -1613,7 +1608,7 @@ cdiPioCollectorMessageLoop()
Free(rxWin);
}
idSetDestroy(&openStreams);
idSetDestroy(&openFiles);
Free(aggBuf.mem);
Free(streamActivity);
Free(clientRanks_);
#ifdef HAVE_PARALLEL_NC4
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment