Skip to content
Snippets Groups Projects

Consolidation with CDI-PIO (develop)

Merged Sergey Kosukhin requested to merge m300488/develop-rebase into develop
1 file
+ 142
144
Compare changes
  • Side-by-side
  • Inline
+ 142
144
@@ -29,6 +29,12 @@
#include "pio_impl.h"
#include "pio_util.h"
enum
{
msgNumWords = 3,
collGuardTag = 777,
};
typedef struct
{
struct dBuffer *db1;
@@ -46,25 +52,6 @@ fileIDTestA(void *a, void *fileID)
return ((aFiledataPF *) a)->fileID == (int) (intptr_t) fileID;
}
typedef struct
{
long offset;
int fileID;
int unfinished;
} bFiledataPF;
static int
fileIDTestB(void *a, void *fileID)
{
return ((bFiledataPF *) a)->fileID == (int) (intptr_t) fileID;
}
static bool
fileIDCmpB(void *a, void *b)
{
return ((bFiledataPF *) a)->fileID == ((bFiledataPF *) b)->fileID;
}
static listSet *bibAFiledataPF;
/***************************************************************/
@@ -101,20 +88,6 @@ initAFiledataPF(const char *key, size_t bs)
return afd;
}
/***************************************************************/
static bFiledataPF *
initBFiledataPF(int fileID, int nProcsColl)
{
bFiledataPF *bfd = Malloc(sizeof(*bfd));
bfd->offset = 0;
bfd->fileID = fileID;
bfd->unfinished = (int) nProcsColl;
return bfd;
}
/***************************************************************/
static int
destroyAFiledataPF(void *v)
{
@@ -135,21 +108,6 @@ destroyAFiledataPF(void *v)
return iret;
}
/***************************************************************/
static int
destroyBFiledataPF(void *v)
{
int iret = 0;
bFiledataPF *bfd = (bFiledataPF *) v;
Free(bfd);
return iret;
}
/***************************************************************/
static bool
compareNamesAPF(void *v1, void *v2)
{
@@ -158,102 +116,6 @@ compareNamesAPF(void *v1, void *v2)
return !strcmp(afd1->name, afd2->name);
}
/***************************************************************/
/* the rank running this message loop responds to queries of the form
* long query[3] = { file_id, operation code, amount }
* where the answer depends on the operation code:
* IO_Open_file, IO_Set_fp or IO_Close_file:
* long answer = offset to write to
* IO_Finalize: no answer
*/
enum
{
msgNumWords = 3,
collGuardTag = 777,
};
static void
fpgPOSIXFPGUARDSENDRECV(void)
{
int source, iret;
MPI_Status status;
bFiledataPF *bfd;
listSet *bibBFiledataPF;
MPI_Comm commPio = commInqCommPio();
size_t nProcsColl = (size_t) (commInqSizeColl()), sentFinalize = nProcsColl;
long(*msgWords)[msgNumWords] = Malloc(sizeof(*msgWords) * nProcsColl);
MPI_Request *msgReq = Malloc(sizeof(*msgReq) * nProcsColl);
xdebug("ncollectors=%zu", nProcsColl);
/* create empty set of files identified by id */
bibBFiledataPF = listSetNew(destroyBFiledataPF, fileIDCmpB);
for (size_t i = 0; i < nProcsColl; ++i)
xmpi(MPI_Irecv(msgWords[i], msgNumWords, MPI_LONG, (int) i, collGuardTag, commPio, msgReq + i));
for (;;)
{
xmpiStat(MPI_Waitany((int) nProcsColl, msgReq, &source, &status), &status);
int file_id = (int) msgWords[source][0];
int opcode = (int) msgWords[source][1];
long amount = msgWords[source][2];
/* re-instate listening */
if (opcode != IO_Finalize) xmpi(MPI_Irecv(msgWords[source], 3, MPI_LONG, source, collGuardTag, commPio, msgReq + source));
xdebug("receive message from source=%d, id=%d, command=%d (%s)", source, file_id, opcode, cdiPioCmdStrTab[opcode]);
if (opcode >= IO_Open_file && opcode < IO_Finalize)
{
if (opcode == IO_Open_file)
{
if (!(bfd = listSetGet(bibBFiledataPF, fileIDTestB, (void *) (intptr_t) file_id)))
{
bfd = initBFiledataPF(file_id, (int) nProcsColl);
if ((iret = listSetAdd(bibBFiledataPF, bfd)) < 0) xabort("fileID=%d not unique", file_id);
bfd->fileID = iret;
}
}
else
{
if (!(bfd = listSetGet(bibBFiledataPF, fileIDTestB, (void *) (intptr_t) file_id)))
xabort("fileId=%d not in set", file_id);
}
xdebug("id=%d, command=%d (%s), send offset=%ld", file_id, opcode, cdiPioCmdStrTab[opcode], bfd->offset);
int unfinished = bfd->unfinished;
if (opcode == IO_Close_file && unfinished == 1) bfd->offset = -bfd->offset - 1;
xmpi(MPI_Send(&bfd->offset, 1, MPI_LONG, source, collGuardTag, commPio));
xdebug("id=%d, command=%d (%s), recv amount=%ld, set offset=%ld", file_id, opcode, cdiPioCmdStrTab[opcode], amount,
bfd->offset);
if (opcode == IO_Close_file && !(bfd->unfinished = --unfinished))
listSetRemove(bibBFiledataPF, fileIDTestB, (void *) (intptr_t) file_id);
else
bfd->offset += amount;
}
else if (opcode == IO_Finalize)
{
if (!--sentFinalize)
{
if (!listSetIsEmpty(bibBFiledataPF))
xabort("set bibBFiledataM not empty");
else
{
xdebug("%s", "destroy set");
listSetDelete(bibBFiledataPF);
}
Free(msgReq);
Free(msgWords);
return;
}
}
else
xabort("COMMAND NOT IMPLEMENTED");
}
}
//*******************************************************
#ifndef HAVE_PWRITE
static ssize_t
@@ -456,6 +318,8 @@ finalizePOSIXFPGUARDSENDRECV(void)
/***************************************************************/
static void fpgPOSIXFPGUARDSENDRECV(void);
void
initPOSIXFPGUARDSENDRECV(void)
{
@@ -475,6 +339,140 @@ initPOSIXFPGUARDSENDRECV(void)
}
}
/***************************************************************/
/* the rank running this message loop responds to queries of the form
* long query[3] = { file_id, operation code, amount }
* where the answer depends on the operation code:
* IO_Open_file, IO_Set_fp or IO_Close_file:
* long answer = offset to write to
* IO_Finalize: no answer
*
* In essence this is an implementation of a shared file pointer.
*/
struct sharedFP
{
long offset;
int fileID;
int unfinished;
};
static int
lookupSharedFP(void *a, void *fileID)
{
return ((struct sharedFP *) a)->fileID == (int) (intptr_t) fileID;
}
static bool
compareSharedFPFileID(void *a, void *b)
{
return ((struct sharedFP *) a)->fileID == ((struct sharedFP *) b)->fileID;
}
static struct sharedFP *
newSharedFP(int fileID, int nProcsColl)
{
struct sharedFP *bfd = Malloc(sizeof(*bfd));
bfd->offset = 0;
bfd->fileID = fileID;
bfd->unfinished = (int) nProcsColl;
return bfd;
}
static int
deleteSharedFP(void *v)
{
int iret = 0;
struct sharedFP *bfd = (struct sharedFP *) v;
Free(bfd);
return iret;
}
static void
fpgPOSIXFPGUARDSENDRECV(void)
{
int source, iret;
MPI_Status status;
struct sharedFP *bfd;
listSet *bibBFiledataPF;
MPI_Comm commPio = commInqCommPio();
size_t nProcsColl = (size_t) (commInqSizeColl()), sentFinalize = nProcsColl;
long(*msgWords)[msgNumWords] = Malloc(sizeof(*msgWords) * nProcsColl);
MPI_Request *msgReq = Malloc(sizeof(*msgReq) * nProcsColl);
xdebug("ncollectors=%zu", nProcsColl);
/* create empty set of files identified by id */
bibBFiledataPF = listSetNew(deleteSharedFP, compareSharedFPFileID);
for (size_t i = 0; i < nProcsColl; ++i)
xmpi(MPI_Irecv(msgWords[i], msgNumWords, MPI_LONG, (int) i, collGuardTag, commPio, msgReq + i));
for (;;)
{
xmpiStat(MPI_Waitany((int) nProcsColl, msgReq, &source, &status), &status);
int file_id = (int) msgWords[source][0];
int opcode = (int) msgWords[source][1];
long amount = msgWords[source][2];
/* re-instate listening */
if (opcode != IO_Finalize) xmpi(MPI_Irecv(msgWords[source], 3, MPI_LONG, source, collGuardTag, commPio, msgReq + source));
xdebug("receive message from source=%d, id=%d, command=%d (%s)", source, file_id, opcode, cdiPioCmdStrTab[opcode]);
if (opcode >= IO_Open_file && opcode < IO_Finalize)
{
if (opcode == IO_Open_file)
{
if (!(bfd = listSetGet(bibBFiledataPF, lookupSharedFP, (void *) (intptr_t) file_id)))
{
bfd = newSharedFP(file_id, (int) nProcsColl);
if ((iret = listSetAdd(bibBFiledataPF, bfd)) < 0) xabort("fileID=%d not unique", file_id);
bfd->fileID = iret;
}
}
else
{
if (!(bfd = listSetGet(bibBFiledataPF, lookupSharedFP, (void *) (intptr_t) file_id)))
xabort("fileId=%d not in set", file_id);
}
xdebug("id=%d, command=%d (%s), send offset=%ld", file_id, opcode, cdiPioCmdStrTab[opcode], bfd->offset);
int unfinished = bfd->unfinished;
if (opcode == IO_Close_file && unfinished == 1) bfd->offset = -bfd->offset - 1;
xmpi(MPI_Send(&bfd->offset, 1, MPI_LONG, source, collGuardTag, commPio));
xdebug("id=%d, command=%d (%s), recv amount=%ld, set offset=%ld", file_id, opcode, cdiPioCmdStrTab[opcode], amount,
bfd->offset);
if (opcode == IO_Close_file && !(bfd->unfinished = --unfinished))
listSetRemove(bibBFiledataPF, lookupSharedFP, (void *) (intptr_t) file_id);
else
bfd->offset += amount;
}
else if (opcode == IO_Finalize)
{
if (!--sentFinalize)
{
if (!listSetIsEmpty(bibBFiledataPF))
xabort("set bibBFiledataM not empty");
else
{
xdebug("%s", "destroy set");
listSetDelete(bibBFiledataPF);
}
Free(msgReq);
Free(msgWords);
return;
}
}
else
xabort("COMMAND NOT IMPLEMENTED");
}
}
/*
* Local Variables:
* c-file-style: "Java"
Loading