Commit 4bc1485a authored by Deike Kleberg's avatar Deike Kleberg
Browse files

IO mode PIO_POSIXFPGUARD running with new pioinfo.

parent 4e4cb9fa
......@@ -25,8 +25,8 @@ static int nlev[nVars] = {1,1,5,5,2};
#define maxlev 5
#define ntsteps 3
//int IOModus = PIO_POSIX_FPGUARD_SENDRECV;
int IOModus = PIO_MPI_NONB;
int IOModus = PIO_POSIX_FPGUARD_SENDRECV;
//int IOModus = PIO_MPI_NONB;
void modelRun ()
......
......@@ -61,6 +61,7 @@ libcdi_la_SOURCES = \
pio_interface.c \
pio_interface.h \
pio_mpinonb.c \
pio_posixfpguardsendrecv.c \
pio_queue.c \
pio_rpc.c \
pio_rpc.h \
......
......@@ -82,9 +82,10 @@ am_libcdi_la_OBJECTS = basetime.lo binary.lo calendar.lo cdf.lo \
cgribexlib.lo dmemory.lo error.lo extralib.lo file.lo \
gaussgrid.lo gribapi.lo grid.lo ieglib.lo institution.lo \
model.lo namespace.lo pio.lo pio_comm.lo pio_dbuffer.lo \
pio_interface.lo pio_mpinonb.lo pio_queue.lo pio_rpc.lo \
pio_server.lo pio_util.lo resource_handle.lo servicelib.lo \
stream_cdf.lo stream_cgribex.lo stream_ext.lo stream_grb.lo \
pio_interface.lo pio_mpinonb.lo pio_posixfpguardsendrecv.lo \
pio_queue.lo pio_rpc.lo pio_server.lo pio_util.lo \
resource_handle.lo servicelib.lo stream_cdf.lo \
stream_cgribex.lo stream_ext.lo stream_grb.lo \
stream_gribapi.lo stream_history.lo stream_ieg.lo \
stream_int.lo stream_record.lo stream_srv.lo stream_var.lo \
table.lo taxis.lo timebase.lo tsteps.lo util.lo varscan.lo \
......@@ -330,6 +331,7 @@ libcdi_la_SOURCES = \
pio_interface.c \
pio_interface.h \
pio_mpinonb.c \
pio_posixfpguardsendrecv.c \
pio_queue.c \
pio_rpc.c \
pio_rpc.h \
......@@ -510,6 +512,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_dbuffer.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_interface.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_mpinonb.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixfpguardsendrecv.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_queue.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_rpc.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_server.Plo@am__quote@
......
......@@ -203,7 +203,7 @@ extern "C" {
#define PIO_POSIX_FPGUARD_THREAD_REFUSE 5
#define PIO_POSIX_FPGUARD_THREAD 6
#define PIO_MAX_IOMODUS PIO_MPI_NONB
#define PIO_MAX_IOMODUS PIO_POSIX_FPGUARD_SENDRECV
//#define TEST_LOCAL 0
// set to 1 if running compareResourceArray with one PE
......
......@@ -92,11 +92,11 @@ size_t pioFileWrite ( int fileID, int tsID, const void *buffer, size_t len )
break;
*/
#endif
/*
case PIO_POSIX_FPGUARD_SENDRECV:
iret = fwPOSIXFPGUARDSENDRECV ( fileID, tsID, buffer, len );
break;
*/
/*
case PIO_POSIX_FPGUARD_THREAD:
iret = fwPOSIXFPGUARDTHREAD ( fileID, tsID, buffer, len );
......@@ -134,11 +134,11 @@ int pioFileClose ( int id )
break;
*/
#endif
/*
case PIO_POSIX_FPGUARD_SENDRECV:
iret = fcPOSIXFPGUARDSENDRECV ( id );
break;
*/
/*
case PIO_POSIX_FPGUARD_THREAD:
iret = fcPOSIXFPGUARDTHREAD ( id );
......@@ -177,11 +177,11 @@ int pioFileOpenW ( const char *filename )
break;
*/
#endif
/*
case PIO_POSIX_FPGUARD_SENDRECV:
iret = fowPOSIXFPGUARDSENDRECV ( filename );
break;
*/
/*
case PIO_POSIX_FPGUARD_THREAD:
iret = fowPOSIXFPGUARDTHREAD ( filename );
......@@ -231,11 +231,11 @@ void backendInit ( void )
break;
*/
#endif
/*
case PIO_POSIX_FPGUARD_SENDRECV:
collectingData = initPOSIXFPGUARDSENDRECV ();
case PIO_POSIX_FPGUARD_SENDRECV:
initPOSIXFPGUARDSENDRECV ();
break;
*/
/*
case PIO_POSIX_FPGUARD_THREAD:
collectingData = initPOSIXFPGUARDTHREAD ();
......@@ -260,6 +260,11 @@ void backendInit ( void )
void backendFinalize ( void )
{
#ifdef USE_MPI
//xdebug ("#############################");
commDestroy ();
//xdebug ("#############################");
MPI_Finalize ();
exit ( 0 );
#endif
}
/*
......
......@@ -248,6 +248,29 @@ void commDefCommColl ( int isProcColl )
}
MPI_Comm commInqCommColl ( void )
{
assert ( pioinfo2 != NULL &&
pioinfo2->commColl != MPI_COMM_NULL );
return pioinfo2->commColl;
}
int commInqSizeColl ( void )
{
assert ( pioinfo2 != NULL &&
pioinfo2->sizeColl != CDI_UNDEFID );
return pioinfo2->sizeColl;
}
int commInqRankColl ( void )
{
assert ( pioinfo2 != NULL &&
pioinfo2->rankColl != CDI_UNDEFID );
return pioinfo2->rankColl;
}
static int cmpr ( const void *a, const void *b )
{
......@@ -308,6 +331,7 @@ void commDefCommNode ( void )
&pioinfo2->commNode ));
xmpi ( MPI_Comm_size ( pioinfo2->commNode, &pioinfo2->sizeNode ));
xmpi ( MPI_Comm_rank ( pioinfo2->commNode, &pioinfo2->rankNode ));
pioinfo2->specialRankNode = pioinfo2->sizeNode - 1;
free ( allHosts0 );
free ( allHosts );
......@@ -341,6 +365,14 @@ int commInqRankNode ( void )
}
int commInqSpecialRankNode ( void )
{
assert ( pioinfo2 != NULL &&
pioinfo2->specialRankNode != CDI_UNDEFID );
return pioinfo2->specialRankNode;
}
void commSendNodeInfo ( void )
{
assert ( pioinfo2 != NULL &&
......@@ -469,7 +501,7 @@ void commEvalPhysNodes ( void )
for ( IOID = 0; IOID < pioinfo2->nProcsIO; IOID++ )
{
assert ( p1[nodeInfo[IOID].hostID - 1] < p2[nodeInfo[IOID].hostID - 1] );
//assert ( p1[nodeInfo[IOID].hostID - 1] < p2[nodeInfo[IOID].hostID - 1] );
if ( nodeInfo[IOID].isProcColl )
* p1[nodeInfo[IOID].hostID - 1]++ = IOID + pioinfo2->nProcsModel;
}
......@@ -573,6 +605,14 @@ MPI_Comm commInqCommCalc ( void )
}
int commInqNProcsColl ( void )
{
assert ( pioinfo2 != NULL &&
pioinfo2->nProcsColl != CDI_UNDEFID );
return pioinfo2->nProcsColl;
}
void commPrint ( FILE * fp )
{
int i;
......@@ -635,8 +675,8 @@ void commPrint ( FILE * fp )
if ( pioinfo2->nodeMap != NULL )
{
fprintf ( fp, "# nodeMap = " );
assert ( pioinfo2->nProcsIO != CDI_UNDEFID );
for ( i = 0; i < pioinfo2->nProcsIO; i++ )
assert ( pioinfo2->nProcsColl != CDI_UNDEFID );
for ( i = 0; i < pioinfo2->nProcsColl; i++ )
fprintf ( fp, "%d ", pioinfo2->nodeMap[i] );
fprintf ( fp, "\n" );
}
......
......@@ -86,16 +86,22 @@ void commDefCommNode ( void );
MPI_Comm commInqCommNode ( void );
int commInqSizeNode ( void );
int commInqRankNode ( void );
int commInqSpecialRankNode ( void );
void commDefCommColl ( int );
MPI_Comm commInqCommColl ( void );
int commInqSizeColl ( void );
int commInqRankColl ( void );
void commSendNodeInfo ( void );
void commRecvNodeMap ( void );// todo switch to gatherNodeInfo inside commpio
int * commInqNodeSizes ( void );
int commInqNNodes ( void );
void commDefCommColl ( int );
void commEvalPhysNodes ( void );
void commDefCommsIO ( void );
MPI_Comm commInqCommsIO ( int );
MPI_Comm commInqCommCalc ( void );
int commInqNProcsColl ( void );
void commPrint ( FILE * );
......
......@@ -53,30 +53,30 @@ typedef struct
} tag_t;
/* pio.c */
int setTag ( int, int );
tag_t * getTag ( int );
void ungetTag ( tag_t * );
int setTag ( int, int );
tag_t * getTag ( int );
void ungetTag ( tag_t * );
/* pio_dbuffer.c */
int dbuffer_init ( struct dBuffer **, size_t );
int dbuffer_push ( struct dBuffer *, unsigned char *, size_t );
size_t dbuffer_data_size ( struct dBuffer * );
int dbuffer_reset ( struct dBuffer * );
void dbuffer_cleanup ( struct dBuffer ** );
size_t dbuffer_free ( struct dBuffer * );
int dbuffer_init ( struct dBuffer **, size_t );
int dbuffer_push ( struct dBuffer *, unsigned char *, size_t );
size_t dbuffer_data_size ( struct dBuffer * );
int dbuffer_reset ( struct dBuffer * );
void dbuffer_cleanup ( struct dBuffer ** );
size_t dbuffer_free ( struct dBuffer * );
/* pio_queue.c */
queue_t * queueInit ( valDestroyFunction, keyCompareFunction );
void queueDestroy ( queue_t * );
int queuePush ( queue_t *, void *, int, ... );
void * queueIdx2val ( queue_t *, int );
int queueDelNode ( queue_t *, int );
void queueDestroy ( queue_t * );
int queuePush ( queue_t *, void *, int, ... );
void * queueIdx2val ( queue_t *, int );
int queueDelNode ( queue_t *, int );
/* pio_mpinonb.c */
int fowMPINONB ( const char * );
int fcMPINONB ( int );
size_t fwMPINONB( int, int, const void *, size_t );
void initMPINONB ( void );
int fowMPINONB ( const char * );
int fcMPINONB ( int );
size_t fwMPINONB( int, int, const void *, size_t );
void initMPINONB ( void );
/* pio_posixasynch.c */
#ifndef _SX
......@@ -87,11 +87,11 @@ size_t fwPOSIXASYNCH( int, int, const void *, size_t );
int initPOSIXASYNCH ( void );
#endif
/* pio_posixfpguardsendrecv.c */
void fpgPOSIXFPGUARDSENDRECV ( int );
int fowPOSIXFPGUARDSENDRECV ( const char * );
int fcPOSIXFPGUARDSENDRECV ( int );
size_t fwPOSIXFPGUARDSENDRECV ( int, int, const void *, size_t );
int initPOSIXFPGUARDSENDRECV ( void );
void fpgPOSIXFPGUARDSENDRECV ( int );
int fowPOSIXFPGUARDSENDRECV ( const char * );
int fcPOSIXFPGUARDSENDRECV ( int );
size_t fwPOSIXFPGUARDSENDRECV ( int, int, const void *, size_t );
void initPOSIXFPGUARDSENDRECV ( void );
/* pio_posixfpguardthread.c */
void * fpgPOSIXFPGUARDTHREAD ( void * );
......
......@@ -199,10 +199,10 @@ void varMapGen ( int * vSizes, int * sSizes, int * varMapping,
int * weightsVarsNode;
int * varMappingNode;
int nVarsNode, summandRank = 0;
int nProcsIO = commInqNProcsIO ();
int nProcsCalc = commInqNProcsModel ();
int nProcsColl = commInqNProcsColl ();
int nProcsModel = commInqNProcsModel ();
int buckets[nProcsIO];
int buckets[nProcsColl];
for ( i = 0; i < nStreams; i++ )
{
......@@ -256,7 +256,7 @@ void varMapGen ( int * vSizes, int * sSizes, int * varMapping,
if ( * ( streamMapping + j ) == i )
for ( k = 0; k < * ( sSizes + j ); k ++ )
* ( varMapping + offset ++ ) =
* ( varMappingNode + offsetN ++ ) + nProcsCalc + summandRank;
* ( varMappingNode + offsetN ++ ) + nProcsModel + summandRank;
else
offset += * ( sSizes + j );
......@@ -270,11 +270,11 @@ void varMapGen ( int * vSizes, int * sSizes, int * varMapping,
if ( ddebug )
{
xprintArray ( "varMapping", varMapping, nVars, DATATYPE_INT );
for ( i = 0; i < nProcsIO; i++ )
for ( i = 0; i < nProcsColl; i++ )
buckets[i] = 0;
for ( i = 0; i < nVars; i ++ )
buckets[*(varMapping + i ) - nProcsCalc] += * ( vSizes + i );
xprintArray ( "buckets", buckets, nProcsIO, DATATYPE_INT );
buckets[*(varMapping + i ) - nProcsModel] += * ( vSizes + i );
xprintArray ( "buckets", buckets, nProcsColl, DATATYPE_INT );
}
}
#endif
......@@ -285,16 +285,16 @@ void varMapGen ( int * vSizes, int * sSizes, int * varMapping,
void defVarDeco ( int vlistID, int varID )
{
int varSize, cRank, lChunk, rem, lOffset;
int nProcsCalc = commInqNProcsModel ();
deco_t deco[nProcsCalc];
int nProcsModel = commInqNProcsModel ();
deco_t deco[nProcsModel];
varSize = vlistInqVarSize ( vlistID, varID );
for ( cRank = 0; cRank < nProcsCalc; cRank++ )
for ( cRank = 0; cRank < nProcsModel; cRank++ )
{
lChunk = varSize / nProcsCalc;
lChunk = varSize / nProcsModel;
lOffset = cRank * lChunk;
rem = varSize % nProcsCalc;
rem = varSize % nProcsModel;
if ( cRank < rem )
{
lChunk++;
......@@ -308,7 +308,7 @@ void defVarDeco ( int vlistID, int varID )
deco[cRank].chunk = lChunk;
}
vlistDefVarDeco ( vlistID, varID, nProcsCalc, &deco[0] );
vlistDefVarDeco ( vlistID, varID, nProcsModel, &deco[0] );
}
#endif
......@@ -373,7 +373,7 @@ void modelWinCleanup ( void )
#ifdef USE_MPI
int IOID;
for ( IOID = 0; IOID < commInqNProcsIO (); IOID ++ )
for ( IOID = 0; IOID < commInqNProcsColl (); IOID ++ )
{
if ( winPostSet[IOID] )
xmpi ( MPI_Win_wait ( win[IOID] ));
......@@ -404,8 +404,8 @@ static
int IOID, nstreams, * streamIndexArray, streamNo, vlistID, nvars, varID;
int IOIDchunk = 0;
int rankGlob = commInqRankGlob ();
int nProcsIO = commInqNProcsIO ();
int nProcsCalc = commInqNProcsModel ();
int nProcsColl = commInqNProcsColl ();
int nProcsModel = commInqNProcsModel ();
nstreams = reshCountType ( &streamOps );
streamIndexArray = xmalloc ( nstreams * sizeof ( int ));
......@@ -417,7 +417,7 @@ static
for ( varID = 0; varID < nvars; varID++ )
{
IOID = CDI_UNDEFID;
IOID = vlistInqVarIOrank ( vlistID, varID ) - nProcsCalc;
IOID = vlistInqVarIOrank ( vlistID, varID ) - nProcsModel;
IOIDchunk = vlistInqVarDecoChunk ( vlistID, varID, rankGlob );
assert ( IOID != CDI_UNDEFID &&
IOIDchunk > 0 );
......@@ -427,12 +427,12 @@ static
}
free ( streamIndexArray );
for ( IOID = 0; IOID < nProcsIO; IOID++ )
for ( IOID = 0; IOID < nProcsColl; IOID++ )
{
winBufferSize[IOID] += ( winBufferOverhead * sizeof ( double ));
assert ( winBufferSize[IOID] < winBufferSizeMax );
}
xprintArray ( "winBufferSize", winBufferSize, nProcsIO, DATATYPE_INT );
xprintArray ( "winBufferSize", winBufferSize, nProcsColl, DATATYPE_INT );
return;
#endif
......@@ -447,21 +447,21 @@ void modelWinCreate ( void )
#ifdef USE_MPI
int IOID, ranks[1];
int nProcsIO = commInqNProcsIO ();
int nProcsCalc = commInqNProcsModel ();
int nProcsColl = commInqNProcsColl ();
int nProcsModel = commInqNProcsModel ();
winBufferSize = xmalloc ( nProcsIO * sizeof ( winBufferSize[0] ));
winBuffer = xmalloc ( nProcsIO * sizeof ( winBuffer[0] ));
winBufferHead = xmalloc ( nProcsIO * sizeof ( winBufferHead[0] ));
win = xmalloc ( nProcsIO * sizeof ( win[0] ));
winPostSet = xmalloc ( nProcsIO * sizeof ( winPostSet[0] ));
winFirstWrite = xmalloc ( nProcsIO * sizeof ( winFirstWrite[0] ));
groupsIONetto = xmalloc ( nProcsIO * sizeof ( groupsIONetto[0] ));
winBufferSize = xmalloc ( nProcsColl * sizeof ( winBufferSize[0] ));
winBuffer = xmalloc ( nProcsColl * sizeof ( winBuffer[0] ));
winBufferHead = xmalloc ( nProcsColl * sizeof ( winBufferHead[0] ));
win = xmalloc ( nProcsColl * sizeof ( win[0] ));
winPostSet = xmalloc ( nProcsColl * sizeof ( winPostSet[0] ));
winFirstWrite = xmalloc ( nProcsColl * sizeof ( winFirstWrite[0] ));
groupsIONetto = xmalloc ( nProcsColl * sizeof ( groupsIONetto[0] ));
getWinBufferSizes ();
ranks[0] = commInqNProcsModel ();
for ( IOID = 0; IOID < nProcsIO; IOID ++ )
for ( IOID = 0; IOID < nProcsColl; IOID ++ )
{
assert ( winBufferSize[IOID] > 0 );
winBuffer[IOID] = NULL;
......@@ -534,10 +534,9 @@ void pioEndDef ( void )
int bufferSize;
int rankGlob = commInqRankGlob ();
//varsMapNDeco ( nNodes, nodeSizes );
varsMapNDeco ( commInqNNodes (), commInqNodeSizes ());
if ( rankGlob < commInqNProcsIO ())
if ( rankGlob < commInqNProcsColl ())
{
reshPackBufferCreate ( &buffer, &bufferSize, commInqCommsIO ( rankGlob ));
......@@ -571,7 +570,7 @@ void pioFinalize ()
#ifdef USE_MPI
int i, ibuffer = 1111;
for ( i = 0; i < commInqNProcsIO (); i++ )
for ( i = 0; i < commInqNProcsColl (); i++ )
{
xmpi ( MPI_Send ( &ibuffer, 1, MPI_INT, commInqNProcsModel (),
FINALIZE, commInqCommsIO ( i )));
......@@ -591,8 +590,8 @@ void pioWriteTimestep ( int ts, int vdate, int vtime )
int IOID, buffer[timestepSize], iAssert = 0;
double end = ( double ) END, size = sizeof ( double );
int rankGlob = commInqRankGlob ();
int nProcsIO = commInqNProcsIO ();
int nProcsCalc = commInqNProcsModel ();
int nProcsColl = commInqNProcsColl ();
int nProcsModel = commInqNProcsModel ();
assert ( ts >= 0 &&
vdate >= 0 &&
......@@ -602,7 +601,7 @@ void pioWriteTimestep ( int ts, int vdate, int vtime )
buffer[1] = vdate;
buffer[2] = vtime;
for ( IOID = 0; IOID < nProcsIO; IOID++ )
for ( IOID = 0; IOID < nProcsColl; IOID++ )
{
memcpy ( winBufferHead[IOID], &end, size);
winBufferHead[IOID] ++;
......@@ -612,9 +611,9 @@ void pioWriteTimestep ( int ts, int vdate, int vtime )
winFirstWrite[IOID] = 1;
}
if ( rankGlob < nProcsIO )
if ( rankGlob < nProcsColl )
{
xmpi ( MPI_Send ( &buffer[0], timestepSize, MPI_INTEGER, nProcsCalc,
xmpi ( MPI_Send ( &buffer[0], timestepSize, MPI_INTEGER, nProcsModel,
WRITETS, commInqCommsIO ( rankGlob )));
xdebug ( "sent message WRITETS" );
}
......
......@@ -16,6 +16,7 @@
#include "mpi.h"
#include "pio.h"
#include "pio_comm.h"
#include "pio_impl.h"
#include "pio_util.h"
......@@ -33,8 +34,6 @@ extern double accumSuspend;
extern double accumWait;
extern double accumWrite;
extern pioInfo *pioinfo;
typedef struct
{
char *name;
......@@ -189,6 +188,7 @@ void fpgPOSIXFPGUARDSENDRECV ( int ncollectors )
queue_t *bibBFiledataPF;
long amount;
char errorString[maxErrorString];
MPI_Comm commNode = commInqCommNode ();
xdebug ( "ncollectors=%d", ncollectors);
......@@ -196,8 +196,7 @@ void fpgPOSIXFPGUARDSENDRECV ( int ncollectors )
for ( ;; )
{
xdebug ();
xmpi ( MPI_Probe ( MPI_ANY_SOURCE, MPI_ANY_TAG, pioinfo->comm, &status ));
xmpi ( MPI_Probe ( MPI_ANY_SOURCE, MPI_ANY_TAG, commNode, &status ));
source = status.MPI_SOURCE;
rtag = getTag ( status.MPI_TAG );
......@@ -229,7 +228,7 @@ void fpgPOSIXFPGUARDSENDRECV ( int ncollectors )
xmpi ( MPI_Sendrecv ( &( bfd->offset ), 1, MPI_LONG, source, status.MPI_TAG,
&amount, 1, MPI_LONG, source, status.MPI_TAG,
pioinfo->comm, &status ));
commNode, &status ));
bfd->offset += amount;
......@@ -254,7 +253,7 @@ void fpgPOSIXFPGUARDSENDRECV ( int ncollectors )
xmpi ( MPI_Sendrecv ( &( bfd->offset ), 1, MPI_LONG, source, status.MPI_TAG,
&amount, 1, MPI_LONG, source, status.MPI_TAG,
pioinfo->comm, &status ));
commNode, &status ));
bfd->offset += amount;
......@@ -279,7 +278,7 @@ void fpgPOSIXFPGUARDSENDRECV ( int ncollectors )
xmpi ( MPI_Sendrecv ( &( bfd->offset ), 1, MPI_LONG, source, status.MPI_TAG,
&amount, 1, MPI_LONG, source, status.MPI_TAG,
pioinfo->comm, &status ));
commNode, &status ));
bfd->offset += amount;
......@@ -321,6 +320,8 @@ void writePF ( aFiledataPF *afd, int id )
int error, tag;
MPI_Status status;
char errorString[maxErrorString];
int specialRank = commInqSpecialRankNode ();
MPI_Comm commNode = commInqCommNode ();
/* send buffersize, recv offset */
......@@ -329,9 +330,9 @@ void writePF ( aFiledataPF *afd, int id )
tag = setTag ( id, afd->command );
xmpi ( MPI_Sendrecv ( &amountL, 1, MPI_LONG, pioinfo->specialRank, tag,
&offset, 1, MPI_LONG, pioinfo->specialRank, tag,
pioinfo->comm, &status ));
xmpi ( MPI_Sendrecv ( &amountL, 1, MPI_LONG, specialRank, tag,
&offset, 1, MPI_LONG, specialRank, tag,
commNode, &status ));
xdebug ( "id=%d, command=%d, amount=%llu, send amountL=%ld, recv offset=%ld",
id, afd->command, (unsigned long long)amount,
......@@ -400,7 +401,7 @@ size_t fwPOSIXFPGUARDSENDRECV( int fileID, int tsID, const void *buffer, size_t
curr = curr->next;
}
oldTsID = tsID;
xmpi ( MPI_Barrier ( pioinfo->collectorComm ));
xmpi ( MPI_Barrier ( commInqCommColl ()));
}
afd = ( aFiledataPF * ) queueIdx2val ( bibAFiledataPF, fileID );
......@@ -490,8 +491,7 @@ void queueCheckFP ( queue_t *q, const char *name )
int fowPOSIXFPGUARDSENDRECV ( const char *filename )
{
int bcastRank, id;
MPI_Comm bcastComm;
int root = 0, id;
static aFiledataPF *afd;
static bool firstOpen = true;
static long buffersize = 0;
......@@ -500,26 +500,21 @@ int fowPOSIXFPGUARDSENDRECV ( const char *filename )
/* broadcast buffersize to collectors */
if ( firstOpen )
{
if ( ddebug && pioinfo->rank == 0)
xdebug ( "name=%s, broadcast buffersize to collectors ...",
filename );
bcastRank = 0;
bcastComm = pioinfo->collectorComm;
if ( pioinfo->rank == bcastRank )
{
if ( commInqRankColl () == root )
{
xdebug ( "name=%s, broadcast buffersize to collectors ...",
filename );
if ( getenv( "BUFSIZE" ) != NULL )
buffersize = atol ( getenv ( "BUFSIZE" ));
if ( buffersize < initial_buffersize )
buffersize = initial_buffersize;
}
xmpi ( MPI_Bcast ( &buffersize, 1, MPI_LONG, 0, bcastComm ));
firstOpen = false;
}
xmpi ( MPI_Bcast ( &buffersize, 1, MPI_LONG, root, commInqCommColl ()));
firstOpen = false;
}