Skip to content
Snippets Groups Projects

Consolidation with CDI-PIO (develop)

Merged Sergey Kosukhin requested to merge m300488/develop-rebase into develop
1 file
+ 50
63
Compare changes
  • Side-by-side
  • Inline
+ 50
63
@@ -36,31 +36,22 @@ struct fileFunnelAIO
unsigned short currOpIndex, aioQueueDepth;
unsigned numQueuedWrites;
int activeCollectors;
int handle, fileID;
int handle;
};
static int
fileIDTest(void *a, void *fileID)
{
return ((struct fileFunnelAIO *) a)->fileID == (int) (intptr_t) fileID;
}
/***************************************************************/
static struct fileFunnelAIO *
initBFiledataPA(char *filename, struct cdiPioConf *conf, int nc)
static void
initBFiledataPA(struct fileFunnelAIO *bfd, const char *filename, struct cdiPioConf *conf, int nc)
{
struct fileFunnelAIO *bfd;
size_t bufSize = conf->writeAggBufLim;
size_t aioQueueDepth = conf->aioQueueDepth;
xdebug("filename=%s, buffersize=%zu, ncollectors=%d, AIO queue depth=%zu", filename, bufSize, nc, aioQueueDepth);
bfd = (struct fileFunnelAIO *) Malloc(sizeof(*bfd));
{
size_t filenameSize = strlen(filename) + 1;
char *name = bfd->name = Malloc(filenameSize);
strcpy(name, filename);
memcpy(name, filename, filenameSize);
}
if ((bfd->handle = open(bfd->name, O_CREAT | O_WRONLY, 0666)) == -1) xabort("Failed to open %s", bfd->name);
@@ -83,19 +74,15 @@ initBFiledataPA(char *filename, struct cdiPioConf *conf, int nc)
bfd->activeCollectors = nc;
xdebug("filename=%s, opened file, return", bfd->name);
return bfd;
}
/***************************************************************/
static int
destroyBFiledataPA(void *v)
destroyBFiledataPA(struct fileFunnelAIO *bfd, struct cdiPioConf *conf)
{
struct fileFunnelAIO *bfd = (struct fileFunnelAIO *) v;
const struct aiocb *ccBP[1];
int iret = 0;
struct cdiPioConf *conf = cdiPioGetConf();
size_t aioQueueDepth = conf->aioQueueDepth;
size_t numQueuedWrites = bfd->numQueuedWrites;
size_t nextOpIndex = ((size_t) bfd->currOpIndex + 1) % aioQueueDepth;
@@ -129,11 +116,10 @@ destroyBFiledataPA(void *v)
/* file closed, cleanup */
dbuffer_cleanup(&(bfd->fb));
dbuffer_cleanup(&bfd->fb);
Free(bfd->name);
bfd->name = NULL;
Free(bfd->ctrlBlks);
Free(bfd);
xdebug("%s", "closed file and cleaned up, return");
@@ -142,16 +128,6 @@ destroyBFiledataPA(void *v)
/***************************************************************/
static bool
compareNamesBPA(void *v1, void *v2)
{
struct fileFunnelAIO *bfd1 = v1, *bfd2 = v2;
return !strcmp(bfd1->name, bfd2->name);
}
/***************************************************************/
static void
writePA(struct fileFunnelAIO *bfd, size_t amount, struct cdiPioConf *conf)
{
@@ -203,23 +179,13 @@ writePA(struct fileFunnelAIO *bfd, size_t amount, struct cdiPioConf *conf)
xdebug("filename=%s, numQueuedWrites=%zu, return", bfd->name, numQueuedWrites);
}
/***************************************************************/
static void
elemCheck(void *q, void *nm)
{
struct fileFunnelAIO *bfd = q;
const char *name = nm;
if (!strcmp(name, bfd->name)) xabort("Filename %s has already been inserted\n", name);
}
/***************************************************************/
void
pioWriterAIO(void)
{
struct fileFunnelAIO *bfd;
listSet *bibBFiledataPA;
struct fileFunnelAIO *openFileFunnels;
size_t openFileFunnelsSize, openFileFunnelsFill;
MPI_Comm commPio = commInqCommPio();
int nProcsColl = commInqSizeColl();
struct cdiPioConf *conf = cdiPioGetConf();
@@ -228,7 +194,11 @@ pioWriterAIO(void)
assert(aioQueueDepth >= 1);
xdebug("nProcsColl=%d ", nProcsColl);
bibBFiledataPA = listSetNew(destroyBFiledataPA, compareNamesBPA);
openFileFunnelsSize = 4;
openFileFunnelsFill = 0;
openFileFunnels = Malloc(sizeof(*openFileFunnels) * openFileFunnelsSize);
for (size_t i = 0; i < openFileFunnelsSize; ++i) openFileFunnels[i].name = NULL;
bool *sentFinalize = Calloc((size_t) nProcsColl, sizeof(sentFinalize[0]));
for (;;)
@@ -261,17 +231,31 @@ pioWriterAIO(void)
xdebug("command %s, filename=%s, amount=%zd", cdiPioCmdStrTab[rtag.command], filename, amount);
if (!(bfd = listSetGet(bibBFiledataPA, fileIDTest, (void *) (intptr_t) rtag.id)))
size_t prevOpenFileFunnelsSize = openFileFunnelsSize;
int fileID = rtag.id;
if (openFileFunnelsSize <= (unsigned) fileID)
{
listSetForeach(bibBFiledataPA, elemCheck, filename);
bfd = initBFiledataPA(filename, conf, nProcsColl);
int id;
if ((id = listSetAdd(bibBFiledataPA, bfd)) < 0) xabort("fileID=%d not unique", rtag.id);
bfd->fileID = id;
while (openFileFunnelsSize <= (unsigned) fileID) openFileFunnelsSize *= 2;
if (openFileFunnelsSize > (unsigned) INT_MAX + 1) openFileFunnelsSize = (unsigned) INT_MAX + 1;
openFileFunnels = Realloc(openFileFunnels, sizeof(*openFileFunnels) * openFileFunnelsSize);
for (size_t i = prevOpenFileFunnelsSize; i < openFileFunnelsSize; ++i) openFileFunnels[i].name = NULL;
}
struct fileFunnelAIO *bfd = openFileFunnels + fileID;
if (!bfd->name)
{
for (size_t i = 0; i < prevOpenFileFunnelsSize; ++i)
if (openFileFunnels[i].name && !strcmp(openFileFunnels[i].name, filename))
xabort("filename %s is already open!"
" CDI-PIO does not support concurrent access"
" through different filehandles.",
filename);
initBFiledataPA(bfd, filename, conf, nProcsColl);
++openFileFunnelsFill;
}
else if (strcmp(filename, bfd->name) != 0)
xabort("filename is not consistent, fileID=%d", rtag.id);
xabort("filename is not consistent, fileID=%d,"
" \"%s\" vs. \"%s\"",
fileID, filename, bfd->name);
size_t currOpIndex = ((size_t) bfd->currOpIndex + 1) % aioQueueDepth;
bfd->currOpIndex = (unsigned short) currOpIndex;
@@ -285,10 +269,11 @@ pioWriterAIO(void)
case IO_Send_buffer:
{
if (!(bfd = listSetGet(bibBFiledataPA, fileIDTest, (void *) (intptr_t) rtag.id)))
xabort("fileID=%d is not in set", rtag.id);
int fileID = rtag.id;
assert(fileID >= 0 && (size_t) fileID < openFileFunnelsSize && openFileFunnels[fileID].name);
struct fileFunnelAIO *bfd = openFileFunnels + fileID;
xdebug("command: %s, id=%d, name=%s", cdiPioCmdStrTab[rtag.command], rtag.id, bfd->name);
xdebug("command: %s, id=%d, name=%s", cdiPioCmdStrTab[rtag.command], fileID, bfd->name);
size_t currOpIndex = ((size_t) bfd->currOpIndex + 1) % aioQueueDepth;
bfd->currOpIndex = (unsigned short) currOpIndex;
@@ -301,10 +286,11 @@ pioWriterAIO(void)
case IO_Close_file:
{
if (!(bfd = listSetGet(bibBFiledataPA, fileIDTest, (void *) (intptr_t) rtag.id)))
xabort("fileID=%d is not in set", rtag.id);
int fileID = rtag.id;
assert(fileID >= 0 && (size_t) fileID < openFileFunnelsSize && openFileFunnels[fileID].name);
struct fileFunnelAIO *bfd = openFileFunnels + fileID;
xdebug(" command %s, id=%d, name=%s", cdiPioCmdStrTab[rtag.command], rtag.id, bfd->name);
xdebug(" command %s, id=%d, name=%s", cdiPioCmdStrTab[rtag.command], fileID, bfd->name);
size_t currOpIndex = ((size_t) bfd->currOpIndex + 1) % aioQueueDepth;
bfd->currOpIndex = (unsigned short) currOpIndex;
@@ -314,10 +300,11 @@ pioWriterAIO(void)
writePA(bfd, (size_t) messagesize, conf);
if (!--(bfd->activeCollectors))
if (!--bfd->activeCollectors)
{
xdebug("all are finished with file %d, delete entry", rtag.id);
listSetRemove(bibBFiledataPA, fileIDTest, (void *) (intptr_t) rtag.id);
xdebug("all are finished with file %d, delete entry", fileID);
destroyBFiledataPA(bfd, conf);
--openFileFunnelsFill;
}
}
break;
@@ -325,20 +312,20 @@ pioWriterAIO(void)
{
int buffer, collID;
xmpi(MPI_Recv(&buffer, 1, MPI_INT, source, tag, commPio, &status));
xmpiStat(MPI_Recv(&buffer, 1, MPI_INT, source, tag, commPio, &status), &status);
sentFinalize[source] = true;
bool doFinalize = true;
for (collID = 0; collID < nProcsColl; collID++) doFinalize &= sentFinalize[collID];
if (doFinalize)
{
if (!listSetIsEmpty(bibBFiledataPA))
if (openFileFunnelsFill)
xabort("Set bibBfiledataP is not empty.");
else
{
xdebug("%s", "all files are finished, destroy set,"
" return");
listSetDelete(bibBFiledataPA);
}
Free(openFileFunnels);
Free(sentFinalize);
return;
}
Loading