Commit cbc55fc5 authored by Deike Kleberg's avatar Deike Kleberg
Browse files

added mpi nonblocking and posix nonblocking

parent 7881570a
......@@ -171,8 +171,10 @@ src/pio.c -text
src/pio.h -text
src/pio_dbuffer.c -text
src/pio_impl.h -text
src/pio_mpinonb.c -text
src/pio_posixasynch.c -text
src/pio_posixfpguardsendrecv.c -text
src/pio_posixnonb.c -text
src/pio_queue.c -text
src/service.h -text
src/servicelib.c -text
......
......@@ -22,7 +22,7 @@
INTEGER, PARAMETER :: dp = SELECTED_REAL_KIND(12,307)
INTEGER, PARAMETER :: PTYPE = PIO_POSIX_FPGUARD_SENDRECV
INTEGER, PARAMETER :: PTYPE = PIO_MPI_NONB
INTEGER, PARAMETER :: NLON = 38 ! Number of longitudes 384
INTEGER, PARAMETER :: NLAT = 19 ! Number of latitudes 192
......
......@@ -94,11 +94,13 @@ libcdi_a_SOURCES = \
tablepar.h \
gaussgrid.h \
varscan.h \
pio.c \
pio_dbuffer.c \
pio_mpinonb.c \
pio_queue.c \
pio.c \
pio_posixasynch.c\
pio_posixfpguardsendrecv.c \
pio_posixnonb.c \
pio.h \
pio_impl.h
#
......
......@@ -77,8 +77,9 @@ am_libcdi_a_OBJECTS = cdiFortran.$(OBJEXT) cdi_error.$(OBJEXT) \
extralib.$(OBJEXT) ieglib.$(OBJEXT) cdf.$(OBJEXT) \
cdf_int.$(OBJEXT) file.$(OBJEXT) binary.$(OBJEXT) \
swap.$(OBJEXT) cgribexlib.$(OBJEXT) gribapi.$(OBJEXT) \
pio_dbuffer.$(OBJEXT) pio_queue.$(OBJEXT) pio.$(OBJEXT) \
pio_posixasynch.$(OBJEXT) pio_posixfpguardsendrecv.$(OBJEXT)
pio.$(OBJEXT) pio_dbuffer.$(OBJEXT) pio_mpinonb.$(OBJEXT) \
pio_queue.$(OBJEXT) pio_posixasynch.$(OBJEXT) \
pio_posixfpguardsendrecv.$(OBJEXT) pio_posixnonb.$(OBJEXT)
libcdi_a_OBJECTS = $(am_libcdi_a_OBJECTS)
DEFAULT_INCLUDES = -I.@am__isrc@
depcomp = $(SHELL) $(top_srcdir)/config/depcomp
......@@ -303,11 +304,13 @@ libcdi_a_SOURCES = \
tablepar.h \
gaussgrid.h \
varscan.h \
pio.c \
pio_dbuffer.c \
pio_mpinonb.c \
pio_queue.c \
pio.c \
pio_posixasynch.c\
pio_posixfpguardsendrecv.c \
pio_posixnonb.c \
pio.h \
pio_impl.h
......@@ -426,8 +429,10 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/model.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_dbuffer.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_mpinonb.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixasynch.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixfpguardsendrecv.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixnonb.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_queue.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/servicelib.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/stream.Po@am__quote@
......
......@@ -182,9 +182,10 @@ extern "C" {
#ifndef NOMPI
#define PIO_NONE 0
#define PIO_POSIX_FPGUARD_SENDRECV 1
#define PIO_MPI_NONB 1
#define PIO_POSIX_ASYNCH 2
#define PIO_POSIX_FPGUARD_SENDRECV 3
#define PIO_POSIX_NONB 4
int pioInit ( int, int, int* );
void pioFinalize ( void );
......
......@@ -4,7 +4,7 @@
!
! Author:
! -------
! Uwe Schulzweida, MPI-MET, Hamburg, March 2010
! Uwe Schulzweida, MPI-MET, Hamburg, April 2010
!
INTEGER CDI_UNDEFID
......@@ -304,10 +304,14 @@
PARAMETER (CALENDAR_NONE = 5)
INTEGER PIO_NONE
PARAMETER (PIO_NONE = 0)
INTEGER PIO_POSIX_FPGUARD_SENDRECV
PARAMETER (PIO_POSIX_FPGUARD_SENDRECV = 1)
INTEGER PIO_MPI_NONB
PARAMETER (PIO_MPI_NONB = 1)
INTEGER PIO_POSIX_ASYNCH
PARAMETER (PIO_POSIX_ASYNCH = 2)
INTEGER PIO_POSIX_FPGUARD_SENDRECV
PARAMETER (PIO_POSIX_FPGUARD_SENDRECV = 3)
INTEGER PIO_POSIX_NONB
PARAMETER (PIO_POSIX_NONB = 4)
INTEGER pioInit
! (INTEGER ,
! INTEGER ,
......
......@@ -120,12 +120,14 @@ c="dmemory.c \
gribapi.c \
swap.c \
binary.c \
cdf.c \
pio_dbuffer.c \
pio_queue.c \
cdf.c \
pio.c \
pio_dbuffer.c \
pio_mpinonb.c \
pio_posixasynch.c \
pio_posixfpguardsendrecv.c\
pio_posixasynch.c"
pio_posixnonb.c \
pio_queue.c"
h="cdi_limits.h taxis.h error.h dtypes.h file.h cgribex.h gribapi.h service.h extra.h \
ieg.h cdi.h timebase.h calendar.h basetime.h datetime.h stream_int.h \
......
......@@ -12,8 +12,10 @@
bool ddebug = false;
long initial_buffer_size = 16 * 1024 * 1024;
/* 16 * 1024 * 1024, 4 KB <= x < 256 MB */
long initial_buffersize = 16 * 1024 * 1024;
/* 4 KB <= x < 256 MB */
/* 16 * 1024 * 1024; */
/* 4 * 1024; */
double startTime;
double accumProbe = 0.0;
......@@ -25,6 +27,10 @@ double accumWrite = 0.0;
pioInfo *pioinfo;
MPI_Request request = MPI_REQUEST_NULL;
char *token = "%";
/*****************************************************************************/
void check_mpi ( int line, int iret )
......@@ -71,12 +77,18 @@ size_t pioFileWrite ( int id, int tsId, const void *buffer, size_t len )
switch ( pioinfo->type )
{
case PIO_POSIX_FPGUARD_SENDRECV:
iret = fwPOSIXFPGUARDSENDRECV ( id, tsId, buffer, len );
case PIO_MPI_NONB:
iret = fwMPINONB ( id, tsId, buffer, len );
break;
case PIO_POSIX_ASYNCH:
iret = fwPOSIXASYNCH ( id, tsId, buffer, len );
break;
case PIO_POSIX_FPGUARD_SENDRECV:
iret = fwPOSIXFPGUARDSENDRECV ( id, tsId, buffer, len );
break;
case PIO_POSIX_NONB:
iret = fwPOSIXNONB ( id, tsId, buffer, len );
break;
}
return iret;
......@@ -90,12 +102,18 @@ int pioFileClose ( int id )
switch ( pioinfo->type )
{
case PIO_POSIX_FPGUARD_SENDRECV:
iret = fcPOSIXFPGUARDSENDRECV ( id );
case PIO_MPI_NONB:
iret = fcMPINONB ( id );
break;
case PIO_POSIX_ASYNCH:
iret = fcPOSIXASYNCH ( id );
break;
case PIO_POSIX_FPGUARD_SENDRECV:
iret = fcPOSIXFPGUARDSENDRECV ( id );
break;
case PIO_POSIX_NONB:
iret = fcPOSIXNONB ( id );
break;
}
return iret;
......@@ -109,12 +127,18 @@ int pioFileOpenW ( const char *filename )
switch ( pioinfo->type )
{
case PIO_POSIX_FPGUARD_SENDRECV:
iret = fowPOSIXFPGUARDSENDRECV ( filename );
case PIO_MPI_NONB:
iret = fowMPINONB ( filename );
break;
case PIO_POSIX_ASYNCH:
iret = fowPOSIXASYNCH ( filename );
break;
case PIO_POSIX_FPGUARD_SENDRECV:
iret = fowPOSIXFPGUARDSENDRECV ( filename );
break;
case PIO_POSIX_NONB:
iret = fowPOSIXNONB ( filename );
break;
}
return iret;
......@@ -145,17 +169,23 @@ int pioInit ( int ptype, int comm, int *ncollectors )
if ( ddebug && pioinfo->rank == 0 )
fprintf ( stdout,
"pe%d in pioDefPtype(), ptype=%d, initial_buffer_size=%ld: init pioinfo ...\n",
pioinfo->rank, pioinfo->type, initial_buffer_size );
"pe%d in pioDefPtype(), ptype=%d, initial_buffersize=%ld: init pioinfo ...\n",
pioinfo->rank, pioinfo->type, initial_buffersize );
switch ( pioinfo->type )
{
case PIO_POSIX_FPGUARD_SENDRECV:
collectingData = initPOSIXFPGUARDSENDRECV ( ncollectors );
case PIO_MPI_NONB:
collectingData = initMPINONB ( ncollectors );
break;
case PIO_POSIX_ASYNCH:
collectingData = initPOSIXASYNCH ( ncollectors );
break;
case PIO_POSIX_FPGUARD_SENDRECV:
collectingData = initPOSIXFPGUARDSENDRECV ( ncollectors );
break;
case PIO_POSIX_NONB:
collectingData = initPOSIXNONB ( ncollectors );
break;
}
return collectingData;
......@@ -171,3 +201,4 @@ void pioFinalize ()
}
#endif
......@@ -2,7 +2,6 @@
#define _PIO_IMPL_H
#include <stdbool.h>
//#include <stdarg.h>
#ifndef NOMPI
#include "mpi.h"
......@@ -13,15 +12,12 @@ typedef enum
IO_Close_file,
IO_Get_fp,
IO_Set_fp,
IO_Send_buffer,
IO_Send_buffer_size,
IO_Finish
IO_Send_buffer
} IO_Server_command;
const char * command2charP[7] = {"IO_Open_file", "IO_Close_file",
const char * command2charP[5] = {"IO_Open_file", "IO_Close_file",
"IO_Get_fp","IO_Set_fp",
"IO_Send_buffer", "IO_Send_buffer_size",
"IO_Finish"};
"IO_Send_buffer"};
struct dBuffer
{
......@@ -72,16 +68,15 @@ size_t dbuffer_free ( struct dBuffer * );
/* pio_queue.c */
queue * queueInit ( valDestroyFunction, keyCompareFunction );
void queueDestroy ( queue * );
void queuePush ( queue *, void *, int, ... );
int queuePush ( queue *, void *, int, ... );
void * queueIdx2val ( queue *, int );
int queueDelNode ( queue *, int );
/* pio_posixfpguardsendrecv.c */
void fpgPOSIXFPGUARDSENDRECV ( int );
int fowPOSIXFPGUARDSENDRECV ( const char * );
int fcPOSIXFPGUARDSENDRECV ( int );
size_t fwPOSIXFPGUARDSENDRECV ( int, int, const void *, size_t );
int initPOSIXFPGUARDSENDRECV ( int * );
/* pio_mpinonb.c */
int fowMPINONB ( const char * );
int fcMPINONB ( int );
size_t fwMPINONB( int, int, const void *, size_t );
int initMPINONB ( int * );
/* pio_posixasynch.c */
void pwPOSIXASYNCH ( int );
......@@ -90,5 +85,19 @@ int fcPOSIXASYNCH ( int );
size_t fwPOSIXASYNCH( int, int, const void *, size_t );
int initPOSIXASYNCH ( int * );
/* pio_posixfpguardsendrecv.c */
void fpgPOSIXFPGUARDSENDRECV ( int );
int fowPOSIXFPGUARDSENDRECV ( const char * );
int fcPOSIXFPGUARDSENDRECV ( int );
size_t fwPOSIXFPGUARDSENDRECV ( int, int, const void *, size_t );
int initPOSIXFPGUARDSENDRECV ( int * );
/* pio_posixnonb.c */
void pwPOSIXNONB ( int );
int fowPOSIXNONB ( const char * );
int fcPOSIXNONB ( int );
size_t fwPOSIXNONB( int, int, const void *, size_t );
int initPOSIXNONB ( int * );
#endif
#endif
#ifndef NOMPI
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include "mpi.h"
#include "pio.h"
#include "pio_impl.h"
extern bool ddebug;
extern double startTime;
extern double accumProbe;
extern double accumRecv;
extern double accumSend;
extern double accumSuspend;
extern double accumWait;
extern double accumWrite;
extern long initial_buffersize;
extern pioInfo *pioinfo;
typedef struct
{
char *name;
size_t size;
struct dBuffer *db1;
struct dBuffer *db2;
struct dBuffer *db;
MPI_File fh;
MPI_Request request;
} aFiledataM;
static queue *bibAFiledataM;
/***************************************************************/
static aFiledataM *initAFiledataMPINONB ( const char *filename, size_t bs )
{
aFiledataM *of;
size_t len;
int status;
if ( ddebug )
fprintf ( stdout, "pe%d initAFiledataMPINONB (): filename=%s, buffersize=%d, in\n",
pioinfo->rank, filename, bs );
of = ( aFiledataM * ) malloc ( sizeof ( aFiledataM ));
memset ( of, 0, sizeof ( aFiledataM ));
len = strlen ( filename );
of->name = ( char * ) malloc (( len ) * sizeof ( char ));
strcpy ( of->name, filename );
of->size = bs;
/* init output buffer */
if ( ddebug )
fprintf ( stdout, "pe%d initOpenFileCollector(): name=%s, init output buffer\n",
pioinfo->rank, of->name );
if (( status = dbuffer_init ( &( of->db1 ), of->size )) == 1 )
{
fprintf ( stderr, "dbuffer_init outputBuffer1: %d\n", status );
MPI_Abort ( pioinfo->comm, 1 );
}
if (( status = dbuffer_init ( &( of->db2 ), of->size )) == 1)
{
fprintf ( stderr, "dbuffer_init outputBuffer2: %d\n", status );
MPI_Abort ( pioinfo->comm, 1 );
}
of->db = of->db1;
/* open file */
MPI_File_open ( pioinfo->comm, of->name, ( MPI_MODE_CREATE|MPI_MODE_WRONLY ),
MPI_INFO_NULL, &( of->fh ));
of->request = MPI_REQUEST_NULL;
if ( ddebug )
fprintf ( stdout, "pe%d initAFiledataMPINONB (), enqueued name=%s, opened file, return\n",
pioinfo->rank, of->name );
return of;
}
/***************************************************************/
int destroyAFiledataMPINONB ( void *v )
{
int iret = 0;
aFiledataM *of;
MPI_Status status;
of = (aFiledataM * ) v;
if ( ddebug )
fprintf ( stdout, "pe%d destroyAFiledataMPINONB(): name=%s, close file, in\n",
pioinfo->rank, of->name );
/* close file */
startTime = MPI_Wtime ();
MPI_Wait ( & ( of->request ), &status );
accumWait += ( MPI_Wtime () - startTime );
MPI_File_sync ( of->fh );
iret = MPI_File_close ( & ( of->fh ));
/* file closed, cleanup */
dbuffer_cleanup ( & ( of->db1 ));
dbuffer_cleanup ( & ( of->db2 ));
free ( of->name );
free ( of );
if ( ddebug )
fprintf ( stdout, "pe%d destroyAFiledataMPINONB(), closed file, cleaned up, return\n",
pioinfo->rank );
return iret == MPI_SUCCESS ? 0 : -1;
}
/***************************************************************/
bool compareNamesMPINONB ( void *v1, void *v2 )
{
aFiledataM *afm1, *afm2;
size_t len;
bool bret;
afm1 = ( aFiledataM * ) v1;
afm2 = ( aFiledataM * ) v2;
len = strlen ( afm1->name );
bret = ( len == strlen ( afm2->name ) &&
memcmp ( afm1->name, afm2->name, len ) == 0 );
return bret;
}
/***************************************************************/
void writeMPINONB ( aFiledataM *of, int id )
{
int iret, amount;
MPI_Offset displ;
MPI_Status status;
/* write buffer */
amount = ( int ) dbuffer_data_size ( of->db );
if ( ddebug )
fprintf ( stdout, "pe%d writeMPINONB(): Write buffer, size %zu bytes, in\n",
pioinfo->rank, amount );
MPI_Wait ( & ( of->request ), &status );
MPI_File_iwrite_shared ( of->fh, of->db->buffer, amount, MPI_CHAR,
& ( of->request ));
/* change outputBuffer */
dbuffer_reset ( of->db );
if ( of->db == of->db1 )
{
if ( ddebug )
fprintf ( stdout, "pe%d writeMPINONB (): id=%d, change to buffer 2 ...\n",
pioinfo->rank, id );
of->db = of->db2;
}
else
{
if ( ddebug )
fprintf ( stdout, "pe%d writeMPINONB (): id=%d, change to buffer 1 ...\n",
pioinfo->rank, id );
of->db = of->db1;
}
return;
}
/***************************************************************/
size_t fwMPINONB ( int id, int tsId, const void *buffer, size_t len )
{
static int oldTsId = 0;
int error = 0;
int flush = 0;
int filled = 0;
aFiledataM *of;
node *curr;
flush = ( tsId != oldTsId ) ? 1 : 0;
if ( flush == 1 )
{
if ( ddebug )
fprintf ( stdout, "pe%d fileWriter (): tsId = %d, flush buffer\n",
pioinfo->rank, tsId );
curr = bibAFiledataM->head;
while ( curr )
{
writeMPINONB (( aFiledataM *) curr->val, curr->idx );
curr = curr->next;
}
oldTsId = tsId;
MPI_Barrier ( pioinfo->comm );
}
of = ( aFiledataM * ) queueIdx2val ( bibAFiledataM, id );
filled = dbuffer_push ( of->db, ( unsigned char * ) buffer, len );
if ( ddebug )
fprintf ( stdout, "pe%d fileWriter (): id = %d, tsId = %d, pushed data on buffer, filled = %d\n",
pioinfo->rank, id, tsId, filled );
if ( filled == 1 )
{
if ( flush )
error = filled;
else
{
writeMPINONB ( of, id );
error = dbuffer_push ( of->db, ( unsigned char * ) buffer, len );
}
}
if ( error == 1 )
{
fprintf ( stderr, "pe%d, id=%d, problem filling output buffer ...\n",
pioinfo->rank, id);
MPI_Abort ( pioinfo->comm, 1 );
}
return len;
}
/***************************************************************/
int fcMPINONB ( int id )
{
aFiledataM *of;
int iret;
double accumWaitMax;
if ( ddebug )
fprintf ( stdout, "pe%d fileClose (%d ): write buffer, close file and cleanup, in\n",
pioinfo->rank, id );
if ( ! ( of = ( aFiledataM * ) queueIdx2val ( bibAFiledataM, id )))
{
fprintf ( stderr, "pe%d error, fileID=%d not found\n",
pioinfo->rank, id);
MPI_Abort ( pioinfo->comm, 1 );
}
writeMPINONB ( of, id );
/* dequeue file element */
iret = queueDelNode ( bibAFiledataM, id );
if ( !bibAFiledataM->head )
{
if ( ddebug )
fprintf ( stdout, "pe%d fileClose (): cleanup queue \n",
pioinfo->rank );
free ( bibAFiledataM );
}
/* timer output */
if ( ddebug )
{
MPI_Reduce ( &accumWait, &accumWaitMax,
1, MPI_DOUBLE, MPI_MAX, 0, pioinfo->comm );
fprintf ( stdout, "pe%d pioFileClose(): Wait time %15.10lf s\n",
pioinfo->rank, accumWait );
if ( pioinfo->rank == 0 )
fprintf ( stdout, "pe%d pioFileClose(): Max wait time %15.10lf s\n",
pioinfo->rank, accumWaitMax );
}
return iret;
}
/***************************************************************/
int fowMPINONB ( const char *filename )
{
static aFiledataM *of;
static long buffersize = 0;
int id, bcastRank = 0;
/* broadcast buffersize to collectors ( just once, for all files )*/
if ( ! buffersize )
{
if ( ddebug )
fprintf ( stdout,
"pe%d fowMPINONB(): Broadcast buffersize to collectors ...\n",
pioinfo->rank );
if ( pioinfo->rank == bcastRank )
{