From ba1b62498cee07b70edec9a1d537c812e4d52420 Mon Sep 17 00:00:00 2001 From: Thomas Jahns <jahns@dkrz.de> Date: Sat, 24 Feb 2018 23:34:17 +0100 Subject: [PATCH] Extract common functionality. --- src/pio_client.c | 140 +++++++++++++++++++++++++++-------------------- 1 file changed, 81 insertions(+), 59 deletions(-) diff --git a/src/pio_client.c b/src/pio_client.c index 5556c9aec..27af5ab95 100644 --- a/src/pio_client.c +++ b/src/pio_client.c @@ -27,6 +27,46 @@ #include "pio_util.h" #include "pio_serialize.h" + +struct ResHListUpdate +{ + unsigned char *msgBuffer; + MPI_Comm comm; + int numClients, clientRank, numColl, collRank, groupLeader; + int msgSize, msgPos; + bool sendRPCData; +}; + +static struct ResHListUpdate +cdiPioClientUpdateResHList() +{ + MPI_Comm comm = cdiPioInqInterComm(); + int clientRank = commInqRankModel(), + numClients = cdiPioCommInqSizeClients(), + numColl = commInqSizeColl(), + collRank = cdiPioCollRank(clientRank, numClients, numColl); + int groupLeader = cdiPioClientRangeStart(collRank, numClients, numColl); + int sendRPCData = (clientRank == groupLeader); + unsigned char *msgBuffer = NULL; + int msgSize = 0, msgBufPos = 0; + if (sendRPCData) + { + msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm); + } +#ifdef HAVE_PPM_DIST_ARRAY_H + else + cdiPioDistGridPackAssist(); +#endif + return (struct vlistMetaReturn){ .msgBuffer = msgBuffer, + .comm = comm, + .numClients = numClients, .clientRank = clientRank, + .groupLeader = groupLeader, + .numColl = numColl, .collRank = collRank, + .msgSize = msgSize, .msgPos = msgBufPos, + .sendRPCData = sendRPCData }; +} + + static int cdiPioClientStreamOpen(const char *filename, char filemode, int filetype, stream_t *streamptr, @@ -36,24 +76,19 @@ cdiPioClientStreamOpen(const char *filename, char filemode, int fileID; if ( filemode == 'w' ) { - MPI_Comm comm = cdiPioInqInterComm(); - int clientRank = commInqRankModel(), - numClients = cdiPioCommInqSizeClients(), - numColl = commInqSizeColl(), - collRank = cdiPioCollRank(clientRank, numClients, numColl); int streamID = streamptr->self; - if (clientRank - == cdiPioClientRangeStart(collRank, numClients, numColl)) + reshSetStatus(streamID, &streamOps, + reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT); + struct ResHListUpdate rup = cdiPioClientUpdateResHList(); + MPI_Comm comm = rup.comm; + int collRank = rup.collRank; + if (rup.sendRPCData) { - reshSetStatus(streamID, &streamOps, - reshGetStatus(streamID, &streamOps) - & ~RESH_SYNC_BIT); - char *msgBuffer; - int msgSize = 0; - int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm); + char *msgBuffer = rup.msgBuffer; + int msgSize = rup.msgSize; int size; size_t filename_len = strlen(filename); - xassert(filename_len < (size_t)(INT_MAX - msgBufPos)); + xassert(filename_len < (size_t)(INT_MAX - rup.msgPos)); int soHdr[3] = { streamptr->self, filetype, (int)filename_len }; xmpi(MPI_Pack_size(3, MPI_INT, comm, &size)); msgSize += size; @@ -64,11 +99,11 @@ cdiPioClientStreamOpen(const char *filename, char filemode, /* optimize to pos + size */ msgBuffer = (char *)Realloc(msgBuffer, (size_t)msgSize); xmpi(MPI_Pack(soHdr, 3, MPI_INT, - msgBuffer, msgSize, &msgBufPos, comm)); + msgBuffer, msgSize, &rup.msgPos, comm)); xmpi(MPI_Pack(&filemode, 1, MPI_CHAR, - msgBuffer, msgSize, &msgBufPos, comm)); + msgBuffer, msgSize, &rup.msgPos, comm)); xmpi(MPI_Pack((void *)filename, (int)filename_len, MPI_CHAR, - msgBuffer, msgSize, &msgBufPos, comm)); + msgBuffer, msgSize, &rup.msgPos, comm)); xmpi(MPI_Sendrecv(msgBuffer, msgSize, MPI_PACKED, collRank, STREAMOPEN, &fileID, 1, MPI_INT, collRank, STREAMOPEN, @@ -77,9 +112,6 @@ cdiPioClientStreamOpen(const char *filename, char filemode, } else { -#ifdef HAVE_PPM_DIST_ARRAY_H - cdiPioDistGridPackAssist(); -#endif xmpi(MPI_Recv(&fileID, 1, MPI_INT, collRank, STREAMOPEN, comm, MPI_STATUS_IGNORE)); } @@ -117,46 +149,41 @@ checkVlistForPIO(int vlistID) } static void -cdiPioClientStreamDefVlist_(int streamID, int vlistID) +cdiPioClientStreamDefVlistCommon(int streamID, int vlistID) { checkVlistForPIO(vlistID); cdiStreamDefVlist_(streamID, vlistID); - int clientRank = commInqRankModel(), - numClients = cdiPioCommInqSizeClients(), - numColl = commInqSizeColl(), - collRank = cdiPioCollRank(clientRank, numClients, numColl); - int sendRPCData - = (clientRank - == cdiPioClientRangeStart(collRank, numClients, numColl)); - if (sendRPCData) + reshSetStatus(streamID, &streamOps, + reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT); +} + + +static void +cdiPioClientStreamDefVlist_(int streamID, int vlistID) +{ + cdiPioClientStreamDefVlistCommon(streamID, vlistID); + struct ResHListUpdate rup = cdiPioClientUpdateResHList(); + if (rup.sendRPCData) { - MPI_Comm comm = cdiPioInqInterComm(); - reshSetStatus(streamID, &streamOps, - reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT); - char *msgBuffer; - int msgSize = 0; - int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm); + unsigned char *msgBuffer = rup.msgBuffer; + /* optimize: pos + size */ + int msgSize = rup.msgSize; { int size; xmpi(MPI_Pack_size(defVlistNInts, MPI_INT, comm, &size)); msgSize += size; } - /* optimize: pos + size */ msgBuffer = Realloc(msgBuffer, (size_t)msgSize); int msgData[defVlistNInts] = { streamID, streamInqVlist(streamID) }; xmpi(MPI_Pack(&msgData, defVlistNInts, MPI_INT, - msgBuffer, msgSize, &msgBufPos, comm)); - xmpi(MPI_Send(msgBuffer, msgBufPos, MPI_PACKED, collRank, + msgBuffer, msgSize, &rup.msgPos, comm)); + xmpi(MPI_Send(msgBuffer, rup.msgPos, MPI_PACKED, rup.collRank, STREAMDEFVLIST, comm)); Free(msgBuffer); } -#ifdef HAVE_PPM_DIST_ARRAY_H - else - cdiPioDistGridPackAssist(); -#endif - struct collSpec cspec = { .numClients = numClients, - .numServers = numColl, - .sendRPCData = sendRPCData }; + struct collSpec cspec = { .numClients = rup.numClients, + .numServers = rup.numColl, + .sendRPCData = rup.sendRPCData }; cdiPioClientStreamWinCreate(streamID, &cspec); } @@ -257,16 +284,15 @@ cdiPioClientStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted) numClients = cdiPioCommInqSizeClients(), numColl = commInqSizeColl(), collRank = cdiPioCollRank(clientRank, numClients, numColl); + reshSetStatus(streamID, &streamOps, + reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT); + struct ResHListUpdate rup = cdiPioClientUpdateResHList(); bool needsFlush = cdiPioClientStreamNeedsFlush(streamID); - if (clientRank - == cdiPioClientRangeStart(collRank, numClients, numColl)) + if (rup.sendRPCData) { - MPI_Comm comm = cdiPioInqInterComm(); - reshSetStatus(streamID, &streamOps, - reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT); - char *msgBuffer; - int msgSize = 0; - int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm); + MPI_Comm comm = rup.comm; + char *msgBuffer = rup.msgBuffer; + int msgSize = rup.msgSize; { int size; xmpi(MPI_Pack_size(1, MPI_INT, comm, &size)); @@ -275,15 +301,11 @@ cdiPioClientStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted) /* optimize: pos + size */ msgBuffer = Realloc(msgBuffer, (size_t)msgSize); xmpi(MPI_Pack(&streamptr->self, 1, MPI_INT, - msgBuffer, msgSize, &msgBufPos, comm)); + msgBuffer, msgSize, &rup.msgPos, comm)); int tag = needsFlush ? STREAMFLUSHCLOSE : STREAMCLOSE; - xmpi(MPI_Send(msgBuffer, msgBufPos, MPI_PACKED, collRank, tag, comm)); + xmpi(MPI_Send(msgBuffer, rup.msgPos, MPI_PACKED, collRank, tag, comm)); Free(msgBuffer); } -#ifdef HAVE_PPM_DIST_ARRAY_H - else - cdiPioDistGridPackAssist(); -#endif if (needsFlush) cdiPioClientStreamWinPost(streamID); cdiPioClientStreamWinDestroy(streamID); -- GitLab