Commit 5bf73ffb authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Add mode to write aligned blocks only.

* This version is still sub-optimal and transfers more data amongst the
  I/O server processes than strictly necessary.
parent 66844e26
......@@ -294,6 +294,7 @@ src/pio_interface.c -text
src/pio_interface.h -text
src/pio_list_set.c -text
src/pio_mpi_fw_at_all.c -text
src/pio_mpi_fw_at_reblock.c -text
src/pio_mpi_fw_ordered.c -text
src/pio_mpinonb.c -text
src/pio_posixasynch.c -text
......@@ -397,6 +398,7 @@ tests/pio_cksum_asynch.in -text
tests/pio_cksum_cdf.in -text
tests/pio_cksum_fpguard.in -text
tests/pio_cksum_mpi_fw_at_all.in -text
tests/pio_cksum_mpi_fw_at_reblock.in -text
tests/pio_cksum_mpi_fw_ordered.in -text
tests/pio_cksum_mpinonb.in -text
tests/pio_cksum_writer.in -text
......
......@@ -30716,7 +30716,7 @@ cat >>confdefs.h <<_ACEOF
_ACEOF
 
 
ac_config_files="$ac_config_files tests/test_cksum_grib tests/test_cksum_nc tests/test_cksum_nc2 tests/test_cksum_nc4 tests/test_cksum_extra tests/test_cksum_service tests/test_cksum_ieg tests/test_chunk_cksum tests/test_f2003 tests/pio_write_run tests/pio_write_deco2d_run tests/pio_cksum_mpinonb tests/pio_cksum_mpi_fw_ordered tests/pio_cksum_mpi_fw_at_all tests/pio_cksum_fpguard tests/pio_cksum_asynch tests/pio_cksum_writer tests/pio_cksum_cdf tests/test_resource_copy_mpi_run tests/test_cdf_transformation tables/gen_tableheaderfile util/serialrun"
ac_config_files="$ac_config_files tests/test_cksum_grib tests/test_cksum_nc tests/test_cksum_nc2 tests/test_cksum_nc4 tests/test_cksum_extra tests/test_cksum_service tests/test_cksum_ieg tests/test_chunk_cksum tests/test_f2003 tests/pio_write_run tests/pio_write_deco2d_run tests/pio_cksum_mpinonb tests/pio_cksum_mpi_fw_ordered tests/pio_cksum_mpi_fw_at_all tests/pio_cksum_mpi_fw_at_reblock tests/pio_cksum_fpguard tests/pio_cksum_asynch tests/pio_cksum_writer tests/pio_cksum_cdf tests/test_resource_copy_mpi_run tests/test_cdf_transformation tables/gen_tableheaderfile util/serialrun"
 
 
ac_config_files="$ac_config_files Makefile src/Makefile interfaces/Makefile app/Makefile tests/Makefile examples/Makefile cdi.settings examples/pio/Makefile src/pkgconfig/cdi.pc src/pkgconfig/cdipio.pc"
......@@ -32096,6 +32096,7 @@ do
"tests/pio_cksum_mpinonb") CONFIG_FILES="$CONFIG_FILES tests/pio_cksum_mpinonb" ;;
"tests/pio_cksum_mpi_fw_ordered") CONFIG_FILES="$CONFIG_FILES tests/pio_cksum_mpi_fw_ordered" ;;
"tests/pio_cksum_mpi_fw_at_all") CONFIG_FILES="$CONFIG_FILES tests/pio_cksum_mpi_fw_at_all" ;;
"tests/pio_cksum_mpi_fw_at_reblock") CONFIG_FILES="$CONFIG_FILES tests/pio_cksum_mpi_fw_at_reblock" ;;
"tests/pio_cksum_fpguard") CONFIG_FILES="$CONFIG_FILES tests/pio_cksum_fpguard" ;;
"tests/pio_cksum_asynch") CONFIG_FILES="$CONFIG_FILES tests/pio_cksum_asynch" ;;
"tests/pio_cksum_writer") CONFIG_FILES="$CONFIG_FILES tests/pio_cksum_writer" ;;
......@@ -33919,6 +33920,7 @@ _LT_EOF
"tests/pio_cksum_mpinonb":F) chmod a+x "$ac_file" ;;
"tests/pio_cksum_mpi_fw_ordered":F) chmod a+x "$ac_file" ;;
"tests/pio_cksum_mpi_fw_at_all":F) chmod a+x "$ac_file" ;;
"tests/pio_cksum_mpi_fw_at_reblock":F) chmod a+x "$ac_file" ;;
"tests/pio_cksum_fpguard":F) chmod a+x "$ac_file" ;;
"tests/pio_cksum_asynch":F) chmod a+x "$ac_file" ;;
"tests/pio_cksum_writer":F) chmod a+x "$ac_file" ;;
......@@ -386,6 +386,7 @@ AC_CONFIG_FILES([tests/test_cksum_grib \
tests/pio_cksum_mpinonb \
tests/pio_cksum_mpi_fw_ordered \
tests/pio_cksum_mpi_fw_at_all \
tests/pio_cksum_mpi_fw_at_reblock \
tests/pio_cksum_fpguard \
tests/pio_cksum_asynch \
tests/pio_cksum_writer \
......
......@@ -180,6 +180,7 @@ libcdipio_la_SOURCES = \
pio_mpinonb.c \
pio_mpi_fw_ordered.c \
pio_mpi_fw_at_all.c \
pio_mpi_fw_at_reblock.c \
pio_record_send.c \
pio_posixasynch.c \
pio_posixfpguardsendrecv.c \
......
......@@ -221,22 +221,22 @@ am__libcdipio_la_SOURCES_DIST = cdipio.h cdipioFortran.c cfortran.h \
pio.c pio.h pio_comm.c pio_comm.h pio_conf.c pio_conf.h \
pio_dbuffer.c pio_id_set.h pio_impl.h pio_interface.c \
pio_interface.h pio_mpinonb.c pio_mpi_fw_ordered.c \
pio_mpi_fw_at_all.c pio_record_send.c pio_posixasynch.c \
pio_posixfpguardsendrecv.c pio_posixnonb.c pio_list_set.c \
resource_unpack.h resource_unpack.c pio_client.c pio_client.h \
pio_roles.c pio_rpc.c pio_rpc.h pio_server.c pio_server.h \
pio_serialize.h pio_serialize.c pio_util.c pio_util.h \
pio_cdf_int.h pio_cdf_int.c
pio_mpi_fw_at_all.c pio_mpi_fw_at_reblock.c pio_record_send.c \
pio_posixasynch.c pio_posixfpguardsendrecv.c pio_posixnonb.c \
pio_list_set.c resource_unpack.h resource_unpack.c \
pio_client.c pio_client.h pio_roles.c pio_rpc.c pio_rpc.h \
pio_server.c pio_server.h pio_serialize.h pio_serialize.c \
pio_util.c pio_util.h pio_cdf_int.h pio_cdf_int.c
am__objects_5 = pio_cdf_int.lo
@HAVE_PARALLEL_NC4_TRUE@@USE_MPI_TRUE@am__objects_6 = \
@HAVE_PARALLEL_NC4_TRUE@@USE_MPI_TRUE@ $(am__objects_5)
am_libcdipio_la_OBJECTS = cdipioFortran.lo pio.lo pio_comm.lo \
pio_conf.lo pio_dbuffer.lo pio_interface.lo pio_mpinonb.lo \
pio_mpi_fw_ordered.lo pio_mpi_fw_at_all.lo pio_record_send.lo \
pio_posixasynch.lo pio_posixfpguardsendrecv.lo \
pio_posixnonb.lo pio_list_set.lo resource_unpack.lo \
pio_client.lo pio_roles.lo pio_rpc.lo pio_server.lo \
pio_serialize.lo pio_util.lo $(am__objects_6)
pio_mpi_fw_ordered.lo pio_mpi_fw_at_all.lo \
pio_mpi_fw_at_reblock.lo pio_record_send.lo pio_posixasynch.lo \
pio_posixfpguardsendrecv.lo pio_posixnonb.lo pio_list_set.lo \
resource_unpack.lo pio_client.lo pio_roles.lo pio_rpc.lo \
pio_server.lo pio_serialize.lo pio_util.lo $(am__objects_6)
libcdipio_la_OBJECTS = $(am_libcdipio_la_OBJECTS)
@ENABLE_CDI_LIB_FALSE@@USE_MPI_TRUE@am_libcdipio_la_rpath =
@ENABLE_CDI_LIB_TRUE@@USE_MPI_TRUE@am_libcdipio_la_rpath = -rpath \
......@@ -553,12 +553,12 @@ libcdipio_la_SOURCES = cdipio.h cdipioFortran.c cfortran.h pio.c pio.h \
pio_comm.c pio_comm.h pio_conf.c pio_conf.h pio_dbuffer.c \
pio_id_set.h pio_impl.h pio_interface.c pio_interface.h \
pio_mpinonb.c pio_mpi_fw_ordered.c pio_mpi_fw_at_all.c \
pio_record_send.c pio_posixasynch.c pio_posixfpguardsendrecv.c \
pio_posixnonb.c pio_list_set.c resource_unpack.h \
resource_unpack.c pio_client.c pio_client.h pio_roles.c \
pio_rpc.c pio_rpc.h pio_server.c pio_server.h pio_serialize.h \
pio_serialize.c pio_util.c pio_util.h pio_cdf_int.h \
$(am__append_7)
pio_mpi_fw_at_reblock.c pio_record_send.c pio_posixasynch.c \
pio_posixfpguardsendrecv.c pio_posixnonb.c pio_list_set.c \
resource_unpack.h resource_unpack.c pio_client.c pio_client.h \
pio_roles.c pio_rpc.c pio_rpc.h pio_server.c pio_server.h \
pio_serialize.h pio_serialize.c pio_util.c pio_util.h \
pio_cdf_int.h $(am__append_7)
libcdipio_la_HAVE_PARALLEL_NC4_extra_sources = \
pio_cdf_int.c
......@@ -726,6 +726,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_interface.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_list_set.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_mpi_fw_at_all.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_mpi_fw_at_reblock.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_mpi_fw_ordered.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_mpinonb.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixasynch.Plo@am__quote@
......
......@@ -12,16 +12,17 @@
/* parallel IO IOMode */
#define PIO_NONE 0
#define PIO_MPI 1
#define PIO_WRITER 2
#define PIO_ASYNCH 3
#define PIO_FPGUARD 4
#define PIO_MPI_FW_ORDERED 5
#define PIO_MPI_FW_AT_ALL 6
#define PIO_MINIOMODE PIO_NONE
#define PIO_MAXIOMODE PIO_MPI_FW_AT_ALL
#define PIO_NONE 0
#define PIO_MPI 1
#define PIO_WRITER 2
#define PIO_ASYNCH 3
#define PIO_FPGUARD 4
#define PIO_MPI_FW_ORDERED 5
#define PIO_MPI_FW_AT_ALL 6
#define PIO_MPI_FW_AT_REBLOCK 7
#define PIO_MINIOMODE PIO_NONE
#define PIO_MAXIOMODE PIO_MPI_FW_AT_REBLOCK
#define PIO_ROLE_CLIENT 0
#define PIO_ROLE_COLLECTOR 1
......
......@@ -24,6 +24,8 @@
PARAMETER (PIO_MPI_FW_ORDERED = 5)
INTEGER PIO_MPI_FW_AT_ALL
PARAMETER (PIO_MPI_FW_AT_ALL = 6)
INTEGER PIO_MPI_FW_AT_REBLOCK
PARAMETER (PIO_MPI_FW_AT_REBLOCK = 7)
INTEGER PIO_ROLE_CLIENT
PARAMETER (PIO_ROLE_CLIENT = 0)
INTEGER PIO_ROLE_COLLECTOR
......
......@@ -55,6 +55,7 @@ static const struct intCodeStrMap modeMap[] = {
{ "PIO_WRITER", PIO_WRITER },
{ "PIO_MPI_FW_ORDERED", PIO_MPI_FW_ORDERED },
{ "PIO_MPI_FW_AT_ALL", PIO_MPI_FW_AT_ALL },
{ "PIO_MPI_FW_AT_REBLOCK", PIO_MPI_FW_AT_REBLOCK },
};
int
......
......@@ -99,6 +99,9 @@ void cdiPioFileWriteOrderedInit(void);
/* pio_mpi_fw_at_all.c */
void cdiPioFileWriteAtAllInit(void);
/* pio_mpi_fw_at_reblock.c */
void cdiPioFileWriteAtReblockInit(void);
/* common functionality for file split between collectors and writer(s) */
void pioSendInitialize(void);
......
......@@ -559,6 +559,9 @@ cdiPioFileWritingInit(void)
case PIO_MPI_FW_AT_ALL:
cdiPioFileWriteAtAllInit();
break;
case PIO_MPI_FW_AT_REBLOCK:
cdiPioFileWriteAtReblockInit();
break;
}
}
......
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <assert.h>
#include <errno.h>
#include <inttypes.h>
#include <limits.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <mpi.h>
#include "cdi.h"
#include "dmemory.h"
#include "error.h"
#include "namespace.h"
#include "pio.h"
#include "pio_comm.h"
#include "pio_impl.h"
#include "pio_rpc.h"
#include "pio_util.h"
enum direction {
INACTIVE_TX,
RECV,
SEND,
};
struct IOmsg
{
int pos, len; /* local index of block buffer
* affected (RECV only) */
void *sendBuf;
int rank;
enum direction direction;
};
struct pendingBufWrite
{
int pass;
int incoming;
};
typedef struct
{
MPI_File fh;
int fileID;
int numMsg;
MPI_Offset pos;
char *name;
int blockSize;
int numBlockBuf; /* number of block-sized buffers
* pointed to by blockBuf */
struct pendingBufWrite *pending; /* pendingBufWrite[i].pass
* indicates pass if non-negative,
* count gives number of
* outstanding recvs still to be
* processed for block buf i */
unsigned char *blockBuf;
int msgSize;
struct IOmsg *msgs;
MPI_Request *reqs;
long collWriteSize[]; /* used to allgather sizes of writes
* on different processes */
} aFiledataM;
static listSet *bibAFiledataM;
static int
fileIDTest(void *a, void *fileID)
{
return ((aFiledataM *)a)->fileID == (int)(intptr_t)fileID;
}
/***************************************************************/
static inline void
initReblockPendingMsg(aFiledataM *of, size_t i)
{
of->msgs[i].pos = -1;
of->msgs[i].len = -1;
of->msgs[i].sendBuf = NULL;
of->msgs[i].rank = -1;
of->msgs[i].direction = INACTIVE_TX;
of->reqs[i] = MPI_REQUEST_NULL;
}
static aFiledataM *
initAFiledataFileWriteAtReblock(const char *filename, size_t bufSize)
{
MPI_Comm commPio = commInqCommPio();
int sizePio = commInqSizePio();
size_t nameSize = strlen(filename) + 1;
MPI_Info open_info = MPI_INFO_NULL;
xmpi(MPI_Info_create(&open_info));
xmpi(MPI_Info_set(open_info, "access_style", "write_once"));
xmpi(MPI_Info_set(open_info, "collective_buffering", "false"));
/* tell IBM PE to not buffer anything, we block-align all writes */
xmpi(MPI_Info_set(open_info, "IBM_io_buffer_size", "0"));
xmpi(MPI_Info_set(open_info, "IBM_largeblock_io", "true"));
MPI_File fh;
xmpi(MPI_File_open(commPio, (char *)filename,
MPI_MODE_CREATE|MPI_MODE_WRONLY|MPI_MODE_UNIQUE_OPEN,
open_info, &fh));
xmpi(MPI_Info_free(&open_info));
/* find block size of underlying file system */
size_t blockSize;
{
struct stat posixstat;
int rc = stat(filename, &posixstat);
if (rc < 0)
{
perror("failed to stat file after open, block size unavailable,"
" assuming 4MiB");
posixstat.st_blksize = 4 * 1024 * 1024;
}
blockSize = (size_t)posixstat.st_blksize < SIZE_MAX
? (size_t)posixstat.st_blksize
: SIZE_MAX/2;
}
/* round bufSize to next multiple of block size */
bufSize = roundUpToMultiple(bufSize, blockSize);
size_t numBlockBuf = bufSize / blockSize;
assert(blockSize <= INT_MAX && numBlockBuf <= INT_MAX);
aFiledataM *of
= Malloc(sizeof (*of) + sizeof (of->collWriteSize[0]) * (size_t)sizePio
+ nameSize);
of->fh = fh;
of->name = (char *)((unsigned char *)of + sizeof (*of)
+ sizeof (of->collWriteSize[0]) * (size_t)sizePio);
memcpy(of->name, filename, nameSize);
of->blockBuf = Malloc(bufSize);
of->pending = (struct pendingBufWrite *)
Malloc(numBlockBuf * sizeof (struct pendingBufWrite));
for (size_t i = 0; i < numBlockBuf; ++i)
{
of->pending[i].pass = -1;
of->pending[i].incoming = 0;
}
of->blockSize = (int)blockSize;
of->numBlockBuf = (int)numBlockBuf;
size_t numPeers = (size_t)commInqSizePio();
/* start with 2 sends and 2 recvs per peer simultaneously */
size_t msgSize = numPeers * 4;
assert(msgSize <= INT_MAX);
of->msgSize = (int)msgSize;
of->msgs = Malloc(sizeof (of->msgs[0]) * msgSize);
of->reqs = Malloc(sizeof (of->reqs[0]) * msgSize);
for (size_t i = 0; i < (size_t)msgSize; ++i)
initReblockPendingMsg(of, i);
of->pos = 0;
of->numMsg = 0;
return of;
}
/***************************************************************/
static void
flushReblockBuffer(aFiledataM *of, int blockBufIdx);
static int
destroyAFiledataFileWriteAtReblock(void *v)
{
aFiledataM *of = v;
size_t numBlockBuf = (size_t)of->numBlockBuf;
/* flush pending buffers */
/** 1. handle all outstanding messages */
xmpi(MPI_Waitall(of->numMsg, of->reqs, MPI_STATUSES_IGNORE));
of->numMsg = 0;
for (size_t block = 0; block < numBlockBuf; ++block)
{
of->pending[block].incoming = 0;
if (of->pending[block].pass != -1)
flushReblockBuffer(of, (int)block);
}
/* close file */
MPI_Offset endpos, fsize;
endpos = of->pos;
xmpi(MPI_File_get_size(of->fh, &fsize));
/* does the file need to be truncated? */
MPI_Comm commPio = commInqCommPio();
int trailingOctets = fsize > endpos;
xmpi(MPI_Allreduce(MPI_IN_PLACE, &trailingOctets, 1, MPI_INT, MPI_LOR,
commPio));
if (trailingOctets)
xmpi(MPI_File_set_size(of->fh, endpos));
int iret = MPI_File_close(&of->fh);
for (size_t i = 0; i < (size_t)of->msgSize; ++i)
Free(of->msgs[i].sendBuf);
Free(of->blockBuf);
Free(of->msgs);
Free(of->reqs);
Free(of->pending);
Free(of);
return iret == MPI_SUCCESS ? 0 : -1;
}
/***************************************************************/
static bool
compareNamesFileWriteAtReblock(void *v1, void *v2)
{
aFiledataM *afm1 = v1, *afm2 = v2;
return !strcmp(afm1->name, afm2->name);
}
/***************************************************************/
static inline long
lmin(long a, long b)
{
return a < b ? a : b;
}
static void
flushReblockBuffer(aFiledataM *of, int blockBufIdx)
{
int blockSize = of->blockSize;
unsigned char *blockBuf = of->blockBuf + blockSize * blockBufIdx;
int finIdx, numMsg = of->numMsg;
while (of->pending[blockBufIdx].incoming)
{
/* todo: switch to MPI_Waitsome */
xmpi(MPI_Waitany(numMsg, of->reqs, &finIdx, MPI_STATUS_IGNORE));
if (finIdx != MPI_UNDEFINED)
{
int blockBufIdx = of->msgs[finIdx].pos;
if (blockBufIdx >= 0 && of->msgs[finIdx].direction == RECV)
--(of->pending[blockBufIdx].incoming);
else if (of->msgs[finIdx].direction == SEND)
Free(of->msgs[finIdx].sendBuf);
else
xabort("internal error");
of->msgs[finIdx] = of->msgs[numMsg - 1];
of->msgs[numMsg - 1].sendBuf = NULL;
of->reqs[finIdx] = of->reqs[numMsg - 1];
--numMsg;
}
}
int sizePio = commInqSizePio(),
rankPio = commInqRankPio();
MPI_Offset ofs = (MPI_Offset)blockSize
* (((MPI_Offset)of->pending[blockBufIdx].pass * (MPI_Offset)sizePio
* of->numBlockBuf)
+ (MPI_Offset)blockBufIdx * (MPI_Offset)sizePio
+ (MPI_Offset)rankPio);
int wsize = of->pos >= ofs + blockSize ? blockSize : (int)(of->pos - ofs);
xmpi(MPI_File_write_at(of->fh, ofs, blockBuf, wsize, MPI_UNSIGNED_CHAR,
MPI_STATUS_IGNORE));
of->pending[blockBufIdx].pass = -1;
of->numMsg = numMsg;
}
static void
reblockMoreMsgs(aFiledataM *of, int numMsg)
{
/* optimize with MPI_Testsome */
if (of->msgSize == numMsg)
{
size_t newMsgSize = (size_t)numMsg * 2;
of->msgs = Realloc(of->msgs, sizeof (of->msgs[0]) * newMsgSize);
of->reqs = Realloc(of->reqs, sizeof (of->reqs[0]) * newMsgSize);
for (size_t i = (size_t)numMsg; i < newMsgSize; ++i)
initReblockPendingMsg(of, i);
assert(newMsgSize <= INT_MAX);
of->msgSize = (int)newMsgSize;
}
}
static size_t
fwFileWriteAtReblock(int fileID, const void *buffer, size_t len)
{
aFiledataM *of
= listSetGet(bibAFiledataM, fileIDTest, (void *)(intptr_t)fileID);
xassert(of && len <= INT_MAX);
MPI_Comm commPio = commInqCommPio();
int sizePio = commInqSizePio(),
rankPio = commInqRankPio();
/* find position to write to */
of->collWriteSize[rankPio] = (long)len;
xmpi(MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
of->collWriteSize, 1, MPI_LONG, commPio));
/* figure out which block buffers intersect locally held data and
* what remotely held data intersects buffers on task */
int blockSize = of->blockSize;
MPI_Offset fWOfs = of->pos;
int numBlockBuf = of->numBlockBuf;
int numMsg = of->numMsg;
const unsigned char *inBuf = buffer;
for (int collRank = 0; collRank < sizePio; ++collRank)
if (of->collWriteSize[collRank])
{
long remaining = of->collWriteSize[collRank];
do {
MPI_Offset collBlockSize = (MPI_Offset)blockSize * (MPI_Offset)sizePio;
int pass = (int)(fWOfs / collBlockSize / numBlockBuf);
int inBlockPos = (int)(fWOfs % (MPI_Offset)blockSize);
int txLen = (int)lmin(blockSize - inBlockPos, remaining);
int destRank = (int)(fWOfs / (MPI_Offset)blockSize) % sizePio;
if (destRank == rankPio)
{
int blockBufIdx = (int)(fWOfs / collBlockSize) % numBlockBuf;
if (of->pending[blockBufIdx].pass >= 0
&& of->pending[blockBufIdx].pass != pass)
{
of->numMsg = numMsg;
flushReblockBuffer(of, blockBufIdx);
numMsg = of->numMsg;
}
if (collRank != rankPio)
{
reblockMoreMsgs(of, numMsg);
/* this rank is to write out (part of) the data, but it
* resides on another rank */
of->msgs[numMsg] = (struct IOmsg){
.pos = blockBufIdx,
.len = txLen,
.rank = collRank,
.direction = RECV,
};
xmpi(MPI_Irecv(of->blockBuf
+ (size_t)blockBufIdx * (size_t)blockSize
+ (size_t)inBlockPos,
txLen, MPI_UNSIGNED_CHAR, collRank, BLOCK_XFER,
commPio, of->reqs + numMsg));
++numMsg;
++(of->pending[blockBufIdx].incoming);
}
else /* if (collRank == rankPio) */
{
memcpy(of->blockBuf
+ (size_t)blockBufIdx * (size_t)blockSize
+ (size_t)inBlockPos,
inBuf, (size_t)txLen);
inBuf += txLen;
}
of->pending[blockBufIdx].pass = pass;
}
else if (collRank == rankPio)
{
reblockMoreMsgs(of, numMsg);
/* this rank has the data and will send it to the one doing
* the writing */
of->msgs[numMsg] = (struct IOmsg){
.pos = -1,
.len = txLen,
.rank = destRank,
.direction = SEND,
};
void *restrict buf = of->msgs[numMsg].sendBuf
= Realloc(of->msgs[numMsg].sendBuf, (size_t)txLen);
memcpy(buf, inBuf, (size_t)txLen);
xmpi(MPI_Isend(buf, txLen, MPI_UNSIGNED_CHAR,
destRank, BLOCK_XFER, commPio, of->reqs + numMsg));
inBuf += txLen;
++numMsg;
}
fWOfs += txLen;
remaining -= txLen;
} while (remaining);
}
of->numMsg = numMsg;
of->pos = fWOfs;
return len;
}
/***************************************************************/
static int fcFileWriteAtReblock(int fileID)
{
aFiledataM *of
= listSetGet(bibAFiledataM, fileIDTest, (void *)(intptr_t)fileID);
if (!of)
xabort("listSet, fileID=%d not found", fileID);
int iret = listSetRemove(bibAFiledataM, fileIDTest, (void *)(intptr_t)fileID);
return iret;
}
/***************************************************************/
static void
elemCheck(void *q, void *nm)
{
aFiledataM *afm = q;
const char *name = nm;
if (!strcmp(name, afm->name))
xabort("Filename %s has already been added to set\n", name);
}
static int
fowFileWriteAtReblock(const char *filename, const char *mode)
{
static unsigned long buffersize = 0;
int id;
enum {
bcastRoot = 0
};
MPI_Comm commPio = commInqCommPio ();
int rankPio = commInqRankPio ();
if ((mode[0] != 'w' && mode[0] != 'W') || mode[0] == 0 || mode[1] != 0)
xabort("Unsupported mode \"%s\" in parallel file open.", mode);
/* broadcast buffersize to collectors ( just once, for all files )*/
if (!buffersize)
{
if (rankPio == bcastRoot)
buffersize = findWriteAccumBufsize();
xmpi(MPI_Bcast(&buffersize, 1, MPI_UNSIGNED_LONG, bcastRoot, commPio));
}