Skip to content
Snippets Groups Projects

Consolidation with CDI-PIO (develop)

Merged Sergey Kosukhin requested to merge m300488/develop-rebase into develop
1 file
+ 35
34
Compare changes
  • Side-by-side
  • Inline
+ 35
34
@@ -32,7 +32,6 @@ typedef struct
struct aiocb *ctrlBlks;
off_t offset;
int currOpIndex;
int nextOpIndex;
int prefIndex;
int activeCollectors;
int handle, fileID;
@@ -73,7 +72,7 @@ initBFiledataPA(char *filename, size_t bs, int nc)
bfd->ctrlBlks[i].aio_sigevent.sigev_notify = SIGEV_NONE;
}
bfd->nextOpIndex = 0;
bfd->currOpIndex = nPrefStreams - 1;
bfd->prefIndex = 0;
bfd->offset = 0;
bfd->activeCollectors = nc;
@@ -93,7 +92,8 @@ destroyBFiledataPA(void *v)
int iret = 0;
ssize_t ssiret;
size_t prefIndex = (size_t) bfd->prefIndex;
size_t nextFinishOp = ((size_t) bfd->nextOpIndex - prefIndex + (size_t) nPrefStreams) % (size_t) nPrefStreams;
size_t nextOpIndex = ((size_t) bfd->currOpIndex + 1) % (size_t) nPrefStreams;
size_t nextFinishOp = (nextOpIndex - prefIndex + (size_t) nPrefStreams) % (size_t) nPrefStreams;
xdebug("filename=%s, cleanup and close file", bfd->name);
@@ -148,7 +148,6 @@ static void
writePA(bFiledataPA *bfd, size_t amount)
{
const struct aiocb *ccBP;
ssize_t iret;
xdebug("file %s, in", bfd->name);
@@ -159,7 +158,7 @@ writePA(bFiledataPA *bfd, size_t amount)
xdebug(" before aio_write(), file %s, aio_nbytes=%zu, aio_offset=%zu", bfd->name, bfd->ctrlBlks[currOpIndex].aio_nbytes,
bfd->ctrlBlks[currOpIndex].aio_offset);
iret = aio_write(bfd->ctrlBlks + currOpIndex);
ssize_t iret = aio_write(bfd->ctrlBlks + currOpIndex);
xdebug("after aio_write(), file %s, aio_nbytes=%zu, aio_offset=%zu,"
"iret=aio_write()=%d",
@@ -177,7 +176,8 @@ writePA(bFiledataPA *bfd, size_t amount)
if (bfd->prefIndex >= nPrefStreams)
{
ccBP = bfd->ctrlBlks + bfd->nextOpIndex;
size_t nextOpIndex = ((size_t) bfd->currOpIndex + 1) % (size_t) nPrefStreams;
ccBP = bfd->ctrlBlks + nextOpIndex;
do
{
iret = aio_suspend(&ccBP, 1, NULL);
@@ -185,7 +185,7 @@ writePA(bFiledataPA *bfd, size_t amount)
}
while (iret != 0);
if ((iret = aio_return(bfd->ctrlBlks + bfd->nextOpIndex)) == -1) xabort("aio_return () failed");
if ((iret = aio_return(bfd->ctrlBlks + nextOpIndex)) == -1) xabort("aio_return () failed");
bfd->prefIndex--;
}
@@ -264,10 +264,10 @@ pioWriterAIO(void)
else if (strcmp(filename, bfd->name) != 0)
xabort("filename is not consistent, fileID=%d", rtag.id);
bfd->currOpIndex = bfd->nextOpIndex;
bfd->nextOpIndex = (bfd->nextOpIndex + 1) % nPrefStreams;
size_t currOpIndex = ((size_t) bfd->currOpIndex + 1) % (size_t) nPrefStreams;
bfd->currOpIndex = (int) currOpIndex;
memcpy((void *) bfd->ctrlBlks[bfd->currOpIndex].aio_buf, pMB, (size_t) amount);
memcpy((void *) bfd->ctrlBlks[currOpIndex].aio_buf, pMB, (size_t) amount);
writePA(bfd, amount);
@@ -276,41 +276,42 @@ pioWriterAIO(void)
break;
case IO_Send_buffer:
{
if (!(bfd = listSetGet(bibBFiledataPA, fileIDTest, (void *) (intptr_t) rtag.id)))
xabort("fileID=%d is not in set", rtag.id);
if (!(bfd = listSetGet(bibBFiledataPA, fileIDTest, (void *) (intptr_t) rtag.id)))
xabort("fileID=%d is not in set", rtag.id);
xdebug("command: %s, id=%d, name=%s", cdiPioCmdStrTab[rtag.command], rtag.id, bfd->name);
xdebug("command: %s, id=%d, name=%s", cdiPioCmdStrTab[rtag.command], rtag.id, bfd->name);
bfd->currOpIndex = bfd->nextOpIndex;
bfd->nextOpIndex = (bfd->nextOpIndex + 1) % nPrefStreams;
size_t currOpIndex = ((size_t) bfd->currOpIndex + 1) % (size_t) nPrefStreams;
bfd->currOpIndex = (int) currOpIndex;
xmpi(MPI_Recv((void *) bfd->ctrlBlks[bfd->currOpIndex].aio_buf, messagesize, MPI_UNSIGNED_CHAR, source, tag, commPio,
&status));
writePA(bfd, (size_t) messagesize);
xmpi(MPI_Recv((void *) bfd->ctrlBlks[currOpIndex].aio_buf, messagesize, MPI_UNSIGNED_CHAR, source, tag, commPio,
&status));
writePA(bfd, (size_t) messagesize);
}
break;
case IO_Close_file:
{
if (!(bfd = listSetGet(bibBFiledataPA, fileIDTest, (void *) (intptr_t) rtag.id)))
xabort("fileID=%d is not in set", rtag.id);
if (!(bfd = listSetGet(bibBFiledataPA, fileIDTest, (void *) (intptr_t) rtag.id)))
xabort("fileID=%d is not in set", rtag.id);
xdebug(" command %s, id=%d, name=%s", cdiPioCmdStrTab[rtag.command], rtag.id, bfd->name);
bfd->currOpIndex = bfd->nextOpIndex;
xdebug(" command %s, id=%d, name=%s", cdiPioCmdStrTab[rtag.command], rtag.id, bfd->name);
bfd->nextOpIndex = (bfd->nextOpIndex + 1) % nPrefStreams;
size_t currOpIndex = ((size_t) bfd->currOpIndex + 1) % (size_t) nPrefStreams;
bfd->currOpIndex = (int) currOpIndex;
xmpi(MPI_Recv((void *) bfd->ctrlBlks[bfd->currOpIndex].aio_buf, messagesize, MPI_UNSIGNED_CHAR, source, tag, commPio,
&status));
xmpi(MPI_Recv((void *) bfd->ctrlBlks[currOpIndex].aio_buf, messagesize, MPI_UNSIGNED_CHAR, source, tag, commPio,
&status));
writePA(bfd, (size_t) messagesize);
writePA(bfd, (size_t) messagesize);
if (!--(bfd->activeCollectors))
{
xdebug("all are finished with file %d, delete entry", rtag.id);
listSetRemove(bibBFiledataPA, fileIDTest, (void *) (intptr_t) rtag.id);
}
if (!--(bfd->activeCollectors))
{
xdebug("all are finished with file %d, delete entry", rtag.id);
listSetRemove(bibBFiledataPA, fileIDTest, (void *) (intptr_t) rtag.id);
}
}
break;
case IO_Finalize:
{
Loading