Skip to content
Snippets Groups Projects

Consolidation with CDI-PIO (develop)

Merged Sergey Kosukhin requested to merge m300488/develop-rebase into develop
1 file
+ 23
23
Compare changes
  • Side-by-side
  • Inline
+ 23
23
@@ -18,7 +18,6 @@
typedef struct
{
struct dBuffer *fb;
FILE *fp;
int fileID;
int activeCollectors;
@@ -46,7 +45,6 @@ initBFiledataP(char *filename, size_t bs, int nc, int fileID)
if ((bfp->fp = fopen(filename, "w")) == NULL) xabort("Failed to open %s", bfp->name);
int fd = fileno(bfp->fp);
ftruncate(fd, (off_t) 0);
dbuffer_init(&bfp->fb, bs);
bfp->activeCollectors = nc;
@@ -72,8 +70,6 @@ destroyBFiledataP(void *v)
/* file closed, cleanup */
dbuffer_cleanup(&(bfp->fb));
Free(bfp);
xdebug("%s", "cleaned up, return");
@@ -94,13 +90,13 @@ compareNamesBP(void *v1, void *v2)
/***************************************************************/
static void
writeP(bFiledataP *bfd, size_t amount)
writeP(bFiledataP *bfd, void *buffer, size_t amount)
{
size_t written;
xdebug("filename=%s, amount=%ld, in", bfd->name, amount);
if ((written = fwrite(bfd->fb->buffer, 1, amount, bfd->fp)) != amount) xabort("did not succeed writing buffer in %s", bfd->name);
if ((written = fwrite(buffer, 1, amount, bfd->fp)) != amount) xabort("did not succeed writing buffer to %s", bfd->name);
xdebug("filename=%s, written=%ld, amount=%ld, return", bfd->name, written, amount);
}
@@ -120,7 +116,7 @@ pioWriterStdIO(void)
{
bFiledataP *bfd;
listSet *bibBFiledataP;
int messagesize, source, tag, id;
int messageSize, source, tag, id;
struct fileOpTag rtag;
MPI_Status status;
MPI_Comm commPio = commInqCommPio();
@@ -131,6 +127,8 @@ pioWriterStdIO(void)
bibBFiledataP = listSetNew(destroyBFiledataP, compareNamesBP);
sentFinalize = Calloc((size_t) nProcsColl, sizeof(sentFinalize[0]));
char *messageBuffer = NULL;
size_t messageBufferSize = 0;
for (;;)
{
@@ -142,19 +140,24 @@ pioWriterStdIO(void)
rtag = decodeFileOpTag(tag);
xmpi(MPI_Get_count(&status, MPI_UNSIGNED_CHAR, &messagesize));
xmpi(MPI_Get_count(&status, MPI_UNSIGNED_CHAR, &messageSize));
xdebug("RECEIVE MESSAGE FROM SOURCE=%d, ID=%d, COMMAND=%d ( %s ),"
"MESSAGESIZE=%d",
source, rtag.id, rtag.command, cdiPioCmdStrTab[rtag.command], messagesize);
source, rtag.id, rtag.command, cdiPioCmdStrTab[rtag.command], messageSize);
if ((size_t) messageSize > messageBufferSize)
{
messageBuffer = Realloc(messageBuffer, (size_t) messageSize);
messageBufferSize = (size_t) messageSize;
}
switch (rtag.command)
{
case IO_Open_file:
{
char *messageBuffer = Malloc((size_t) messagesize * sizeof(messageBuffer[0]));
char *pMB = messageBuffer;
xmpi(MPI_Recv(messageBuffer, messagesize, MPI_UNSIGNED_CHAR, source, tag, commPio, &status));
xmpi(MPI_Recv(messageBuffer, messageSize, MPI_UNSIGNED_CHAR, source, tag, commPio, &status));
xdebug("%s", "after recv, in loop");
@@ -163,7 +166,7 @@ pioWriterStdIO(void)
char *temp = pMB;
long buffersize = strtol(temp, NULL, 16);
pMB += (strlen(temp) + 1);
size_t amount = (size_t) (messageBuffer + messagesize - pMB);
size_t amount = (size_t) (messageBuffer + messageSize - pMB);
xdebug("command %s, filename=%s, buffersize=%ld, amount=%zu", cdiPioCmdStrTab[rtag.command], filename, buffersize,
amount);
@@ -177,11 +180,7 @@ pioWriterStdIO(void)
}
else if (strcmp(filename, bfd->name) != 0)
xabort("filename is not consistent, fileID=%d", rtag.id);
memcpy(bfd->fb->buffer, pMB, amount);
writeP(bfd, amount);
Free(messageBuffer);
writeP(bfd, pMB, amount);
}
break;
@@ -189,13 +188,13 @@ pioWriterStdIO(void)
{
if (!(bfd = listSetGet(bibBFiledataP, fileIDTest, (void *) (intptr_t) rtag.id)))
xabort("fileID=%d is not in set", rtag.id);
size_t amount = (size_t) messagesize;
size_t amount = (size_t) messageSize;
xdebug("COMMAND %s, ID=%d, NAME=%s", cdiPioCmdStrTab[rtag.command], rtag.id, bfd->name);
xmpi(MPI_Recv(bfd->fb->buffer, messagesize, MPI_UNSIGNED_CHAR, source, tag, commPio, &status));
xmpi(MPI_Recv(messageBuffer, messageSize, MPI_UNSIGNED_CHAR, source, tag, commPio, &status));
writeP(bfd, amount);
writeP(bfd, messageBuffer, amount);
}
break;
@@ -205,12 +204,12 @@ pioWriterStdIO(void)
if (!(bfd = listSetGet(bibBFiledataP, fileIDTest, (void *) (intptr_t) rtag.id)))
xabort("fileID=%d is not in set", rtag.id);
size_t amount = (size_t) messagesize;
size_t amount = (size_t) messageSize;
xdebug("COMMAND %s, ID=%d, NAME=%s, AMOUNT=%zu", cdiPioCmdStrTab[rtag.command], rtag.id, bfd->name, amount);
xmpi(MPI_Recv(bfd->fb->buffer, messagesize, MPI_UNSIGNED_CHAR, source, tag, commPio, &status));
xmpi(MPI_Recv(messageBuffer, messageSize, MPI_UNSIGNED_CHAR, source, tag, commPio, &status));
writeP(bfd, amount);
writeP(bfd, messageBuffer, amount);
if (!--(bfd->activeCollectors))
{
@@ -246,6 +245,7 @@ pioWriterStdIO(void)
listSetDelete(bibBFiledataP);
}
Free(sentFinalize);
Free(messageBuffer);
return;
}
}
Loading