Commit 92c20f21 authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Merge common parts of distinct collector/writer variants.

parent be831657
......@@ -227,6 +227,7 @@ src/pio_posixasynch.c -text
src/pio_posixfpguardsendrecv.c -text
src/pio_posixnonb.c -text
src/pio_queue.c -text
src/pio_record_send.c -text
src/pio_rpc.c -text
src/pio_rpc.h -text
src/pio_server.c -text
......
......@@ -61,6 +61,7 @@ libcdi_la_SOURCES = \
pio_interface.c \
pio_interface.h \
pio_mpinonb.c \
pio_record_send.c\
pio_posixasynch.c\
pio_posixfpguardsendrecv.c \
pio_posixnonb.c \
......
......@@ -90,15 +90,16 @@ am_libcdi_la_OBJECTS = basetime.lo binary.lo calendar.lo cdf.lo \
cgribexlib.lo dmemory.lo error.lo extralib.lo file.lo \
gaussgrid.lo gribapi.lo grid.lo ieglib.lo institution.lo \
model.lo namespace.lo pio.lo pio_comm.lo pio_dbuffer.lo \
pio_interface.lo pio_mpinonb.lo pio_posixasynch.lo \
pio_posixfpguardsendrecv.lo pio_posixnonb.lo pio_queue.lo \
pio_rpc.lo pio_server.lo pio_util.lo resource_handle.lo \
servicelib.lo stream_cdf.lo stream_cgribex.lo stream_ext.lo \
stream_grb.lo stream_gribapi.lo stream_history.lo \
stream_ieg.lo stream_int.lo stream_record.lo stream_srv.lo \
stream_var.lo table.lo taxis.lo timebase.lo tsteps.lo util.lo \
varscan.lo version.lo vlist.lo vlist_att.lo vlist_var.lo \
zaxis.lo stream.lo swap.lo
pio_interface.lo pio_mpinonb.lo pio_record_send.lo \
pio_posixasynch.lo pio_posixfpguardsendrecv.lo \
pio_posixnonb.lo pio_queue.lo pio_rpc.lo pio_server.lo \
pio_util.lo resource_handle.lo servicelib.lo stream_cdf.lo \
stream_cgribex.lo stream_ext.lo stream_grb.lo \
stream_gribapi.lo stream_history.lo stream_ieg.lo \
stream_int.lo stream_record.lo stream_srv.lo stream_var.lo \
table.lo taxis.lo timebase.lo tsteps.lo util.lo varscan.lo \
version.lo vlist.lo vlist_att.lo vlist_var.lo zaxis.lo \
stream.lo swap.lo
libcdi_la_OBJECTS = $(am_libcdi_la_OBJECTS)
@ENABLE_CDI_LIB_FALSE@am_libcdi_la_rpath =
@ENABLE_CDI_LIB_TRUE@am_libcdi_la_rpath = -rpath $(libdir)
......@@ -354,6 +355,7 @@ libcdi_la_SOURCES = \
pio_interface.c \
pio_interface.h \
pio_mpinonb.c \
pio_record_send.c\
pio_posixasynch.c\
pio_posixfpguardsendrecv.c \
pio_posixnonb.c \
......@@ -541,6 +543,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixfpguardsendrecv.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixnonb.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_queue.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_record_send.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_rpc.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_server.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_util.Plo@am__quote@
......
......@@ -74,15 +74,13 @@ size_t pioFileWrite ( int fileID, int tsID, const void *buffer, size_t len )
break;
#ifndef _SX
case PIO_ASYNCH:
iret = fwPOSIXASYNCH ( fileID, tsID, buffer, len );
break;
#endif
case PIO_WRITER:
iret = pioSendWrite(fileID, tsID, buffer, len);
break;
case PIO_FPGUARD:
iret = fwPOSIXFPGUARDSENDRECV ( fileID, tsID, buffer, len );
break;
case PIO_WRITER:
iret = fwPOSIXNONB ( fileID, tsID, buffer, len );
break;
}
return iret;
......@@ -100,15 +98,13 @@ int pioFileClose ( int id )
break;
#ifndef _SX
case PIO_ASYNCH:
iret = fcPOSIXASYNCH ( id );
break;
#endif
case PIO_WRITER:
iret = pioSendClose(id);
break;
case PIO_FPGUARD:
iret = fcPOSIXFPGUARDSENDRECV ( id );
break;
case PIO_WRITER:
iret = fcPOSIXNONB ( id );
break;
}
return iret;
......@@ -127,15 +123,13 @@ int pioFileOpenW ( const char *filename )
break;
#ifndef _SX
case PIO_ASYNCH:
iret = fowPOSIXASYNCH ( filename );
break;
#endif
case PIO_WRITER:
iret = pioSendOpen(filename);
break;
case PIO_FPGUARD:
iret = fowPOSIXFPGUARDSENDRECV ( filename );
break;
case PIO_WRITER:
iret = fowPOSIXNONB ( filename );
break;
}
return iret;
......@@ -164,15 +158,13 @@ void backendInit ( void )
break;
#ifndef _SX
case PIO_ASYNCH:
initPOSIXASYNCH ();
break;
#endif
case PIO_WRITER:
pioSendInitialize();
break;
case PIO_FPGUARD:
initPOSIXFPGUARDSENDRECV ();
break;
case PIO_WRITER:
initPOSIXNONB ();
break;
}
}
......@@ -188,15 +180,15 @@ void backendCleanup ( void )
case PIO_MPI:
finalizeMPINONB ();
break;
#ifndef _SX
case PIO_ASYNCH:
finalizePOSIXASYNCH ();
#endif
case PIO_WRITER:
pioSendFinalize();
break;
case PIO_FPGUARD:
finalizePOSIXFPGUARDSENDRECV ();
break;
case PIO_WRITER:
finalizePOSIXNONB ();
break;
default:
xdebug("%s", " BACKENDCLEANUP FUNCTION NOT IMPLEMENTED YET.");
}
......
......@@ -79,14 +79,18 @@ size_t fwMPINONB( int, int, const void *, size_t );
void initMPINONB ( void );
void finalizeMPINONB ( void );
/* common functionality for file split between collectors and writer(s) */
int pioSendClose(int);
int pioSendOpen(const char *);
size_t pioSendWrite(int, int, const void *, size_t);
void pioSendInitialize(void);
void pioSendFinalize(void);
/* pio_posixasynch.c */
#ifndef _SX
void pwPOSIXASYNCH ( void );
int fowPOSIXASYNCH ( const char * );
int fcPOSIXASYNCH ( int );
size_t fwPOSIXASYNCH( int, int, const void *, size_t );
void initPOSIXASYNCH ( void );
void finalizePOSIXASYNCH ( void );
void pioWriterAIO(void);
#endif
/* pio_posixfpguardsendrecv.c */
......@@ -98,12 +102,7 @@ void initPOSIXFPGUARDSENDRECV ( void );
void finalizePOSIXFPGUARDSENDRECV ( void );
/* pio_posixnonb.c */
void pwPOSIXNONB ( void );
int fowPOSIXNONB ( const char * );
int fcPOSIXNONB ( int );
size_t fwPOSIXNONB( int, int, const void *, size_t );
void initPOSIXNONB ( void );
void finalizePOSIXNONB ( void );
void pioWriterStdIO(void);
#endif
#endif
......
......@@ -29,7 +29,6 @@
extern char * command2charP[6];
extern long initial_buffersize;
extern char *token;
extern double accumProbe;
......@@ -40,18 +39,6 @@ extern double accumWait;
extern double accumWrite;
typedef struct
{
char *name;
size_t size;
int tsID;
struct dBuffer *db1;
struct dBuffer *db2;
struct dBuffer *db;
IO_Server_command command;
MPI_Request request;
} aFiledataPA;
typedef struct
{
char *name;
......@@ -67,50 +54,10 @@ typedef struct
bool *nfinished;
} bFiledataPA;
static queue_t *bibAFiledataPA;
int nPrefStreams = 4;
/***************************************************************/
static aFiledataPA *
initAFiledataPA(const char *filename, size_t bs)
{
aFiledataPA *afd;
size_t len;
int iret;
xdebug ( "filename=%s, buffersize=%zu, in", filename, bs );
afd = xmalloc ( sizeof ( aFiledataPA ));
len = strlen ( filename );
afd->name = xmalloc (( len + 1) * sizeof ( afd->name[0] ));
strcpy ( afd->name, filename );
afd->size = bs;
afd->tsID = 0;
/* init output buffer */
xdebug ( "filename=%s, init output buffer", afd->name );
iret = dbuffer_init ( &( afd->db1 ), afd->size );
iret += dbuffer_init ( &( afd->db2 ), afd->size );
if ( iret > 0 ) xabort ( "dbuffer_init did not succeed" );
afd->db = afd->db1;
afd->command = IO_Open_file;
afd->request = MPI_REQUEST_NULL;
xdebug ( "enqueued name=%s, return", afd->name );
return afd;
}
/***************************************************************/
static bFiledataPA *
initBFiledataPA(char *filename, size_t bs, int nc)
{
......@@ -157,29 +104,6 @@ initBFiledataPA(char *filename, size_t bs, int nc)
/***************************************************************/
static int
destroyAFiledataPA(void *v)
{
aFiledataPA *afd = (aFiledataPA * ) v;
MPI_Status status;
xdebug ( "filename=%s, cleanup, in", afd->name );
xmpiStat ( MPI_Wait ( &( afd->request ), &status ), &status );
dbuffer_cleanup ( &( afd->db1 ));
dbuffer_cleanup ( &( afd->db2 ));
free ( afd->name );
free ( afd );
xdebug("%s", "cleaned up, return");
return 0;
}
/***************************************************************/
static int
destroyBFiledataPA ( void *v )
{
......@@ -239,16 +163,6 @@ destroyBFiledataPA ( void *v )
/***************************************************************/
static bool
compareNamesAPA(void *v1, void *v2)
{
aFiledataPA *afd1 = v1, *afd2 = v2;
return !strcmp(afd1->name, afd2->name);
}
/***************************************************************/
static bool
compareNamesBPA(void *v1, void *v2)
{
......@@ -344,7 +258,7 @@ queueCheckPA(queue_t *q, char *name)
/***************************************************************/
void pwPOSIXASYNCH ( void )
void pioWriterAIO(void)
{
bFiledataPA *bfd;
queue_t * bibBFiledataPA;
......@@ -357,6 +271,7 @@ void pwPOSIXASYNCH ( void )
int nProcsCollNode = commInqSizeNode () - commInqSizeColl ();
bool * sentFinalize, doFinalize;
if ( nPrefStreams < 1 ) xabort("USAGE: # PREFETCH STREAMS >= 1");
xdebug ( "nProcsCollNode=%d on this node", nProcsCollNode );
bibBFiledataPA = queueInit ( destroyBFiledataPA, compareNamesBPA );
......@@ -520,267 +435,11 @@ void pwPOSIXASYNCH ( void )
}
/***************************************************************/
/* send buffer to writer and swap buffer for filling */
static void
sendPA(aFiledataPA *afd, int id)
{
int tag;
long amount;
MPI_Status status;
double startTime;
amount = ( long ) dbuffer_data_size ( afd->db );
tag = setTag ( id, afd->command );
xdebug ( "send buffer for %s, size: %zu bytes, command=%s, in",
afd->name, amount, command2charP [ afd->command ] );
if ( ddebug ) startTime = MPI_Wtime ();
xmpiStat ( MPI_Wait ( &( afd->request ), &status ), &status );
if ( ddebug ) accumWait += ( MPI_Wtime () - startTime );
xmpi(MPI_Issend(afd->db->buffer, amount, MPI_CHAR, commInqSpecialRankNode(),
tag, commInqCommNode(), &afd->request));
/* change outputBuffer */
dbuffer_reset ( afd->db );
if ( afd->db == afd->db1 )
{
xdebug("%s", "Change to buffer 2 ...");
afd->db = afd->db2;
}
else
{
xdebug("%s", "Change to buffer 1 ...");
afd->db = afd->db1;
}
afd->command = IO_Send_buffer;
return;
}
/***************************************************************/
static void
defTimestepPA(aFiledataPA * afd, int tsID)
{
if ( afd == NULL || tsID != afd->tsID + 1 )
xabort ( " defTimestepPA () didn't succeed." );
afd->tsID = tsID;
}
/***************************************************************/
size_t fwPOSIXASYNCH( int id, int tsID, const void *buffer, size_t len )
{
int error = 0;
int flush = 0;
int filled = 0;
aFiledataPA *afd;
listElem_t *curr;
afd = ( aFiledataPA * ) queueIdx2val ( bibAFiledataPA, id );
flush = ( tsID != afd->tsID ) ? 1 : 0;
if ( flush == 1 )
{
xdebug ( "fileWriter (): tsID = %d, flush buffer", tsID );
curr = bibAFiledataPA->head;
while ( curr )
{
sendPA (( aFiledataPA *) curr->val, curr->idx );
defTimestepPA (( aFiledataPA * ) curr->val, tsID );
curr = curr->next;
}
{
double startTime;
MPI_Status status;
if (ddebug) startTime = MPI_Wtime();
xmpiStat(MPI_Wait(&(afd->request), &status), &status);
if (ddebug) accumWait += MPI_Wtime() - startTime;
}
xmpi ( MPI_Barrier ( commInqCommColl ()));
}
filled = dbuffer_push ( afd->db, ( unsigned char * ) buffer, len );
xdebug ( "id = %d, tsID = %d, pushed %zu byte data on buffer, filled = %d",
id, tsID, len, filled );
if ( filled == 1 )
{
if ( flush )
error = filled;
else
{
sendPA ( afd, id );
error = dbuffer_push ( afd->db, ( unsigned char * )buffer, len );
}
}
if ( error == 1 )
xabort("did not succeed filling output buffer, id=%d", id);
return len;
}
/***************************************************************/
int fcPOSIXASYNCH ( int id )
{
double accumWaitMax;
aFiledataPA *afd;
int iret, root = 0;
xdebug ( "fileID %d: send buffer, close file and cleanup", id );
afd = ( aFiledataPA * ) queueIdx2val ( bibAFiledataPA, id );
afd->command = IO_Close_file;
sendPA ( afd, id );
/* dequeue file element */
iret = queueDelNode ( bibAFiledataPA, id );
/* timer output */
if ( ddebug )
{
xmpi ( MPI_Reduce ( &accumWait, &accumWaitMax,
1, MPI_DOUBLE, MPI_MAX, root, commInqCommColl ()));
xdebug ( "Wait time %15.10lf s", accumWait );
if ( commInqRankColl () == root )
xdebug ( "Max wait time %15.10lf s", accumWaitMax );
}
return iret;
}
/***************************************************************/
int fowPOSIXASYNCH ( const char *filename )
{
aFiledataPA *afd;
static long buffersize = 0;
int root = 0, iret, id, messageLength = 32;
char message[messageLength];
/* broadcast buffersize to collectors */
if ( !buffersize )
{
if ( commInqRankColl () == root )
{
if ( getenv( "BUFSIZE" ) != NULL )
buffersize = atol ( getenv ( "BUFSIZE" ));
if ( buffersize < initial_buffersize )
buffersize = initial_buffersize;
xdebug("filename=%s, broadcast buffersize=%ld to collectors ...",
filename, buffersize);
}
xmpi ( MPI_Bcast ( &buffersize, 1, MPI_LONG, root, commInqCommColl ()));
}
/* init and enqueue aFiledataPA */
afd = initAFiledataPA ( filename, buffersize );
if ((id = queuePush ( bibAFiledataPA, afd, 0)) < 0)
xabort("filename %s is not unique", afd->name);
xdebug ( "filename=%s, init and enqueued aFiledataPA, return id = %d",
filename, id );
/* put filename, id and buffersize on buffer */
iret = dbuffer_push ( afd->db, ( unsigned char *) filename,
strlen ( filename ));
xassert(iret == 0);
iret = dbuffer_push ( afd->db, ( unsigned char *) token, 1);
xassert(iret == 0);
sprintf ( message,"%lX", buffersize);
iret = dbuffer_push ( afd->db, ( unsigned char *) message,
strlen ( message ));
xassert(iret == 0);
iret = dbuffer_push ( afd->db, ( unsigned char *) token, 1);
xassert(iret == 0);
if ( ddebug )
{
size_t l = strlen(filename) + strlen(message) + 2;
char *temp = xmalloc(l + 1);
strncpy(temp, (char *)afd->db->buffer, l);
temp[l] = '\0';
xdebug("filename=%s, put Open file message on buffer:\n%s,\t return",
filename, temp);
free(temp);
}
return id;
}
/***************************************************************/
void finalizePOSIXASYNCH ( void )
{
int buffer = 0, tag, specialRank = commInqSpecialRankNode ();
MPI_Comm commNode = commInqCommNode ();
tag = setTag ( 0, IO_Finalize );
xmpi ( MPI_Send ( &buffer, 1, MPI_INT, specialRank, tag, commNode ));
if ( bibAFiledataPA->head != NULL )
{
xabort ( "Queue bibAFiledataP not empty." );
}
else
{
xdebug("%s", "cleanup queue");
queueDestroy ( bibAFiledataPA );
}
}
/***************************************************************/
void initPOSIXASYNCH ( void )
{
if ( commInqSizeNode () < 2 )
xabort ( "USAGE: # IO PROCESSES ON A PHYSICAL NODE >= 2" );
if ( nPrefStreams < 1 ) xabort ( "USAGE: # PREFETCH STREAMS >= 1" );
if ( commInqRankNode () == commInqSpecialRankNode ())
{
commDefCommColl ( 0 );
commSendNodeInfo ();
commRecvNodeMap ();
commDefCommsIO ();
pwPOSIXASYNCH ();
}
else
{
commDefCommColl ( 1 );
commSendNodeInfo ();
commRecvNodeMap ();
commDefCommsIO ();
bibAFiledataPA = queueInit ( destroyAFiledataPA,
compareNamesAPA );
}
}
#endif
#endif
......
......@@ -17,7 +17,6 @@
extern char * command2charP[6];
extern long initial_buffersize;
extern char *token;
extern double accumProbe;
......@@ -28,18 +27,6 @@ extern double accumWait;
extern double accumWrite;
typedef struct
{
char *name;
int tsID;
size_t size;
struct dBuffer *db1;
struct dBuffer *db2;
struct dBuffer *db;
IO_Server_command command;
MPI_Request request;
} aFiledataP;
typedef struct
{
char *name;
......@@ -50,47 +37,6 @@ typedef struct
bool *nfinished;
} bFiledataP;
static queue_t * bibAFiledataP;
/***************************************************************/
static aFiledataP *
initAFiledataP(const char *filename, size_t bs)