Skip to content
Snippets Groups Projects
Commit 2725df03 authored by Thomas Jahns's avatar Thomas Jahns :cartwheel:
Browse files

Move insertion of stream into PIO server internal list into streamOpen caller.

parent 7af9c1cd
No related branches found
No related tags found
No related merge requests found
......@@ -103,7 +103,7 @@ struct serverBufferSetup
int numClientsOfServer;
struct collSpec collectorData;
struct clientBufSize *bufSizes;
size_t numStreams;
unsigned numStreams;
};
static enum cdiApplyRet
......@@ -111,20 +111,18 @@ serverStreamBufSizeIter(int id, void *res, void *data)
{
(void)res;
struct serverBufferSetup *setup = data;
size_t numStreams = setup->numStreams;
size_t numClientsOfServer = (size_t)setup->numClientsOfServer;
struct clientBufSize (*bufSizes)[numClientsOfServer]
= (struct clientBufSize (*)[numClientsOfServer])setup->bufSizes;
size_t streamIdx = indexOfID(&openStreams, id);
xassert(streamIdx < setup->numStreams);
setup->collectorData.sendRPCData = 1;
for (size_t i = 0; i < numClientsOfServer; ++i)
{
bufSizes[numStreams][i]
bufSizes[streamIdx][i]
= computeClientStreamBufSize(id, &setup->collectorData);
setup->collectorData.sendRPCData = 0;
}
setup->numStreams = ++numStreams;
size_t insertPos = insertID(&openStreams, id);
xassert(insertPos == numStreams - 1);
return CDI_APPLY_GO_ON;
}
......@@ -144,7 +142,7 @@ serverBufSizes(struct serverBufferSetup *setup)
};
reshLock();
unsigned streamCount = reshCountType(&streamOps);
setup->numStreams = 0;
setup->numStreams = streamCount;
setup->bufSizes = Malloc((size_t)numClientsOfServer * streamCount
* sizeof (setup->bufSizes[0]));
cdiResHFilterApply(&streamOps, serverStreamBufSizeIter, setup);
......@@ -925,19 +923,20 @@ void getTimeStepData(int *streamActivity)
/************************************************************************/
#if defined (HAVE_LIBNETCDF) && ! defined (HAVE_PARALLEL_NC4)
static int
cdiPioStreamCDFOpenWrap(const char *filename, char filemode,
int filetype, stream_t *streamptr,
int recordBufIsToBeCreated)
cdiPioServerStreamOpen(const char *filename, char filemode,
int filetype, stream_t *streamptr,
int recordBufIsToBeCreated)
{
int fileID;
switch (filetype)
{
#if defined (HAVE_LIBNETCDF) && ! defined (HAVE_PARALLEL_NC4)
case FILETYPE_NC4:
case FILETYPE_NC4C:
{
/* Only needs initialization to shut up gcc/clang */
int rank = -1, fileID = -1;
/* Only needs initialization to shut up gcc */
int rank = -1;
int ioMode = commInqIOMode();
if (ioMode == PIO_NONE
|| commInqRankColl() == (rank = cdiPioNextOpenRank()))
......@@ -950,41 +949,51 @@ cdiPioStreamCDFOpenWrap(const char *filename, char filemode,
xmpi(MPI_Bcast(&fileID, 1, MPI_INT, rank, commInqCommColl()));
streamptr->ownerRank = rank;
cdiPioOpenFileOnRank(rank);
return fileID;
}
break;
#endif
default:
return cdiStreamOpenDefaultDelegate(filename, filemode, filetype,
streamptr, recordBufIsToBeCreated);
fileID = cdiStreamOpenDefaultDelegate(filename, filemode, filetype,
streamptr, recordBufIsToBeCreated);
}
if (fileID >= 0)
insertID(&openStreams, streamptr->self);
return fileID;
}
static void
cdiPioStreamCDFCloseWrap(stream_t *streamptr, int recordBufIsToBeDeleted)
cdiPioServerStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
{
int fileID = streamptr->fileID;
int filetype = streamptr->filetype;
if ( fileID == CDI_UNDEFID )
Warning("File %s not open!", streamptr->filename);
else
switch (filetype)
{
case FILETYPE_NC:
case FILETYPE_NC2:
case FILETYPE_NC4:
case FILETYPE_NC4C:
{
switch (filetype)
{
int rank, rankOpen = cdiPioSerialOpenFileMap(streamptr->self);
if (commInqIOMode() == PIO_NONE
|| ((rank = commInqRankColl()) == rankOpen))
cdiStreamCloseDefaultDelegate(streamptr, recordBufIsToBeDeleted);
cdiPioCloseFileOnRank(rankOpen);
#if defined (HAVE_LIBNETCDF) && ! defined (HAVE_PARALLEL_NC4)
case FILETYPE_NC:
case FILETYPE_NC2:
case FILETYPE_NC4:
case FILETYPE_NC4C:
{
int rank, rankOpen = cdiPioSerialOpenFileMap(streamptr->self);
if (commInqIOMode() == PIO_NONE
|| ((rank = commInqRankColl()) == rankOpen))
cdiStreamCloseDefaultDelegate(streamptr, recordBufIsToBeDeleted);
cdiPioCloseFileOnRank(rankOpen);
}
break;
#endif
default:
cdiStreamCloseDefaultDelegate(streamptr, recordBufIsToBeDeleted);
}
default:
cdiStreamCloseDefaultDelegate(streamptr, recordBufIsToBeDeleted);
}
removeID(&openStreams, streamptr->self);
}
}
#if defined (HAVE_LIBNETCDF) && ! defined (HAVE_PARALLEL_NC4)
static void
cdiPioCdfDefTimestep(stream_t *streamptr, int tsID)
{
......@@ -994,7 +1003,6 @@ cdiPioCdfDefTimestep(stream_t *streamptr, int tsID)
== (rankOpen = cdiPioSerialOpenFileMap(streamID))))
cdfDefTimestep(streamptr, tsID);
}
#endif
/**
......@@ -1008,6 +1016,10 @@ void cdiPioCollectorMessageLoop(void)
xdebug("%s", "START");
MPI_Comm pioInterComm = cdiPioInqInterComm();
namespaceSwitchSet(NSSWITCH_STREAM_OPEN_BACKEND,
NSSW_FUNC(cdiPioServerStreamOpen));
namespaceSwitchSet(NSSWITCH_STREAM_CLOSE_BACKEND,
NSSW_FUNC(cdiPioServerStreamClose));
#ifdef HAVE_PARALLEL_NC4
cdiPioEnableNetCDFParAccess();
numPioPrimes = PPM_prime_factorization_32((uint32_t)commInqSizeColl(),
......@@ -1015,10 +1027,6 @@ void cdiPioCollectorMessageLoop(void)
#elif defined (HAVE_LIBNETCDF)
cdiSerialOpenFileCount = Calloc(sizeof (cdiSerialOpenFileCount[0]),
(size_t)commInqSizeColl());
namespaceSwitchSet(NSSWITCH_STREAM_OPEN_BACKEND,
NSSW_FUNC(cdiPioStreamCDFOpenWrap));
namespaceSwitchSet(NSSWITCH_STREAM_CLOSE_BACKEND,
NSSW_FUNC(cdiPioStreamCDFCloseWrap));
namespaceSwitchSet(NSSWITCH_CDF_DEF_TIMESTEP,
NSSW_FUNC(cdiPioCdfDefTimestep));
namespaceSwitchSet(NSSWITCH_CDF_STREAM_SETUP,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment