Commit c97c2e22 authored by Deike Kleberg's avatar Deike Kleberg
Browse files

IO mode PIO_POSIXASYNCH running

parent 5f5ee7b6
......@@ -27,8 +27,8 @@ static int nlev[nVars] = {1,1,5,5,2};
//int IOModus = PIO_POSIX_FPGUARD_SENDRECV;
//int IOModus = PIO_MPI_NONB;
int IOModus = PIO_POSIX_NONB;
//int IOModus = PIO_POSIX_NONB;
int IOModus = PIO_POSIX_ASYNCH;
void modelRun ()
{
......
......@@ -61,6 +61,7 @@ libcdi_la_SOURCES = \
pio_interface.c \
pio_interface.h \
pio_mpinonb.c \
pio_posixasynch.c\
pio_posixfpguardsendrecv.c \
pio_posixnonb.c \
pio_queue.c \
......
......@@ -82,15 +82,15 @@ am_libcdi_la_OBJECTS = basetime.lo binary.lo calendar.lo cdf.lo \
cgribexlib.lo dmemory.lo error.lo extralib.lo file.lo \
gaussgrid.lo gribapi.lo grid.lo ieglib.lo institution.lo \
model.lo namespace.lo pio.lo pio_comm.lo pio_dbuffer.lo \
pio_interface.lo pio_mpinonb.lo pio_posixfpguardsendrecv.lo \
pio_posixnonb.lo pio_queue.lo pio_rpc.lo pio_server.lo \
pio_util.lo resource_handle.lo servicelib.lo stream_cdf.lo \
stream_cgribex.lo stream_ext.lo stream_grb.lo \
stream_gribapi.lo stream_history.lo stream_ieg.lo \
stream_int.lo stream_record.lo stream_srv.lo stream_var.lo \
table.lo taxis.lo timebase.lo tsteps.lo util.lo varscan.lo \
version.lo vlist.lo vlist_att.lo vlist_var.lo zaxis.lo \
stream.lo swap.lo
pio_interface.lo pio_mpinonb.lo pio_posixasynch.lo \
pio_posixfpguardsendrecv.lo pio_posixnonb.lo pio_queue.lo \
pio_rpc.lo pio_server.lo pio_util.lo resource_handle.lo \
servicelib.lo stream_cdf.lo stream_cgribex.lo stream_ext.lo \
stream_grb.lo stream_gribapi.lo stream_history.lo \
stream_ieg.lo stream_int.lo stream_record.lo stream_srv.lo \
stream_var.lo table.lo taxis.lo timebase.lo tsteps.lo util.lo \
varscan.lo version.lo vlist.lo vlist_att.lo vlist_var.lo \
zaxis.lo stream.lo swap.lo
libcdi_la_OBJECTS = $(am_libcdi_la_OBJECTS)
@ENABLE_CDI_LIB_FALSE@am_libcdi_la_rpath =
@ENABLE_CDI_LIB_TRUE@am_libcdi_la_rpath = -rpath $(libdir)
......@@ -331,6 +331,7 @@ libcdi_la_SOURCES = \
pio_interface.c \
pio_interface.h \
pio_mpinonb.c \
pio_posixasynch.c\
pio_posixfpguardsendrecv.c \
pio_posixnonb.c \
pio_queue.c \
......@@ -513,6 +514,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_dbuffer.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_interface.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_mpinonb.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixasynch.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixfpguardsendrecv.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixnonb.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_queue.Plo@am__quote@
......
......@@ -86,11 +86,9 @@ size_t pioFileWrite ( int fileID, int tsID, const void *buffer, size_t len )
iret = fwMPINONB ( fileID, tsID, buffer, len );
break;
#ifndef _SX
/*
case PIO_POSIX_ASYNCH:
iret = fwPOSIXASYNCH ( fileID, tsID, buffer, len );
break;
*/
#endif
case PIO_POSIX_FPGUARD_SENDRECV:
......@@ -126,13 +124,10 @@ int pioFileClose ( int id )
iret = fcMPINONB ( id );
break;
#ifndef _SX
/*
case PIO_POSIX_ASYNCH:
iret = fcPOSIXASYNCH ( id );
break;
*/
#endif
case PIO_POSIX_FPGUARD_SENDRECV:
iret = fcPOSIXFPGUARDSENDRECV ( id );
break;
......@@ -167,13 +162,10 @@ int pioFileOpenW ( const char *filename )
iret = fowMPINONB ( filename );
break;
#ifndef _SX
/*
case PIO_POSIX_ASYNCH:
iret = fowPOSIXASYNCH ( filename );
break;
*/
#endif
case PIO_POSIX_FPGUARD_SENDRECV:
iret = fowPOSIXFPGUARDSENDRECV ( filename );
break;
......@@ -219,13 +211,10 @@ void backendInit ( void )
initMPINONB ();
break;
#ifndef _SX
/*
case PIO_POSIX_ASYNCH:
collectingData = initPOSIXASYNCH ();
initPOSIXASYNCH ();
break;
*/
#endif
case PIO_POSIX_FPGUARD_SENDRECV:
initPOSIXFPGUARDSENDRECV ();
break;
......@@ -252,6 +241,15 @@ void backendInit ( void )
void backendFinalize ( void )
{
#ifdef USE_MPI
/*
switch ( pioinfo->type )
{
case PIO_POSIX_FPGUARD_THREAD:
finalizePOSIXFPGUARDTHREAD ();
break;
}
*/
commDestroy ();
MPI_Finalize ();
exit ( 0 );
......
......@@ -80,11 +80,11 @@ void initMPINONB ( void );
/* pio_posixasynch.c */
#ifndef _SX
void pwPOSIXASYNCH ( int );
int fowPOSIXASYNCH ( const char * );
int fcPOSIXASYNCH ( int );
size_t fwPOSIXASYNCH( int, int, const void *, size_t );
int initPOSIXASYNCH ( void );
void pwPOSIXASYNCH ( void );
int fowPOSIXASYNCH ( const char * );
int fcPOSIXASYNCH ( int );
size_t fwPOSIXASYNCH( int, int, const void *, size_t );
void initPOSIXASYNCH ( void );
#endif
/* pio_posixfpguardsendrecv.c */
void fpgPOSIXFPGUARDSENDRECV ( void );
......
This diff is collapsed.
......@@ -102,11 +102,11 @@ static bFiledataP * initBFiledataP ( char *filename,
int i;
char errorString[maxErrorString];
xdebug ( "filename=%s, buffersize=%lu, ncollectors=%d, in", filename, bs, nc );
xdebug ( "filename=%s, buffersize=%lu, ncollectors=%d", filename, bs, nc );
bfp = xmalloc ( sizeof ( bFiledataP ));
bfp = xmalloc ( sizeof ( bfp[0] ));
bfp->name = xmalloc (( strlen ( filename ) + 1 ) * sizeof ( char ));
bfp->name = xmalloc (( strlen ( filename ) + 1 ) * sizeof ( bfp->name[0] ));
strcpy ( bfp->name, filename );
bfp->size = bs;
......@@ -134,7 +134,6 @@ static bFiledataP * initBFiledataP ( char *filename,
int destroyAFiledataP ( void *v )
{
int iret = 0;
aFiledataP *afp = ( aFiledataP * ) v;
MPI_Status status;
......@@ -150,7 +149,7 @@ int destroyAFiledataP ( void *v )
xdebug ( "cleaned up, return" );
return iret;
return 0;
}
/***************************************************************/
......@@ -238,12 +237,11 @@ void writeP ( bFiledataP *bfd, long amount )
xdebug ( "filename=%s, written=%ld, amount=%ld, return",
bfd->name, written, amount );
return;
}
/***************************************************************/
// baustelle, in modi vereinheitlichen, kapseln in pio_queue.c
// TODO: unify in IOModi, encapsulate in pio_queue.c
void queueCheckP ( queue_t *q, char *name )
{
listElem_t *curr;
......@@ -267,10 +265,9 @@ void queueCheckP ( queue_t *q, char *name )
curr = curr->next;
}
return;
}
//
/***************************************************************/
void pwPOSIXNONB ( void )
{
......@@ -292,8 +289,8 @@ void pwPOSIXNONB ( void )
for ( ;; )
{
xmpi ( MPI_Probe ( MPI_ANY_SOURCE, MPI_ANY_TAG, commNode, &status ));
xmpiStat ( MPI_Probe ( MPI_ANY_SOURCE, MPI_ANY_TAG, commNode,
&status ), &status );
source = status.MPI_SOURCE;
tag = status.MPI_TAG;
......@@ -313,7 +310,7 @@ void pwPOSIXNONB ( void )
{
case IO_Open_file:
messageBuffer = ( char *) xmalloc ( messagesize * sizeof ( char ));
messageBuffer = xmalloc ( messagesize * sizeof ( messageBuffer[0] ));
pMB = messageBuffer;
xmpi ( MPI_Recv ( messageBuffer, messagesize, MPI_CHAR, source,
......@@ -389,8 +386,7 @@ void pwPOSIXNONB ( void )
case IO_Close_file:
xdebug ( "command %s, file%d, source%d",
command2charP [ rtag->command ],
rtag->id, source );
command2charP [ rtag->command ], rtag->id, source );
if ( ! ( bfd = ( bFiledataP * ) queueIdx2val ( bibBFiledataP,
rtag->id )))
......@@ -402,8 +398,7 @@ void pwPOSIXNONB ( void )
amount = messagesize;
xdebug ( "command %s, id=%d, name=%s, amount=%ld",
command2charP [ rtag->command ],
rtag->id, bfd->name, amount );
command2charP [ rtag->command ], rtag->id, bfd->name, amount );
xmpi ( MPI_Recv ( bfd->fb->buffer, amount, MPI_CHAR, source, tag,
commNode, &status ));
......@@ -458,7 +453,7 @@ void sendP ( aFiledataP *afd, int id )
if ( ddebug ) startTime = MPI_Wtime ();
xmpi ( MPI_Wait ( &( afd->request ), &status ));
xmpiStat ( MPI_Wait ( &( afd->request ), &status ), &status );
if ( ddebug ) accumWait += ( MPI_Wtime () - startTime );
......@@ -598,16 +593,15 @@ int fowPOSIXNONB ( const char *filename )
if ( ! buffersize )
{
xdebug (" Broadcast buffersize to collectors ..." );
if ( commInqRankColl () == root )
{
if ( getenv( "BUFSIZE" ) != NULL )
buffersize = atol ( getenv ( "BUFSIZE" ));
if ( buffersize < initial_buffersize )
buffersize = initial_buffersize;
}
xdebug ( "filename=%s, broadcast buffersize=%d to collectors ...",
filename, buffersize );
}
xmpi ( MPI_Bcast ( &buffersize, 1, MPI_LONG, root, commInqCommColl ()));
}
......@@ -666,7 +660,7 @@ void initPOSIXNONB ( void )
commRecvNodeMap ();
commDefCommsIO ();
bibAFiledataP = queueInit ( destroyAFiledataP,
compareNamesAP );
compareNamesAP );
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment