Commit 0bc08301 authored by Deike Kleberg's avatar Deike Kleberg
Browse files

Feature close/open files with defined resources runs in POSIXASYNCH.

parent 0c63ad1e
......@@ -14,8 +14,8 @@ enum {
nProcsIO = 3,
//IOMode = PIO_NONE,
//IOMode = PIO_MPI_NONB,
IOMode = PIO_POSIX_FPGUARD_SENDRECV,
//IOMode = PIO_POSIX_ASYNCH,
//IOMode = PIO_POSIX_FPGUARD_SENDRECV,
IOMode = PIO_POSIX_ASYNCH,
//IOMode = PIO_POSIX_NONB,
filetype = FILETYPE_GRB,
ntfiles = 2,
......
......@@ -189,6 +189,9 @@ void backendCleanup ( void )
case PIO_MPI_NONB:
finalizeMPINONB ();
break;
case PIO_POSIX_ASYNCH:
finalizePOSIXASYNCH ();
break;
case PIO_POSIX_FPGUARD_SENDRECV:
finalizePOSIXFPGUARDSENDRECV ();
break;
......
......@@ -86,6 +86,7 @@ int fowPOSIXASYNCH ( const char * );
int fcPOSIXASYNCH ( int );
size_t fwPOSIXASYNCH( int, int, const void *, size_t );
void initPOSIXASYNCH ( void );
void finalizePOSIXASYNCH ( void );
#endif
/* pio_posixfpguardsendrecv.c */
......
......@@ -46,6 +46,7 @@ typedef struct
{
char *name;
size_t size;
int tsID;
struct dBuffer *db1;
struct dBuffer *db2;
struct dBuffer *db;
......@@ -88,6 +89,7 @@ static aFiledataPA *initAFiledataPA ( const char *filename, size_t bs )
afd->name = xmalloc (( len ) * sizeof ( afd->name[0] ));
strcpy ( afd->name, filename );
afd->size = bs;
afd->tsID = 0;
/* init output buffer */
......@@ -379,10 +381,12 @@ void pwPOSIXASYNCH ( void )
MPI_Comm commNode = commInqCommNode ();
char errorString[maxErrorString];
int nProcsCollNode = commInqSizeNode () - commInqSizeColl ();
bool * sentFinalize, doFinalize;
xdebug ( "nProcsCollNode=%d on this node", nProcsCollNode );
bibBFiledataPA = queueInit ( destroyBFiledataPA, compareNamesBPA );
sentFinalize = xmalloc ( nProcsCollNode * sizeof ( sentFinalize ));
for ( ;; )
{
......@@ -494,7 +498,7 @@ void pwPOSIXASYNCH ( void )
amount = messagesize;
xdebug ( " command %s, id=%d, name=%s\n",
xdebug ( " command %s, id=%d, name=%s",
command2charP [ rtag->command ], rtag->id, bfd->name );
bfd->currOpIndex = bfd->nextOpIndex;
......@@ -518,13 +522,42 @@ void pwPOSIXASYNCH ( void )
if ( bfd->finished )
{
xdebug ( "all are finished with file %d, delete node", rtag->id );
queueDelNode ( bibBFiledataPA, rtag->id );
if ( !bibBFiledataPA->head )
{
queueDestroy ( bibBFiledataPA );
return;
}
}
}
break;
case IO_Finalize:
{
int buffer = CDI_UNDEFID, collID;
xmpi ( MPI_Recv ( &buffer, 1, MPI_INT, source, tag, commNode, &status ));
sentFinalize[source] = true;
doFinalize = true;
for ( collID = 0; collID < nProcsCollNode; collID++ )
if ( !sentFinalize[collID] )
{
doFinalize = false;
break;
}
if ( doFinalize )
{
if ( bibBFiledataPA->head != NULL )
{
xabort ( "Queue bibBfiledataP is not empty." );
}
else
{
xdebug ( "all files are finished, destroy queue, return" );
queueDestroy ( bibBFiledataPA );
}
if ( rtag ) ungetTag ( rtag );
return;
}
}
break;
default:
xabort ( "COMMAND NOT IMPLEMENTED" );
}
}
}
......@@ -571,11 +604,22 @@ void sendPA ( aFiledataPA *afd, int id )
afd->command = IO_Send_buffer;
return;
}
}
/***************************************************************/
void defTimestepPA ( aFiledataPA * afd, int tsID )
{
if ( afd == NULL || tsID != afd->tsID + 1 )
xabort ( " defTimestepPA () didn't succeed." );
afd->tsID = tsID;
}
/***************************************************************/
size_t fwPOSIXASYNCH( int id, int tsId, const void *buffer, size_t len )
size_t fwPOSIXASYNCH( int id, int tsID, const void *buffer, size_t len )
{
static int oldTsId = 0;
int error = 0;
......@@ -585,29 +629,27 @@ size_t fwPOSIXASYNCH( int id, int tsId, const void *buffer, size_t len )
listElem_t *curr;
char errorString[maxErrorString];
flush = ( tsId != oldTsId ) ? 1 : 0;
afd = ( aFiledataPA * ) queueIdx2val ( bibAFiledataPA, id );
flush = ( tsID != afd->tsID ) ? 1 : 0;
if ( flush == 1 )
{
xdebug ( "pe%d fileWriter (): tsId = %d, flush buffer", tsId );
xdebug ( "pe%d fileWriter (): tsID = %d, flush buffer", tsID );
curr = bibAFiledataPA->head;
while ( curr )
{
sendPA (( aFiledataPA *) curr->val, curr->idx );
defTimestepPA (( aFiledataPA * ) curr->val, tsID );
curr = curr->next;
}
oldTsId = tsId;
xmpi ( MPI_Barrier ( commInqCommColl ()));
}
afd = ( aFiledataPA * ) queueIdx2val ( bibAFiledataPA, id );
filled = dbuffer_push ( afd->db, ( unsigned char * ) buffer, len );
xdebug ( "id = %d, tsId = %d, pushed %zu byte data on buffer, filled = %d",
id, tsId, len, filled );
xdebug ( "id = %d, tsID = %d, pushed %zu byte data on buffer, filled = %d",
id, tsID, len, filled );
if ( filled == 1 )
{
......@@ -650,12 +692,6 @@ int fcPOSIXASYNCH ( int id )
iret = queueDelNode ( bibAFiledataPA, id );
if ( !bibAFiledataPA->head )
{
xdebug ( "cleanup queue" );
queueDestroy ( bibAFiledataPA );
}
/* timer output */
if ( ddebug )
......@@ -728,6 +764,28 @@ int fowPOSIXASYNCH ( const char *filename )
/***************************************************************/
void finalizePOSIXASYNCH ( void )
{
int buffer = 0, tag, specialRank = commInqSpecialRankNode ();
MPI_Comm commNode = commInqCommNode ();
tag = setTag ( 0, IO_Finalize );
xmpi ( MPI_Send ( &buffer, 1, MPI_INT, specialRank, tag, commNode ));
if ( bibAFiledataPA->head != NULL )
{
xabort ( "Queue bibAFiledataP not empty." );
}
else
{
xdebug ( "cleanup queue" );
queueDestroy ( bibAFiledataPA );
}
}
/***************************************************************/
void initPOSIXASYNCH ( void )
{
if ( commInqSizeNode () < 2 ) xabort ( "usage: #pes/#nodes >= 2");
......
......@@ -514,7 +514,7 @@ void sendP ( aFiledataP *afd, int id )
void defTimestepP ( aFiledataP *afd, int tsID)
{
if ( afd == NULL || tsID != afd->tsID + 1 )
xabort ( " defTimestepPF() didn't succeed." );
xabort ( " defTimestepP () didn't succeed." );
afd->tsID = tsID;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment