Commit 0c63ad1e authored by Deike Kleberg's avatar Deike Kleberg
Browse files

Bugfix, feature close/open files with defined resources runs in POSIXNONB.

parent 12ea9548
......@@ -11,19 +11,19 @@
#include "pio_util.h"
enum {
nProcsIO = 2,
//IOMode = PIO_NONE,
nProcsIO = 3,
//IOMode = PIO_NONE,
//IOMode = PIO_MPI_NONB,
IOMode = PIO_POSIX_FPGUARD_SENDRECV,
//IOMode = PIO_POSIX_ASYNCH,
//IOMode = PIO_POSIX_NONB,
//IOMode = PIO_POSIX_ASYNCH,
//IOMode = PIO_POSIX_NONB,
filetype = FILETYPE_GRB,
ntfiles = 2,
ntsteps = 3,
nVars = 5,
nlon = 12,
nlat = 6,
maxlev = 5};
maxlev = 5 };
static int hasLocalFile[] = { 0 };
static int nlev[nVars] = {1,1,5,5,2};
......
......@@ -192,6 +192,9 @@ void backendCleanup ( void )
case PIO_POSIX_FPGUARD_SENDRECV:
finalizePOSIXFPGUARDSENDRECV ();
break;
case PIO_POSIX_NONB:
finalizePOSIXNONB ();
break;
default:
xdebug ( " BACKENDCLEANUP FUNCTION NOT IMPLEMENTED YET." );
}
......
......@@ -102,6 +102,7 @@ int fowPOSIXNONB ( const char * );
int fcPOSIXNONB ( int );
size_t fwPOSIXNONB( int, int, const void *, size_t );
void initPOSIXNONB ( void );
void finalizePOSIXNONB ( void );
#endif
#endif
......
......@@ -192,10 +192,12 @@ void fpgPOSIXFPGUARDSENDRECV ( void )
char errorString[maxErrorString];
MPI_Comm commNode = commInqCommNode ();
int nProcsCollNode = commInqSizeNode () - commInqSizeColl ();
bool * sentFinalize, doFinalize = false;
xdebug ( "ncollectors=%d on this node", nProcsCollNode );
bibBFiledataPF = queueInit ( destroyBFiledataPF, NULL );
sentFinalize = xmalloc ( nProcsCollNode * sizeof ( sentFinalize[0] ));
for ( ;; )
{
......@@ -305,21 +307,32 @@ void fpgPOSIXFPGUARDSENDRECV ( void )
break;
case IO_Finalize:
{
int buffer = CDI_UNDEFID;
int buffer = CDI_UNDEFID, collID;
xmpi ( MPI_Recv ( &buffer, 1, MPI_INT, source, status.MPI_TAG,
commNode, &status ));
if ( bibBFiledataPF->head != NULL )
sentFinalize[source] = true;
doFinalize = true;
for ( collID = 0; collID < nProcsCollNode; collID++ )
if ( !sentFinalize[collID] )
{
doFinalize = false;
break;
}
if ( doFinalize )
{
xabort ( "queue bibBFiledataM not empty" );
if ( bibBFiledataPF->head != NULL )
{
xabort ( "queue bibBFiledataM not empty" );
}
else
{
xdebug ( "cleanup queue" );
queueDestroy ( bibBFiledataPF );
}
if ( rtag ) ungetTag ( rtag );
return;
}
else
{
xdebug ( "cleanup queue" );
queueDestroy ( bibBFiledataPF );
}
return;
}
break;
default:
......
......@@ -33,6 +33,7 @@ extern double accumWrite;
typedef struct
{
char *name;
int tsID;
size_t size;
struct dBuffer *db1;
struct dBuffer *db2;
......@@ -73,6 +74,7 @@ static aFiledataP *initAFiledataP ( const char *filename, size_t bs )
afp->name = xmalloc (( len + 1) * sizeof ( afp->name[0] ));
strcpy ( afp->name, filename );
afp->size = bs;
afp->tsID = 0;
/* init output buffer */
......@@ -282,10 +284,12 @@ void pwPOSIXNONB ( void )
MPI_Comm commNode = commInqCommNode ();
char errorString[maxErrorString];
int nProcsCollNode = commInqSizeNode () - commInqSizeColl ();
bool * sentFinalize, doFinalize;
xdebug ( "ncollectors=%d on this node", nProcsCollNode );
bibBFiledataP = queueInit ( destroyBFiledataP, compareNamesBP );
sentFinalize = xmalloc ( nProcsCollNode * sizeof ( sentFinalize ));
for ( ;; )
{
......@@ -421,14 +425,39 @@ void pwPOSIXNONB ( void )
{
xdebug ( "all are finished with file %d, delete node", rtag->id );
queueDelNode ( bibBFiledataP, rtag->id );
if ( !bibBFiledataP->head )
{
xdebug ( "all files are finished, destroy queue, return" );
queueDestroy ( bibBFiledataP );
ungetTag ( rtag );
return;
}
}
}
break;
case IO_Finalize:
{
int buffer = CDI_UNDEFID, collID;
xmpi ( MPI_Recv ( &buffer, 1, MPI_INT, source, tag, commNode, &status ));
sentFinalize[source] = true;
doFinalize = true;
for ( collID = 0; collID < nProcsCollNode; collID++ )
if ( !sentFinalize[collID] )
{
doFinalize = false;
break;
}
if ( doFinalize )
{
if ( bibBFiledataP->head != NULL )
{
xabort ( "Queue bibBfiledataP is not empty." );
}
else
{
xdebug ( "all files are finished, destroy queue, return" );
queueDestroy ( bibBFiledataP );
}
if ( rtag ) ungetTag ( rtag );
return;
}
}
break;
default:
xabort ( "COMMAND NOT IMPLEMENTED" );
}
}
}
......@@ -480,11 +509,20 @@ void sendP ( aFiledataP *afd, int id )
return;
}
/***************************************************************/
void defTimestepP ( aFiledataP *afd, int tsID)
{
if ( afd == NULL || tsID != afd->tsID + 1 )
xabort ( " defTimestepPF() didn't succeed." );
afd->tsID = tsID;
}
/***************************************************************/
size_t fwPOSIXNONB ( int id, int tsId, const void *buffer, size_t len )
size_t fwPOSIXNONB ( int id, int tsID, const void *buffer, size_t len )
{
static int oldTsId = 0;
int error = 0;
int flush = 0;
int filled = 0;
......@@ -492,29 +530,29 @@ size_t fwPOSIXNONB ( int id, int tsId, const void *buffer, size_t len )
listElem_t *curr;
char errorString[maxErrorString];
flush = ( tsId != oldTsId ) ? 1 : 0;
afd = ( aFiledataP * ) queueIdx2val ( bibAFiledataP, id );
flush = ( tsID != afd->tsID ) ? 1 : 0;
if ( flush == 1 )
{
xdebug ( "tsId = %d, flush buffer", tsId );
xdebug ( "tsID = %d, flush buffer", tsID );
curr = bibAFiledataP->head;
while ( curr )
{
sendP (( aFiledataP *) curr->val, curr->idx );
defTimestepP (( aFiledataP * ) curr->val, tsID );
curr = curr->next;
}
oldTsId = tsId;
xmpi ( MPI_Barrier ( commInqCommColl ()));
}
afd = ( aFiledataP * ) queueIdx2val ( bibAFiledataP, id );
filled = dbuffer_push ( afd->db, ( unsigned char * ) buffer, len );
xdebug ( "id = %d, tsId = %d, pushed %lu byte data on buffer, filled = %d",
id, tsId, len, filled );
xdebug ( "id = %d, tsID = %d, pushed %lu byte data on buffer, filled = %d",
id, tsID, len, filled );
if ( filled == 1 )
{
......@@ -557,12 +595,6 @@ int fcPOSIXNONB ( int id )
iret = queueDelNode ( bibAFiledataP, id );
if ( !bibAFiledataP->head )
{
xdebug ( "cleanup queue" );
queueDestroy ( bibAFiledataP );
}
/* timer output */
if ( ddebug )
......@@ -641,6 +673,28 @@ int fowPOSIXNONB ( const char *filename )
/***************************************************************/
void finalizePOSIXNONB ( void )
{
int buffer = 0, tag, specialRank = commInqSpecialRankNode ();
MPI_Comm commNode = commInqCommNode ();
tag = setTag ( 0, IO_Finalize );
xmpi ( MPI_Send ( &buffer, 1, MPI_INT, specialRank, tag, commNode ));
if ( bibAFiledataP->head != NULL )
{
xabort ( "Queue bibAFiledataP not empty." );
}
else
{
xdebug ( "cleanup queue" );
queueDestroy ( bibAFiledataP );
}
}
/***************************************************************/
void initPOSIXNONB ( void )
{
if ( commInqSizeNode () < 2 ) xabort ( "usage: #pes/#nodes >= 2" );
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment