Skip to content
Snippets Groups Projects

Consolidation with CDI-PIO (develop)

Merged Sergey Kosukhin requested to merge m300488/develop-rebase into develop
1 file
+ 36
38
Compare changes
  • Side-by-side
  • Inline
@@ -10,6 +10,7 @@
#define _XOPEN_SOURCE 500
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
@@ -35,7 +36,11 @@ enum
collGuardTag = 777,
};
typedef struct
/*
* Represents file opened by multiple writers and the corresponding
* buffer(s) for accumulation of timestep data.
*/
struct mwFileBuf
{
struct dBuffer *db1;
struct dBuffer *db2;
@@ -44,27 +49,23 @@ typedef struct
enum IO_Server_command command;
int tsID, fileID;
char name[];
} aFiledataPF;
};
static int
fileIDTestA(void *a, void *fileID)
lookupMwFBFileID(void *a, void *fileID)
{
return ((aFiledataPF *) a)->fileID == (int) (intptr_t) fileID;
return ((struct mwFileBuf *) a)->fileID == (int) (intptr_t) fileID;
}
static listSet *bibAFiledataPF;
static listSet *openedFiles;
/***************************************************************/
static aFiledataPF *
initAFiledataPF(const char *key, size_t bs)
static struct mwFileBuf *
newMultiwriterFileBuf(const char *key, size_t bs)
{
aFiledataPF *afd;
size_t len;
int iret;
len = strlen(key);
afd = Calloc(1, sizeof(*afd) + len + 1);
size_t len = strlen(key);
struct mwFileBuf *afd = Calloc(1, sizeof(*afd) + len + 1);
strcpy(afd->name, key);
afd->tsID = 0;
@@ -72,7 +73,7 @@ initAFiledataPF(const char *key, size_t bs)
xdebug(" name=%s, init output buffer", afd->name);
iret = dbuffer_init(&(afd->db1), bs);
int iret = dbuffer_init(&(afd->db1), bs);
iret += dbuffer_init(&(afd->db2), bs);
if (iret > 0) xabort("dbuffer_init did not succeed");
@@ -89,10 +90,10 @@ initAFiledataPF(const char *key, size_t bs)
}
static int
destroyAFiledataPF(void *v)
deleteMultiwriterFileBuf(void *v)
{
int iret = 0;
aFiledataPF *afd = (aFiledataPF *) v;
struct mwFileBuf *afd = (struct mwFileBuf *) v;
/* close file */
xdebug("name=%s, close file", afd->name);
@@ -111,7 +112,7 @@ destroyAFiledataPF(void *v)
static bool
compareNamesAPF(void *v1, void *v2)
{
aFiledataPF *afd1 = v1, *afd2 = v2;
struct mwFileBuf *afd1 = v1, *afd2 = v2;
return !strcmp(afd1->name, afd2->name);
}
@@ -131,7 +132,7 @@ pwrite(int fd, const void *buf, size_t count, off_t offset)
#endif
static void
writePF(aFiledataPF *afd)
writePF(struct mwFileBuf *afd)
{
ssize_t written;
MPI_Status status;
@@ -177,16 +178,16 @@ writePF(aFiledataPF *afd)
/***************************************************************/
static void
defTimestepPF(aFiledataPF *afd, int tsID)
defTimestepPF(struct mwFileBuf *afd, int tsID)
{
if (afd == NULL || tsID < 0 || tsID != afd->tsID + 1) xabort(" defTimestepPF() didn't succeed.");
assert(afd != NULL && tsID >= 0 && tsID == afd->tsID + 1);
afd->tsID = tsID;
}
/***************************************************************/
static void
flushOp(aFiledataPF *a, int tsID)
flushOp(struct mwFileBuf *a, int tsID)
{
writePF(a);
defTimestepPF(a, tsID);
@@ -196,7 +197,7 @@ static size_t
fwPOSIXFPGUARDSENDRECV(int fileID, const void *buffer, size_t len, int tsID)
{
int error = 0;
aFiledataPF *afd = listSetGet(bibAFiledataPF, fileIDTestA, (void *) (intptr_t) fileID);
struct mwFileBuf *afd = listSetGet(openedFiles, lookupMwFBFileID, (void *) (intptr_t) fileID);
bool flush = tsID != afd->tsID;
@@ -233,19 +234,16 @@ fwPOSIXFPGUARDSENDRECV(int fileID, const void *buffer, size_t len, int tsID)
static int
fcPOSIXFPGUARDSENDRECV(int id)
{
aFiledataPF *afd;
int iret;
xdebug("write buffer, close file %d and cleanup", id);
afd = listSetGet(bibAFiledataPF, fileIDTestA, (void *) (intptr_t) id);
struct mwFileBuf *afd = listSetGet(openedFiles, lookupMwFBFileID, (void *) (intptr_t) id);
afd->command = IO_Close_file;
writePF(afd);
/* remove file element */
iret = listSetRemove(bibAFiledataPF, fileIDTestA, (void *) (intptr_t) id);
int iret = listSetRemove(openedFiles, lookupMwFBFileID, (void *) (intptr_t) id);
/* make sure the file is closed on all collectors before proceeding */
xmpi(MPI_Barrier(commInqCommColl()));
return iret;
@@ -253,9 +251,9 @@ fcPOSIXFPGUARDSENDRECV(int id)
/***************************************************************/
static void
elemCheck(void *q, void *nm)
lookupMwFBFilename(void *q, void *nm)
{
aFiledataPF *afd = q;
struct mwFileBuf *afd = q;
const char *name = nm;
if (!strcmp(name, afd->name)) xabort("Filename %s has already been added to set\n", name);
@@ -279,14 +277,14 @@ fowPOSIXFPGUARDSENDRECV(const char *filename, const char *mode)
}
/* init and add file element */
listSetForeach(bibAFiledataPF, elemCheck, (void *) filename);
listSetForeach(openedFiles, lookupMwFBFilename, (void *) filename);
aFiledataPF *afd = initAFiledataPF(filename, (size_t) buffersize);
struct mwFileBuf *afd = newMultiwriterFileBuf(filename, (size_t) buffersize);
int file_id;
if ((file_id = listSetAdd(bibAFiledataPF, afd)) < 0) xabort("filename %s not unique", afd->name);
if ((file_id = listSetAdd(openedFiles, afd)) < 0) xabort("filename %s not unique", afd->name);
afd->fileID = file_id;
xdebug("name=%s, init and add aFiledataPF, return id = %d", filename, file_id);
xdebug("name=%s, init and add struct mwFileBuf, return id = %d", filename, file_id);
long offset;
int specialRank = commInqSpecialRank();
MPI_Status status;
@@ -307,12 +305,12 @@ finalizePOSIXFPGUARDSENDRECV(void)
long query[msgNumWords] = { 0, IO_Finalize, 0L };
xmpi(MPI_Send(query, msgNumWords, MPI_LONG, commInqSpecialRank(), collGuardTag, commInqCommPio()));
if (!listSetIsEmpty(bibAFiledataPF))
xabort("set bibAFiledataM not empty");
if (!listSetIsEmpty(openedFiles))
xabort("files still open at CDI-PIO finalization");
else
{
xdebug("%s", "destroy set");
listSetDelete(bibAFiledataPF);
listSetDelete(openedFiles);
}
}
@@ -335,12 +333,12 @@ initPOSIXFPGUARDSENDRECV(void)
namespaceSwitchSet(NSSWITCH_FILE_CLOSE, NSSW_FUNC(fcPOSIXFPGUARDSENDRECV));
namespaceSwitchSet(NSSWITCH_FILE_WRITE, NSSW_FUNC(fwPOSIXFPGUARDSENDRECV));
namespaceSwitchSet(cdiPioExtraNSKeys[cdiPioEKFileWritingFinalize], NSSW_FUNC(finalizePOSIXFPGUARDSENDRECV));
bibAFiledataPF = listSetNew(destroyAFiledataPF, compareNamesAPF);
openedFiles = listSetNew(deleteMultiwriterFileBuf, compareNamesAPF);
}
}
/***************************************************************/
/* the rank running this message loop responds to queries of the form
/* the rank running the message loop below responds to queries of the form
* long query[3] = { file_id, operation code, amount }
* where the answer depends on the operation code:
* IO_Open_file, IO_Set_fp or IO_Close_file:
Loading