Commit 4b94a3e9 authored by Deike Kleberg's avatar Deike Kleberg
Browse files

Catch exceptions, testprograms collectData.c, collectData2003.F90 and compareResArray.c running

parent da134fe9
......@@ -103,11 +103,11 @@ void modelRun ()
int main (int argc, char *argv[])
{
enum {
nProcsIO = 1,
IOMode = PIO_NONE,
nProcsIO = 3,
//IOMode = PIO_NONE,
//IOMode = PIO_MPI_NONB,
//IOMode = PIO_POSIX_FPGUARD_SENDRECV,
//IOMode = PIO_POSIX_ASYNCH,
IOMode = PIO_POSIX_ASYNCH,
//IOMode = PIO_POSIX_NONB,
nNamespaces = 1 };
......
......@@ -14,17 +14,16 @@ PROGRAM COLLECTDATA2003
! For parallel IO:
! Parameter and variables needed.
INTEGER nProcsIO, IOMode, nNamespaces
PARAMETER ( nProcsIO = 4 )
!PARAMETER ( IOMode = PIO_NONE )
!PARAMETER ( IOMode = PIO_MPI_NONB )
PARAMETER ( IOMode = PIO_POSIX_NONB )
!PARAMETER ( IOMode = PIO_POSIX_ASYNCH )
!PARAMETER ( IOMode = PIO_POSIX_FPGUARD_SENDRECV )
PARAMETER ( nNamespaces = 1 )
INTEGER, PARAMETER :: hasLocalFile ( nNamespaces ) = (/ 0 /)
INTEGER nProcsIO, IOMode, nNamespaces
PARAMETER ( nProcsIO = 3 )
!PARAMETER ( IOMode = PIO_NONE )
!PARAMETER ( IOMode = PIO_MPI_NONB )
!PARAMETER ( IOMode = PIO_POSIX_NONB )
!PARAMETER ( IOMode = PIO_POSIX_ASYNCH )
PARAMETER ( IOMode = PIO_POSIX_FPGUARD_SENDRECV )
PARAMETER ( nNamespaces = 1 )
INTEGER hasLocalFile ( nNamespaces )
PARAMETER ( hasLocalFile = (/ 0 /) )
INTEGER commGlob, commModel, error
! Start parallel environment
......@@ -56,12 +55,12 @@ CONTAINS
INTEGER, INTENT ( IN ) :: commModel
INTEGER nlon, nlat, nlev, nts, vdate, vtime, filetype
PARAMETER ( nlon = 12 ) ! Number of longitudes
PARAMETER ( nlat = 6 ) ! Number of latitudes
PARAMETER ( nlev = 5 ) ! Number of levels
PARAMETER ( nts = 3 ) ! Number of time steps
PARAMETER ( vdate = 19850101 )
PARAMETER ( vtime = 120000 )
PARAMETER ( nlon = 12 ) ! Number of longitudes
PARAMETER ( nlat = 6 ) ! Number of latitudes
PARAMETER ( nlev = 5 ) ! Number of levels
PARAMETER ( nts = 3 ) ! Number of time steps
PARAMETER ( vdate = 19850101 )
PARAMETER ( vtime = 120000 )
PARAMETER ( filetype = FILETYPE_GRB )
INTEGER gridID, zaxisID1, zaxisID2, taxisID
......@@ -161,6 +160,10 @@ CONTAINS
CALL pioWriteTimestep ( tsID, vdate, vtime )
END DO
! For parallel IO:
! Preparation for local cleanup
CALL pioEndTimestepping ()
! Close the output stream
CALL streamClose(streamID)
......
......@@ -209,6 +209,7 @@ extern "C" {
int pioInit ( int, int, int, int, int * );
void pioFinalize ( void );
void pioEndDef ( void );
void pioEndTimestepping ( void );
void pioWriteTimestep ( int, int, int );
int pioInqVarDecoChunk ( int, int );
int pioInqVarDecoOff ( int, int );
......
......@@ -344,6 +344,9 @@
! pioEndDef
EXTERNAL pioEndDef
! pioEndTimestepping
EXTERNAL pioEndTimestepping
! pioWriteTimestep
! (INTEGER ,
! INTEGER ,
......
......@@ -59,6 +59,7 @@
FCALLSCFUN5 (INT, pioInit, PIOINIT, pioinit, INT, INT, INT, INT, PINT)
FCALLSCSUB0 (pioFinalize, PIOFINALIZE, piofinalize)
FCALLSCSUB0 (pioEndDef, PIOENDDEF, pioenddef)
FCALLSCSUB0 (pioEndTimestepping, PIOENDTIMESTEPPING, pioendtimestepping)
FCALLSCSUB3 (pioWriteTimestep, PIOWRITETIMESTEP, piowritetimestep, INT, INT, INT)
FCALLSCFUN2 (INT, pioInqVarDecoChunk, PIOINQVARDECOCHUNK, pioinqvardecochunk, INT, INT)
FCALLSCFUN2 (INT, pioInqVarDecoOff, PIOINQVARDECOOFF, pioinqvardecooff, INT, INT)
......
......@@ -8,10 +8,10 @@
static int nNamespaces = 1;
static int activeNamespace = 0;
static int HLF = 1;
static int * hasLocalFile = &HLF;
static int RAS = 0;
static int * resASent = &RAS;
static int serialHLF = 1;
static int * hasLocalFile = &serialHLF;
static int serialRS = 0;
static int * resStatus = &serialRS;
enum {
intbits = sizeof(int) * CHAR_BIT,
......@@ -79,7 +79,7 @@ void namespaceInit ( int nspn, int * argHasLocalFile )
hasLocalFile = xmalloc ( nspn * sizeof ( hasLocalFile[0] ));
for ( nspID = 0; nspID < nspn; nspID++ )
hasLocalFile[nspID] = argHasLocalFile[nspID];
resASent = xmalloc ( nspn * sizeof ( resASent[0] ));
resStatus = xmalloc ( nspn * sizeof ( resStatus[0] ));
}
#endif
}
......@@ -91,7 +91,7 @@ void namespaceCleanup ( void )
{
free ( hasLocalFile );
hasLocalFile = NULL;
free ( resASent );
free ( resStatus );
}
}
......@@ -160,17 +160,17 @@ int namespaceAdaptKey2 ( int key )
}
void namespaceDefResASent ( void )
void namespaceDefResStatus ( statusCode argResStatus )
{
int nsp = namespaceGetActive ();
resASent[nsp] = 1;
resStatus[nsp] = argResStatus;
}
int namespaceInqResASent ( void )
statusCode namespaceInqResStatus ( void )
{
int nsp = namespaceGetActive ();
return resASent[nsp];
return resStatus[nsp];
}
/*
......
......@@ -4,9 +4,15 @@
typedef struct {
int idx;
int nsp;
int resASent;
int resStatus;
}namespaceTuple_t;
typedef enum {
STAGE_DEFINITION = 0,
STAGE_TIMELOOP = 1,
STAGE_CLEANUP = 2
} statusCode;
void namespaceCleanup ( void );
void namespaceInit ( int, int * );
void namespaceShowbits ( int, char * );
......@@ -18,8 +24,8 @@ namespaceTuple_t namespaceResHDecode ( int );
int namespaceHasLocalFile ( int );
int namespaceAdaptKey ( int, int );
int namespaceAdaptKey2 ( int );
void namespaceDefResASent ( void );
int namespaceInqResASent ( void );
void namespaceDefResStatus ( statusCode );
statusCode namespaceInqResStatus ( void );
#endif
/*
......
......@@ -251,7 +251,7 @@ void defVarDeco ( int vlistID, int varID )
static
void varsMapNDeco ( int nNodes, int * nodeSizes )
{
int nStreams, nVars, * resHs, * streamSizes, * varSizes, * varMapping, * getsStreamData;
int nStreams, nVars, * resHs, * streamSizes, * varSizes, * varMapping, * collectsData;
int i, j, k = 0;
int nProcsColl = commInqNProcsColl ();
char text[1024];
......@@ -260,7 +260,7 @@ void varsMapNDeco ( int nNodes, int * nodeSizes )
resHs = xmalloc ( nStreams * sizeof ( resHs[0] ));
streamSizes = xmalloc ( nStreams * sizeof ( streamSizes[0] ));
getsStreamData = xmalloc ( nProcsColl * sizeof ( getsStreamData[0] ));
collectsData = xmalloc ( nProcsColl * sizeof ( collectsData[0] ));
streamGetIndexArray ( nStreams, resHs );
for ( i = 0; i < nStreams; i++ )
......@@ -281,44 +281,33 @@ void varsMapNDeco ( int nNodes, int * nodeSizes )
k = 0;
for ( i = 0; i < nStreams; i++ )
{
for ( j = 0; j < * ( streamSizes + i ); j++ )
{
defVarDeco ( streamInqVlist ( * resHs + i ), j );
defVarDeco ( streamInqVlistIDorig ( * resHs + i ), j );
vlistDefVarIOrank ( streamInqVlist ( * resHs + i ), j,
* ( varMapping + k ));
vlistDefVarIOrank ( streamInqVlistIDorig ( * resHs + i ), j,
* ( varMapping + k ));
getsStreamData[commRankGlob2CollID ( varMapping[k++] )] = 1;
}
for ( j = 0; j < nProcsColl; j++ )
if ( getsStreamData[j] == 0 )
{
if ( commInqIOMode () == PIO_MPI_NONB )
{
sprintf ( text,
"\nAT LEAST ONE COLLECTOR PROCESS IDLE, "
"\nCURRENTLY NOT COVERED IN IOMODE PIO_MPI_NONB: "
"\nPE%d writes no data for stream %d",
commCollID2RankGlob ( j ), resHs[i]);
xabort ( text );
}
else
{
xwarning ( "PE%d writes no data for stream %d",
commCollID2RankGlob ( j ), resHs[i]);
}
}
memset ( getsStreamData, 0, nProcsColl * sizeof ( getsStreamData[0] ));
}
if ( varMapping ) free ( varMapping );
if ( varSizes ) free ( varSizes );
if ( getsStreamData ) free ( getsStreamData );
if ( streamSizes ) free ( streamSizes );
if ( resHs ) free ( resHs );
for ( j = 0; j < * ( streamSizes + i ); j++ )
{
defVarDeco ( streamInqVlist ( * resHs + i ), j );
defVarDeco ( streamInqVlistIDorig ( * resHs + i ), j );
vlistDefVarIOrank ( streamInqVlist ( * resHs + i ), j,
* ( varMapping + k ));
vlistDefVarIOrank ( streamInqVlistIDorig ( * resHs + i ), j,
* ( varMapping + k ));
collectsData[commRankGlob2CollID ( varMapping[k++] )] = 1;
}
for ( j = 0; j < nProcsColl; j++ )
if ( collectsData[j] == 0 )
{
sprintf ( text,
"\nAT LEAST ONE COLLECTOR PROCESS IDLES, "
"CURRENTLY NOT COVERED: "
"PE%d collects no data",
commCollID2RankGlob ( j ));
xabort ( text );
}
if ( varMapping ) free ( varMapping );
if ( varSizes ) free ( varSizes );
if ( collectsData ) free ( collectsData );
if ( streamSizes ) free ( streamSizes );
if ( resHs ) free ( resHs );
}
/************************************************************************/
......@@ -842,13 +831,21 @@ void pioEndDef ( void )
RESOURCES, commInqCommsIO ( rankGlob )));
xdebug ( "SENT MESSAGE WITH TAG \"RESOURCES\"" );
namespaceDefResASent ();
reshPackBufferDestroy ( &buffer );
}
modelWinCreate ();
namespaceDefResStatus ( STAGE_TIMELOOP );
#endif
}
/************************************************************************/
void pioEndTimestepping ( void )
{
#ifdef USE_MPI
namespaceDefResStatus ( STAGE_CLEANUP );
#endif
}
......@@ -869,7 +866,6 @@ void pioFinalize ()
{
#ifdef USE_MPI
int collID, ibuffer = 1111;
xdebug ();
namespaceCleanup ();
for ( collID = 0; collID < commInqNProcsColl (); collID++ )
{
......@@ -922,7 +918,7 @@ void pioWriteTimestep ( int tsID, int vdate, int vtime )
xmpi ( MPI_Win_post ( groupsIONetto[collID], iAssert, win[collID] ));
winPostSet[collID] = 1;
}
xdebug ();
#endif
}
......
......@@ -788,9 +788,10 @@ void finalizePOSIXASYNCH ( void )
void initPOSIXASYNCH ( void )
{
if ( commInqSizeNode () < 2 ) xabort ( "usage: #pes/#nodes >= 2");
if ( commInqSizeNode () < 2 )
xabort ( "USAGE: # IO PROCESSES ON A PHYSICAL NODE >= 2" );
if ( nPrefStreams < 1 ) xabort ( "usage: #prefetch streams >= 1" );
if ( nPrefStreams < 1 ) xabort ( "USAGE: # PREFETCH STREAMS >= 1" );
if ( commInqRankNode () == commInqSpecialRankNode ())
{
......
......@@ -594,7 +594,8 @@ void finalizePOSIXFPGUARDSENDRECV ( void )
void initPOSIXFPGUARDSENDRECV ( void )
{
if ( commInqSizeNode () < 2 ) xabort ( "usage: #pes/#nodes >= 2" );
if ( commInqSizeNode () < 2 )
xabort ( "USAGE: # IO PROCESSES ON A PHYSICAL NODE >= 2" );
if ( commInqRankNode () == commInqSpecialRankNode ())
{
......
......@@ -292,21 +292,23 @@ void pwPOSIXNONB ( void )
sentFinalize = xmalloc ( nProcsCollNode * sizeof ( sentFinalize ));
for ( ;; )
{
{
xmpiStat ( MPI_Probe ( MPI_ANY_SOURCE, MPI_ANY_TAG, commNode,
&status ), &status );
source = status.MPI_SOURCE;
tag = status.MPI_TAG;
if ( rtag ) ungetTag ( rtag );
if (( rtag = getTag ( tag )) == NULL )
xabort ( "getTag () failed" );
xmpi ( MPI_Get_count ( &status, MPI_CHAR, &messagesize ));
xdebug ( "receive message from source=%d, id=%d, command=%d ( %s ),"
"messagesize=%d",
xdebug ( "RECEIVE MESSAGE FROM SOURCE=%d, ID=%d, COMMAND=%d ( %s ),"
"MESSAGESIZE=%d",
source, rtag->id, rtag->command,
command2charP [ rtag->command ], messagesize);
......@@ -376,7 +378,7 @@ void pwPOSIXNONB ( void )
amount = messagesize;
xdebug ( "command %s, id=%d, name=%s",
xdebug ( "COMMAND %s, ID=%d, NAME=%s",
command2charP [ rtag->command ],
rtag->id, bfd->name );
......@@ -389,7 +391,7 @@ void pwPOSIXNONB ( void )
case IO_Close_file:
xdebug ( "command %s, file%d, source%d",
xdebug ( "COMMAND %s, FILE%d, SOURCE%d",
command2charP [ rtag->command ], rtag->id, source );
if ( ! ( bfd = ( bFiledataP * ) queueIdx2val ( bibBFiledataP,
......@@ -401,7 +403,7 @@ void pwPOSIXNONB ( void )
amount = messagesize;
xdebug ( "command %s, id=%d, name=%s, amount=%ld",
xdebug ( "COMMAND %s, ID=%d, NAME=%s, AMOUNT=%ld",
command2charP [ rtag->command ], rtag->id, bfd->name, amount );
xmpi ( MPI_Recv ( bfd->fb->buffer, amount, MPI_CHAR, source, tag,
......@@ -432,14 +434,17 @@ void pwPOSIXNONB ( void )
int buffer = CDI_UNDEFID, collID;
xmpi ( MPI_Recv ( &buffer, 1, MPI_INT, source, tag, commNode, &status ));
sentFinalize[source] = true;
doFinalize = true;
for ( collID = 0; collID < nProcsCollNode; collID++ )
if ( !sentFinalize[collID] )
{
doFinalize = false;
break;
}
if ( doFinalize )
{
if ( bibBFiledataP->head != NULL )
......@@ -452,6 +457,7 @@ void pwPOSIXNONB ( void )
queueDestroy ( bibBFiledataP );
}
if ( rtag ) ungetTag ( rtag );
return;
}
}
......@@ -682,6 +688,8 @@ void finalizePOSIXNONB ( void )
xmpi ( MPI_Send ( &buffer, 1, MPI_INT, specialRank, tag, commNode ));
xdebug ( "SENT MESSAGE WITH TAG \"IO_FINALIZE\" TO SPECIAL PROCESS" );
if ( bibAFiledataP->head != NULL )
{
xabort ( "Queue bibAFiledataP not empty." );
......@@ -697,7 +705,8 @@ void finalizePOSIXNONB ( void )
void initPOSIXNONB ( void )
{
if ( commInqSizeNode () < 2 ) xabort ( "usage: #pes/#nodes >= 2" );
if ( commInqSizeNode () < 2 )
xabort ( "USAGE: # IO PROCESSES ON A PHYSICAL NODE >= 2" );
if ( commInqRankNode () == commInqSpecialRankNode ())
{
......
......@@ -189,7 +189,6 @@ static
getBufferGetFromEnd ( __func__, __LINE__,
root, &streamID, sizeof ( streamID ));
streamClose ( streamID );
xdebug ( "READ FUNCTION CALL FROM WIN: %s, streamID=%d,"
" closed stream",
funcMap[funcID], streamID );
......@@ -212,7 +211,6 @@ static
root, &filetype, sizeof ( filetype ));
streamID = streamOpenWrite ( filename, filetype );
free ( filename );
xdebug ( "READ FUNCTION CALL FROM WIN: %s, filenamesz=%d,"
" filename=%s, filetype=%d, opened stream %d",
funcMap[funcID], filenamesz, filename, filetype, streamID );
......@@ -227,7 +225,6 @@ static
getBufferGetFromEnd ( __func__, __LINE__,
root, &vlistID, sizeof ( vlistID ));
streamDefVlist ( streamID, vlistID );
xdebug ( "READ FUNCTION CALL FROM WIN: %s, streamID=%d,"
" vlistID=%d, called streamDefVlist ().",
funcMap[funcID], streamID, vlistID );
......@@ -257,6 +254,7 @@ void readGetBuffers ( int tsID, int vdate, int vtime )
char text[1024];
int nProcsModel = commInqNProcsModel ();
int root = commInqRootGlob ();
getBufferGetFromEnd ( __func__, __LINE__,
root, &tokenID, sizeof ( tokenID ));
......@@ -306,9 +304,10 @@ void readGetBuffers ( int tsID, int vdate, int vtime )
getBufferGetFromEnd ( __func__, __LINE__,
modelID, &tokenID2, sizeof ( tokenID2 ));
xassert ( tokenID2 == SEPARATOR );
}
}
streamWriteVar ( streamID, varID, data, nmiss );
if ( ddebug > 2 )
{
sprintf ( text, "streamID=%d, var[%d], size=%d", streamID, varID, size );
......@@ -321,12 +320,12 @@ void readGetBuffers ( int tsID, int vdate, int vtime )
readFuncCall ();
break;
default:
xabort ( "BUFFER NOT READABLE!" );
xabort ( "BUFFER NOT READABLE!" );
}
getBufferGetFromEnd ( __func__, __LINE__,
root, &tokenID, sizeof ( tokenID ));
}
}
/************************************************************************/
......@@ -359,6 +358,7 @@ void getData ( int tsID, int vdate, int vtime )
int iAssert = 0, modelID;
char text[1024];
int nProcsModel = commInqNProcsModel ();
// todo put in correct lbs and ubs
xassert ( tsID >= 0 && vdate >= 0 && vtime >= 0 );
......@@ -383,6 +383,7 @@ void getData ( int tsID, int vdate, int vtime )
DATATYPE_FLT );
}
readGetBuffers ( tsID, vdate, vtime );
}
......@@ -433,7 +434,7 @@ void IOServer ()
{
{
int nStreams = streamSize ();
xdebug ();
if ( nStreams > 0 )
{
int streamNo;
......@@ -446,13 +447,10 @@ void IOServer ()
free ( resHs );
}
}
xdebug ();
backendCleanup ();
xdebug ();
serverWinCleanup ();
xdebug ();
serverWinCleanup ();
arrayDestroy();
xdebug ();
return;
}
......@@ -482,9 +480,9 @@ void IOServer ()
xdebug ( "RECEIVED MESSAGE WITH TAG \"WRITETS\": "
"tsID=%d, vdate=%d, vtime=%d, source=%d",
iBuffer[0], iBuffer[1], iBuffer[2], source );
getData ( iBuffer[0], iBuffer[1], iBuffer[2] );
getData ( iBuffer[0], iBuffer[1], iBuffer[2] );
free ( iBuffer );
break;
default:
......
......@@ -34,7 +34,7 @@
#define MAXSTRING 256
#define ddebug 1
#define ddebug 0
void pcdiAssert ( bool, const char *, const char *, int );
......
......@@ -691,6 +691,26 @@ int streamOpen(const char *filename, const char *filemode, int filetype)
}
}
}
#ifdef USE_MPI
else if ( tolower ( * filemode ) == 'w' )
{
statusCode nspStatus = namespaceInqResStatus ();
switch ( nspStatus )
{
case STAGE_DEFINITION:
break;
case STAGE_TIMELOOP:
pioBufferFuncCall ( __func__, 2, filename, filetype );
break;
case STAGE_CLEANUP:
xabort ( "TRANSITION TO IO PROCESSES ALREADY FINISHED." );
break;
default:
xabort ( "INTERNAL ERROR" );
}
}
#endif
if ( fileID < 0 && hasLocalFile )
{
......@@ -723,13 +743,6 @@ int streamOpen(const char *filename, const char *filemode, int filetype)
vlistptr = vlist_to_pointer(streamptr->vlistID);
vlistptr->ntsteps = streamNtsteps(streamID);
}
#ifdef USE_MPI
if ( !hasLocalFile &&
namespaceInqResASent () == 1 &&
streamptr->filemode == 'w' )
pioBufferFuncCall ( __func__, 2, filename, filetype );
#endif
}
return (streamID);
......@@ -1126,7 +1139,6 @@ void streamClose(int streamID)
int index;
int vlistID;
stream_t *streamptr;
xdebug ();
streamptr = stream_to_pointer(streamID);
stream_check_ptr(__func__, streamptr);
......@@ -1136,10 +1148,10 @@ void streamClose(int streamID)
fileID = streamptr->fileID;
filetype = streamptr->filetype;
vlistID = streamptr->vlistID;
vlistID = streamptr->vlistID;
if ( namespaceHasLocalFile ( namespaceGetActive ()))
{
{
if ( fileID == CDI_UNDEFID )
Warning("File %s not open!", streamptr->filename);
else
......@@ -1197,9 +1209,21 @@ void streamClose(int streamID)
}
#ifdef USE_MPI
else
if ( namespaceInqResASent () == 1 )
pioBufferFuncCall ( __func__, 1, streamID );
xdebug ();
{
statusCode nspStatus = namespaceInqResStatus ();
switch ( nspStatus )