Commit d12c6b53 authored by Deike Kleberg's avatar Deike Kleberg
Browse files

added pio*, initial changes

parent d3d42fb1
......@@ -163,6 +163,12 @@ src/make_cdilib -text
src/make_fint.c -text
src/mo_cdi.f90 -text
src/model.c -text
src/pio.c -text
src/pio.h -text
src/pio_dbuffer.c -text
src/pio_impl.h -text
src/pio_posixfpguardsendrecv.c -text
src/pio_queue.c -text
src/service.h -text
src/servicelib.c -text
src/stream.c -text
......
......@@ -185,6 +185,7 @@ target_alias = @target_alias@
target_cpu = @target_cpu@
target_os = @target_os@
target_vendor = @target_vendor@
top_build_prefix = @top_build_prefix@
top_builddir = @top_builddir@
top_srcdir = @top_srcdir@
......
......@@ -11,8 +11,8 @@
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE.
m4_if(m4_PACKAGE_VERSION, [2.61],,
[m4_fatal([this file was generated for autoconf 2.61.
m4_if(m4_PACKAGE_VERSION, [2.63],,
[m4_fatal([this file was generated for autoconf 2.63.
You have another version of autoconf. If you want to use that,
you should regenerate the build system entirely.], [63])])
......
......@@ -178,6 +178,7 @@ target_alias = @target_alias@
target_cpu = @target_cpu@
target_os = @target_os@
target_vendor = @target_vendor@
top_build_prefix = @top_build_prefix@
top_builddir = @top_builddir@
top_srcdir = @top_srcdir@
#
......
This diff is collapsed.
......@@ -196,6 +196,7 @@ target_alias = @target_alias@
target_cpu = @target_cpu@
target_os = @target_os@
target_vendor = @target_vendor@
top_build_prefix = @top_build_prefix@
top_builddir = @top_builddir@
top_srcdir = @top_srcdir@
......
......@@ -189,6 +189,7 @@ target_alias = @target_alias@
target_cpu = @target_cpu@
target_os = @target_os@
target_vendor = @target_vendor@
top_build_prefix = @top_build_prefix@
top_builddir = @top_builddir@
top_srcdir = @top_srcdir@
......
......@@ -93,7 +93,13 @@ libcdi_a_SOURCES = \
table.h \
tablepar.h \
gaussgrid.h \
varscan.h
varscan.h \
pio_dbuffer.c \
pio_queue.c \
pio.c \
pio_posixfpguardsendrecv.c \
pio.h \
pio_impl.h
#
cdiFortran.o: cdiFortran.c
source='$<' object='$@' libtool=no \
......
......@@ -76,7 +76,9 @@ am_libcdi_a_OBJECTS = cdiFortran.$(OBJEXT) cdi_error.$(OBJEXT) \
tsteps.$(OBJEXT) stream_int.$(OBJEXT) servicelib.$(OBJEXT) \
extralib.$(OBJEXT) ieglib.$(OBJEXT) cdf.$(OBJEXT) \
cdf_int.$(OBJEXT) file.$(OBJEXT) binary.$(OBJEXT) \
swap.$(OBJEXT) cgribexlib.$(OBJEXT) gribapi.$(OBJEXT)
swap.$(OBJEXT) cgribexlib.$(OBJEXT) gribapi.$(OBJEXT) \
pio_dbuffer.$(OBJEXT) pio_queue.$(OBJEXT) pio.$(OBJEXT) \
pio_posixfpguardsendrecv.$(OBJEXT)
libcdi_a_OBJECTS = $(am_libcdi_a_OBJECTS)
DEFAULT_INCLUDES = -I.@am__isrc@
depcomp = $(SHELL) $(top_srcdir)/config/depcomp
......@@ -205,6 +207,7 @@ target_alias = @target_alias@
target_cpu = @target_cpu@
target_os = @target_os@
target_vendor = @target_vendor@
top_build_prefix = @top_build_prefix@
top_builddir = @top_builddir@
top_srcdir = @top_srcdir@
......@@ -299,7 +302,13 @@ libcdi_a_SOURCES = \
table.h \
tablepar.h \
gaussgrid.h \
varscan.h
varscan.h \
pio_dbuffer.c \
pio_queue.c \
pio.c \
pio_posixfpguardsendrecv.c \
pio.h \
pio_impl.h
LOCALTARGETS = cdilib.o $(am__append_2)
#
......@@ -414,6 +423,10 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ieglib.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/institution.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/model.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_dbuffer.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixfpguardsendrecv.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_queue.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/servicelib.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/stream.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/stream_cdf.Po@am__quote@
......
......@@ -177,6 +177,16 @@ extern "C" {
#define CALENDAR_366DAYS 4
#define CALENDAR_NONE 5
/* deike */
/* parallel io types, A: asynchronous, B: blocking */
#ifndef NOMPI
#define PIO_NONE 0
#define PIO_POSIX_FPGUARD_SENDRECV 1
int pioInit ( int, int, int* );
void pioFinalize ( void );
#endif
/* CDI control routines */
......
! This file was automatically created, don't edit!
!
! Fortran interface for CDI library version 1.4.3
! Fortran interface for CDI library version 1.4.3.1
!
! Author:
! -------
! Uwe Schulzweida, MPI-MET, Hamburg, February 2010
! Uwe Schulzweida, MPI-MET, Hamburg, March 2010
!
INTEGER CDI_UNDEFID
......@@ -302,6 +302,19 @@
PARAMETER (CALENDAR_366DAYS = 4)
INTEGER CALENDAR_NONE
PARAMETER (CALENDAR_NONE = 5)
INTEGER PIO_NONE
PARAMETER (PIO_NONE = 0)
INTEGER PIO_POSIX_FPGUARD_SENDRECV
PARAMETER (PIO_POSIX_FPGUARD_SENDRECV = 1)
INTEGER pioInit
! (INTEGER ,
! INTEGER ,
! INTEGER )
EXTERNAL pioInit
! pioFinalize
EXTERNAL pioFinalize
!
! CDI control routines
!
......
......@@ -51,6 +51,8 @@
/* CALENDAR types */
FCALLSCFUN3 (INT, pioInit, PIOINIT, pioinit, INT, INT, PINT)
FCALLSCSUB0 (pioFinalize, PIOFINALIZE, piofinalize)
/* CDI control routines */
......
......@@ -19,6 +19,16 @@ size_t getpagesize(void);
#include "error.h"
#include "file.h"
/* begin deike */
#ifndef NOMPI
#include "pio.h"
#include "cdi.h"
extern pioInfo *pioinfo;
#endif
/* end deike */
#if ! defined(O_BINARY)
#define O_BINARY 0
#endif
......@@ -1070,6 +1080,13 @@ int fileOpen(const char *filename, const char *mode)
struct stat filestat;
F_I_L_E *fileptr = NULL;
/* begin deike */
#ifndef NOMPI
if ( memcmp ( mode, "w", 1 ) == 0 && pioinfo->type != PIO_NONE )
return pioFileOpenW ( filename );
#endif
/* end deike */
FILE_INIT
fmode = tolower((int) mode[0]);
......@@ -1159,6 +1176,13 @@ int fileClose(int fileID)
char *ftname[] = {"unknown", "open", "fopen"};
F_I_L_E *fileptr;
/* begin deike */
#ifndef NOMPI
if ( pioinfo->type != PIO_NONE )
return pioFileClose ( fileID );
#endif
/* end deike */
fileptr = file_to_pointer(fileID);
if ( fileptr == NULL )
......
......@@ -47,6 +47,11 @@ cat > ${PROG} << EOR
#include <fcntl.h>
#include <unistd.h>
#include <stdbool.h>
#ifndef NOMPI
#include "mpi.h"
#endif
#if defined (HAVE_LIBGRIB_API)
# include <grib_api.h>
#endif
......@@ -74,7 +79,6 @@ cat > ${PROG} << EOR
EOR
c="dmemory.c \
dmemory.h \
taxis.c \
error.c \
timebase.c \
......@@ -116,14 +120,17 @@ c="dmemory.c \
swap.c \
binary.c \
cdf.c \
"
pio_dbuffer.c \
pio_queue.c \
pio.c \
pio_posixfpguardsendrecv.c"
h="cdi_limits.h taxis.h error.h dtypes.h file.h cgribex.h gribapi.h service.h extra.h \
ieg.h cdi.h timebase.h calendar.h basetime.h datetime.h stream_int.h \
stream_cgribex.h stream_gribapi.h stream_grb.h stream_cdf.h \
tablepar.h table.h gaussgrid.h grid.h varscan.h binary.h swap.h \
service.h stream_srv.h stream_ext.h stream_ieg.h cdf_int.h \
cdf.h vlist.h"
cdf.h vlist.h pio.h pio_impl.h"
#cat $h >> ${PROG}
#cat $c | grep -v '#include' | grep -v '# include' >> ${PROG}
......
#ifndef NOMPI
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"
#include "cdi.h"
#include "pio.h"
#include "pio_impl.h"
bool ddebug = false;
long initial_buffer_size = 128 * 1024; //16 * 1024 * 1024, 4 KB <= x < 256 MB
double startTime;
double accumProbe = 0.0;
double accumRecv = 0.0;
double accumSend = 0.0;
double accumSuspend = 0.0;
double accumWait = 0.0;
double accumWrite = 0.0;
pioInfo *pioinfo;
/*****************************************************************************/
void check_mpi ( int line, int iret )
{
char error_string[MPI_MAX_ERROR_STRING+1];
int len;
if ( iret != MPI_SUCCESS )
{
MPI_Error_string ( iret, error_string, &len );
error_string[len] = '\0';
fprintf ( stderr,"\nLine %8d MPI error %4d: %s\n\n", line, iret,
error_string );
}
return;
}
/***************************************************************/
size_t pioFileWrite ( int id, int tsId, const void *buffer, size_t len )
{
size_t iret;
switch ( pioinfo->type )
{
case PIO_POSIX_FPGUARD_SENDRECV:
iret = fwPOSIXFPGUARDSENDRECV ( id, tsId, buffer, len );
break;
}
return iret;
}
/***************************************************************/
int pioFileClose ( int id )
{
int iret;
switch ( pioinfo->type )
{
case PIO_POSIX_FPGUARD_SENDRECV:
iret = fcPOSIXFPGUARDSENDRECV ( id );
break;
}
return iret;
}
/***************************************************************/
int pioFileOpenW ( const char *filename )
{
int iret;
switch ( pioinfo->type )
{
case PIO_POSIX_FPGUARD_SENDRECV:
iret = fowPOSIXFPGUARDSENDRECV ( filename );
break;
}
return iret;
}
/***************************************************************/
int pioInit ( int ptype, int comm, int *ncollectors )
{
int collectingData;
int size;
pioinfo = ( pioInfo * ) malloc ( sizeof ( pioInfo ));
pioinfo->type = ptype;
pioinfo->comm = ( MPI_Comm ) comm;
MPI_Comm_rank ( pioinfo->comm, &( pioinfo->rank ));
MPI_Comm_size ( pioinfo->comm, &( pioinfo->size ));
if ( pioinfo->type == PIO_NONE && pioinfo->size != 1 )
{
fprintf ( stderr,
"PTYPE should be set to a parallel I/O type or npe should be 1." );
fprintf ( stderr, "PTYPE = %d, npe = %d\n",
pioinfo->type , pioinfo->size );
MPI_Abort ( pioinfo->comm, 1 );
}
if ( ddebug && pioinfo->rank == 0 )
fprintf ( stdout,
"pe%d in pioDefPtype(), ptype=%d, initial_buffer_size=%ld: init pioinfo ...\n",
pioinfo->rank, pioinfo->type, initial_buffer_size );
switch ( pioinfo->type )
{
case PIO_POSIX_FPGUARD_SENDRECV:
collectingData = initPOSIXFPGUARDSENDRECV ( ncollectors );
break;
}
return collectingData;
}
/***************************************************************/
void pioFinalize ()
{
free ( pioinfo );
return;
}
#endif
#ifndef _PIO_H
#define _PIO_H
#ifndef NOMPI
#include <stdlib.h>
#include "mpi.h"
typedef struct{
int type;
MPI_Comm comm;
int size;
int rank;
int specialRank;
MPI_Comm collectorComm;
}pioInfo;
int pioFileOpenW ( const char* );
int pioFileClose ( int );
size_t pioFileWrite ( int, int, const void*, size_t );
#endif
#endif
#define _XOPEN_SOURCE 600
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#ifndef NOMPI
#include "pio_impl.h"
bool ldebug = false;
int dbuffer_init ( struct dBuffer **dbuffer, size_t size )
{
struct dBuffer *db;
int status;
size_t pagesize;
pagesize = ( size_t ) sysconf ( _SC_PAGESIZE );
if ( ldebug )
fprintf ( stdout, "dbuffer_init(): pagesize = %zu bytes, size = %zu \n", pagesize, size );
if ( dbuffer == NULL || size < pagesize )
{
return 1;
}
db = ( struct dBuffer * ) malloc ( sizeof ( struct dBuffer ));
if ( db == NULL )
{
perror ( "Not enough memory" );
return 1;
}
db->size = pagesize;
while ( db->size < size )
{
db->size <<= 1;
if ( ldebug )
fprintf ( stdout,"size correction: %zu\n", db->size );
}
db->wr_pointer = 0;
if ( ( status = posix_memalign ( ( void ** ) &db->buffer, pagesize,
sizeof ( char ) * ( db->size ))) != 0 )
{
switch ( status )
{
case EINVAL:
fprintf ( stderr,
"The alignment argument was not a power of two, or was not a multiple of sizeof(void *).\n" );
break;
case ENOMEM:
fprintf ( stderr,
"There was insufficient memory to fulfill the allocation request.\n" );
break;
}
}
*dbuffer = db;
return 0;
}
void dbuffer_cleanup ( struct dBuffer **dbuffer )
{
struct dBuffer *db;
db = *dbuffer;
free ( db->buffer );
free ( db );
return;
}
size_t dbuffer_data_size ( struct dBuffer *dbuffer )
{
size_t data_size;
data_size = ( size_t )( dbuffer->wr_pointer & ( dbuffer->size-1 ));
return data_size;
}
size_t dbuffer_free ( struct dBuffer *dbuffer )
{
size_t free_size;
free_size = ( size_t )( dbuffer->size - 1 - dbuffer_data_size ( dbuffer ));
return free_size;
}
int dbuffer_reset ( struct dBuffer *dbuffer )
{
dbuffer->wr_pointer = 0;
return 0;
}
int dbuffer_push ( struct dBuffer *dbuffer, unsigned char *buffer, size_t len )
{
size_t space_left;
size_t wr_ptr;
space_left = dbuffer_free ( dbuffer );
if ( len > space_left )
{
return 1; /* not enough space left */
}
wr_ptr = dbuffer->wr_pointer;
memcpy ( dbuffer->buffer + wr_ptr, buffer, len );
dbuffer->wr_pointer = wr_ptr + len;
return 0;
}
#endif
#ifndef _PIO_IMPL_H
#define _PIO_IMPL_H
#include <stdbool.h>
#ifndef NOMPI
#include "mpi.h"
typedef enum
{
IO_Open_file,
IO_Close_file,
IO_Get_fp,
IO_Set_fp,
IO_Send_buffer,
IO_Send_buffer_size,
IO_Finish
} IO_Server_command;
const char *command2charP[7] = {"IO_Open_file", "IO_Close_file",
"IO_Get_fp","IO_Set_fp",
"IO_Send_buffer", "IO_Send_buffer_size",
"IO_Finish"};
struct dBuffer
{
size_t wr_pointer;
size_t size;
unsigned char *buffer;
};
struct listElem
{
int idx;
void *val;
struct listElem * next;
};
typedef struct listElem node;
typedef int ( * valDestroyFunction ) ( void * );
typedef bool ( * keyCompareFunction ) ( void *, void * );
typedef struct
{
node *head;
node *tail;
valDestroyFunction valDestroy;
keyCompareFunction keyCompare;
int count;
} queue;
// pio_dbuffer.c
int dbuffer_init ( struct dBuffer**, size_t );
int dbuffer_push ( struct dBuffer*, unsigned char*, size_t );
size_t dbuffer_data_size ( struct dBuffer* );
int dbuffer_reset ( struct dBuffer* );
void dbuffer_cleanup ( struct dBuffer ** );
size_t dbuffer_free ( struct dBuffer* );
// pio_queue.c
queue *queueInit ( valDestroyFunction, keyCompareFunction );
void queueDestroy ( queue * );
void queuePush ( queue *, void * );
void *queueIdx2val ( queue *, int );
int queueDelNode ( queue *, int );
// pio_posixfpguardsendrecv.c
void fpgPOSIXFPGUARDSENDRECV ( int );
int fowPOSIXFPGUARDSENDRECV ( const char* );
int fcPOSIXFPGUARDSENDRECV ( int );
size_t fwPOSIXFPGUARDSENDRECV ( int, int, const void*, size_t );
int initPOSIXFPGUARDSENDRECV ( int* );
#endif
#endif
#ifndef NOMPI
#include <stdbool.h>
#include <stdio.h>
#include <string.h>