Skip to content
Snippets Groups Projects
Commit ba1b6249 authored by Thomas Jahns's avatar Thomas Jahns :cartwheel:
Browse files

Extract common functionality.

parent 10a2ae37
No related branches found
No related tags found
No related merge requests found
......@@ -27,6 +27,46 @@
#include "pio_util.h"
#include "pio_serialize.h"
struct ResHListUpdate
{
unsigned char *msgBuffer;
MPI_Comm comm;
int numClients, clientRank, numColl, collRank, groupLeader;
int msgSize, msgPos;
bool sendRPCData;
};
static struct ResHListUpdate
cdiPioClientUpdateResHList()
{
MPI_Comm comm = cdiPioInqInterComm();
int clientRank = commInqRankModel(),
numClients = cdiPioCommInqSizeClients(),
numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
int groupLeader = cdiPioClientRangeStart(collRank, numClients, numColl);
int sendRPCData = (clientRank == groupLeader);
unsigned char *msgBuffer = NULL;
int msgSize = 0, msgBufPos = 0;
if (sendRPCData)
{
msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm);
}
#ifdef HAVE_PPM_DIST_ARRAY_H
else
cdiPioDistGridPackAssist();
#endif
return (struct vlistMetaReturn){ .msgBuffer = msgBuffer,
.comm = comm,
.numClients = numClients, .clientRank = clientRank,
.groupLeader = groupLeader,
.numColl = numColl, .collRank = collRank,
.msgSize = msgSize, .msgPos = msgBufPos,
.sendRPCData = sendRPCData };
}
static int
cdiPioClientStreamOpen(const char *filename, char filemode,
int filetype, stream_t *streamptr,
......@@ -36,24 +76,19 @@ cdiPioClientStreamOpen(const char *filename, char filemode,
int fileID;
if ( filemode == 'w' )
{
MPI_Comm comm = cdiPioInqInterComm();
int clientRank = commInqRankModel(),
numClients = cdiPioCommInqSizeClients(),
numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
int streamID = streamptr->self;
if (clientRank
== cdiPioClientRangeStart(collRank, numClients, numColl))
reshSetStatus(streamID, &streamOps,
reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT);
struct ResHListUpdate rup = cdiPioClientUpdateResHList();
MPI_Comm comm = rup.comm;
int collRank = rup.collRank;
if (rup.sendRPCData)
{
reshSetStatus(streamID, &streamOps,
reshGetStatus(streamID, &streamOps)
& ~RESH_SYNC_BIT);
char *msgBuffer;
int msgSize = 0;
int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm);
char *msgBuffer = rup.msgBuffer;
int msgSize = rup.msgSize;
int size;
size_t filename_len = strlen(filename);
xassert(filename_len < (size_t)(INT_MAX - msgBufPos));
xassert(filename_len < (size_t)(INT_MAX - rup.msgPos));
int soHdr[3] = { streamptr->self, filetype, (int)filename_len };
xmpi(MPI_Pack_size(3, MPI_INT, comm, &size));
msgSize += size;
......@@ -64,11 +99,11 @@ cdiPioClientStreamOpen(const char *filename, char filemode,
/* optimize to pos + size */
msgBuffer = (char *)Realloc(msgBuffer, (size_t)msgSize);
xmpi(MPI_Pack(soHdr, 3, MPI_INT,
msgBuffer, msgSize, &msgBufPos, comm));
msgBuffer, msgSize, &rup.msgPos, comm));
xmpi(MPI_Pack(&filemode, 1, MPI_CHAR,
msgBuffer, msgSize, &msgBufPos, comm));
msgBuffer, msgSize, &rup.msgPos, comm));
xmpi(MPI_Pack((void *)filename, (int)filename_len, MPI_CHAR,
msgBuffer, msgSize, &msgBufPos, comm));
msgBuffer, msgSize, &rup.msgPos, comm));
xmpi(MPI_Sendrecv(msgBuffer, msgSize, MPI_PACKED, collRank,
STREAMOPEN,
&fileID, 1, MPI_INT, collRank, STREAMOPEN,
......@@ -77,9 +112,6 @@ cdiPioClientStreamOpen(const char *filename, char filemode,
}
else
{
#ifdef HAVE_PPM_DIST_ARRAY_H
cdiPioDistGridPackAssist();
#endif
xmpi(MPI_Recv(&fileID, 1, MPI_INT, collRank, STREAMOPEN,
comm, MPI_STATUS_IGNORE));
}
......@@ -117,46 +149,41 @@ checkVlistForPIO(int vlistID)
}
static void
cdiPioClientStreamDefVlist_(int streamID, int vlistID)
cdiPioClientStreamDefVlistCommon(int streamID, int vlistID)
{
checkVlistForPIO(vlistID);
cdiStreamDefVlist_(streamID, vlistID);
int clientRank = commInqRankModel(),
numClients = cdiPioCommInqSizeClients(),
numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
int sendRPCData
= (clientRank
== cdiPioClientRangeStart(collRank, numClients, numColl));
if (sendRPCData)
reshSetStatus(streamID, &streamOps,
reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT);
}
static void
cdiPioClientStreamDefVlist_(int streamID, int vlistID)
{
cdiPioClientStreamDefVlistCommon(streamID, vlistID);
struct ResHListUpdate rup = cdiPioClientUpdateResHList();
if (rup.sendRPCData)
{
MPI_Comm comm = cdiPioInqInterComm();
reshSetStatus(streamID, &streamOps,
reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT);
char *msgBuffer;
int msgSize = 0;
int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm);
unsigned char *msgBuffer = rup.msgBuffer;
/* optimize: pos + size */
int msgSize = rup.msgSize;
{
int size;
xmpi(MPI_Pack_size(defVlistNInts, MPI_INT, comm, &size));
msgSize += size;
}
/* optimize: pos + size */
msgBuffer = Realloc(msgBuffer, (size_t)msgSize);
int msgData[defVlistNInts] = { streamID, streamInqVlist(streamID) };
xmpi(MPI_Pack(&msgData, defVlistNInts, MPI_INT,
msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Send(msgBuffer, msgBufPos, MPI_PACKED, collRank,
msgBuffer, msgSize, &rup.msgPos, comm));
xmpi(MPI_Send(msgBuffer, rup.msgPos, MPI_PACKED, rup.collRank,
STREAMDEFVLIST, comm));
Free(msgBuffer);
}
#ifdef HAVE_PPM_DIST_ARRAY_H
else
cdiPioDistGridPackAssist();
#endif
struct collSpec cspec = { .numClients = numClients,
.numServers = numColl,
.sendRPCData = sendRPCData };
struct collSpec cspec = { .numClients = rup.numClients,
.numServers = rup.numColl,
.sendRPCData = rup.sendRPCData };
cdiPioClientStreamWinCreate(streamID, &cspec);
}
......@@ -257,16 +284,15 @@ cdiPioClientStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
numClients = cdiPioCommInqSizeClients(),
numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
reshSetStatus(streamID, &streamOps,
reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT);
struct ResHListUpdate rup = cdiPioClientUpdateResHList();
bool needsFlush = cdiPioClientStreamNeedsFlush(streamID);
if (clientRank
== cdiPioClientRangeStart(collRank, numClients, numColl))
if (rup.sendRPCData)
{
MPI_Comm comm = cdiPioInqInterComm();
reshSetStatus(streamID, &streamOps,
reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT);
char *msgBuffer;
int msgSize = 0;
int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm);
MPI_Comm comm = rup.comm;
char *msgBuffer = rup.msgBuffer;
int msgSize = rup.msgSize;
{
int size;
xmpi(MPI_Pack_size(1, MPI_INT, comm, &size));
......@@ -275,15 +301,11 @@ cdiPioClientStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
/* optimize: pos + size */
msgBuffer = Realloc(msgBuffer, (size_t)msgSize);
xmpi(MPI_Pack(&streamptr->self, 1, MPI_INT,
msgBuffer, msgSize, &msgBufPos, comm));
msgBuffer, msgSize, &rup.msgPos, comm));
int tag = needsFlush ? STREAMFLUSHCLOSE : STREAMCLOSE;
xmpi(MPI_Send(msgBuffer, msgBufPos, MPI_PACKED, collRank, tag, comm));
xmpi(MPI_Send(msgBuffer, rup.msgPos, MPI_PACKED, collRank, tag, comm));
Free(msgBuffer);
}
#ifdef HAVE_PPM_DIST_ARRAY_H
else
cdiPioDistGridPackAssist();
#endif
if (needsFlush)
cdiPioClientStreamWinPost(streamID);
cdiPioClientStreamWinDestroy(streamID);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment