Skip to content
Snippets Groups Projects

Consolidation with CDI-PIO (develop)

Merged Sergey Kosukhin requested to merge m300488/develop-rebase into develop
1 file
+ 106
144
Compare changes
  • Side-by-side
  • Inline
+ 106
144
@@ -49,9 +49,8 @@ fileIDTestA(void *a, void *fileID)
typedef struct
{
long offset;
bool finished;
int fileID;
bool nfinished[];
int unfinished;
} bFiledataPF;
static int
@@ -104,14 +103,12 @@ initAFiledataPF(const char *key, size_t bs)
/***************************************************************/
static bFiledataPF *
initBFiledataPF(int fileID, int nc)
initBFiledataPF(int fileID, int nProcsColl)
{
bFiledataPF *bfd;
size_t bfdSize = sizeof(bFiledataPF) + (size_t) nc * sizeof(bool);
bfd = Calloc(1, bfdSize);
bFiledataPF *bfd = Malloc(sizeof(*bfd));
bfd->offset = 0;
bfd->finished = false;
bfd->fileID = fileID;
bfd->unfinished = (int) nProcsColl;
return bfd;
}
@@ -162,126 +159,98 @@ compareNamesAPF(void *v1, void *v2)
}
/***************************************************************/
/* the rank running this message loop responds to queries of the form
* long query[3] = { file_id, operation code, amount }
* where the answer depends on the operation code:
* IO_Open_file, IO_Set_fp or IO_Close_file:
* long answer = offset to write to
* IO_Finalize: no answer
*/
enum
{
msgNumWords = 3,
collGuardTag = 777,
};
static void
fpgPOSIXFPGUARDSENDRECV(void)
{
int i, source, iret;
struct fileOpTag rtag;
int source, iret;
MPI_Status status;
bFiledataPF *bfd;
listSet *bibBFiledataPF;
long amount;
MPI_Comm commPio = commInqCommPio();
int nProcsColl = commInqSizeColl();
bool *sentFinalize, doFinalize = false;
size_t nProcsColl = (size_t) (commInqSizeColl()), sentFinalize = nProcsColl;
long(*msgWords)[msgNumWords] = Malloc(sizeof(*msgWords) * nProcsColl);
MPI_Request *msgReq = Malloc(sizeof(*msgReq) * nProcsColl);
xdebug("ncollectors=%d", nProcsColl);
xdebug("ncollectors=%zu", nProcsColl);
/* create empty set of files identified by id */
bibBFiledataPF = listSetNew(destroyBFiledataPF, fileIDCmpB);
sentFinalize = Calloc((size_t) nProcsColl, sizeof(sentFinalize[0]));
for (size_t i = 0; i < nProcsColl; ++i)
xmpi(MPI_Irecv(msgWords[i], msgNumWords, MPI_LONG, (int) i, collGuardTag, commPio, msgReq + i));
for (;;)
{
xmpi(MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, commPio, &status));
source = status.MPI_SOURCE;
rtag = decodeFileOpTag(status.MPI_TAG);
xmpiStat(MPI_Waitany((int) nProcsColl, msgReq, &source, &status), &status);
int file_id = (int) msgWords[source][0];
int opcode = (int) msgWords[source][1];
long amount = msgWords[source][2];
/* re-instate listening */
if (opcode != IO_Finalize) xmpi(MPI_Irecv(msgWords[source], 3, MPI_LONG, source, collGuardTag, commPio, msgReq + source));
xdebug("receive message from source=%d, id=%d, command=%d ( %s )", source, rtag.id, rtag.command,
cdiPioCmdStrTab[rtag.command]);
xdebug("receive message from source=%d, id=%d, command=%d (%s)", source, file_id, opcode, cdiPioCmdStrTab[opcode]);
switch (rtag.command)
if (opcode >= IO_Open_file && opcode < IO_Finalize)
{
case IO_Open_file:
if (!(bfd = listSetGet(bibBFiledataPF, fileIDTestB, (void *) (intptr_t) rtag.id)))
if (opcode == IO_Open_file)
{
bfd = initBFiledataPF(rtag.id, nProcsColl);
if ((iret = listSetAdd(bibBFiledataPF, bfd)) < 0) xabort("fileID=%d not unique", rtag.id);
bfd->fileID = iret;
if (!(bfd = listSetGet(bibBFiledataPF, fileIDTestB, (void *) (intptr_t) file_id)))
{
bfd = initBFiledataPF(file_id, (int) nProcsColl);
if ((iret = listSetAdd(bibBFiledataPF, bfd)) < 0) xabort("fileID=%d not unique", file_id);
bfd->fileID = iret;
}
}
else
{
if (!(bfd = listSetGet(bibBFiledataPF, fileIDTestB, (void *) (intptr_t) file_id)))
xabort("fileId=%d not in set", file_id);
}
xdebug("id=%d, command=%d (%s), send offset=%ld", file_id, opcode, cdiPioCmdStrTab[opcode], bfd->offset);
xdebug("id=%d, command=%d ( %s ), send offset=%ld", rtag.id, rtag.command, cdiPioCmdStrTab[rtag.command], bfd->offset);
xmpi(MPI_Sendrecv(&(bfd->offset), 1, MPI_LONG, source, status.MPI_TAG, &amount, 1, MPI_LONG, source, status.MPI_TAG,
commPio, &status));
bfd->offset += amount;
xdebug("id=%d, command=%d ( %s ), recv amount=%ld, set offset=%ld", rtag.id, rtag.command, cdiPioCmdStrTab[rtag.command],
amount, bfd->offset);
break;
case IO_Set_fp:
if (!(bfd = listSetGet(bibBFiledataPF, fileIDTestB, (void *) (intptr_t) rtag.id)))
xabort("fileId=%d not in set", rtag.id);
xdebug("id=%d, command=%d ( %s ), send offset=%ld", rtag.id, rtag.command, cdiPioCmdStrTab[rtag.command], bfd->offset);
xmpi(MPI_Sendrecv(&(bfd->offset), 1, MPI_LONG, source, status.MPI_TAG, &amount, 1, MPI_LONG, source, status.MPI_TAG,
commPio, &status));
bfd->offset += amount;
xdebug("id=%d, command=%d ( %s ), recv amount=%ld, set offset=%ld", rtag.id, rtag.command, cdiPioCmdStrTab[rtag.command],
amount, bfd->offset);
break;
case IO_Close_file:
if (!(bfd = listSetGet(bibBFiledataPF, fileIDTestB, (void *) (intptr_t) rtag.id)))
xabort("fileId=%d not in set", rtag.id);
xdebug("id=%d, command=%d ( %s )), send offset=%ld", rtag.id, rtag.command, cdiPioCmdStrTab[rtag.command], bfd->offset);
xmpi(MPI_Sendrecv(&(bfd->offset), 1, MPI_LONG, source, status.MPI_TAG, &amount, 1, MPI_LONG, source, status.MPI_TAG,
commPio, &status));
bfd->offset += amount;
xdebug("id=%d, command=%d ( %s ), recv amount=%ld, set offset=%ld", rtag.id, rtag.command, cdiPioCmdStrTab[rtag.command],
amount, bfd->offset);
bfd->nfinished[source] = true;
bfd->finished = true;
for (i = 0; i < nProcsColl; i++)
if (!(bfd->nfinished[i]))
{
bfd->finished = false;
break;
}
int unfinished = bfd->unfinished;
if (opcode == IO_Close_file && unfinished == 1) bfd->offset = -bfd->offset - 1;
xmpi(MPI_Send(&bfd->offset, 1, MPI_LONG, source, collGuardTag, commPio));
if (bfd->finished) listSetRemove(bibBFiledataPF, fileIDTestB, (void *) (intptr_t) rtag.id);
break;
case IO_Finalize:
{
int buffer = CDI_UNDEFID, collID;
xdebug("id=%d, command=%d (%s), recv amount=%ld, set offset=%ld", file_id, opcode, cdiPioCmdStrTab[opcode], amount,
bfd->offset);
xmpi(MPI_Recv(&buffer, 1, MPI_INT, source, status.MPI_TAG, commPio, &status));
sentFinalize[source] = true;
doFinalize = true;
for (collID = 0; collID < nProcsColl; collID++) doFinalize &= sentFinalize[collID];
if (doFinalize)
{
if (!listSetIsEmpty(bibBFiledataPF))
xabort("set bibBFiledataM not empty");
else
{
xdebug("%s", "destroy set");
listSetDelete(bibBFiledataPF);
}
Free(sentFinalize);
return;
}
}
break;
default: xabort("COMMAND NOT IMPLEMENTED");
if (opcode == IO_Close_file && !(bfd->unfinished = --unfinished))
listSetRemove(bibBFiledataPF, fileIDTestB, (void *) (intptr_t) file_id);
else
bfd->offset += amount;
}
else if (opcode == IO_Finalize)
{
if (!--sentFinalize)
{
if (!listSetIsEmpty(bibBFiledataPF))
xabort("set bibBFiledataM not empty");
else
{
xdebug("%s", "destroy set");
listSetDelete(bibBFiledataPF);
}
Free(msgReq);
Free(msgWords);
return;
}
}
else
xabort("COMMAND NOT IMPLEMENTED");
}
}
@@ -302,44 +271,41 @@ pwrite(int fd, const void *buf, size_t count, off_t offset)
static void
writePF(aFiledataPF *afd)
{
size_t amount;
ssize_t written;
long offset;
long amountL;
int tag;
MPI_Status status;
/* FIXME: pretend there's only one special rank for now */
int file_id = afd->fileID;
int specialRank = commInqSizePio() - 1;
MPI_Comm commPio = commInqCommPio();
/* send buffersize, recv offset */
size_t amount = dbuffer_data_size(afd->db);
amount = dbuffer_data_size(afd->db);
amountL = (long) amount;
int id = afd->fileID;
tag = encodeFileOpTag(id, afd->command);
xmpi(MPI_Sendrecv(&amountL, 1, MPI_LONG, specialRank, tag, &offset, 1, MPI_LONG, specialRank, tag, commPio, &status));
xdebug("id=%d, command=%d, amount=%zu, send amountL=%ld, recv offset=%ld", id, afd->command, amount, amountL, offset);
long query[msgNumWords] = { file_id, afd->command, (long) amount };
long offset;
xmpiStat(MPI_Sendrecv(query, msgNumWords, MPI_LONG, specialRank, collGuardTag, &offset, 1, MPI_LONG, specialRank, collGuardTag,
commPio, &status),
&status);
xdebug("id=%d, command=%d, amount=%zu, sent amount=%ld, recv offset=%ld", file_id, afd->command, amount, (long) amount, offset);
bool doTruncate = offset < 0;
offset = labs(offset) - (offset < 0);
/* write buffer */
if ((written = pwrite(afd->fd, afd->db->buffer, amount, offset)) != (ssize_t) amount)
xabort("fileId=%d, expect to write %zu byte, written %zu byte", id, amount, written);
xdebug("written %zu bytes in file %d with offset %ld", written, id, offset);
xabort("fileId=%d, expect to write %zu byte, written %zu byte", file_id, amount, written);
xdebug("written %zu bytes in file %d with offset %ld", written, file_id, offset);
if (doTruncate) ftruncate(afd->fd, offset + (off_t) amount);
/* change outputBuffer */
dbuffer_reset(afd->db);
if (afd->db == afd->db1)
{
xdebug("id=%d, change to buffer 2 ...", id);
xdebug("id=%d, change to buffer 2 ...", file_id);
afd->db = afd->db2;
}
else
{
xdebug("id=%d, change to buffer 1 ...", id);
xdebug("id=%d, change to buffer 1 ...", file_id);
afd->db = afd->db1;
}
@@ -368,7 +334,6 @@ static size_t
fwPOSIXFPGUARDSENDRECV(int fileID, const void *buffer, size_t len, int tsID)
{
int error = 0;
int filled = 0;
aFiledataPF *afd = listSetGet(bibAFiledataPF, fileIDTestA, (void *) (intptr_t) fileID);
bool flush = tsID != afd->tsID;
@@ -380,7 +345,7 @@ fwPOSIXFPGUARDSENDRECV(int fileID, const void *buffer, size_t len, int tsID)
xmpi(MPI_Barrier(commInqCommColl()));
}
filled = dbuffer_push(afd->db, (unsigned char *) buffer, len);
int filled = dbuffer_push(afd->db, buffer, len);
xdebug("fileID = %d, tsID = %d, pushed data on buffer, filled = %d", fileID, tsID, filled);
@@ -437,14 +402,11 @@ elemCheck(void *q, void *nm)
static int
fowPOSIXFPGUARDSENDRECV(const char *filename, const char *mode)
{
int id;
enum
{
bcastRoot = 0
};
if ((mode[0] != 'w' && mode[0] != 'W') || mode[0] == 0 || mode[1] != 0)
xabort("Unsupported mode \"%s\" in parallel file open.", mode);
aFiledataPF *afd;
if ((mode[0] != 'w' && mode[0] != 'W') || mode[1] != 0) xabort("Unsupported mode \"%s\" in parallel file open.", mode);
static unsigned long buffersize = 0;
/* broadcast buffersize to collectors */
@@ -457,21 +419,22 @@ fowPOSIXFPGUARDSENDRECV(const char *filename, const char *mode)
/* init and add file element */
listSetForeach(bibAFiledataPF, elemCheck, (void *) filename);
afd = initAFiledataPF(filename, (size_t) buffersize);
aFiledataPF *afd = initAFiledataPF(filename, (size_t) buffersize);
if ((id = listSetAdd(bibAFiledataPF, afd)) < 0) xabort("filename %s not unique", afd->name);
afd->fileID = id;
xdebug("name=%s, init and add aFiledataPF, return id = %d", filename, id);
{
long offset, amount = 0L;
int tag = encodeFileOpTag(afd->fileID, afd->command);
int specialRank = commInqSpecialRank();
MPI_Status status;
MPI_Comm commPio = commInqCommPio();
xmpi(MPI_Sendrecv(&amount, 1, MPI_LONG, specialRank, tag, &offset, 1, MPI_LONG, specialRank, tag, commPio, &status));
}
int file_id;
if ((file_id = listSetAdd(bibAFiledataPF, afd)) < 0) xabort("filename %s not unique", afd->name);
afd->fileID = file_id;
xdebug("name=%s, init and add aFiledataPF, return id = %d", filename, file_id);
long offset;
int specialRank = commInqSpecialRank();
MPI_Status status;
MPI_Comm commPio = commInqCommPio();
long query[msgNumWords] = { file_id, afd->command, 0L };
xmpiStat(MPI_Sendrecv(query, msgNumWords, MPI_LONG, specialRank, collGuardTag, &offset, 1, MPI_LONG, specialRank, collGuardTag,
commPio, &status),
&status);
afd->command = IO_Set_fp;
return id;
return file_id;
}
/***************************************************************/
@@ -479,9 +442,8 @@ fowPOSIXFPGUARDSENDRECV(const char *filename, const char *mode)
static void
finalizePOSIXFPGUARDSENDRECV(void)
{
int buffer = 0, tag = encodeFileOpTag(0, IO_Finalize);
xmpi(MPI_Send(&buffer, 1, MPI_INT, commInqSpecialRank(), tag, commInqCommPio()));
long query[msgNumWords] = { 0, IO_Finalize, 0L };
xmpi(MPI_Send(query, msgNumWords, MPI_LONG, commInqSpecialRank(), collGuardTag, commInqCommPio()));
if (!listSetIsEmpty(bibAFiledataPF))
xabort("set bibAFiledataM not empty");
Loading