Skip to content
Snippets Groups Projects
Commit 308ec061 authored by Thomas Jahns's avatar Thomas Jahns :cartwheel: Committed by Sergey Kosukhin
Browse files

Use reliable file IDs in PIO MPI_File_write_at_all mode.

parent b370bed6
No related branches found
No related tags found
2 merge requests!34Version 2.2.0,!13Consolidation with CDI-PIO (develop)
......@@ -2,6 +2,7 @@
#include "config.h"
#endif
#include <assert.h>
#include <inttypes.h>
#include <limits.h>
#include <stdbool.h>
......@@ -26,23 +27,17 @@ struct fileMPIFWAA
int *collWriteSize;
};
static listSet *bibAFiledataM;
static int
fileIDTest(void *a, void *fileID)
{
return ((struct fileMPIFWAA *) a)->fileID == (int) (intptr_t) fileID;
}
static struct fileMPIFWAA *openFiles;
static unsigned openFilesSize, openFilesFill;
/***************************************************************/
static struct fileMPIFWAA *
initAFiledataFileWriteAtAll(const char *filename, size_t bs)
static void
initAFiledataFileWriteAtAll(struct fileMPIFWAA *of, const char *filename, size_t bs)
{
MPI_Comm commPio = commInqCommPio();
int sizePio = commInqSizePio();
size_t nameSize = strlen(filename) + 1;
struct fileMPIFWAA *of = Malloc(sizeof(*of));
of->collWriteSize = Malloc(sizeof(of->collWriteSize[0]) * (size_t) sizePio);
of->name = Malloc(nameSize);
memcpy(of->name, filename, nameSize);
......@@ -61,17 +56,13 @@ initAFiledataFileWriteAtAll(const char *filename, size_t bs)
xmpi(MPI_File_open(commPio, of->name, MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_UNIQUE_OPEN, open_info, &of->fh));
xmpi(MPI_Info_free(&open_info));
of->pos = 0;
return of;
}
/***************************************************************/
static int
destroyAFiledataFileWriteAtAll(void *v)
destroyAFiledataFileWriteAtAll(struct fileMPIFWAA *of)
{
struct fileMPIFWAA *of = v;
/* close file */
MPI_Offset endpos, fsize;
endpos = of->pos;
......@@ -85,27 +76,18 @@ destroyAFiledataFileWriteAtAll(void *v)
Free(of->collWriteSize);
Free(of->name);
of->name = NULL;
Free(of);
return iret == MPI_SUCCESS ? 0 : -1;
}
/***************************************************************/
static bool
compareNamesFileWriteAtAll(void *v1, void *v2)
{
struct fileMPIFWAA *afm1 = v1, *afm2 = v2;
return !strcmp(afm1->name, afm2->name);
}
/***************************************************************/
static size_t
fwFileWriteAtAll(int fileID, const void *buffer, size_t len)
{
struct fileMPIFWAA *of = listSetGet(bibAFiledataM, fileIDTest, (void *) (intptr_t) fileID);
xassert(of && len <= INT_MAX);
assert(fileID >= 0 && (size_t) fileID < openFilesSize && openFiles[fileID].name);
struct fileMPIFWAA *of = openFiles + fileID;
xassert(len <= INT_MAX);
MPI_Comm commPio = commInqCommPio();
int sizePio = commInqSizePio(), rankPio = commInqRankPio();
/* find position to write to */
......@@ -116,7 +98,6 @@ fwFileWriteAtAll(int fileID, const void *buffer, size_t len)
nextWritePos = myPos;
for (size_t i = (size_t) rankPio; i < (size_t) sizePio; ++i) nextWritePos += of->collWriteSize[i];
/* write buffer */
xassert(len <= INT_MAX);
xmpi(MPI_File_write_at_all(of->fh, myPos, (void *) buffer, (int) len, MPI_UNSIGNED_CHAR, MPI_STATUS_IGNORE));
of->pos = nextWritePos;
return len;
......@@ -127,21 +108,13 @@ fwFileWriteAtAll(int fileID, const void *buffer, size_t len)
static int
fcFileWriteAtAll(int fileID)
{
struct fileMPIFWAA *of = listSetGet(bibAFiledataM, fileIDTest, (void *) (intptr_t) fileID);
if (!of) xabort("listSet, fileID=%d not found", fileID);
int iret = listSetRemove(bibAFiledataM, fileIDTest, (void *) (intptr_t) fileID);
assert(fileID >= 0 && (size_t) fileID < openFilesSize && openFiles[fileID].name);
struct fileMPIFWAA *of = openFiles + fileID;
int iret = destroyAFiledataFileWriteAtAll(of);
return iret;
}
/***************************************************************/
static void
elemCheck(void *q, void *nm)
{
struct fileMPIFWAA *afm = q;
const char *name = nm;
if (!strcmp(name, afm->name)) xabort("Filename %s has already been added to set\n", name);
}
static int
fowFileWriteAtAll(const char *filename, const char *mode)
......@@ -151,12 +124,38 @@ fowFileWriteAtAll(const char *filename, const char *mode)
struct cdiPioConf *conf = cdiPioGetConf();
listSetForeach(bibAFiledataM, elemCheck, (void *) filename);
struct fileMPIFWAA *of = initAFiledataFileWriteAtAll(filename, conf->writeAggBufLim);
int id;
if ((of->fileID = id = listSetAdd(bibAFiledataM, of)) < 0) xabort("filename %s not unique", of->name);
return id;
for (size_t i = 0; i < openFilesSize; ++i)
if (openFiles[i].name && !strcmp(openFiles[i].name, filename))
{
Warning("filename %s is already open!"
" CDI-PIO does not support concurrent access"
" through different filehandles.",
filename);
return CDI_EINVAL;
}
size_t fileID = SIZE_MAX;
if (openFilesSize == openFilesFill)
{
fileID = openFilesSize;
if (openFilesSize == (size_t) INT_MAX + 1) return CDI_ELIMIT;
openFilesSize = openFilesSize ? openFilesSize * 2 : 4;
if (openFilesSize > (size_t) INT_MAX + 1) openFilesSize = (size_t) INT_MAX + 1;
openFiles = Realloc(openFiles, sizeof(*openFiles) * openFilesSize);
for (size_t i = fileID; i < openFilesSize; ++i) openFiles[i].name = NULL;
}
else
{
for (size_t i = 0; i < openFilesSize; ++i)
if (openFiles[i].name == NULL)
{
fileID = i;
break;
}
}
struct fileMPIFWAA *of = openFiles + fileID;
initAFiledataFileWriteAtAll(of, filename, conf->writeAggBufLim);
return (int) fileID;
}
/***************************************************************/
......@@ -164,13 +163,10 @@ fowFileWriteAtAll(const char *filename, const char *mode)
static void
finalizeFileWriteAtAll(void)
{
if (!listSetIsEmpty(bibAFiledataM))
xabort("set bibAFiledataM not empty");
if (openFilesFill)
xabort("files still open on exit!");
else
{
xdebug("%s", "destroy set");
listSetDelete(bibAFiledataM);
}
Free(openFiles);
}
/***************************************************************/
......@@ -178,14 +174,10 @@ finalizeFileWriteAtAll(void)
void
cdiPioFileWriteAtAllInit(void)
{
bibAFiledataM = listSetNew(destroyAFiledataFileWriteAtAll, compareNamesFileWriteAtAll);
namespaceSwitchSet(NSSWITCH_FILE_OPEN, NSSW_FUNC(fowFileWriteAtAll));
namespaceSwitchSet(NSSWITCH_FILE_CLOSE, NSSW_FUNC(fcFileWriteAtAll));
namespaceSwitchSet(NSSWITCH_FILE_WRITE, NSSW_FUNC(fwFileWriteAtAll));
namespaceSwitchSet(cdiPioExtraNSKeys[cdiPioEKFileWritingFinalize], NSSW_FUNC(finalizeFileWriteAtAll));
if (bibAFiledataM == NULL) xabort("listSetNew did not succeed");
}
/*
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment