Skip to content
Snippets Groups Projects

Consolidation with CDI-PIO (develop)

Merged Sergey Kosukhin requested to merge m300488/develop-rebase into develop
1 file
+ 59
61
Compare changes
  • Side-by-side
  • Inline
@@ -45,30 +45,22 @@ struct mwFileBuf
{
char *name;
struct dBuffer *db;
int fd;
int fd, tsID;
enum IO_Server_command command;
int tsID, fileID;
};
static int
lookupMwFBFileID(void *a, void *fileID)
{
return ((struct mwFileBuf *) a)->fileID == (int) (intptr_t) fileID;
}
static listSet *openedFiles;
static struct mwFileBuf *openFiles;
static unsigned openFilesSize, openFilesFill;
/***************************************************************/
static struct mwFileBuf *
newMultiwriterFileBuf(const char *key, size_t bs)
static void
newMultiwriterFileBuf(struct mwFileBuf *afd, const char *filename, size_t bs)
{
size_t len = strlen(key);
struct mwFileBuf *afd = Calloc(1, sizeof(*afd));
{
size_t nameSize = len + 1;
size_t nameSize = strlen(filename) + 1;
char *name = afd->name = Malloc(nameSize);
memcpy(afd->name, key, nameSize);
memcpy(name, filename, nameSize);
}
afd->tsID = 0;
@@ -86,14 +78,12 @@ newMultiwriterFileBuf(const char *key, size_t bs)
if ((afd->fd = open(afd->name, O_CREAT | O_WRONLY, 0666)) == -1) xabort("Failed to open %s", afd->name);
afd->command = IO_Open_file;
return afd;
}
static int
deleteMultiwriterFileBuf(void *v)
deleteMultiwriterFileBuf(struct mwFileBuf *afd)
{
int iret = 0;
struct mwFileBuf *afd = (struct mwFileBuf *) v;
int iret;
/* close file */
xdebug("name=%s, close file", afd->name);
@@ -104,19 +94,10 @@ deleteMultiwriterFileBuf(void *v)
dbuffer_cleanup(&afd->db);
Free(afd->name);
afd->name = NULL;
Free(afd);
return iret;
}
static bool
compareNamesAPF(void *v1, void *v2)
{
struct mwFileBuf *afd1 = v1, *afd2 = v2;
return !strcmp(afd1->name, afd2->name);
}
//*******************************************************
#ifndef HAVE_PWRITE
static ssize_t
@@ -137,26 +118,26 @@ writePF(struct mwFileBuf *afd)
ssize_t written;
MPI_Status status;
/* FIXME: pretend there's only one special rank for now */
int file_id = afd->fileID;
int fileID = (int) (afd - openFiles);
int specialRank = commInqSizePio() - 1;
MPI_Comm commPio = commInqCommPio();
/* send buffersize, recv offset */
size_t amount = dbuffer_data_size(afd->db);
long query[msgNumWords] = { file_id, afd->command, (long) amount };
long query[msgNumWords] = { fileID, afd->command, (long) amount };
long offset;
xmpiStat(MPI_Sendrecv(query, msgNumWords, MPI_LONG, specialRank, collGuardTag, &offset, 1, MPI_LONG, specialRank, collGuardTag,
commPio, &status),
&status);
xdebug("id=%d, command=%d, amount=%zu, sent amount=%ld, recv offset=%ld", file_id, afd->command, amount, (long) amount, offset);
xdebug("id=%d, command=%d, amount=%zu, sent amount=%ld, recv offset=%ld", fileID, afd->command, amount, (long) amount, offset);
bool doTruncate = offset < 0;
offset = labs(offset) - (offset < 0);
/* write buffer */
if ((written = pwrite(afd->fd, afd->db->buffer, amount, offset)) != (ssize_t) amount)
xabort("fileId=%d, expect to write %zu byte, written %zu byte", file_id, amount, written);
xdebug("written %zu bytes in file %d with offset %ld", written, file_id, offset);
xabort("fileId=%d, expect to write %zu byte, written %zu byte", fileID, amount, written);
xdebug("written %zu bytes in file %d with offset %ld", written, fileID, offset);
if (doTruncate) ftruncate(afd->fd, offset + (off_t) amount);
/* change outputBuffer */
dbuffer_reset(afd->db);
@@ -185,8 +166,8 @@ flushOp(struct mwFileBuf *a, int tsID)
static size_t
fwPOSIXFPGUARDSENDRECV(int fileID, const void *buffer, size_t len, int tsID)
{
int error = 0;
struct mwFileBuf *afd = listSetGet(openedFiles, lookupMwFBFileID, (void *) (intptr_t) fileID);
assert(fileID >= 0 && (size_t) fileID < openFilesSize && openFiles[fileID].name);
struct mwFileBuf *afd = openFiles + fileID;
bool flush = tsID != afd->tsID;
@@ -201,6 +182,7 @@ fwPOSIXFPGUARDSENDRECV(int fileID, const void *buffer, size_t len, int tsID)
xdebug("fileID = %d, tsID = %d, pushed data on buffer, filled = %d", fileID, tsID, filled);
int error = 0;
if (filled == 1)
{
if (flush)
@@ -221,32 +203,25 @@ fwPOSIXFPGUARDSENDRECV(int fileID, const void *buffer, size_t len, int tsID)
/***************************************************************/
static int
fcPOSIXFPGUARDSENDRECV(int id)
fcPOSIXFPGUARDSENDRECV(int fileID)
{
xdebug("write buffer, close file %d and cleanup", id);
assert(fileID >= 0 && (size_t) fileID < openFilesSize && openFiles[fileID].name);
struct mwFileBuf *afd = openFiles + fileID;
struct mwFileBuf *afd = listSetGet(openedFiles, lookupMwFBFileID, (void *) (intptr_t) id);
xdebug("write buffer, close file %d and cleanup", fileID);
afd->command = IO_Close_file;
writePF(afd);
/* remove file element */
int iret = listSetRemove(openedFiles, lookupMwFBFileID, (void *) (intptr_t) id);
int iret = deleteMultiwriterFileBuf(afd);
/* make sure the file is closed on all collectors before proceeding */
xmpi(MPI_Barrier(commInqCommColl()));
return iret;
}
/***************************************************************/
static void
lookupMwFBFilename(void *q, void *nm)
{
struct mwFileBuf *afd = q;
const char *name = nm;
if (!strcmp(name, afd->name)) xabort("Filename %s has already been added to set\n", name);
}
static int
fowPOSIXFPGUARDSENDRECV(const char *filename, const char *mode)
@@ -255,23 +230,50 @@ fowPOSIXFPGUARDSENDRECV(const char *filename, const char *mode)
struct cdiPioConf *conf = cdiPioGetConf();
listSetForeach(openedFiles, lookupMwFBFilename, (void *) filename);
/* init and add file element */
struct mwFileBuf *afd = newMultiwriterFileBuf(filename, conf->writeAggBufLim);
for (size_t i = 0; i < openFilesSize; ++i)
if (openFiles[i].name && !strcmp(openFiles[i].name, filename))
{
Warning("filename %s is already open!"
" CDI-PIO does not support concurrent access"
" through different filehandles.",
filename);
return CDI_EINVAL;
}
size_t fileID = SIZE_MAX;
if (openFilesSize == openFilesFill)
{
fileID = openFilesSize;
if (openFilesSize == (size_t) INT_MAX + 1) return CDI_ELIMIT;
openFilesSize = openFilesSize ? openFilesSize * 2 : 4;
if (openFilesSize > (size_t) INT_MAX + 1) openFilesSize = (size_t) INT_MAX + 1;
openFiles = Realloc(openFiles, sizeof(*openFiles) * openFilesSize);
for (size_t i = fileID; i < openFilesSize; ++i) openFiles[i].name = NULL;
}
else
{
for (size_t i = 0; i < openFilesSize; ++i)
if (openFiles[i].name == NULL)
{
fileID = i;
break;
}
}
struct mwFileBuf *afd = openFiles + fileID;
newMultiwriterFileBuf(afd, filename, conf->writeAggBufLim);
int file_id;
if ((afd->fileID = file_id = listSetAdd(openedFiles, afd)) < 0) xabort("filename %s not unique", afd->name);
xdebug("name=%s, init and add struct mwFileBuf, return id = %d", filename, file_id);
xdebug("name=%s, init and add struct mwFileBuf, return id = %zu", filename, fileID);
long offset;
int specialRank = commInqSpecialRank();
MPI_Status status;
MPI_Comm commPio = commInqCommPio();
long query[msgNumWords] = { file_id, afd->command, 0L };
long query[msgNumWords] = { (int) fileID, afd->command, 0L };
xmpiStat(MPI_Sendrecv(query, msgNumWords, MPI_LONG, specialRank, collGuardTag, &offset, 1, MPI_LONG, specialRank, collGuardTag,
commPio, &status),
&status);
afd->command = IO_Set_fp;
return file_id;
return (int) fileID;
}
/***************************************************************/
@@ -282,13 +284,10 @@ finalizePOSIXFPGUARDSENDRECV(void)
long query[msgNumWords] = { 0, IO_Finalize, 0L };
xmpi(MPI_Send(query, msgNumWords, MPI_LONG, commInqSpecialRank(), collGuardTag, commInqCommPio()));
if (!listSetIsEmpty(openedFiles))
if (openFilesFill)
xabort("files still open at CDI-PIO finalization");
else
{
xdebug("%s", "destroy set");
listSetDelete(openedFiles);
}
Free(openFiles);
}
/***************************************************************/
@@ -310,7 +309,6 @@ initPOSIXFPGUARDSENDRECV(void)
namespaceSwitchSet(NSSWITCH_FILE_CLOSE, NSSW_FUNC(fcPOSIXFPGUARDSENDRECV));
namespaceSwitchSet(NSSWITCH_FILE_WRITE, NSSW_FUNC(fwPOSIXFPGUARDSENDRECV));
namespaceSwitchSet(cdiPioExtraNSKeys[cdiPioEKFileWritingFinalize], NSSW_FUNC(finalizePOSIXFPGUARDSENDRECV));
openedFiles = listSetNew(deleteMultiwriterFileBuf, compareNamesAPF);
}
}
Loading