Skip to content
Snippets Groups Projects

Consolidation with CDI-PIO (develop)

Merged Sergey Kosukhin requested to merge m300488/develop-rebase into develop
1 file
+ 98
88
Compare changes
  • Side-by-side
  • Inline
+ 98
88
@@ -2,6 +2,7 @@
#include "config.h"
#endif
#include <assert.h>
#include <inttypes.h>
#include <stdlib.h>
#include <unistd.h>
@@ -11,159 +12,144 @@
#include "namespace.h"
#include "pio.h"
#include "pio_comm.h"
#include "pio_id_set.h"
#include "pio_impl.h"
#include "pio_util.h"
struct remoteFileBuf
{
char *name;
struct dBuffer *db1;
struct dBuffer *db2;
struct dBuffer *db;
enum IO_Server_command command;
MPI_Request request;
int tsID, fileID;
char name[];
int tsID;
};
static listSet *bibRemoteFileBuf;
static struct remoteFileBuf *openRemoteFiles;
static unsigned openRemoteFilesSize, openRemoteFilesFill;
static int
fileIDTest(void *a, void *fileID)
{
return ((struct remoteFileBuf *) a)->fileID == (int) (intptr_t) fileID;
}
static struct remoteFileBuf *
initRemoteFileBuf(const char *filename, size_t bs)
static void
initRemoteFileBuf(struct remoteFileBuf *rfile, const char *filename, size_t bs)
{
xdebug("filename=%s, buffersize=%zu, in", filename, bs);
size_t len = strlen(filename);
struct remoteFileBuf *afp = Malloc(sizeof(struct remoteFileBuf) + len + 1);
strcpy(afp->name, filename);
afp->tsID = 0;
rfile->name = Malloc(len + 1);
memcpy(rfile->name, filename, len + 1);
rfile->tsID = 0;
/* init output buffer */
xdebug("filename=%s, init output buffer", afp->name);
xdebug("filename=%s, init output buffer", filename);
int iret = dbuffer_init(&afp->db1, bs);
iret += dbuffer_init(&afp->db2, bs);
int iret = dbuffer_init(&rfile->db1, bs);
iret += dbuffer_init(&rfile->db2, bs);
if (iret > 0) xabort("dbuffer_init did not succeed");
afp->db = afp->db1;
rfile->db = rfile->db1;
afp->command = IO_Open_file;
afp->request = MPI_REQUEST_NULL;
rfile->command = IO_Open_file;
rfile->request = MPI_REQUEST_NULL;
xdebug("added name=%s, return", afp->name);
return afp;
xdebug("added name=%s, return", rfile->name);
}
static int
destroyRemoteFileBuf(void *v)
destroyRemoteFileBuf(struct remoteFileBuf *rfile)
{
struct remoteFileBuf *afp = (struct remoteFileBuf *) v;
MPI_Status status;
xdebug("filename=%s, cleanup, in", afp->name);
xmpiStat(MPI_Wait(&afp->request, &status), &status);
dbuffer_cleanup(&afp->db1);
dbuffer_cleanup(&afp->db2);
xdebug("filename=%s, cleanup, in", rfile->name);
Free(afp);
xmpiStat(MPI_Wait(&rfile->request, &status), &status);
dbuffer_cleanup(&rfile->db1);
dbuffer_cleanup(&rfile->db2);
Free(rfile->name);
rfile->name = NULL;
xdebug("%s", "cleaned up, return");
return 0;
}
static bool
compareNames(void *v1, void *v2)
{
struct remoteFileBuf *afd1 = v1, *afd2 = v2;
return !strcmp(afd1->name, afd2->name);
}
/***************************************************************/
/* send buffer to writer and swap buffer for filling */
static void
sendP(struct remoteFileBuf *afd, int id)
sendP(struct remoteFileBuf *rfile)
{
MPI_Status status;
size_t amount = dbuffer_data_size(afd->db);
int tag = encodeFileOpTag(id, afd->command);
size_t amount = dbuffer_data_size(rfile->db);
int fileID = (int) (rfile - openRemoteFiles);
int tag = encodeFileOpTag(fileID, rfile->command);
xdebug("send buffer for %s, size: %zu bytes, command=%s, in", afd->name, amount, cdiPioCmdStrTab[afd->command]);
xdebug("send buffer for %s, size: %zu bytes, command=%s, in", rfile->name, amount, cdiPioCmdStrTab[rfile->command]);
xmpiStat(MPI_Wait(&(afd->request), &status), &status);
xmpiStat(MPI_Wait(&(rfile->request), &status), &status);
/* FIXME: amount > INT_MAX unhandled */
xmpi(MPI_Issend(afd->db->buffer, (int) amount, MPI_UNSIGNED_CHAR, commInqSizePio() - 1, tag, commInqCommPio(), &afd->request));
xmpi(
MPI_Issend(rfile->db->buffer, (int) amount, MPI_UNSIGNED_CHAR, commInqSizePio() - 1, tag, commInqCommPio(), &rfile->request));
/* change outputBuffer */
dbuffer_reset(afd->db);
if (afd->db == afd->db1)
dbuffer_reset(rfile->db);
if (rfile->db == rfile->db1)
{
xdebug("%s", "Change to buffer 2 ...");
afd->db = afd->db2;
rfile->db = rfile->db2;
}
else
{
xdebug("%s", "Change to buffer 1 ...");
afd->db = afd->db1;
rfile->db = rfile->db1;
}
afd->command = IO_Send_buffer;
return;
rfile->command = IO_Send_buffer;
}
static void
defTimestep(struct remoteFileBuf *afd, int tsID)
defTimestep(struct remoteFileBuf *rfile, int tsID)
{
if (afd == NULL || tsID != afd->tsID + 1) xabort(" defTimestepPA () didn't succeed.");
afd->tsID = tsID;
if (rfile == NULL || tsID != rfile->tsID + 1) xabort(" defTimestepPA () didn't succeed.");
rfile->tsID = tsID;
}
static void
flushOp(struct remoteFileBuf *fb, int tsID)
{
sendP(fb, fb->fileID);
sendP(fb);
defTimestep(fb, (int) (intptr_t) tsID);
}
static size_t
pioSendWrite(int id, const void *buffer, size_t len, int tsID)
pioSendWrite(int fileID, const void *buffer, size_t len, int tsID)
{
assert(fileID >= 0 && (size_t) fileID < openRemoteFilesSize && openRemoteFiles[fileID].name);
int error = 0;
int flush = 0;
int filled;
struct remoteFileBuf *afd;
afd = listSetGet(bibRemoteFileBuf, fileIDTest, (void *) (intptr_t) id);
struct remoteFileBuf *rfile = openRemoteFiles + fileID;
flush = tsID != afd->tsID;
bool flush = tsID != rfile->tsID;
if (flush)
{
xdebug("tsID = %d, flush buffer for fileID=%d", tsID, afd->fileID);
xdebug("tsID = %d, flush buffer for fileID=%d", tsID, fileID);
flushOp(afd, tsID);
flushOp(rfile, tsID);
{
MPI_Status status;
xmpiStat(MPI_Wait(&(afd->request), &status), &status);
xmpiStat(MPI_Wait(&(rfile->request), &status), &status);
}
xmpi(MPI_Barrier(commInqCommColl()));
}
filled = dbuffer_push(afd->db, buffer, len);
filled = dbuffer_push(rfile->db, buffer, len);
xdebug("id = %d, tsID = %d, pushed %lu byte data on buffer, filled = %d", id, tsID, len, filled);
xdebug("id = %d, tsID = %d, pushed %lu byte data on buffer, filled = %d", fileID, tsID, len, filled);
if (filled == 1)
{
@@ -171,35 +157,36 @@ pioSendWrite(int id, const void *buffer, size_t len, int tsID)
error = filled;
else
{
sendP(afd, id);
error = dbuffer_push(afd->db, buffer, len);
sendP(rfile);
error = dbuffer_push(rfile->db, buffer, len);
}
}
if (error == 1) xabort("did not succeed filling output buffer, id=%d", id);
if (error == 1) xabort("did not succeed filling output buffer, id=%d", fileID);
return len;
}
static int
pioSendClose(int id)
pioSendClose(int fileID)
{
struct remoteFileBuf *afd;
xdebug("fileID %d: send buffer, close file and cleanup", id);
assert(fileID >= 0 && (size_t) fileID < openRemoteFilesSize && openRemoteFiles[fileID].name);
afd = listSetGet(bibRemoteFileBuf, fileIDTest, (void *) (intptr_t) id);
struct remoteFileBuf *rfile = openRemoteFiles + fileID;
afd->command = IO_Close_file;
xdebug("fileID %d: send buffer, close file and cleanup", fileID);
sendP(afd, id);
rfile->command = IO_Close_file;
sendP(rfile);
/* wait for other collectors to also close the file
* this prevents one process from re-using the file ID before
* another has sent the close */
xmpi(MPI_Barrier(commInqCommColl()));
/* remove file element */
int iret = listSetRemove(bibRemoteFileBuf, fileIDTest, (void *) (intptr_t) id);
return iret;
destroyRemoteFileBuf(rfile);
return 0;
}
static int
@@ -211,21 +198,45 @@ pioSendOpen(const char *filename, const char *mode)
struct cdiPioConf *conf = cdiPioGetConf();
/* init and add struct remoteFileBuf */
struct remoteFileBuf *afd = initRemoteFileBuf(filename, conf->writeAggBufLim);
int id;
if ((id = listSetAdd(bibRemoteFileBuf, afd)) < 0) xabort("filename %s is not unique", afd->name);
afd->fileID = id;
for (size_t i = 0; i < openRemoteFilesSize; ++i)
if (openRemoteFiles[i].name && !strcmp(openRemoteFiles[i].name, filename))
{
Warning("filename %s is already open! CDI-PIO does not support concurrent access through different filehandles.", filename);
return CDI_EINVAL;
}
int fileID = CDI_ELIMIT;
if (openRemoteFilesSize == openRemoteFilesFill)
{
fileID = (int) openRemoteFilesSize;
if (openRemoteFilesSize == (size_t) INT_MAX + 1) return CDI_ELIMIT;
openRemoteFilesSize = openRemoteFilesSize ? openRemoteFilesSize * 2 : 4;
if (openRemoteFilesSize > (size_t) INT_MAX + 1) openRemoteFilesSize = (size_t) INT_MAX + 1;
openRemoteFiles = Realloc(openRemoteFiles, sizeof(*openRemoteFiles) * openRemoteFilesSize);
for (size_t i = (size_t) fileID; i < openRemoteFilesSize; ++i) openRemoteFiles[i].name = NULL;
}
else
{
for (size_t i = 0; i < openRemoteFilesSize; ++i)
if (openRemoteFiles[i].name == NULL)
{
fileID = (int) i;
break;
}
}
struct remoteFileBuf *rfile = openRemoteFiles + fileID;
initRemoteFileBuf(rfile, filename, conf->writeAggBufLim);
xdebug("filename=%s, init and added remoteFileBuf, return id = %d", filename, id);
xdebug("filename=%s, init and added remoteFileBuf, return fileID = %d", filename, fileID);
/* put filename, id and buffersize on buffer */
int iret = dbuffer_push(afd->db, filename, strlen(filename) + 1);
int iret = dbuffer_push(rfile->db, filename, strlen(filename) + 1);
xassert(iret == 0);
sendP(afd, afd->fileID);
sendP(rfile);
MPI_Comm commCollectors = commInqCommColl();
xmpi(MPI_Barrier(commCollectors));
return id;
return fileID;
}
static void
@@ -239,12 +250,12 @@ pioSendFinalize(void)
xmpi(MPI_Send(&buffer, 1, MPI_INT, specialRank, tag, commPio));
xdebug("%s", "SENT MESSAGE WITH TAG \"IO_FINALIZE\" TO SPECIAL PROCESS");
if (!listSetIsEmpty(bibRemoteFileBuf))
xabort("set bibRemoteFileBuf not empty.");
if (openRemoteFilesFill)
xabort("files still open.");
else
{
xdebug("%s", "destroy set");
listSetDelete(bibRemoteFileBuf);
Free(openRemoteFiles);
}
}
@@ -268,7 +279,6 @@ pioSendInitialize(void)
namespaceSwitchSet(NSSWITCH_FILE_CLOSE, NSSW_FUNC(pioSendClose));
namespaceSwitchSet(NSSWITCH_FILE_WRITE, NSSW_FUNC(pioSendWrite));
namespaceSwitchSet(cdiPioExtraNSKeys[cdiPioEKFileWritingFinalize], NSSW_FUNC(pioSendFinalize));
bibRemoteFileBuf = listSetNew(destroyRemoteFileBuf, compareNames);
}
}
Loading