Skip to content
Snippets Groups Projects
Commit 4183d00f authored by Thomas Jahns's avatar Thomas Jahns :cartwheel: Committed by Sergey Kosukhin
Browse files

Fix representation of file offset.

parent c92bf337
No related branches found
No related tags found
2 merge requests!34Version 2.2.0,!13Consolidation with CDI-PIO (develop)
......@@ -33,10 +33,18 @@
enum
{
msgNumWords = 3,
collGuardTag = 777,
};
/* datatype to communicate an off_t and a request message */
static MPI_Datatype dt4Offset, guardMsgDt;
struct guardMsg
{
off_t amount;
int fileID, opCode;
};
/*
* Represents file opened by multiple writers and the corresponding
* buffer(s) for accumulation of timestep data.
......@@ -125,15 +133,16 @@ writePF(struct mwFileBuf *afd)
/* send buffersize, recv offset */
size_t amount = dbuffer_data_size(afd->db);
long query[msgNumWords] = { fileID, afd->command, (long) amount };
long offset;
xmpiStat(MPI_Sendrecv(query, msgNumWords, MPI_LONG, specialRank, collGuardTag, &offset, 1, MPI_LONG, specialRank, collGuardTag,
commPio, &status),
struct guardMsg query = { .amount = (off_t) amount, .fileID = fileID, .opCode = afd->command };
off_t offset;
xmpiStat(MPI_Sendrecv(&query, 1, guardMsgDt, specialRank, collGuardTag, &offset, 1, dt4Offset, specialRank, collGuardTag, commPio,
&status),
&status);
xdebug("id=%d, command=%d, amount=%zu, sent amount=%ld, recv offset=%ld", fileID, afd->command, amount, (long) amount, offset);
xdebug("id=%d, command=%d, amount=%zu, sent amount=%lld, recv offset=%ld", fileID, afd->command, amount,
(long long) (off_t) amount, offset);
bool doTruncate = offset < 0;
offset = labs(offset) - (offset < 0);
if (offset < 0) offset = -offset - 1;
/* write buffer */
if ((written = pwrite(afd->fd, afd->db->buffer, amount, offset)) != (ssize_t) amount)
xabort("fileId=%d, expect to write %zu byte, written %zu byte", fileID, amount, written);
......@@ -264,13 +273,13 @@ fowPOSIXFPGUARDSENDRECV(const char *filename, const char *mode)
newMultiwriterFileBuf(afd, filename, conf->writeAggBufLim);
xdebug("name=%s, init and add struct mwFileBuf, return id = %zu", filename, fileID);
long offset;
off_t offset;
int specialRank = commInqSpecialRank();
MPI_Status status;
MPI_Comm commPio = commInqCommPio();
long query[msgNumWords] = { (int) fileID, afd->command, 0L };
xmpiStat(MPI_Sendrecv(query, msgNumWords, MPI_LONG, specialRank, collGuardTag, &offset, 1, MPI_LONG, specialRank, collGuardTag,
commPio, &status),
struct guardMsg query = { .amount = 0, .fileID = (int) fileID, .opCode = afd->command };
xmpiStat(MPI_Sendrecv(&query, 1, guardMsgDt, specialRank, collGuardTag, &offset, 1, dt4Offset, specialRank, collGuardTag, commPio,
&status),
&status);
afd->command = IO_Set_fp;
return (int) fileID;
......@@ -281,13 +290,12 @@ fowPOSIXFPGUARDSENDRECV(const char *filename, const char *mode)
static void
finalizePOSIXFPGUARDSENDRECV(void)
{
long query[msgNumWords] = { 0, IO_Finalize, 0L };
xmpi(MPI_Send(query, msgNumWords, MPI_LONG, commInqSpecialRank(), collGuardTag, commInqCommPio()));
static const struct guardMsg query = { .amount = 0, .fileID = 0, .opCode = IO_Finalize };
xmpi(MPI_Send((void *) &query, 1, guardMsgDt, commInqSpecialRank(), collGuardTag, commInqCommPio()));
if (openFilesFill)
xabort("files still open at CDI-PIO finalization");
else
Free(openFiles);
if (openFilesFill) xabort("files still open at CDI-PIO finalization");
Free(openFiles);
MPI_Type_free(&guardMsgDt);
}
/***************************************************************/
......@@ -300,6 +308,21 @@ initPOSIXFPGUARDSENDRECV(void)
int numIOServers = commInqSizePio();
if (numIOServers < 2) xabort("error: # of I/O processes must be >= 2 for mode, but is %d", numIOServers);
{
MPI_Comm commPio = commInqCommPio();
int so = sizeof(off_t);
xmpi(MPI_Type_match_size(MPI_TYPECLASS_INTEGER, so, &dt4Offset));
}
{
struct guardMsg dummy;
int bl[2] = { 1, 2 };
MPI_Aint displ[2];
displ[0] = 0;
displ[1] = (unsigned char *) &dummy.fileID - (unsigned char *) &dummy.amount;
MPI_Datatype dt[2] = { dt4Offset, MPI_INT };
xmpi(MPI_Type_create_struct(2, bl, displ, dt, &guardMsgDt));
xmpi(MPI_Type_commit(&guardMsgDt));
}
int isCollector = commInqRankColl() != -1;
if (!isCollector)
fpgPOSIXFPGUARDSENDRECV();
......@@ -314,10 +337,10 @@ initPOSIXFPGUARDSENDRECV(void)
/***************************************************************/
/* the rank running the message loop below responds to queries of the form
* long query[3] = { fileID, operation code, amount }
* struct guardMsg { int fileID, int operation code, off_t amount }
* where the answer depends on the operation code:
* IO_Open_file, IO_Set_fp or IO_Close_file:
* long answer = offset to write to
* off_t answer = offset to write to
* IO_Finalize: no answer
*
* In essence this is an implementation of a shared file pointer.
......@@ -325,7 +348,7 @@ initPOSIXFPGUARDSENDRECV(void)
struct sharedFP
{
long offset;
off_t offset;
int unfinished;
};
......@@ -345,23 +368,22 @@ fpgPOSIXFPGUARDSENDRECV(void)
size_t sharedFPsSize = 0, sharedFPsFill = 0;
MPI_Comm commPio = commInqCommPio();
size_t nProcsColl = (size_t) (commInqSizeColl()), sentFinalize = nProcsColl;
long(*msgWords)[msgNumWords] = Malloc(sizeof(*msgWords) * nProcsColl);
struct guardMsg *msgWords = Malloc(sizeof(*msgWords) * nProcsColl);
MPI_Request *msgReq = Malloc(sizeof(*msgReq) * nProcsColl);
xdebug("ncollectors=%zu", nProcsColl);
for (size_t i = 0; i < nProcsColl; ++i)
xmpi(MPI_Irecv(msgWords[i], msgNumWords, MPI_LONG, (int) i, collGuardTag, commPio, msgReq + i));
for (size_t i = 0; i < nProcsColl; ++i) xmpi(MPI_Irecv(msgWords + i, 1, guardMsgDt, (int) i, collGuardTag, commPio, msgReq + i));
for (;;)
{
xmpiStat(MPI_Waitany((int) nProcsColl, msgReq, &source, &status), &status);
int fileID = (int) msgWords[source][0];
int fileID = msgWords[source].fileID;
assert(fileID >= 0);
int opcode = (int) msgWords[source][1];
long amount = msgWords[source][2];
int opcode = msgWords[source].opCode;
off_t amount = msgWords[source].amount;
/* re-instate listening */
if (opcode != IO_Finalize) xmpi(MPI_Irecv(msgWords[source], 3, MPI_LONG, source, collGuardTag, commPio, msgReq + source));
if (opcode != IO_Finalize) xmpi(MPI_Irecv(msgWords + source, 1, guardMsgDt, source, collGuardTag, commPio, msgReq + source));
xdebug("receive message from source=%d, id=%d, command=%d (%s)", source, fileID, opcode, cdiPioCmdStrTab[opcode]);
......@@ -387,10 +409,10 @@ fpgPOSIXFPGUARDSENDRECV(void)
int unfinished = sharedFPs[fileID].unfinished;
if (opcode == IO_Close_file && unfinished == 1) sharedFPs[fileID].offset = -sharedFPs[fileID].offset - 1;
xmpi(MPI_Send(&sharedFPs[fileID].offset, 1, MPI_LONG, source, collGuardTag, commPio));
xmpi(MPI_Send(&sharedFPs[fileID].offset, 1, dt4Offset, source, collGuardTag, commPio));
xdebug("id=%d, command=%d (%s), recv amount=%ld, set offset=%ld", fileID, opcode, cdiPioCmdStrTab[opcode], amount,
sharedFPs[fileID].offset);
xdebug("id=%d, command=%d (%s), recv amount=%lld, set offset=%ld", fileID, opcode, cdiPioCmdStrTab[opcode],
(long long) amount, sharedFPs[fileID].offset);
if (opcode == IO_Close_file && !(sharedFPs[fileID].unfinished = --unfinished))
--sharedFPsFill;
......@@ -405,6 +427,7 @@ fpgPOSIXFPGUARDSENDRECV(void)
Free(sharedFPs);
Free(msgReq);
Free(msgWords);
MPI_Type_free(&guardMsgDt);
return;
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment