Commit b076010b authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Remove useless switch.

* PIO client can only ever be invoked on active namespaces, other stages
  no longer exist.
parent 2b60e240
......@@ -30,62 +30,52 @@ cdiPioClientStreamOpen(const char *filename, char filemode,
int fileID;
if ( filemode == 'w' )
{
enum namespaceStatus nspStatus = namespaceInqResStatus ();
switch ( nspStatus )
MPI_Comm comm = cdiPioInqInterComm();
int clientRank = commInqRankModel(),
numClients = cdiPioCommInqSizeClients(),
numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
int streamID = streamptr->self;
if (clientRank
== cdiPioClientRangeStart(collRank, numClients, numColl))
{
case NAMESPACE_STATUS_INUSE:
{
MPI_Comm comm = cdiPioInqInterComm();
int clientRank = commInqRankModel(),
numClients = cdiPioCommInqSizeClients(),
numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
int streamID = streamptr->self;
if (clientRank
== cdiPioClientRangeStart(collRank, numClients, numColl))
{
reshSetStatus(streamID, &streamOps,
reshGetStatus(streamID, &streamOps)
& ~RESH_SYNC_BIT);
char *msgBuffer;
int msgSize = 0;
int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm);
int size;
size_t filename_len = strlen(filename);
xassert(filename_len < (size_t)(INT_MAX - msgBufPos));
int soHdr[3] = { streamptr->self, filetype, (int)filename_len };
xmpi(MPI_Pack_size(3, MPI_INT, comm, &size));
msgSize += size;
xmpi(MPI_Pack_size(1, MPI_CHAR, comm, &size));
msgSize += size;
xmpi(MPI_Pack_size((int)filename_len, MPI_CHAR, comm, &size));
msgSize += size;
/* optimize to pos + size */
msgBuffer = (char *)Realloc(msgBuffer, (size_t)msgSize);
xmpi(MPI_Pack(soHdr, 3, MPI_INT,
msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Pack(&filemode, 1, MPI_CHAR,
msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Pack((void *)filename, (int)filename_len, MPI_CHAR,
msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Sendrecv(msgBuffer, msgSize, MPI_PACKED, collRank,
STREAMOPEN,
&fileID, 1, MPI_INT, collRank, STREAMOPEN,
comm, MPI_STATUS_IGNORE));
Free(msgBuffer);
}
else
xmpi(MPI_Recv(&fileID, 1, MPI_INT, collRank, STREAMOPEN,
reshSetStatus(streamID, &streamOps,
reshGetStatus(streamID, &streamOps)
& ~RESH_SYNC_BIT);
char *msgBuffer;
int msgSize = 0;
int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm);
int size;
size_t filename_len = strlen(filename);
xassert(filename_len < (size_t)(INT_MAX - msgBufPos));
int soHdr[3] = { streamptr->self, filetype, (int)filename_len };
xmpi(MPI_Pack_size(3, MPI_INT, comm, &size));
msgSize += size;
xmpi(MPI_Pack_size(1, MPI_CHAR, comm, &size));
msgSize += size;
xmpi(MPI_Pack_size((int)filename_len, MPI_CHAR, comm, &size));
msgSize += size;
/* optimize to pos + size */
msgBuffer = (char *)Realloc(msgBuffer, (size_t)msgSize);
xmpi(MPI_Pack(soHdr, 3, MPI_INT,
msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Pack(&filemode, 1, MPI_CHAR,
msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Pack((void *)filename, (int)filename_len, MPI_CHAR,
msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Sendrecv(msgBuffer, msgSize, MPI_PACKED, collRank,
STREAMOPEN,
&fileID, 1, MPI_INT, collRank, STREAMOPEN,
comm, MPI_STATUS_IGNORE));
if (fileID >= 0)
{
streamptr->filetype = filetype;
cdiPioClientStreamWinInit(streamID);
}
}
break;
default:
xabort ( "INTERNAL ERROR" );
Free(msgBuffer);
}
else
xmpi(MPI_Recv(&fileID, 1, MPI_INT, collRank, STREAMOPEN,
comm, MPI_STATUS_IGNORE));
if (fileID >= 0)
{
streamptr->filetype = filetype;
cdiPioClientStreamWinInit(streamID);
}
}
else
......@@ -96,50 +86,40 @@ cdiPioClientStreamOpen(const char *filename, char filemode,
static void
cdiPioClientStreamDefVlist_(int streamID, int vlistID)
{
enum namespaceStatus nspStatus = namespaceInqResStatus ();
cdiStreamDefVlist_(streamID, vlistID);
switch ( nspStatus )
int clientRank = commInqRankModel(),
numClients = cdiPioCommInqSizeClients(),
numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
int sendRPCData
= (clientRank
== cdiPioClientRangeStart(collRank, numClients, numColl));
if (sendRPCData)
{
case NAMESPACE_STATUS_INUSE:
MPI_Comm comm = cdiPioInqInterComm();
reshSetStatus(streamID, &streamOps,
reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT);
char *msgBuffer;
int msgSize = 0;
int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm);
{
int clientRank = commInqRankModel(),
numClients = cdiPioCommInqSizeClients(),
numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
int sendRPCData
= (clientRank
== cdiPioClientRangeStart(collRank, numClients, numColl));
if (sendRPCData)
{
MPI_Comm comm = cdiPioInqInterComm();
reshSetStatus(streamID, &streamOps,
reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT);
char *msgBuffer;
int msgSize = 0;
int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm);
{
int size;
xmpi(MPI_Pack_size(defVlistNInts, MPI_INT, comm, &size));
msgSize += size;
}
/* optimize: pos + size */
msgBuffer = Realloc(msgBuffer, (size_t)msgSize);
int msgData[defVlistNInts] = { streamID, streamInqVlist(streamID) };
xmpi(MPI_Pack(&msgData, defVlistNInts, MPI_INT,
msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Send(msgBuffer, msgBufPos, MPI_PACKED, collRank,
STREAMDEFVLIST, comm));
Free(msgBuffer);
}
struct collSpec cspec = { .numClients = numClients,
.numServers = numColl,
.sendRPCData = sendRPCData };
cdiPioClientStreamWinCreate(streamID, &cspec);
int size;
xmpi(MPI_Pack_size(defVlistNInts, MPI_INT, comm, &size));
msgSize += size;
}
break;
default:
xabort ( "INTERNAL ERROR" );
/* optimize: pos + size */
msgBuffer = Realloc(msgBuffer, (size_t)msgSize);
int msgData[defVlistNInts] = { streamID, streamInqVlist(streamID) };
xmpi(MPI_Pack(&msgData, defVlistNInts, MPI_INT,
msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Send(msgBuffer, msgBufPos, MPI_PACKED, collRank,
STREAMDEFVLIST, comm));
Free(msgBuffer);
}
struct collSpec cspec = { .numClients = numClients,
.numServers = numColl,
.sendRPCData = sendRPCData };
cdiPioClientStreamWinCreate(streamID, &cspec);
}
static void
......@@ -182,14 +162,7 @@ static void
cdiPioClientStreamWriteVarPart(int streamID, int varID, const void *data,
int nmiss, Xt_idxlist partDesc)
{
switch (namespaceInqResStatus())
{
case NAMESPACE_STATUS_INUSE:
pioBufferPartData(streamID, varID, data, nmiss, partDesc);
return;
default:
xabort("INTERNAL ERROR");
}
pioBufferPartData(streamID, varID, data, nmiss, partDesc);
}
static void
......@@ -199,15 +172,8 @@ cdiPioClientStreamWriteScatteredVarPart(int streamID, int varID,
const int displacements[],
int nmiss, Xt_idxlist partDesc)
{
switch (namespaceInqResStatus())
{
case NAMESPACE_STATUS_INUSE:
cdiPioBufferPartDataGather(streamID, varID, data, numBlocks,
blocklengths, displacements, nmiss, partDesc);
return;
default:
xabort("INTERNAL ERROR");
}
cdiPioBufferPartDataGather(streamID, varID, data, numBlocks,
blocklengths, displacements, nmiss, partDesc);
}
#if defined HAVE_LIBNETCDF
......@@ -228,45 +194,35 @@ cdiPioClientStreamNOP(stream_t *streamptr)
static void
cdiPioClientStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
{
enum namespaceStatus nspStatus = namespaceInqResStatus ();
(void)recordBufIsToBeDeleted;
int streamID = streamptr->self;
switch ( nspStatus )
int clientRank = commInqRankModel(),
numClients = cdiPioCommInqSizeClients(),
numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
if (clientRank
== cdiPioClientRangeStart(collRank, numClients, numColl))
{
case NAMESPACE_STATUS_INUSE:
MPI_Comm comm = cdiPioInqInterComm();
reshSetStatus(streamID, &streamOps,
reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT);
char *msgBuffer;
int msgSize = 0;
int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm);
{
int clientRank = commInqRankModel(),
numClients = cdiPioCommInqSizeClients(),
numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
if (clientRank
== cdiPioClientRangeStart(collRank, numClients, numColl))
{
MPI_Comm comm = cdiPioInqInterComm();
reshSetStatus(streamID, &streamOps,
reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT);
char *msgBuffer;
int msgSize = 0;
int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm);
{
int size;
xmpi(MPI_Pack_size(1, MPI_INT, comm, &size));
msgSize += size;
}
/* optimize: pos + size */
msgBuffer = Realloc(msgBuffer, (size_t)msgSize);
xmpi(MPI_Pack(&streamptr->self, 1, MPI_INT,
msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Send(msgBuffer, msgBufPos, MPI_PACKED, collRank,
STREAMCLOSE, comm));
Free(msgBuffer);
}
cdiPioClientStreamWinDestroy(streamID);
int size;
xmpi(MPI_Pack_size(1, MPI_INT, comm, &size));
msgSize += size;
}
break;
default:
xabort ( "INTERNAL ERROR" );
/* optimize: pos + size */
msgBuffer = Realloc(msgBuffer, (size_t)msgSize);
xmpi(MPI_Pack(&streamptr->self, 1, MPI_INT,
msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Send(msgBuffer, msgBufPos, MPI_PACKED, collRank,
STREAMCLOSE, comm));
Free(msgBuffer);
}
cdiPioClientStreamWinDestroy(streamID);
}
static void
......@@ -280,23 +236,13 @@ cdiPioTaxisPackWrap(void *data, void *buf, int size, int *pos,
static int
cdiPioClientStreamDefTimestep_(stream_t *streamptr, int tsID)
{
struct winHeaderEntry header;
enum namespaceStatus nspStatus = namespaceInqResStatus ();
int taxisID;
switch ( nspStatus )
{
case NAMESPACE_STATUS_INUSE:
taxisID = vlistInqTaxis(streamptr->vlistID);
header = (struct winHeaderEntry){
.id = STREAMDEFTIMESTEP,
.specific.funcArgs.streamNewTimestep = { streamptr->self, tsID } };
xassert(sizeof (void *) >= sizeof (int));
pioBufferFuncCall(streamptr->self, header,
(void *)(intptr_t)taxisID, cdiPioTaxisPackWrap);
break;
default:
xabort ( "INTERNAL ERROR" );
}
int taxisID = vlistInqTaxis(streamptr->vlistID);
struct winHeaderEntry header = (struct winHeaderEntry){
.id = STREAMDEFTIMESTEP,
.specific.funcArgs.streamNewTimestep = { streamptr->self, tsID } };
xassert(sizeof (void *) >= sizeof (int));
pioBufferFuncCall(streamptr->self, header,
(void *)(intptr_t)taxisID, cdiPioTaxisPackWrap);
return cdiStreamDefTimestep_(streamptr, tsID);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment