Commit d1233979 authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Allow for arbitrary open/close operations in PIO setups.

parent 281d00c1
......@@ -8,6 +8,7 @@
#include "cdi.h"
#include "cdi_int.h"
#include "dmemory.h"
#include "namespace.h"
#include "taxis.h"
......@@ -20,45 +21,69 @@
#include "pio_util.h"
#include "pio_serialize.h"
static void
nullPackFunc(void *obj, void *buf, int size, int *pos, void *context)
{
(void)obj; (void)buf; (void)size; (void)pos; (void)context;
}
static int
cdiPioClientStreamOpen(const char *filename, char filemode,
int filetype, stream_t *streamptr,
int recordBufIsToBeCreated)
{
struct winHeaderEntry header;
size_t filename_len;
(void)streamptr; (void)recordBufIsToBeCreated;
int fileID;
if ( filemode == 'w' )
{
statusCode nspStatus = namespaceInqResStatus ();
switch ( nspStatus )
{
case STAGE_DEFINITION:
streamptr->filetype = filetype;
break;
case STAGE_TIMELOOP:
filename_len = strlen(filename);
xassert(filename_len > 0 && filename_len < MAXDATAFILENAME);
header = (struct winHeaderEntry){
.id = STREAMOPEN,
.specific.funcArgs.newFile
= { .fnamelen = (int)filename_len,
.filetype = filetype } };
pioBufferFuncCall(streamptr->self, header,
&(struct memCpyDataDesc){filename,
filename_len + 1}, memcpyPackFunc);
xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS: %s, filenamesz=%zu,"
" filename=%s, filetype=%d",
funcMap[(-1 - STREAMOPEN)], filename_len + 1, filename,
filetype);
streamptr->filetype = filetype;
{
MPI_Comm comm = cdiPioInqInterComm();
int clientRank = commInqRankModel(),
numClients = cdiPioCommInqSizeClients(),
numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
int streamID = streamptr->self;
if (clientRank
== cdiPioClientRangeStart(collRank, numClients, numColl))
{
reshSetStatus(streamID, &streamOps,
reshGetStatus(streamID, &streamOps)
& ~RESH_SYNC_BIT);
char *msgBuffer;
int msgSize = 0;
int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm);
int size;
size_t filename_len = strlen(filename);
xassert(filename_len < (size_t)(INT_MAX - msgBufPos));
int soHdr[3] = { streamptr->self, filetype, (int)filename_len };
xmpi(MPI_Pack_size(3, MPI_INT, comm, &size));
msgSize += size;
xmpi(MPI_Pack_size(1, MPI_CHAR, comm, &size));
msgSize += size;
xmpi(MPI_Pack_size((int)filename_len, MPI_CHAR, comm, &size));
msgSize += size;
/* optimize to pos + size */
msgBuffer = (char *)Realloc(msgBuffer, (size_t)msgSize);
xmpi(MPI_Pack(soHdr, 3, MPI_INT,
msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Pack(&filemode, 1, MPI_CHAR,
msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Pack((void *)filename, (int)filename_len, MPI_CHAR,
msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Sendrecv(msgBuffer, msgSize, MPI_PACKED, collRank,
STREAMOPEN,
&fileID, 1, MPI_INT, collRank, STREAMOPEN,
comm, MPI_STATUS_IGNORE));
Free(msgBuffer);
}
else
xmpi(MPI_Recv(&fileID, 1, MPI_INT, collRank, STREAMOPEN,
comm, MPI_STATUS_IGNORE));
if (fileID >= 0)
{
streamptr->filetype = filetype;
cdiPioClientStreamWinInit(streamID);
}
}
break;
case STAGE_CLEANUP:
xabort ( "TRANSITION TO IO PROCESSES ALREADY FINISHED." );
......@@ -69,25 +94,53 @@ cdiPioClientStreamOpen(const char *filename, char filemode,
}
else
Error("cdiPIO read support not implemented");
return 1;
return fileID;
}
static void
cdiPioClientStreamDefVlist_(int streamID, int vlistID)
{
struct winHeaderEntry header;
statusCode nspStatus = namespaceInqResStatus ();
cdiStreamDefVlist_(streamID, vlistID);
switch ( nspStatus )
{
case STAGE_DEFINITION:
break;
case STAGE_TIMELOOP:
header = (struct winHeaderEntry){
.id = STREAMDEFVLIST,
.specific.funcArgs.streamChange = { streamID, vlistID } };
pioBufferFuncCall(streamID, header, NULL, nullPackFunc);
xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS: %s, streamID=%d,"
" vlistID=%d", funcMap[(-1 - STREAMDEFVLIST)], streamID, vlistID);
{
int clientRank = commInqRankModel(),
numClients = cdiPioCommInqSizeClients(),
numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
int sendRPCData
= (clientRank
== cdiPioClientRangeStart(collRank, numClients, numColl));
if (sendRPCData)
{
MPI_Comm comm = cdiPioInqInterComm();
reshSetStatus(streamID, &streamOps,
reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT);
char *msgBuffer;
int msgSize = 0;
int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm);
{
int size;
xmpi(MPI_Pack_size(defVlistNInts, MPI_INT, comm, &size));
msgSize += size;
}
/* optimize: pos + size */
msgBuffer = Realloc(msgBuffer, (size_t)msgSize);
int msgData[defVlistNInts] = { streamID, streamInqVlist(streamID) };
xmpi(MPI_Pack(&msgData, defVlistNInts, MPI_INT,
msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Send(msgBuffer, msgBufPos, MPI_PACKED, collRank,
STREAMDEFVLIST, comm));
Free(msgBuffer);
}
struct collSpec cspec = { .numClients = numClients,
.numServers = numColl,
.sendRPCData = sendRPCData };
cdiPioClientStreamWinCreate(streamID, &cspec);
}
break;
case STAGE_CLEANUP:
xabort ( "TRANSITION TO IO PROCESSES ALREADY FINISHED." );
......@@ -95,7 +148,6 @@ cdiPioClientStreamDefVlist_(int streamID, int vlistID)
default:
xabort ( "INTERNAL ERROR" );
}
cdiStreamDefVlist_(streamID, vlistID);
}
static void
......@@ -196,23 +248,43 @@ cdiPioClientStreamNOP(stream_t *streamptr)
static void
cdiPioClientStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
{
struct winHeaderEntry header;
statusCode nspStatus = namespaceInqResStatus ();
(void)recordBufIsToBeDeleted;
int streamID = streamptr->self;
switch ( nspStatus )
{
case STAGE_DEFINITION:
break;
case STAGE_TIMELOOP:
header = (struct winHeaderEntry){
.id = STREAMCLOSE,
.specific.funcArgs.streamChange
= { streamptr->self, CDI_UNDEFID } };
pioBufferFuncCall(streamptr->self, header, NULL, nullPackFunc);
xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS: %s, streamID=%d",
funcMap[-1 - STREAMCLOSE], streamptr->self);
break;
case STAGE_CLEANUP:
{
int clientRank = commInqRankModel(),
numClients = cdiPioCommInqSizeClients(),
numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
if (clientRank
== cdiPioClientRangeStart(collRank, numClients, numColl))
{
MPI_Comm comm = cdiPioInqInterComm();
reshSetStatus(streamID, &streamOps,
reshGetStatus(streamID, &streamOps) & ~RESH_SYNC_BIT);
char *msgBuffer;
int msgSize = 0;
int msgBufPos = reshPackBufferCreate(&msgBuffer, &msgSize, &comm);
{
int size;
xmpi(MPI_Pack_size(1, MPI_INT, comm, &size));
msgSize += size;
}
/* optimize: pos + size */
msgBuffer = Realloc(msgBuffer, (size_t)msgSize);
xmpi(MPI_Pack(&streamptr->self, 1, MPI_INT,
msgBuffer, msgSize, &msgBufPos, comm));
xmpi(MPI_Send(msgBuffer, msgBufPos, MPI_PACKED, collRank,
STREAMCLOSE, comm));
Free(msgBuffer);
}
cdiPioClientStreamWinDestroy(streamID);
}
break;
default:
xabort ( "INTERNAL ERROR" );
......@@ -236,7 +308,6 @@ cdiPioClientStreamDefTimestep_(stream_t *streamptr, int tsID)
switch ( nspStatus )
{
case STAGE_DEFINITION:
break;
case STAGE_TIMELOOP:
taxisID = vlistInqTaxis(streamptr->vlistID);
header = (struct winHeaderEntry){
......
......@@ -46,14 +46,26 @@ static struct rdmaWin
static struct idList openStreams;
const char * const funcMap[numRPCFuncs] = {
"streamOpen",
"streamDefVlist",
"streamClose",
"streamDefTimestep",
};
float cdiPIOpartInflate_;
static inline void
collWait(size_t streamIdx);
static inline void
collWaitAll()
{
size_t openStreamsSize = openStreams.size;
for (size_t streamIdx = 0; streamIdx < openStreamsSize; ++streamIdx)
if (openStreams.entries[streamIdx] != CDI_UNDEFID)
collWait(streamIdx);
}
/****************************************************/
void memcpyPackFunc(void *dataDesc, void *buf, int size, int *pos,
......@@ -75,64 +87,6 @@ struct clientBufferSetup
size_t bufSizesCount;
};
static enum cdiApplyRet
clientStreamBufSizeIter(int id, void *res, void *data)
{
(void)res;
struct clientBufferSetup *setup = data;
size_t bufSizesCount = setup->bufSizesCount;
setup->bufSizes[bufSizesCount]
= computeClientStreamBufSize(id, &setup->collectData);
setup->bufSizesCount = ++bufSizesCount;
size_t insertPos = insertID(&openStreams, id);
xassert(insertPos == bufSizesCount - 1);
return CDI_APPLY_GO_ON;
}
static void
clientBufSizes(struct clientBufferSetup *setup)
{
int numClients = cdiPioCommInqSizeClients(),
clientRank = commInqRankModel(),
numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
setup->collectData = (struct collSpec){
.numClients = numClients,
.numServers = numColl,
.sendRPCData
= (clientRank == cdiPioClientRangeStart(collRank, numClients, numColl)),
};
reshLock();
unsigned streamCount = reshCountType(&streamOps);
setup->bufSizesCount = 0;
setup->bufSizes = Malloc(streamCount * sizeof (setup->bufSizes[0]));
cdiResHFilterApply(&streamOps, clientStreamBufSizeIter, setup);
reshUnlock();
}
/************************************************************************/
static void
modelWinDelete(void)
{
xdebug("%s", "START");
if (txWin != NULL)
{
for (size_t streamIdx = 0; streamIdx < openStreams.size; ++streamIdx)
{
if (txWin[streamIdx].postSet)
xmpi(MPI_Win_wait(txWin[streamIdx].win));
xmpi(MPI_Win_free(&txWin[streamIdx].win));
xmpi(MPI_Free_mem(txWin[streamIdx].buffer));
}
Free(txWin);
}
xdebug("%s", "RETURN. CLEANED UP MPI_WIN'S");
}
/************************************************************************/
static void
......@@ -162,33 +116,64 @@ modelWinFlushBuffer(size_t streamIdx)
/************************************************************************/
static void
cdiPioClientWinCreate(struct clientBufferSetup *setup)
void
cdiPioClientStreamWinInit(int streamID)
{
size_t streamIdx = insertID(&openStreams, streamID);
txWin = Realloc(txWin, openStreams.size * sizeof (txWin[0]));
txWin[streamIdx].win = MPI_WIN_NULL;
txWin[streamIdx].buffer = NULL;
txWin[streamIdx].postSet = false;
}
void
cdiPioClientStreamWinDestroy(int streamID)
{
/* no operation on any other window must overlap this call for
* platforms that block the servers in MPI_Win_complete
*/
collWaitAll();
size_t streamIdx = indexOfID(&openStreams, streamID);
if (txWin[streamIdx].postSet)
xmpi(MPI_Win_wait(txWin[streamIdx].win));
if (txWin[streamIdx].win != MPI_WIN_NULL)
xmpi(MPI_Win_free(&txWin[streamIdx].win));
if (txWin[streamIdx].buffer)
xmpi(MPI_Free_mem(txWin[streamIdx].buffer));
removeID(&openStreams, streamID);
}
void
cdiPioClientStreamWinCreate(int streamID, struct collSpec *cspec)
{
/* no operation on any other window must overlap this call for
* platforms that block the servers in MPI_Win_complete
*/
collWaitAll();
struct clientBufSize bufSize
= computeClientStreamBufSize(streamID, cspec);
MPI_Info no_locks_info;
xmpi(MPI_Info_create(&no_locks_info));
xmpi(MPI_Info_set(no_locks_info, "no_locks", "true"));
size_t numStreams = setup->bufSizesCount;
txWin = Malloc(numStreams * sizeof (txWin[0]));
struct clientBufSize *restrict bufSpec = setup->bufSizes;
size_t streamIdx = indexOfID(&openStreams, streamID);
int dictSize = bufSize.numDataRecords + bufSize.numRPCRecords;
size_t streamBufSize = bufSize.bufSize;
txWin[streamIdx].dictSize = dictSize;
txWin[streamIdx].size = streamBufSize;
xmpi(MPI_Alloc_mem((MPI_Aint)streamBufSize, MPI_INFO_NULL,
&txWin[streamIdx].buffer));
txWin[streamIdx].head = txWin[streamIdx].buffer
+ (size_t)dictSize * sizeof (struct winHeaderEntry);
MPI_Comm collClientIntraComm = cdiPioInqCollClientIntraComm();
for (size_t streamIdx = 0; streamIdx < numStreams; ++streamIdx)
{
int dictSize = bufSpec[streamIdx].numDataRecords + bufSpec[streamIdx].numRPCRecords;
size_t streamBufSize = bufSpec[streamIdx].bufSize;
txWin[streamIdx].dictSize = dictSize;
txWin[streamIdx].size = streamBufSize;
xmpi(MPI_Alloc_mem((MPI_Aint)streamBufSize, MPI_INFO_NULL,
&txWin[streamIdx].buffer));
txWin[streamIdx].head = txWin[streamIdx].buffer
+ (size_t)dictSize * sizeof (struct winHeaderEntry);
xmpi(MPI_Win_create(txWin[streamIdx].buffer, (MPI_Aint)streamBufSize, 1,
no_locks_info, collClientIntraComm,
&txWin[streamIdx].win));
modelWinFlushBuffer(streamIdx);
}
xmpi(MPI_Win_create(txWin[streamIdx].buffer, (MPI_Aint)streamBufSize, 1,
no_locks_info, collClientIntraComm,
&txWin[streamIdx].win));
modelWinFlushBuffer(streamIdx);
xmpi(MPI_Info_free(&no_locks_info));
}
/************************************************************************/
static void
......@@ -598,9 +583,6 @@ void pioEndDef ( void )
xdebug("%s", "START");
struct clientBufferSetup setup;
clientBufSizes(&setup);
if (clientRank == cdiPioClientRangeStart(collRank, numClients, numColl))
{
MPI_Comm comm = cdiPioInqInterComm();
......@@ -614,8 +596,6 @@ void pioEndDef ( void )
reshPackBufferDestroy ( &buffer );
}
cdiPioClientWinCreate(&setup);
Free(setup.bufSizes);
namespaceDefResStatus ( STAGE_TIMELOOP );
xdebug("%s", "RETURN");
}
......@@ -624,9 +604,6 @@ void pioEndDef ( void )
void pioEndTimestepping ( void )
{
xdebug("%s", "START");
namespaceDefResStatus ( STAGE_CLEANUP );
xdebug("%s", "RETURN");
}
......@@ -654,7 +631,7 @@ void pioFinalize ( void )
if (clientRank == cdiPioClientRangeStart(collRank, numClients, numColl))
xmpi(MPI_Send(NULL, 0, MPI_INT, collRank, FINALIZE, cdiPioInqInterComm()));
xdebug("%s", "SENT MESSAGE WITH TAG \"FINALIZE\"");
modelWinDelete();
Free(txWin);
cdiPioCommFinalize();
idSetDestroy(&openStreams);
if (xtInitByCDI)
......
......@@ -35,6 +35,13 @@ void memcpyPackFunc(void *dataDesc, void *buf, int size, int *pos, void *context
extern float cdiPIOpartInflate_;
void
cdiPioClientStreamWinInit(int streamID);
void
cdiPioClientStreamWinCreate(int streamID, struct collSpec *cspec);
void
cdiPioClientStreamWinDestroy(int streamID);
#endif
/*
* Local Variables:
......
......@@ -52,7 +52,6 @@ computeClientStreamBufSize(int streamID, struct collSpec *collector)
rmaSizeSpec.numRPCRecords = numRPCFuncs;
rmaSizeSpec.bufSize +=
numRPCFuncs * sizeof (struct winHeaderEntry)
+ MAXDATAFILENAME
/* data part of streamDefTimestep */
+ (2 * CDI_MAX_NAME + sizeof (taxis_t));
}
......
......@@ -13,6 +13,9 @@
enum collectorCommandTags {
FINALIZE,
RESOURCES,
STREAMOPEN,
STREAMCLOSE,
STREAMDEFVLIST,
WRITETS,
};
......@@ -20,15 +23,16 @@ enum collectorCommandTags {
enum
{
numRPCFuncs = 4,
STREAMOPEN = -1,
STREAMDEFVLIST = -2,
STREAMCLOSE = -3,
STREAMDEFTIMESTEP = -4,
numRPCFuncs = 1,
STREAMDEFTIMESTEP = -1,
HEADERSIZEMARKER = -numRPCFuncs - 1,
PARTDESCMARKER = -numRPCFuncs - 2,
};
enum { MAXDATAFILENAME = 256, MINFUNCID = -numRPCFuncs, MAXFUNCID = -1 };
enum {
MINFUNCID = -numRPCFuncs,
MAXFUNCID = -1,
defVlistNInts = 2,
};
extern const char * const funcMap[numRPCFuncs];
struct headerSize
......@@ -43,18 +47,10 @@ struct dataRecord
union funcArgs
{
struct
{
int streamID, vlistID;
} streamChange;
struct
{
int streamID, tsID;
} streamNewTimestep;
struct
{
int fnamelen, filetype;
} newFile;
};
/* Describes offset and ID of serialized partition descriptor.
......
......@@ -84,74 +84,15 @@ static int numPioPrimes;
/************************************************************************/
static void
deleteServerStreamWin(size_t streamIdx)
cdiPioServerStreamWinDestroy(size_t streamIdx)
{
free(rxWin[streamIdx].clientBuf[0].mem);
xmpi(MPI_Win_free(&rxWin[streamIdx].getWin));
}
static
void serverWinCleanup(void)
{
for (size_t streamIdx = 0; streamIdx < openStreams.size; ++streamIdx)
deleteServerStreamWin(streamIdx);
free(rxWin);
}
struct serverBufferSetup
{
int numClientsOfServer;
struct collSpec collectorData;
struct clientBufSize *bufSizes;
unsigned numStreams;
};
static enum cdiApplyRet
serverStreamBufSizeIter(int id, void *res, void *data)
{
(void)res;
struct serverBufferSetup *setup = data;
size_t numClientsOfServer = (size_t)setup->numClientsOfServer;
struct clientBufSize (*bufSizes)[numClientsOfServer]
= (struct clientBufSize (*)[numClientsOfServer])setup->bufSizes;
size_t streamIdx = indexOfID(&openStreams, id);
xassert(streamIdx < setup->numStreams);
setup->collectorData.sendRPCData = 1;
for (size_t i = 0; i < numClientsOfServer; ++i)
if (rxWin[streamIdx].getWin != MPI_WIN_NULL)
{
bufSizes[streamIdx][i]
= computeClientStreamBufSize(id, &setup->collectorData);
setup->collectorData.sendRPCData = 0;
Free(rxWin[streamIdx].clientBuf[0].mem);
xmpi(MPI_Win_free(&rxWin[streamIdx].getWin));
}
return CDI_APPLY_GO_ON;
}
static size_t
serverBufSizes(struct serverBufferSetup *setup)
{
int numClients = cdiPioCommInqSizeClients(),
numColl = commInqSizeColl(),
collRank = commInqRankColl(),
numClientsOfServer
= cdiPioClientRangeStart(collRank + 1, numClients, numColl)
- cdiPioClientRangeStart(collRank, numClients, numColl);
setup->numClientsOfServer = numClientsOfServer;
setup->collectorData = (struct collSpec){
.numClients = numClients,
.numServers = numColl,
};
reshLock();
unsigned streamCount = reshCountType(&streamOps);
setup->numStreams = streamCount;
setup->bufSizes = Malloc((size_t)numClientsOfServer * streamCount
* sizeof (setup->bufSizes[0]));
cdiResHFilterApply(&streamOps, serverStreamBufSizeIter, setup);
reshUnlock();
return 0;
}
static int numClients_, *clientRanks_;
static void
......@@ -173,8 +114,9 @@ setupClientRanks(void)
}
static void
createStreamWin(size_t streamIdx, MPI_Info no_locks_info,
MPI_Comm collClientIntraComm, struct clientBufSize *bufSizes)
cdiPioServerStreamWinCreate(size_t streamIdx, MPI_Info no_locks_info,
MPI_Comm collClientIntraComm,
struct clientBufSize *bufSizes)
{
xmpi(MPI_Win_create(MPI_BOTTOM, 0, 1, no_locks_info, collClientIntraComm,
&rxWin[streamIdx].getWin));
......@@ -193,28 +135,6 @@ createStreamWin(size_t streamIdx, MPI_Info no_locks_info,
}
static void
serverWinCreate(void)
{
struct serverBufferSetup setup;
serverBufSizes(&setup);