Skip to content
Snippets Groups Projects
Commit f552abc0 authored by Thomas Jahns's avatar Thomas Jahns :cartwheel: Committed by Sergey Kosukhin
Browse files

Improve guard handling of file IDs.

parent e2395682
No related branches found
No related tags found
2 merge requests!34Version 2.2.0,!13Consolidation with CDI-PIO (develop)
......@@ -314,7 +314,7 @@ initPOSIXFPGUARDSENDRECV(void)
/***************************************************************/
/* the rank running the message loop below responds to queries of the form
* long query[3] = { file_id, operation code, amount }
* long query[3] = { fileID, operation code, amount }
* where the answer depends on the operation code:
* IO_Open_file, IO_Set_fp or IO_Close_file:
* long answer = offset to write to
......@@ -326,51 +326,23 @@ initPOSIXFPGUARDSENDRECV(void)
struct sharedFP
{
long offset;
int fileID;
int unfinished;
};
static int
lookupSharedFP(void *a, void *fileID)
{
return ((struct sharedFP *) a)->fileID == (int) (intptr_t) fileID;
}
static bool
compareSharedFPFileID(void *a, void *b)
{
return ((struct sharedFP *) a)->fileID == ((struct sharedFP *) b)->fileID;
}
static struct sharedFP *
newSharedFP(int fileID, int nProcsColl)
static inline void
initSharedFP(struct sharedFP *bfd, int nProcsColl)
{
struct sharedFP *bfd = Malloc(sizeof(*bfd));
bfd->offset = 0;
bfd->fileID = fileID;
bfd->unfinished = (int) nProcsColl;
return bfd;
}
static int
deleteSharedFP(void *v)
{
int iret = 0;
struct sharedFP *bfd = (struct sharedFP *) v;
Free(bfd);
return iret;
bfd->unfinished = nProcsColl;
}
static void
fpgPOSIXFPGUARDSENDRECV(void)
{
int source, iret;
int source;
MPI_Status status;
struct sharedFP *bfd;
listSet *bibBFiledataPF;
struct sharedFP *restrict sharedFPs = NULL;
size_t sharedFPsSize = 0, sharedFPsFill = 0;
MPI_Comm commPio = commInqCommPio();
size_t nProcsColl = (size_t) (commInqSizeColl()), sentFinalize = nProcsColl;
long(*msgWords)[msgNumWords] = Malloc(sizeof(*msgWords) * nProcsColl);
......@@ -378,64 +350,59 @@ fpgPOSIXFPGUARDSENDRECV(void)
xdebug("ncollectors=%zu", nProcsColl);
/* create empty set of files identified by id */
bibBFiledataPF = listSetNew(deleteSharedFP, compareSharedFPFileID);
for (size_t i = 0; i < nProcsColl; ++i)
xmpi(MPI_Irecv(msgWords[i], msgNumWords, MPI_LONG, (int) i, collGuardTag, commPio, msgReq + i));
for (;;)
{
xmpiStat(MPI_Waitany((int) nProcsColl, msgReq, &source, &status), &status);
int file_id = (int) msgWords[source][0];
int fileID = (int) msgWords[source][0];
assert(fileID >= 0);
int opcode = (int) msgWords[source][1];
long amount = msgWords[source][2];
/* re-instate listening */
if (opcode != IO_Finalize) xmpi(MPI_Irecv(msgWords[source], 3, MPI_LONG, source, collGuardTag, commPio, msgReq + source));
xdebug("receive message from source=%d, id=%d, command=%d (%s)", source, file_id, opcode, cdiPioCmdStrTab[opcode]);
xdebug("receive message from source=%d, id=%d, command=%d (%s)", source, fileID, opcode, cdiPioCmdStrTab[opcode]);
if (opcode >= IO_Open_file && opcode < IO_Finalize)
{
if (opcode == IO_Open_file)
{
if (!(bfd = listSetGet(bibBFiledataPF, lookupSharedFP, (void *) (intptr_t) file_id)))
if (sharedFPsSize <= (size_t) fileID)
{
bfd = newSharedFP(file_id, (int) nProcsColl);
if ((iret = listSetAdd(bibBFiledataPF, bfd)) < 0) xabort("fileID=%d not unique", file_id);
bfd->fileID = iret;
size_t oldSize = sharedFPsSize;
sharedFPsSize = sharedFPsSize ? sharedFPsSize * 2 : 2;
sharedFPs = Realloc(sharedFPs, sharedFPsSize * sizeof(*sharedFPs));
for (size_t i = oldSize; i < sharedFPsSize; ++i) sharedFPs[i].offset = -1;
}
if (sharedFPs[fileID].offset < 0)
{
initSharedFP(sharedFPs + fileID, (int) nProcsColl);
++sharedFPsFill;
}
}
else
{
if (!(bfd = listSetGet(bibBFiledataPF, lookupSharedFP, (void *) (intptr_t) file_id)))
xabort("fileId=%d not in set", file_id);
}
xdebug("id=%d, command=%d (%s), send offset=%ld", file_id, opcode, cdiPioCmdStrTab[opcode], bfd->offset);
xdebug("id=%d, command=%d (%s), send offset=%lld", fileID, opcode, cdiPioCmdStrTab[opcode],
(long long) sharedFPs[fileID].offset);
int unfinished = bfd->unfinished;
if (opcode == IO_Close_file && unfinished == 1) bfd->offset = -bfd->offset - 1;
xmpi(MPI_Send(&bfd->offset, 1, MPI_LONG, source, collGuardTag, commPio));
int unfinished = sharedFPs[fileID].unfinished;
if (opcode == IO_Close_file && unfinished == 1) sharedFPs[fileID].offset = -sharedFPs[fileID].offset - 1;
xmpi(MPI_Send(&sharedFPs[fileID].offset, 1, MPI_LONG, source, collGuardTag, commPio));
xdebug("id=%d, command=%d (%s), recv amount=%ld, set offset=%ld", file_id, opcode, cdiPioCmdStrTab[opcode], amount,
bfd->offset);
xdebug("id=%d, command=%d (%s), recv amount=%ld, set offset=%ld", fileID, opcode, cdiPioCmdStrTab[opcode], amount,
sharedFPs[fileID].offset);
if (opcode == IO_Close_file && !(bfd->unfinished = --unfinished))
listSetRemove(bibBFiledataPF, lookupSharedFP, (void *) (intptr_t) file_id);
if (opcode == IO_Close_file && !(sharedFPs[fileID].unfinished = --unfinished))
--sharedFPsFill;
else
bfd->offset += amount;
sharedFPs[fileID].offset += amount;
}
else if (opcode == IO_Finalize)
{
if (!--sentFinalize)
{
if (!listSetIsEmpty(bibBFiledataPF))
xabort("set bibBFiledataM not empty");
else
{
xdebug("%s", "destroy set");
listSetDelete(bibBFiledataPF);
}
if (sharedFPsFill) xabort("still files open");
Free(sharedFPs);
Free(msgReq);
Free(msgWords);
return;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment