Skip to content
Snippets Groups Projects
Commit 0fb9e996 authored by Thomas Jahns's avatar Thomas Jahns :cartwheel: Committed by Sergey Kosukhin
Browse files

Change PIO MPI shared file pointer method to reliable file IDs.

parent b48df30c
No related branches found
No related tags found
2 merge requests!34Version 2.2.0,!13Consolidation with CDI-PIO (develop)
......@@ -2,6 +2,7 @@
#include "config.h"
#endif
#include <assert.h>
#include <inttypes.h>
#include <limits.h>
#include <stdbool.h>
......@@ -25,25 +26,18 @@ struct fileMPIFWS
struct dBuffer *db;
MPI_File fh;
MPI_Request request;
int fileID;
int tsID;
};
static listSet *bibAFiledataM;
static int
fileIDTest(void *a, void *fileID)
{
return ((struct fileMPIFWS *) a)->fileID == (int) (intptr_t) fileID;
}
static struct fileMPIFWS *openFiles;
static unsigned openFilesSize, openFilesFill;
/***************************************************************/
static struct fileMPIFWS *
initAFiledataMPINONB(const char *filename, size_t bufSize)
static void
initAFiledataMPINONB(struct fileMPIFWS *of, const char *filename, size_t bufSize)
{
MPI_Comm commPio = commInqCommPio();
struct fileMPIFWS *of = (struct fileMPIFWS *) Malloc(sizeof(*of));
{
size_t nameSize = strlen(filename) + 1;
char *name = of->name = Malloc(nameSize);
......@@ -60,7 +54,7 @@ initAFiledataMPINONB(const char *filename, size_t bufSize)
of->db = of->db1;
of->tsID = CDI_UNDEFID;
of->tsID = -1;
MPI_Info open_info = MPI_INFO_NULL;
/* tell IBM PE to buffer just as much as one buffer holds */
......@@ -74,22 +68,18 @@ initAFiledataMPINONB(const char *filename, size_t bufSize)
xmpi(MPI_File_open(commPio, of->name, MPI_MODE_CREATE | MPI_MODE_WRONLY, open_info, &of->fh));
xmpi(MPI_Info_free(&open_info));
of->request = MPI_REQUEST_NULL;
return of;
}
/***************************************************************/
static int
destroyAFiledataMPINONB(void *v)
destroyAFiledataMPINONB(struct fileMPIFWS *of)
{
int iret = 0;
struct fileMPIFWS *of;
MPI_Status status;
MPI_Offset endpos;
of = (struct fileMPIFWS *) v;
xdebug("IOPE%d: close file %d, name=\"%s\"", commInqRankGlob(), of->fileID, of->name);
xdebug("IOPE%d: close file %d, name=\"%s\"", commInqRankGlob(), (int) (of - openFiles), of->name);
/* close file */
xmpi(MPI_Wait(&of->request, &status));
......@@ -104,7 +94,6 @@ destroyAFiledataMPINONB(void *v)
dbuffer_cleanup(&(of->db2));
Free(of->name);
of->name = NULL;
Free(of);
xdebug("IOPE%d: closed file, cleaned up, return", commInqRankGlob());
......@@ -113,21 +102,12 @@ destroyAFiledataMPINONB(void *v)
/***************************************************************/
static bool
compareNamesMPINONB(void *v1, void *v2)
{
struct fileMPIFWS *afm1 = v1, *afm2 = v2;
return !strcmp(afm1->name, afm2->name);
}
/***************************************************************/
static void
writeMPINONB(struct fileMPIFWS *of)
{
int amount;
MPI_Status status;
int fileID = of->fileID;
int fileID = (int) (of - openFiles);
/* write buffer */
......@@ -164,19 +144,14 @@ writeMPINONB(struct fileMPIFWS *of)
static size_t
fwMPINONB(int fileID, const void *buffer, size_t len, int tsID)
{
int error = 0;
int filled = 0;
struct fileMPIFWS *of;
int rankPio = commInqRankPio();
of = listSetGet(bibAFiledataM, fileIDTest, (void *) (intptr_t) fileID);
xassert(of);
assert(fileID >= 0 && (size_t) fileID < openFilesSize && openFiles[fileID].name);
struct fileMPIFWS *of = openFiles + fileID;
bool flush = tsID != of->tsID;
if (flush)
{
xdebug3("IOPE%d: tsID = %d, flush buffer", rankPio, tsID);
xdebug3("IOPE%d: tsID = %d, flush buffer", commInqRankPio(), tsID);
writeMPINONB(of);
of->tsID = tsID;
MPI_Status status;
......@@ -184,12 +159,13 @@ fwMPINONB(int fileID, const void *buffer, size_t len, int tsID)
xmpi(MPI_Barrier(commInqCommPio()));
}
filled = dbuffer_push(of->db, buffer, len);
int filled = dbuffer_push(of->db, buffer, len);
xdebug3("IOPE%d: fileID = %d, tsID = %d,"
" pushed data on buffer, filled = %d",
rankPio, fileID, tsID, filled);
commInqRankPio(), fileID, tsID, filled);
int error = 0;
if (filled == 1)
{
if (flush)
......@@ -212,29 +188,20 @@ fwMPINONB(int fileID, const void *buffer, size_t len, int tsID)
static int
fcMPINONB(int fileID)
{
struct fileMPIFWS *of;
xdebug("IOPE%d: write buffer, close file and cleanup, in %d", commInqRankPio(), fileID);
if (!(of = listSetGet(bibAFiledataM, fileIDTest, (void *) (intptr_t) fileID))) xabort("listSet, fileID=%d not found", fileID);
assert(fileID >= 0 && (size_t) fileID < openFilesSize && openFiles[fileID].name);
struct fileMPIFWS *of = openFiles + fileID;
writeMPINONB(of);
MPI_Status status;
xmpi(MPI_Wait(&(of->request), &status));
xmpiStat(MPI_Wait(&(of->request), &status), &status);
/* remove file element */
int iret = listSetRemove(bibAFiledataM, fileIDTest, (void *) (intptr_t) fileID);
int iret = destroyAFiledataMPINONB(of);
return iret;
}
/***************************************************************/
static void
elemCheck(void *q, void *nm)
{
struct fileMPIFWS *afm = q;
const char *name = nm;
if (!strcmp(name, afm->name)) xabort("Filename %s has already been added to set\n", name);
}
static int
fowMPINONB(const char *filename, const char *mode)
......@@ -244,12 +211,38 @@ fowMPINONB(const char *filename, const char *mode)
struct cdiPioConf *conf = cdiPioGetConf();
listSetForeach(bibAFiledataM, elemCheck, (void *) filename);
struct fileMPIFWS *of = initAFiledataMPINONB(filename, conf->writeAggBufLim);
int id;
if ((of->fileID = id = listSetAdd(bibAFiledataM, of)) < 0) xabort("filename %s not unique", of->name);
return id;
for (size_t i = 0; i < openFilesSize; ++i)
if (openFiles[i].name && !strcmp(openFiles[i].name, filename))
{
Warning("filename %s is already open!"
" CDI-PIO does not support concurrent access"
" through different filehandles.",
filename);
return CDI_EINVAL;
}
size_t fileID = SIZE_MAX;
if (openFilesSize == openFilesFill)
{
fileID = openFilesSize;
if (openFilesSize == (size_t) INT_MAX + 1) return CDI_ELIMIT;
openFilesSize = openFilesSize ? openFilesSize * 2 : 4;
if (openFilesSize > (size_t) INT_MAX + 1) openFilesSize = (size_t) INT_MAX + 1;
openFiles = Realloc(openFiles, sizeof(*openFiles) * openFilesSize);
for (size_t i = fileID; i < openFilesSize; ++i) openFiles[i].name = NULL;
}
else
{
for (size_t i = 0; i < openFilesSize; ++i)
if (openFiles[i].name == NULL)
{
fileID = i;
break;
}
}
struct fileMPIFWS *of = openFiles + fileID;
initAFiledataMPINONB(of, filename, conf->writeAggBufLim);
return (int) fileID;
}
/***************************************************************/
......@@ -257,13 +250,10 @@ fowMPINONB(const char *filename, const char *mode)
static void
finalizeMPINONB(void)
{
if (!listSetIsEmpty(bibAFiledataM))
xabort("set bibAFiledataM not empty");
if (openFilesFill)
xabort("files still open on exit!");
else
{
xdebug("%s", "destroy set");
listSetDelete(bibAFiledataM);
}
Free(openFiles);
}
/***************************************************************/
......@@ -271,14 +261,10 @@ finalizeMPINONB(void)
void
initMPINONB(void)
{
bibAFiledataM = listSetNew(destroyAFiledataMPINONB, compareNamesMPINONB);
namespaceSwitchSet(NSSWITCH_FILE_OPEN, NSSW_FUNC(fowMPINONB));
namespaceSwitchSet(NSSWITCH_FILE_CLOSE, NSSW_FUNC(fcMPINONB));
namespaceSwitchSet(NSSWITCH_FILE_WRITE, NSSW_FUNC(fwMPINONB));
namespaceSwitchSet(cdiPioExtraNSKeys[cdiPioEKFileWritingFinalize], NSSW_FUNC(finalizeMPINONB));
if (bibAFiledataM == NULL) xabort("listSetNew did not succeed");
}
/*
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment