Skip to content
Snippets Groups Projects

Consolidation with CDI-PIO (develop)

Merged Sergey Kosukhin requested to merge m300488/develop-rebase into develop
1 file
+ 74
45
Compare changes
  • Side-by-side
  • Inline
+ 74
45
@@ -27,6 +27,45 @@
#include "pio_util.h"
#include "pio_serialize.h"
struct ResHListUpdate
{
unsigned char *msgBuffer;
MPI_Comm comm;
int numClients, clientRank, numColl, collRank, groupLeader;
int msgSize, msgPos;
bool sendRPCData;
};
static struct ResHListUpdate
cdiPioClientUpdateResHList()
{
MPI_Comm comm = cdiPioInqInterComm();
int clientRank = commInqRankModel(), numClients = cdiPioCommInqSizeClients(), numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
int groupLeader = cdiPioClientRangeStart(collRank, numClients, numColl);
int sendRPCData = (clientRank == groupLeader);
unsigned char *msgBuffer = NULL;
int msgSize = 0, msgBufPos = 0;
if (sendRPCData)
{
msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm);
}
#ifdef HAVE_PPM_DIST_ARRAY_H
else
cdiPioDistGridPackAssist();
#endif
return (struct vlistMetaReturn){ .msgBuffer = msgBuffer,
.comm = comm,
.numClients = numClients,
.clientRank = clientRank,
.groupLeader = groupLeader,
.numColl = numColl,
.collRank = collRank,
.msgSize = msgSize,
.msgPos = msgBufPos,
.sendRPCData = sendRPCData };
}
static int
cdiPioClientStreamOpen(const char *filename, char filemode, int filetype, stream_t *streamptr, int recordBufIsToBeCreated)
{
@@ -35,19 +74,18 @@ cdiPioClientStreamOpen(const char *filename, char filemode, int filetype, stream
int fileID = 0;
if (filemode == 'w')
{
MPI_Comm comm = cdiPioInqInterComm();
int clientRank = commInqRankModel(), numClients = cdiPioCommInqSizeClients(), numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
int streamID = streamptr->self;
if (clientRank == cdiPioClientRangeStart(collRank, numClients, numColl))
reshSetStatus(streamID, &streamOps, reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT);
struct ResHListUpdate rup = cdiPioClientUpdateResHList();
MPI_Comm comm = rup.comm;
int collRank = rup.collRank;
if (rup.sendRPCData)
{
reshSetStatus(streamID, &streamOps, reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT);
char *msgBuffer;
int msgSize = 0;
int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm);
char *msgBuffer = rup.msgBuffer;
int msgSize = rup.msgSize;
int size;
size_t filename_len = strlen(filename);
xassert(filename_len < (size_t) (INT_MAX - msgBufPos));
xassert(filename_len < (size_t) (INT_MAX - rup.msgPos));
int soHdr[3] = { streamptr->self, filetype, (int) filename_len };
xmpi(MPI_Pack_size(3, MPI_INT, comm, &size));
msgSize += size;
@@ -57,18 +95,15 @@ cdiPioClientStreamOpen(const char *filename, char filemode, int filetype, stream
msgSize += size;
/* optimize to pos + size */
msgBuffer = (char *) Realloc(msgBuffer, (size_t) msgSize);
xmpi(MPI_Pack(soHdr, 3, MPI_INT, msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Pack(&filemode, 1, MPI_CHAR, msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Pack((void *) filename, (int) filename_len, MPI_CHAR, msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Pack(soHdr, 3, MPI_INT, msgBuffer, msgSize, &rup.msgPos, comm));
xmpi(MPI_Pack(&filemode, 1, MPI_CHAR, msgBuffer, msgSize, &rup.msgPos, comm));
xmpi(MPI_Pack((void *) filename, (int) filename_len, MPI_CHAR, msgBuffer, msgSize, &rup.msgPos, comm));
xmpi(MPI_Sendrecv(msgBuffer, msgSize, MPI_PACKED, collRank, STREAMOPEN, &fileID, 1, MPI_INT, collRank, STREAMOPEN, comm,
MPI_STATUS_IGNORE));
Free(msgBuffer);
}
else
{
#ifdef HAVE_PPM_DIST_ARRAY_H
cdiPioDistGridPackAssist();
#endif
xmpi(MPI_Recv(&fileID, 1, MPI_INT, collRank, STREAMOPEN, comm, MPI_STATUS_IGNORE));
}
if (fileID >= 0)
@@ -104,37 +139,35 @@ checkVlistForPIO(int vlistID)
}
static void
cdiPioClientStreamDefVlist_(int streamID, int vlistID)
cdiPioClientStreamDefVlistCommon(int streamID, int vlistID)
{
checkVlistForPIO(vlistID);
cdiStreamDefVlist_(streamID, vlistID);
int clientRank = commInqRankModel(), numClients = cdiPioCommInqSizeClients(), numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
int sendRPCData = (clientRank == cdiPioClientRangeStart(collRank, numClients, numColl));
if (sendRPCData)
reshSetStatus(streamID, &streamOps, reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT);
}
static void
cdiPioClientStreamDefVlist_(int streamID, int vlistID)
{
cdiPioClientStreamDefVlistCommon(streamID, vlistID);
struct ResHListUpdate rup = cdiPioClientUpdateResHList();
if (rup.sendRPCData)
{
MPI_Comm comm = cdiPioInqInterComm();
reshSetStatus(streamID, &streamOps, reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT);
char *msgBuffer;
int msgSize = 0;
int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm);
unsigned char *msgBuffer = rup.msgBuffer;
/* optimize: pos + size */
int msgSize = rup.msgSize;
{
int size;
xmpi(MPI_Pack_size(defVlistNInts, MPI_INT, comm, &size));
msgSize += size;
}
/* optimize: pos + size */
msgBuffer = Realloc(msgBuffer, (size_t) msgSize);
int msgData[defVlistNInts] = { streamID, streamInqVlist(streamID) };
xmpi(MPI_Pack(&msgData, defVlistNInts, MPI_INT, msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Send(msgBuffer, msgBufPos, MPI_PACKED, collRank, STREAMDEFVLIST, comm));
xmpi(MPI_Pack(&msgData, defVlistNInts, MPI_INT, msgBuffer, msgSize, &rup.msgPos, comm));
xmpi(MPI_Send(msgBuffer, rup.msgPos, MPI_PACKED, rup.collRank, STREAMDEFVLIST, comm));
Free(msgBuffer);
}
#ifdef HAVE_PPM_DIST_ARRAY_H
else
cdiPioDistGridPackAssist();
#endif
struct collSpec cspec = { .numClients = numClients, .numServers = numColl, .sendRPCData = sendRPCData };
struct collSpec cspec = { .numClients = rup.numClients, .numServers = rup.numColl, .sendRPCData = rup.sendRPCData };
cdiPioClientStreamWinCreate(streamID, &cspec);
}
@@ -221,14 +254,14 @@ cdiPioClientStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
int streamID = streamptr->self;
int clientRank = commInqRankModel(), numClients = cdiPioCommInqSizeClients(), numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
reshSetStatus(streamID, &streamOps, reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT);
struct ResHListUpdate rup = cdiPioClientUpdateResHList();
bool needsFlush = cdiPioClientStreamNeedsFlush(streamID);
if (clientRank == cdiPioClientRangeStart(collRank, numClients, numColl))
if (rup.sendRPCData)
{
MPI_Comm comm = cdiPioInqInterComm();
reshSetStatus(streamID, &streamOps, reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT);
char *msgBuffer;
int msgSize = 0;
int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm);
MPI_Comm comm = rup.comm;
char *msgBuffer = rup.msgBuffer;
int msgSize = rup.msgSize;
{
int size;
xmpi(MPI_Pack_size(1, MPI_INT, comm, &size));
@@ -236,15 +269,11 @@ cdiPioClientStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
}
/* optimize: pos + size */
msgBuffer = Realloc(msgBuffer, (size_t) msgSize);
xmpi(MPI_Pack(&streamptr->self, 1, MPI_INT, msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Pack(&streamptr->self, 1, MPI_INT, msgBuffer, msgSize, &rup.msgPos, comm));
int tag = needsFlush ? STREAMFLUSHCLOSE : STREAMCLOSE;
xmpi(MPI_Send(msgBuffer, msgBufPos, MPI_PACKED, collRank, tag, comm));
xmpi(MPI_Send(msgBuffer, rup.msgPos, MPI_PACKED, collRank, tag, comm));
Free(msgBuffer);
}
#ifdef HAVE_PPM_DIST_ARRAY_H
else
cdiPioDistGridPackAssist();
#endif
if (needsFlush) cdiPioClientStreamWinPost(streamID);
cdiPioClientStreamWinDestroy(streamID);
}
Loading