Skip to content
Snippets Groups Projects

Consolidation with CDI-PIO (develop)

Merged Sergey Kosukhin requested to merge m300488/develop-rebase into develop
2 files
+ 315
310
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 155
152
@@ -355,7 +355,6 @@ pioWriterAIO(void)
const struct cdiPioConf *conf = cdiPioGetConf();
unsigned aioQueueDepth = conf->aioQueueDepth;
size_t bufSize = conf->writeAggBufLim;
MPI_Status status;
assert(aioQueueDepth >= 1);
xdebug("nProcsColl=%d ", nProcsColl);
@@ -377,10 +376,10 @@ pioWriterAIO(void)
* concurrently in flight: open/sync/close/shutdown requests and
* data record aggregates for each file */
MPI_Request(*req)[nProcsColl] = Malloc(sizeof(*req) * (numMetaOp + openFileFunnelsSize));
MPI_Status *sync_stat = Malloc(sizeof(*sync_stat) * (size_t) nProcsColl);
size_t numOpenSends = 0;
size_t maxOpenFileIDp1 = 0;
size_t numOpenRequests = (size_t) nProcsColl * (numMetaOp + maxOpenFileIDp1);
MPI_Status *statui = Malloc(sizeof(*statui) * (size_t) nProcsColl * (numMetaOp + openFileFunnelsSize)),
*wstats = Malloc(sizeof(*wstats) * (size_t) nProcsColl);
int numCompleted, *completed = Malloc(sizeof(*completed) * (size_t) nProcsColl * (numMetaOp + openFileFunnelsSize));
size_t numOpenSends = 0, maxOpenFileIDp1 = 0, numOpenRequests = (size_t) nProcsColl * (numMetaOp + maxOpenFileIDp1);
for (size_t i = 0; i < (size_t) nProcsColl; ++i)
{
xmpi(MPI_Irecv(openReqBuffer + i * openMsgSize, (int) openMsgSize, MPI_PACKED, (int) i, IO_Open_file, commPio,
@@ -393,168 +392,172 @@ pioWriterAIO(void)
for (;;)
{
int rcvIdx;
assert(numOpenRequests < INT_MAX);
/* if still unsent, include sync sends waiting to complete */
xmpiStat(MPI_Waitany((int) numOpenRequests, *req, &rcvIdx, &status), &status);
int source = rcvIdx % nProcsColl;
int command;
switch (rcvIdx / nProcsColl)
assert(numOpenRequests <= INT_MAX);
xmpiStats(MPI_Waitsome((int) numOpenRequests, *req, &numCompleted, completed, statui), numCompleted, statui);
for (size_t cmpltIdx = 0; cmpltIdx < (size_t) numCompleted; ++cmpltIdx)
{
case MOOpenRx: command = IO_Open_file; break;
case MOSyncRx: command = syncMsgs[source].command; break;
case MOSyncTx: --numOpenSends; continue;
default: command = IO_Send_buffer;
}
int rcvIdx = completed[cmpltIdx];
int source = rcvIdx % nProcsColl;
int command;
switch (rcvIdx / nProcsColl)
{
case MOOpenRx: command = IO_Open_file; break;
case MOSyncRx: command = syncMsgs[source].command; break;
case MOSyncTx: --numOpenSends; continue;
default: command = IO_Send_buffer;
}
xdebug("receive message from source=%d, command=%d (%s)", source, command, cdiPioCmdStrTab[command]);
xdebug("receive message from source=%d, command=%d (%s)", source, command, cdiPioCmdStrTab[command]);
switch (command)
{
case IO_Open_file:
{
int openMsgHdr[2];
int position = 0;
xmpi(MPI_Unpack(openReqBuffer + (size_t) source * openMsgSize, (int) openMsgSize, &position, openMsgHdr, 2, MPI_INT,
commPio));
size_t len = (size_t) openMsgHdr[1];
assert(len <= maxPathLen);
xmpi(MPI_Unpack(openReqBuffer + (size_t) source * openMsgSize, (int) openMsgSize, &position, filename, openMsgHdr[1],
MPI_CHAR, commPio));
filename[len] = '\0';
size_t buffersize = conf->writeAggBufLim;
xdebug("command %s, filename=%s, buffersize=%zu", cdiPioCmdStrTab[command], filename, buffersize);
int fileID = openMsgHdr[0];
size_t prevOpenFileFunnelsSize = openFileFunnelsSize;
if (openFileFunnelsSize <= (unsigned) fileID)
switch (command)
{
case IO_Open_file:
{
while (openFileFunnelsSize <= (unsigned) fileID) openFileFunnelsSize *= 2;
if (openFileFunnelsSize > (unsigned) INT_MAX + 1) openFileFunnelsSize = (unsigned) INT_MAX + 1;
openFileFunnels = Realloc(openFileFunnels, sizeof(*openFileFunnels) * openFileFunnelsSize);
for (size_t i = prevOpenFileFunnelsSize; i < openFileFunnelsSize; ++i) openFileFunnels[i].name = NULL;
req = Realloc(req, sizeof(**req) * (openFileFunnelsSize + numMetaOp));
for (size_t j = prevOpenFileFunnelsSize; j < openFileFunnelsSize; ++j)
for (size_t i = 0; i < (size_t) nProcsColl; ++i) req[numMetaOp + j][i] = MPI_REQUEST_NULL;
maxOpenFileIDp1 = (size_t) fileID + 1;
int openMsgHdr[2];
int position = 0;
unsigned char *reqBuffer = openReqBuffer + (size_t) source * openMsgSize;
xmpi(MPI_Unpack(reqBuffer, (int) openMsgSize, &position, openMsgHdr, 2, MPI_INT, commPio));
size_t len = (size_t) openMsgHdr[1];
assert(len <= maxPathLen);
xmpi(MPI_Unpack(reqBuffer, (int) openMsgSize, &position, filename, openMsgHdr[1], MPI_CHAR, commPio));
filename[len] = '\0';
size_t buffersize = conf->writeAggBufLim;
xdebug("command %s, filename=%s, buffersize=%zu", cdiPioCmdStrTab[command], filename, buffersize);
int fileID = openMsgHdr[0];
size_t prevOpenFileFunnelsSize = openFileFunnelsSize;
if (openFileFunnelsSize <= (unsigned) fileID)
{
while (openFileFunnelsSize <= (unsigned) fileID) openFileFunnelsSize *= 2;
if (openFileFunnelsSize > (unsigned) INT_MAX + 1) openFileFunnelsSize = (unsigned) INT_MAX + 1;
openFileFunnels = Realloc(openFileFunnels, sizeof(*openFileFunnels) * openFileFunnelsSize);
for (size_t i = prevOpenFileFunnelsSize; i < openFileFunnelsSize; ++i) openFileFunnels[i].name = NULL;
req = Realloc(req, sizeof(*req) * (openFileFunnelsSize + numMetaOp));
statui = Realloc(statui, sizeof(*statui) * (size_t) nProcsColl * (openFileFunnelsSize + numMetaOp));
completed = Realloc(completed, sizeof(*completed) * (size_t) nProcsColl * (openFileFunnelsSize + numMetaOp));
for (size_t j = prevOpenFileFunnelsSize; j < openFileFunnelsSize; ++j)
for (size_t i = 0; i < (size_t) nProcsColl; ++i) req[numMetaOp + j][i] = MPI_REQUEST_NULL;
maxOpenFileIDp1 = (size_t) fileID + 1;
}
else if (maxOpenFileIDp1 < (size_t) fileID + 1)
maxOpenFileIDp1 = (size_t) fileID + 1;
struct fileFunnelAIO *bfd = openFileFunnels + fileID;
if (!bfd->name)
{
for (size_t i = 0; i < prevOpenFileFunnelsSize; ++i)
if (openFileFunnels[i].name && !strcmp(openFileFunnels[i].name, filename))
xabort("filename %s is already open!"
" CDI-PIO does not support concurrent access"
" through different filehandles.",
filename);
initBFiledataPA(bfd, filename, conf, (size_t) nProcsColl);
++openFileFunnelsFill;
}
else if (strcmp(filename, bfd->name) != 0)
xabort("filename is not consistent, fileID=%d,"
" \"%s\" vs. \"%s\"",
fileID, filename, bfd->name);
reinstallListenReq(bfd, fileID, source, (size_t) nProcsColl, bufSize, aioQueueDepth, req, commPio, conf);
xmpi(MPI_Irecv(reqBuffer, (int) openMsgSize, MPI_PACKED, source, IO_Open_file, commPio, req[MOOpenRx] + source));
numOpenRequests = (size_t) nProcsColl * (numMetaOp + maxOpenFileIDp1);
}
else if (maxOpenFileIDp1 < (size_t) fileID + 1)
maxOpenFileIDp1 = (size_t) fileID + 1;
struct fileFunnelAIO *bfd = openFileFunnels + fileID;
if (!bfd->name)
break;
case IO_Send_buffer:
{
for (size_t i = 0; i < prevOpenFileFunnelsSize; ++i)
if (openFileFunnels[i].name && !strcmp(openFileFunnels[i].name, filename))
xabort("filename %s is already open!"
" CDI-PIO does not support concurrent access"
" through different filehandles.",
filename);
initBFiledataPA(bfd, filename, conf, (size_t) nProcsColl);
++openFileFunnelsFill;
int fileID = rcvIdx / nProcsColl - numMetaOp;
assert(fileID >= 0 && (size_t) fileID < openFileFunnelsSize && openFileFunnels[fileID].name);
struct fileFunnelAIO *bfd = openFileFunnels + fileID;
xdebug("command: %s, fileID=%d, name=%s", cdiPioCmdStrTab[command], fileID, bfd->name);
int messagesize;
xmpi(MPI_Get_count(statui + cmpltIdx, MPI_UNSIGNED_CHAR, &messagesize));
size_t amount = (size_t) messagesize;
writePA(bfd, source, (size_t) nProcsColl, amount, conf);
if (!bfd->outstandingSync && bfd->syncOffset == bfd->offset)
sendCollSync(bfd, fileID, commPio, nProcsColl, req, &numOpenSends, wstats);
off_t *restrict perCollAmounts = bfd->perCollAmounts;
/* end of collector stream not yet reached? */
if (perCollAmounts[source] != perCollAmounts[nProcsColl + source])
reinstallListenReq(bfd, fileID, source, (size_t) nProcsColl, bufSize, aioQueueDepth, req, commPio, conf);
/* end of collector stream reached */
else if (!--bfd->activeCollectors)
funnelFileCleanup(openFileFunnels, fileID, &openFileFunnelsFill, &maxOpenFileIDp1, &numOpenRequests, commPio,
nProcsColl);
}
else if (strcmp(filename, bfd->name) != 0)
xabort("filename is not consistent, fileID=%d,"
" \"%s\" vs. \"%s\"",
fileID, filename, bfd->name);
reinstallListenReq(bfd, fileID, source, (size_t) nProcsColl, bufSize, aioQueueDepth, req, commPio, conf);
xmpi(MPI_Irecv(openReqBuffer + (size_t) source * openMsgSize, (int) openMsgSize, MPI_PACKED, source, IO_Open_file,
commPio, req[MOOpenRx] + source));
numOpenRequests = (size_t) nProcsColl * (numMetaOp + maxOpenFileIDp1);
}
break;
break;
case IO_Send_buffer:
{
int fileID = rcvIdx / nProcsColl - numMetaOp;
assert(fileID >= 0 && (size_t) fileID < openFileFunnelsSize && openFileFunnels[fileID].name);
struct fileFunnelAIO *bfd = openFileFunnels + fileID;
xdebug("command: %s, fileID=%d, name=%s", cdiPioCmdStrTab[command], fileID, bfd->name);
int messagesize;
xmpi(MPI_Get_count(&status, MPI_UNSIGNED_CHAR, &messagesize));
size_t amount = (size_t) messagesize;
writePA(bfd, source, (size_t) nProcsColl, amount, conf);
if (!bfd->outstandingSync && bfd->syncOffset == bfd->offset)
sendCollSync(bfd, fileID, commPio, nProcsColl, req, &numOpenSends, sync_stat);
off_t *restrict perCollAmounts = bfd->perCollAmounts;
/* end of collector stream not yet reached? */
if (perCollAmounts[source] != perCollAmounts[nProcsColl + source])
reinstallListenReq(bfd, fileID, source, (size_t) nProcsColl, bufSize, aioQueueDepth, req, commPio, conf);
/* end of collector stream reached */
else if (!--bfd->activeCollectors)
funnelFileCleanup(openFileFunnels, fileID, &openFileFunnelsFill, &maxOpenFileIDp1, &numOpenRequests, commPio,
nProcsColl);
}
break;
case IO_Sync_file:
{
int fileID = syncMsgs[source].fileID;
assert(fileID >= 0 && (size_t) fileID < openFileFunnelsSize && openFileFunnels[fileID].name);
struct fileFunnelAIO *bfd = openFileFunnels + fileID;
xdebug("COMMAND %s, FILE%d, SOURCE%d", cdiPioCmdStrTab[command], fileID, source);
bfd->syncOffset += syncMsgs[source].amount;
if (!--bfd->outstandingSync && bfd->syncOffset == bfd->offset)
sendCollSync(bfd, fileID, commPio, nProcsColl, req, &numOpenSends, sync_stat);
xmpi(MPI_Irecv(syncMsgs + source, 1, cdiPioSyncMsgDt, source, IO_Sync_file, commPio, req[MOSyncRx] + source));
}
break;
case IO_Sync_file:
{
int fileID = syncMsgs[source].fileID;
assert(fileID >= 0 && (size_t) fileID < openFileFunnelsSize && openFileFunnels[fileID].name);
struct fileFunnelAIO *bfd = openFileFunnels + fileID;
xdebug("COMMAND %s, FILE%d, SOURCE%d", cdiPioCmdStrTab[command], fileID, source);
bfd->syncOffset += syncMsgs[source].amount;
if (!--bfd->outstandingSync && bfd->syncOffset == bfd->offset)
sendCollSync(bfd, fileID, commPio, nProcsColl, req, &numOpenSends, wstats);
xmpi(MPI_Irecv(syncMsgs + source, 1, cdiPioSyncMsgDt, source, IO_Sync_file, commPio, req[MOSyncRx] + source));
}
break;
case IO_Close_file:
{
int fileID = syncMsgs[source].fileID;
assert(fileID >= 0 && (size_t) fileID < openFileFunnelsSize && openFileFunnels[fileID].name);
struct fileFunnelAIO *bfd = openFileFunnels + fileID;
xdebug("COMMAND %s, FILE%d, SOURCE%d", cdiPioCmdStrTab[command], fileID, source);
bfd->syncOffset += syncMsgs[source].amount;
off_t *restrict perCollAmounts = bfd->perCollAmounts;
perCollAmounts[nProcsColl + source] = syncMsgs[source].amount;
/* did this source collector already send enough data? */
if (perCollAmounts[source] == perCollAmounts[nProcsColl + source])
case IO_Close_file:
{
size_t fileReqIdx = numMetaOp + (size_t) fileID;
assert(req[fileReqIdx][source] != MPI_REQUEST_NULL);
xmpi(MPI_Cancel(req[fileReqIdx] + source));
xmpi(MPI_Request_free(req[fileReqIdx] + source));
if (!--bfd->activeCollectors)
funnelFileCleanup(openFileFunnels, fileID, &openFileFunnelsFill, &maxOpenFileIDp1, &numOpenRequests, commPio,
nProcsColl);
int fileID = syncMsgs[source].fileID;
assert(fileID >= 0 && (size_t) fileID < openFileFunnelsSize && openFileFunnels[fileID].name);
struct fileFunnelAIO *bfd = openFileFunnels + fileID;
xdebug("COMMAND %s, FILE%d, SOURCE%d", cdiPioCmdStrTab[command], fileID, source);
bfd->syncOffset += syncMsgs[source].amount;
off_t *restrict perCollAmounts = bfd->perCollAmounts;
perCollAmounts[nProcsColl + source] = syncMsgs[source].amount;
/* did this source collector already send enough data? */
if (perCollAmounts[source] == perCollAmounts[nProcsColl + source])
{
size_t fileReqIdx = numMetaOp + (size_t) fileID;
assert(req[fileReqIdx][source] != MPI_REQUEST_NULL);
xmpi(MPI_Cancel(req[fileReqIdx] + source));
xmpi(MPI_Request_free(req[fileReqIdx] + source));
if (!--bfd->activeCollectors)
funnelFileCleanup(openFileFunnels, fileID, &openFileFunnelsFill, &maxOpenFileIDp1, &numOpenRequests, commPio,
nProcsColl);
}
xmpi(MPI_Irecv(syncMsgs + source, 1, cdiPioSyncMsgDt, source, IO_Sync_file, commPio, req[MOSyncRx] + source));
}
xmpi(MPI_Irecv(syncMsgs + source, 1, cdiPioSyncMsgDt, source, IO_Sync_file, commPio, req[MOSyncRx] + source));
}
break;
break;
case IO_Finalize:
if (req[MOSyncTx][source] != MPI_REQUEST_NULL)
{
xmpiStat(MPI_Wait(req[MOSyncTx] + source, &status), &status);
--numOpenSends;
}
if (req[MOOpenRx][source] != MPI_REQUEST_NULL)
{
xmpi(MPI_Cancel(req[MOOpenRx] + source));
xmpi(MPI_Request_free(req[MOOpenRx] + source));
}
if (!--outstandingFinalizations)
{
if (openFileFunnelsFill)
xabort("some files still not closed.");
else
case IO_Finalize:
if (req[MOSyncTx][source] != MPI_REQUEST_NULL)
{
xmpiStat(MPI_Wait(req[MOSyncTx] + source, wstats), wstats);
--numOpenSends;
}
if (req[MOOpenRx][source] != MPI_REQUEST_NULL)
{
xmpi(MPI_Cancel(req[MOOpenRx] + source));
xmpi(MPI_Request_free(req[MOOpenRx] + source));
}
if (!--outstandingFinalizations)
{
xdebug("%s", "all files are finished, destroy file set,"
" return");
if (openFileFunnelsFill)
xabort("some files still not closed.");
else
{
xdebug("%s", "all files are finished, destroy file set,"
" return");
}
Free(completed);
Free(wstats);
Free(statui);
Free(req);
Free(syncMsgs);
Free(openReqBuffer);
Free(openFileFunnels);
return;
}
Free(sync_stat);
Free(req);
Free(syncMsgs);
Free(openReqBuffer);
Free(openFileFunnels);
return;
break;
default: xabort("COMMAND NOT IMPLEMENTED");
}
break;
default: xabort("COMMAND NOT IMPLEMENTED");
}
}
}
Loading