Commit 0c4617c4 authored by Deike Kleberg's avatar Deike Kleberg
Browse files

interim 8

parent 0c84f4e7
......@@ -5221,7 +5221,6 @@ int gribOpen(const char *filename, const char *mode)
fileSetBufferType(fileID, FILE_BUFTYPE_MMAP);
}
#endif
return (fileID);
}
......
......@@ -23,6 +23,7 @@ size_t getpagesize(void);
#ifdef USE_MPI
#include "pio.h"
#include "pio_comm.h"
#include "cdi.h"
#include "namespace.h"
......@@ -1074,7 +1075,7 @@ int fileOpen(const char *filename, const char *mode)
/* begin deike */
#ifdef USE_MPI
if ( memcmp ( mode, "w", 1 ) == 0 && pioinfo->type != PIO_NONE )
if ( memcmp ( mode, "w", 1 ) == 0 && commInqIOModus () != PIO_NONE )
return pioFileOpenW ( filename );
#endif
/* end deike */
......@@ -1170,7 +1171,7 @@ int fileClose(int fileID)
/* begin deike */
#ifdef USE_MPI
if ( pioinfo->type != PIO_NONE )
if ( commInqIOModus () != PIO_NONE )
return pioFileClose ( fileID );
#endif
......
......@@ -8,6 +8,8 @@
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include "pio.h"
#include "cdi.h"
#include "pio_util.h"
......@@ -80,7 +82,7 @@ size_t pioFileWrite ( int fileID, int tsID, const void *buffer, size_t len )
{
size_t iret = CDI_UNDEFID;
switch ( pioinfo->type )
switch ( commInqIOModus ())
{
case PIO_MPI_NONB:
iret = fwMPINONB ( fileID, tsID, buffer, len );
......@@ -120,7 +122,7 @@ size_t pioFileWrite ( int fileID, int tsID, const void *buffer, size_t len )
int pioFileClose ( int id )
{
int iret = CDI_UNDEFID;
switch ( pioinfo->type )
switch ( commInqIOModus ())
{
case PIO_MPI_NONB:
iret = fcMPINONB ( id );
......@@ -161,7 +163,7 @@ int pioFileOpenW ( const char *filename )
{
int iret = CDI_UNDEFID;
switch ( pioinfo->type )
switch ( commInqIOModus ())
{
case PIO_MPI_NONB:
iret = fowMPINONB ( filename );
......@@ -195,95 +197,6 @@ int pioFileOpenW ( const char *filename )
return iret;
}
/***************************************************************/
static int cmpr ( const void *a, const void *b )
{
return strcmp ( *( char ** ) a, *( char ** ) b);
}
/***************************************************************/
void setPioCommunicator ( MPI_Comm *myComm, MPI_Comm commF2C, int *color,
int *nnodes )
{
int size, rank, len, npes_node, key, test, i, j;
char *myHost, **allHosts, *allHosts0, *curr;
char hostname [ MPI_MAX_PROCESSOR_NAME + 1 ];
xmpi ( MPI_Comm_size ( commF2C, &size ));
xmpi ( MPI_Comm_rank ( commF2C, &rank ));
myHost = ( char * ) xmalloc ( MPI_MAX_PROCESSOR_NAME * sizeof ( char ));
memset ( myHost, 0, MPI_MAX_PROCESSOR_NAME * sizeof ( char ));
xmpi ( MPI_Get_processor_name ( myHost, &len ));
if ( myHost[0] == '\0' ) xabort ( "did not succeed to set hostname" );
if ( ddebug == MAXDEBUG )
{
strncpy ( hostname, myHost, len );
hostname [ len ] = '\0';
xdebug ( "myHost = %s", hostname );
}
allHosts = ( char ** ) xmalloc ( size * sizeof ( char * ));
allHosts[0] = ( char * ) xmalloc ( size * MPI_MAX_PROCESSOR_NAME *
sizeof ( char ));
allHosts0 = allHosts[0];
for ( i = 1; i < size; i++ )
allHosts[i] = allHosts[0] + i * MPI_MAX_PROCESSOR_NAME;
xmpi ( MPI_Allgather ( myHost, MPI_MAX_PROCESSOR_NAME, MPI_CHAR,
& ( allHosts[0][0] ), MPI_MAX_PROCESSOR_NAME,
MPI_CHAR, commF2C ));
qsort ( allHosts, size, sizeof ( char * ), cmpr );
*color = 0;
i = 0;
j = 0;
while ( i < size )
{
curr = allHosts[i];
j++;
if (( test = strcmp ( myHost, curr )) == 0 ) *color = j;
while ( ++i < size )
if (( test = strcmp ( allHosts[i], curr )) != 0)
break;
}
*nnodes = j;
if ( *color == 0 ) xabort ( "Color is not set" );
npes_node = size / ( * nnodes );
key = rank % npes_node;
xmpi ( MPI_Comm_split ( commF2C, *color, key, myComm));
commDefCommNode ( * myComm );
commDefHostName ( myHost, len );
commDefHostID ( * color );
commDefNNodes ( * nnodes );
free ( allHosts0 );
free ( allHosts );
free ( myHost );
xdebug ( "color=%d", *color );
return;
}
#endif
/***************************************************************/
......@@ -292,47 +205,37 @@ MPI_Comm backendInit ( int ptype, MPI_Comm comm, int *color, int *nnodes )
{
int size, rank;
int collectingData = 1;
*nnodes = 1;
*color = 1;
#ifdef USE_MPI
xmpi ( MPI_Comm_size ( comm, &size ));
xmpi ( MPI_Comm_rank ( comm, &rank ));
/*
if ( ptype < 0 || ptype > maxPtype )
xabort ( "PIOTYPE is no valid modus" );
#ifdef _SX
if ( ptype == PIO_POSIX_ASYNCH )
xabort ( "PIO_POSIX_ASYNCH does not work on SX" );
#endif
*/
pioinfo = ( pioInfo * ) xmalloc ( sizeof ( pioInfo ));
pioinfo->type = ptype;
setPioCommunicator ( & ( pioinfo->comm ), comm, color, nnodes );
commDefCommNode ();
/*
if (( pioinfo->type == PIO_NONE ) && ( *nnodes != size ))
xabort ( "PIOTYPE, NNODES: not a valid combination" );
*/
pioinfo->color = *color;
xmpi ( MPI_Comm_rank ( pioinfo->comm, &( pioinfo->rank )));
xmpi ( MPI_Comm_size ( pioinfo->comm, &( pioinfo->size )));
xdebug( "IOPE%d in bInit(), ptype=%d, initial_buffersize=%ld: "
"after init pioinfo ...",
pioinfo->rank, pioinfo->type, initial_buffersize );
pioinfo->collectorComm = MPI_COMM_NULL;
switch ( pioinfo->type )
switch ( commInqIOModus ())
{
/*
case PIO_NONE:
xmpi ( MPI_Comm_dup ( pioinfo->comm, &( pioinfo->collectorComm )));
collectingData = 1;
break;
*/
case PIO_MPI_NONB:
collectingData = initMPINONB ();
break;
......@@ -343,9 +246,11 @@ MPI_Comm backendInit ( int ptype, MPI_Comm comm, int *color, int *nnodes )
break;
*/
#endif
case PIO_POSIX_FPGUARD_SENDRECV:
/*
case PIO_POSIX_FPGUARD_SENDRECV:
collectingData = initPOSIXFPGUARDSENDRECV ();
break;
*/
/*
case PIO_POSIX_FPGUARD_THREAD:
collectingData = initPOSIXFPGUARDTHREAD ();
......@@ -365,7 +270,9 @@ MPI_Comm backendInit ( int ptype, MPI_Comm comm, int *color, int *nnodes )
#endif
#ifdef USE_MPI
return pioinfo->collectorComm;
// TODO: change interface!
assert ( pioinfo2 != NULL && pioinfo2->commColl != MPI_COMM_NULL );
return pioinfo2->commColl;
#else
return 0;
#endif
......@@ -376,13 +283,14 @@ MPI_Comm backendInit ( int ptype, MPI_Comm comm, int *color, int *nnodes )
void backendFinalize ()
{
#ifdef USE_MPI
/*
switch ( pioinfo->type )
{
/*
case PIO_POSIX_FPGUARD_THREAD:
finalizePOSIXFPGUARDTHREAD ();
break;
*/
}
if ( pioinfo->collectorComm != MPI_COMM_NULL )
......@@ -392,6 +300,7 @@ void backendFinalize ()
xmpi ( MPI_Comm_free ( &( pioinfo->comm )));
free ( pioinfo );
*/
#endif
}
/*
......
......@@ -234,7 +234,7 @@ MPI_Comm commInqCommModel( void )
}
void commDefIsProcColl ( int isProcColl )
void commDefCommColl ( int isProcColl )
{
assert ( pioinfo2 != NULL &&
pioinfo2->commNode != MPI_COMM_NULL &&
......@@ -248,26 +248,88 @@ void commDefIsProcColl ( int isProcColl )
}
void commDefCommNode ( MPI_Comm commNode )
static int cmpr ( const void *a, const void *b )
{
assert ( pioinfo2 != NULL );
xmpi ( MPI_Comm_dup ( commNode, &pioinfo2->commNode ));
return strcmp ( *( char ** ) a, *( char ** ) b);
}
void commDefCommNode ( void )
{
int size, rank, len, i, j, test;
char * myHost, ** allHosts, * allHosts0, * curr;
char hostname [ MPI_MAX_PROCESSOR_NAME + 1 ];
assert ( pioinfo2 != NULL &&
pioinfo2->commPio != MPI_COMM_NULL );
size = pioinfo2->sizePio;
rank = pioinfo2->rankPio;
myHost = xmalloc ( MPI_MAX_PROCESSOR_NAME * sizeof ( myHost[0] ));
xmpi ( MPI_Get_processor_name ( myHost, &len ));
if ( myHost[0] == '\0' ) xabort ( "did not succeed to set hostname" );
strncpy ( pioinfo2->hostname, myHost, len );
pioinfo2->hostname[len] = '\0';
allHosts = xmalloc ( size * sizeof ( allHosts[0] ));
allHosts[0] = xmalloc ( size * MPI_MAX_PROCESSOR_NAME *
sizeof ( allHosts[0][0] ));
allHosts0 = allHosts[0];
for ( i = 1; i < size; i++ )
allHosts[i] = allHosts[0] + i * MPI_MAX_PROCESSOR_NAME;
xmpi ( MPI_Allgather ( myHost, MPI_MAX_PROCESSOR_NAME, MPI_CHAR,
& ( allHosts[0][0] ), MPI_MAX_PROCESSOR_NAME,
MPI_CHAR, pioinfo2->commPio ));
qsort ( allHosts, size, sizeof ( char * ), cmpr );
i = 0;
j = 0;
while ( i < size )
{
curr = allHosts[i];
j++;
if (( test = strcmp ( myHost, curr )) == 0 ) pioinfo2->nodeInfo.hostID = j;
while ( ++i < size )
if (( test = strcmp ( allHosts[i], curr )) != 0)
break;
}
pioinfo2->nodeInfo.nNodes = j;
if ( pioinfo2->nodeInfo.hostID == CDI_UNDEFID ) xabort ( "HostID is not set" );
xmpi ( MPI_Comm_split ( pioinfo2->commPio, pioinfo2->nodeInfo.hostID, 0,
&pioinfo2->commNode ));
xmpi ( MPI_Comm_size ( pioinfo2->commNode, &pioinfo2->sizeNode ));
xmpi ( MPI_Comm_rank ( pioinfo2->commNode, &pioinfo2->rankNode ));
free ( allHosts0 );
free ( allHosts );
free ( myHost );
return;
}
void commDefHostName ( char * hostname, size_t len )
MPI_Comm commInqCommNode ( void )
{
assert ( pioinfo2 != NULL );
strncpy ( pioinfo2->hostname, hostname, len );
}
assert ( pioinfo2 != NULL &&
pioinfo2->commNode != MPI_COMM_NULL );
return pioinfo2->commNode;
}
void commDefHostID ( int hostID )
int commInqRankNode ( void )
{
assert ( pioinfo2 != NULL );
pioinfo2->nodeInfo.hostID = hostID;
assert ( pioinfo2 != NULL &&
pioinfo2->rankNode != CDI_UNDEFID );
return pioinfo2->rankNode;
}
......@@ -288,19 +350,24 @@ void commSendNodeInfo ( void )
void commRecvNodeMap ( void )
{
MPI_Status status;
int source;
assert ( pioinfo2 != NULL &&
pioinfo2->nProcsColl != CDI_UNDEFID &&
pioinfo2->commGlob != MPI_COMM_NULL &&
pioinfo2->commGlob != MPI_COMM_NULL &&
pioinfo2->rankGlob != CDI_UNDEFID &&
pioinfo2->nProcsModel != CDI_UNDEFID &&
pioinfo2->nodeMap == NULL );
pioinfo2->nodeMap == NULL );
source = pioinfo2->rankGlob - pioinfo2->nProcsModel;
xmpi ( MPI_Probe ( source, NODEMAP, pioinfo2->commGlob, &status ));
xmpi ( MPI_Get_count ( &status, MPI_INTEGER, &pioinfo2->nProcsColl ));
pioinfo2->nodeMap = xmalloc ( pioinfo2->nProcsColl *
sizeof ( pioinfo2->nodeMap[0] ));
xmpi ( MPI_Recv ( pioinfo2->nodeMap, pioinfo2->nProcsColl, MPI_INTEGER,
pioinfo2->rankGlob - pioinfo2->nProcsModel, NODEMAP,
pioinfo2->commGlob, &status ));
source, NODEMAP, pioinfo2->commGlob, &status ));
}
......@@ -319,13 +386,6 @@ int commInqNNodes ( void )
}
void commDefNProcsColl ( int nProcsColl )
{
assert ( pioinfo2 != NULL );
pioinfo2->nProcsColl = nProcsColl;
}
void commEvalPhysNodes ( void )
{
nodeInfo_t * nodeInfo;
......
......@@ -36,6 +36,7 @@ typedef struct {
int sizeNode;
int rankNode;
char hostname [ MPI_MAX_PROCESSOR_NAME + 1 ];
nodeInfo_t nodeInfo;
int specialRankNode;
MPI_Comm commColl;
......@@ -47,7 +48,6 @@ typedef struct {
int rankCalc;
MPI_Comm * commsIO;
nodeInfo_t nodeInfo;
int nProcsColl;
int * nodeSizes;
int * nodeMap;
......@@ -78,13 +78,11 @@ void commDefCommPio ( void );
MPI_Comm commInqCommPio ( void );
MPI_Comm commInqCommModel ( void );
int commInqIsProcIO ( void );
void commDefIsProcColl ( int );
void commDefCommNode ( MPI_Comm );
void commDefHostName ( char *, size_t );
void commDefHostID ( int );
void commDefNNodes ( int );
void commDefCommNode ( void );
MPI_Comm commInqCommNode ( void );
int commInqRankNode ( void );
int commInqNNodes ( void );
void commDefNProcsColl ( int );
void commDefCommColl ( int );
void commEvalPhysNodes ( void );
void commDefCommsIO ( void );
MPI_Comm commInqCommsIO ( int );
......@@ -93,7 +91,6 @@ void commPrint ( FILE * );
void commSendNodeInfo ( void );
void commRecvNodeMap ( void );// todo switch to gatherNodeInfo inside commpio
void commDefCommsIO ( void );
int * commInqNodeSizes ( void );
......
......@@ -51,9 +51,11 @@ static aFiledataM *initAFiledataMPINONB ( const char *filename, size_t bs )
aFiledataM *of;
size_t len;
int iret;
MPI_Comm commNode = commInqCommNode ();
int rankNode = commInqRankNode ();
xdebug ( "IOPE%d: filename=%s, buffersize=%zu, in",
pioinfo->rank, filename, bs );
rankNode, filename, bs );
of = ( aFiledataM * ) xmalloc ( sizeof ( aFiledataM ));
memset ( of, 0, sizeof ( aFiledataM ));
......@@ -66,7 +68,7 @@ static aFiledataM *initAFiledataMPINONB ( const char *filename, size_t bs )
/* init output buffer */
xdebug ( "IOPE%d: name=%s, init output buffer",
pioinfo->rank, of->name );
rankNode, of->name );
iret = dbuffer_init ( &( of->db1 ), of->size );
iret += dbuffer_init ( &( of->db2 ), of->size );
......@@ -76,14 +78,14 @@ static aFiledataM *initAFiledataMPINONB ( const char *filename, size_t bs )
of->db = of->db1;
/* open file */
xmpi ( MPI_File_open ( pioinfo->comm, of->name,
xmpi ( MPI_File_open ( commNode, of->name,
( MPI_MODE_CREATE|MPI_MODE_WRONLY ),
MPI_INFO_NULL, &( of->fh )));
of->request = MPI_REQUEST_NULL;
of->finished = false;
xdebug ( "IOPE%d opened file %s, return",
pioinfo->rank, of->name );
rankNode, of->name );
return of;
}
......@@ -95,11 +97,12 @@ int destroyAFiledataMPINONB ( void *v )
int iret = 0;
aFiledataM *of;
MPI_Status status;
int rankNode = commInqRankNode ();
of = (aFiledataM * ) v;
xdebug ( "IOPE%d: name=%s, close file, in",
pioinfo->rank, of->name );
rankNode, of->name );
/* close file */
......@@ -118,7 +121,7 @@ int destroyAFiledataMPINONB ( void *v )
free ( of );
xdebug ( "IOPE%d: closed file, cleaned up, return",
pioinfo->rank );
rankNode );
return iret == MPI_SUCCESS ? 0 : -1;
}
......@@ -147,6 +150,7 @@ void writeMPINONB ( aFiledataM *of, int fileID )
{
int amount;
MPI_Status status;
int rankNode = commInqRankNode ();
/* write buffer */
......@@ -155,7 +159,7 @@ void writeMPINONB ( aFiledataM *of, int fileID )
if ( amount == 0 ) return;
xdebug3 ( "IOPI%d: Write buffer, size %d bytes, in",
pioinfo->rank, amount );
rankNode, amount );
xmpi ( MPI_Wait ( & ( of->request ), &status ));
xmpi ( MPI_File_iwrite_shared ( of->fh, of->db->buffer, amount, MPI_CHAR,
......@@ -169,13 +173,13 @@ void writeMPINONB ( aFiledataM *of, int fileID )
if ( of->db == of->db1 )
{
xdebug3 ( "IOPE%d: fileID=%d, change to buffer 2 ...",
pioinfo->rank, fileID );
rankNode, fileID );
of->db = of->db2;
}
else
{
xdebug3 ( "IOPE%d: fileID=%d, change to buffer 1 ...",
pioinfo->rank, fileID );
rankNode, fileID );
of->db = of->db1;
}
......@@ -193,13 +197,15 @@ size_t fwMPINONB ( int fileID, int tsID, const void *buffer, size_t len )
aFiledataM *of;
listElem_t *curr;
char errorString[maxErrorString];
MPI_Comm commNode = commInqCommNode ();
int rankNode = commInqRankNode ();
flush = ( tsID != oldTsID ) ? 1 : 0;
if ( flush == 1 )
{
xdebug3 ( "IOPE%d: tsID = %d, flush buffer",
pioinfo->rank, tsID );
rankNode, tsID );
curr = bibAFiledataM->head;
while ( curr )
{
......@@ -207,7 +213,7 @@ size_t fwMPINONB ( int fileID, int tsID, const void *buffer, size_t len )
curr = curr->next;
}
oldTsID = tsID;
xmpi ( MPI_Barrier ( pioinfo->comm ));
xmpi ( MPI_Barrier ( commNode ));
}
of = ( aFiledataM * ) queueIdx2val ( bibAFiledataM, fileID );
......@@ -215,7 +221,7 @@ size_t fwMPINONB ( int fileID, int tsID, const void *buffer, size_t len )
xdebug3 ( "IOPE%d: fileID = %d, tsID = %d,"
" pushed data on buffer, filled = %d",
pioinfo->rank, fileID, tsID, filled );
rankNode, fileID, tsID, filled );
if ( filled == 1 )
{
......@@ -246,9 +252,11 @@ int fcMPINONB ( int fileID )
int iret;
double accumWaitMax;
char errorString[maxErrorString];
MPI_Comm commNode = commInqCommNode ();
int rankNode = commInqRankNode ();
xdebug ( "IOPE%d: write buffer, close file and cleanup, in",
pioinfo->rank, fileID );
rankNode, fileID );
if ( ! ( of = ( aFiledataM * ) queueIdx2val ( bibAFiledataM, fileID )))
{
......@@ -265,7 +273,7 @@ int fcMPINONB ( int fileID )
if ( !bibAFiledataM->head )
{
xdebug ( "IOPE%d: cleanup queue",
pioinfo->rank );
rankNode );
free ( bibAFiledataM );
}
......@@ -274,12 +282,12 @@ int fcMPINONB ( int fileID )
if ( ddebug == MAXDEBUG )
{
xmpi ( MPI_Reduce ( &accumWait, &accumWaitMax,
1, MPI_DOUBLE, MPI_MAX, 0, pioinfo->comm ));
1, MPI_DOUBLE, MPI_MAX, 0, commNode ));
xdebug ( "IOPE%d: Wait time %15.10lf s",
pioinfo->rank, accumWait );
if ( pioinfo->rank == 0 )
rankNode, accumWait );
if ( rankNode == 0 )
xdebug ( "IOPE%d: Max wait time %15.10lf s",
pioinfo->rank, accumWaitMax );
rankNode, accumWaitMax );
}
return iret;
......@@ -321,15 +329,17 @@ int fowMPINONB ( const char *filename )
static long buffersize = 0;
int id, bcastRank = 0;
char errorString[maxErrorString];
MPI_Comm commNode = commInqCommNode ();
int rankNode = commInqRankNode ();
/* broadcast buffersize to collectors ( just once, for all files )*/
if ( ! buffersize )
{
xdebug ( "IOPE%d: Broadcast buffersize to collectors ...",
pioinfo->rank );
rankNode );