Commit 06ba417f authored by Deike Kleberg's avatar Deike Kleberg
Browse files

IOserver call bInit

parent 552844fa
......@@ -209,6 +209,8 @@ src/pio_posixfpguardthread.c -text
src/pio_posixfpguardthreadrefuse.c -text
src/pio_posixnonb.c -text
src/pio_queue.c -text
src/pio_rpc.c -text
src/pio_rpc.h -text
src/pio_server.c -text
src/pio_server.h -text
src/pio_util.c -text
......
......@@ -124,7 +124,7 @@ void modelRun ()
for ( i = 0; i < nlon*nlat*nlev; i++ ) var2[i] = 2.2;
void pioServerReturn (); // Write var1 and var2
// Write var1 and var2
streamWriteVar(streamID, varID1, var1, nmiss);
streamWriteVar(streamID, varID2, var2, nmiss);
}
......@@ -146,17 +146,15 @@ void pioServerReturn (); // Write var1 and var2
int main (int argc, char *argv[])
{
char fnName[] = "main()";
int sizeGlob, rankGlob, isCalcPE, nProcsCalc;
MPI_Comm commGlob, commModel;
int iret = 0;
int var2ProcsIO[11];
int sizeGlob, rankGlob;
MPI_Comm commGlob, commModel;
MPI_Init(&argc, &argv);
MPI_Comm_dup ( MPI_COMM_WORLD, &commGlob );
MPI_Comm_set_errhandler ( commGlob, MPI_ERRORS_RETURN );
MPI_Comm_rank ( commGlob, &rankGlob );
MPI_Comm_size ( commGlob, &sizeGlob );
MPI_Comm_rank ( commGlob, &rankGlob );
if ( nProcsIO <=0 || nProcsIO >= ( float ) sizeGlob / 2.0 )
myAbort ( "bad distribution of tasks on PEs", __FILE__, __LINE__, rankGlob );
......@@ -164,15 +162,12 @@ int main (int argc, char *argv[])
if ( ddebug >= 3 )
myDebug ( __FILE__, fnName, __LINE__ );
nProcsCalc = sizeGlob - nProcsIO;
commModel = pioInit_c ( commGlob, nProcsIO, 0 );
commModel = pioInit_c ( commGlob, nProcsIO, 1 );
//modelRun ();
pioServerFinalize ();
pioFinalize ();
MPI_Finalize ();
return iret;
}
return 0;
}
......@@ -63,6 +63,8 @@ libcdi_la_SOURCES = \
pio_posixfpguardthreadrefuse.c \
pio_posixnonb.c \
pio_queue.c \
pio_rpc.c \
pio_rpc.h \
pio_server.c \
pio_server.h \
pio_util.c \
......
......@@ -67,13 +67,13 @@ am_libcdi_la_OBJECTS = basetime.lo binary.lo calendar.lo cdf.lo \
model.lo pio.lo pio_dbuffer.lo pio_interface.lo pio_mpinonb.lo \
pio_posixasynch.lo pio_posixfpguardsendrecv.lo \
pio_posixfpguardthread.lo pio_posixfpguardthreadrefuse.lo \
pio_posixnonb.lo pio_queue.lo pio_server.lo pio_util.lo \
servicelib.lo stream_cdf.lo stream_cgribex.lo stream_ext.lo \
stream_grb.lo stream_gribapi.lo stream_history.lo \
stream_ieg.lo stream_int.lo stream_record.lo stream_srv.lo \
stream_var.lo table.lo taxis.lo timebase.lo tsteps.lo util.lo \
varscan.lo version.lo vlist.lo vlist_att.lo vlist_var.lo \
zaxis.lo stream.lo swap.lo
pio_posixnonb.lo pio_queue.lo pio_rpc.lo pio_server.lo \
pio_util.lo servicelib.lo stream_cdf.lo stream_cgribex.lo \
stream_ext.lo stream_grb.lo stream_gribapi.lo \
stream_history.lo stream_ieg.lo stream_int.lo stream_record.lo \
stream_srv.lo stream_var.lo table.lo taxis.lo timebase.lo \
tsteps.lo util.lo varscan.lo version.lo vlist.lo vlist_att.lo \
vlist_var.lo zaxis.lo stream.lo swap.lo
libcdi_la_OBJECTS = $(am_libcdi_la_OBJECTS)
@ENABLE_CDI_LIB_FALSE@am_libcdi_la_rpath =
@ENABLE_CDI_LIB_TRUE@am_libcdi_la_rpath = -rpath $(libdir)
......@@ -313,6 +313,8 @@ libcdi_la_SOURCES = \
pio_posixfpguardthreadrefuse.c \
pio_posixnonb.c \
pio_queue.c \
pio_rpc.c \
pio_rpc.h \
pio_server.c \
pio_server.h \
pio_util.c \
......@@ -487,6 +489,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixfpguardthreadrefuse.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixnonb.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_queue.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_rpc.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_server.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_util.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/servicelib.Plo@am__quote@
......
......@@ -10,14 +10,15 @@
#ifdef USE_MPI
#include "mpi.h"
#include "cdi.h"
#include "pio.h"
#include "pio_impl.h"
//#include "pio_util.h"
#endif
bool ddebug = false;
bool ddebug = true;
#ifdef USE_MPI
......@@ -349,25 +350,18 @@ void setPioCommunicator ( MPI_Comm *myComm, MPI_Comm commF2C, int *color,
/***************************************************************/
int bInit ( int ptype, int commF, int *color, int *nnodes, int *pioCollComm2F )
MPI_Comm bInit ( int ptype, MPI_Comm comm, int *color, int *nnodes )
{
#ifdef USE_MPI
MPI_Comm comm;
char fnName[] = "bInit()";
int size, rank;
#endif
int collectingData = 1;
*nnodes = 1;
*color = 1;
*pioCollComm2F = 0;
#ifdef USE_MPI
comm = MPI_COMM_NULL;
if (( comm = MPI_Comm_f2c (( MPI_Fint ) commF )) == NULL )
errorPIO ( "MPI_Comm_f2c didn't succeed", __FILE__, __LINE__, -1 );
if ( ddebug )
myDebug ( __FILE__, fnName, __LINE__ );
MPI_Comm_set_errhandler ( comm, MPI_ERRORS_RETURN );
#ifdef USE_MPI
MPI_Comm_size ( comm, &size );
MPI_Comm_rank ( comm, &rank );
......@@ -427,19 +421,12 @@ int bInit ( int ptype, int commF, int *color, int *nnodes, int *pioCollComm2F )
collectingData = initPOSIXNONB ();
break;
}
/* ????? MPI_Comm_c2f hardwired to 0 ????
*pioCollComm2F = 0;
if (( *pioCollComm2F = MPI_Comm_c2f ( pioinfo->collectorComm ) == 0 )
errorPIO ( "MPI_Comm_c2f didn't succeed\n", __FILE__, __LINE__ , rank);
*/
*pioCollComm2F = MPI_Comm_c2f ( pioinfo->collectorComm );
#endif
if ( ddebug )
fprintf ( stderr, "pe in pioinit out\n" );
fprintf ( stderr, "pe in bInit out\n" );
return collectingData;
return pioinfo->collectorComm;
}
/***************************************************************/
......
......@@ -19,6 +19,7 @@ typedef struct{
MPI_Comm collectorComm;
}pioInfo;
MPI_Comm bInit ( int, MPI_Comm, int *, int * );
int pioFileOpenW ( const char* );
int pioFileClose ( int );
size_t pioFileWrite ( int, int, const void*, size_t );
......
......@@ -8,45 +8,10 @@
#include <stdio.h>
#include "limits.h"
#include "pio_util.h"
#include "pio_rpc.h"
#include "pio_server.h"
#include "cdi.h"
MPI_Comm commGlob;
int sizeGlob;
int rankGlob;
int nProcsIO;
int nProcsCalc;
MPI_Comm * commsIO;
MPI_Comm * commCalc;
int isCalcPE;
/*****************************************************************************/
/**
@brief frees the MPI_Communicators for the
communication between the calculator PEs and the I/O PEs and within the
group of I/O PEs.
@param
@return
*/
void pioFinalize ()
{
char fnName[] = "IOFinalize()";
int i;
if ( ddebug >= 1 ) myDebug ( __FILE__, fnName,__LINE__ );
for ( i = 0; i < nProcsIO; i++ )
if ( rankGlob < nProcsCalc || rankGlob == nProcsCalc + i )
MPI_Comm_free ( commsIO + i );
return;
}
/*****************************************************************************/
......@@ -78,26 +43,22 @@ int pioInit ( int comm, int nIOP, int mode )
@return int indicating wether the calling PE is a calcutator (1) or not (0)
*/
MPI_Comm pioInit_c ( MPI_Comm comm, int nIOP )
MPI_Comm pioInit_c ( MPI_Comm comm, int nIOP, int mode )
{
char fnName[] = "IOInit()";
int * ranks;
int i, j;
MPI_Group groupGlob, currGroupIO;
int currIORank;
char name[10];
char fnName[] = "pioInit_c()";
MPI_Comm commModel;
int isCalcPE;
commGlob = comm;
MPI_Comm_size ( commGlob, &sizeGlob );
nProcsCalc = sizeGlob - nIOP;
MPI_Comm_rank ( commGlob, &rankGlob );
MPI_Comm_set_name ( commGlob, "commGlob" );
if ( ddebug >= 1 ) myDebugComm ( __FILE__, fnName,__LINE__ , &commGlob );
nProcsIO = nIOP;
nProcsCalc = sizeGlob - nProcsIO;
if ( rankGlob >= nProcsCalc )
isCalcPE = 0;
......@@ -105,61 +66,16 @@ MPI_Comm pioInit_c ( MPI_Comm comm, int nIOP )
isCalcPE = 1;
CHECK_MPI ( MPI_Comm_split ( comm, isCalcPE, 0, &commModel ), rankGlob );
commsIO = ( MPI_Comm * ) myMalloc ( nProcsIO * sizeof ( MPI_Comm ),
__FILE__, __LINE__ );
strncpy ( name, "commsIO_", 8 );
name[9] = '\0';
ranks = ( int * ) myMalloc (( nProcsCalc + 1 ) * sizeof ( int ),
__FILE__, __LINE__ );
for ( i = 0; i < nProcsCalc; i++ )
* ( ranks + i ) = i;
MPI_Comm_group ( commGlob, &groupGlob );
commsIOcreate ( comm, nIOP );
for ( i = 0; i < nProcsIO; i++ )
{
currIORank = nProcsCalc + i;
* ( ranks + nProcsCalc ) = currIORank;
MPI_Group_incl ( groupGlob, nProcsCalc + 1, ranks, &currGroupIO );
MPI_Comm_create ( commGlob, currGroupIO, commsIO + i );
if ( rankGlob < nProcsCalc || rankGlob == nProcsCalc + i )
{
sprintf ( &name[8], "%d", i );
MPI_Comm_set_name ( * ( commsIO + i ), name );
}
if ( rankGlob == currIORank )
commCalc = commsIO + i;
}
if ( ddebug >= 2 )
if ( isCalcPE )
for ( i = 0; i < nProcsIO; i++ )
myDebugComm ( __FILE__, fnName, __LINE__, commsIO + i );
else
myDebugComm ( __FILE__, fnName,__LINE__, commCalc );
if ( !isCalcPE )
{
IOServer ();
MPI_Group_free ( &currGroupIO );
MPI_Group_free ( &groupGlob );
free ( ranks );
pioFinalize ();
IOServer ( commModel, mode );
commsIOCleanup ();
MPI_Finalize ();
exit ( 0 );
}
else
{
MPI_Group_free ( &currGroupIO );
MPI_Group_free ( &groupGlob );
free ( ranks );
}
return commModel;
}
......@@ -329,7 +245,7 @@ in order of vSizes
void varMapGen ( int * vSizes, int * sSizes, int * varMapping,
int nStreams, int * nodeSizes, int nNodes )
{
char fnName[] = "IOMapping()";
char fnName[] = "varMapGen()";
int weightsStreams[nStreams];
int streamMapping[nStreams];
int nPEs = 0, nVars = 0;
......@@ -444,17 +360,18 @@ void varMapGen ( int * vSizes, int * sSizes, int * varMapping,
@return
*/
void pioServerFinalize ()
void pioFinalize ()
{
char fnName[] = "pioServerFinalize()";
int i;
int ibuffer = 1111;
char fnName[] = "pioFinalize()";
int i, ibuffer = 1111;
if ( ddebug >= 1 )
myDebug ( __FILE__, fnName, __LINE__ );
for ( i = 0; i < nProcsIO; i++ )
MPI_Send ( &ibuffer, 1, MPI_INT, nProcsCalc, FINALIZE, *( commsIO + i ));
commsIOCleanup ();
return;
}
......
#include "pio_rpc.h"
#include "pio_util.h"
MPI_Comm commGlob;
int sizeGlob;
int rankGlob;
int nProcsIO;
int nProcsCalc;
MPI_Comm * commsIO;
MPI_Comm * commCalc;
void commsIOcreate ( MPI_Comm commGlob, int nIOP )
{
char fnName[] = "commsIOcreate()";
int * ranks;
int i, j;
MPI_Group groupGlob, currGroupIO;
int currIORank;
char name[10];
nProcsIO = nIOP;
MPI_Comm_rank ( commGlob, &rankGlob );
commsIO = ( MPI_Comm * ) myMalloc ( nProcsIO * sizeof ( MPI_Comm ),
__FILE__, __LINE__ );
strncpy ( name, "commsIO_", 8 );
name[9] = '\0';
ranks = ( int * ) myMalloc (( nProcsCalc + 1 ) * sizeof ( int ),
__FILE__, __LINE__ );
for ( i = 0; i < nProcsCalc; i++ )
* ( ranks + i ) = i;
MPI_Comm_group ( commGlob, &groupGlob );
for ( i = 0; i < nProcsIO; i++ )
{
currIORank = nProcsCalc + i;
* ( ranks + nProcsCalc ) = currIORank;
MPI_Group_incl ( groupGlob, nProcsCalc + 1, ranks, &currGroupIO );
MPI_Comm_create ( commGlob, currGroupIO, commsIO + i );
if ( rankGlob < nProcsCalc || rankGlob == nProcsCalc + i )
{
sprintf ( &name[8], "%d", i );
MPI_Comm_set_name ( * ( commsIO + i ), name );
}
if ( rankGlob == currIORank )
commCalc = commsIO + i;
}
if ( ddebug >= 2 )
{
if ( rankGlob < nProcsCalc )
for ( i = 0; i < nProcsIO; i++ )
myDebugComm ( __FILE__, fnName, __LINE__, commsIO + i );
else
myDebugComm ( __FILE__, fnName,__LINE__, commCalc );
}
MPI_Group_free ( &currGroupIO );
MPI_Group_free ( &groupGlob );
free ( ranks );
return;
}
/*****************************************************************************/
/**
@brief frees the MPI_Communicators for the
communication between the calculator PEs and the I/O PEs and within the
group of I/O PEs.
@param
@return
*/
void commsIOCleanup ()
{
char fnName[] = "commsIOCleanup()";
int i;
if ( ddebug >= 1 ) myDebug ( __FILE__, fnName,__LINE__ );
for ( i = 0; i < nProcsIO; i++ )
if ( rankGlob < nProcsCalc || rankGlob == nProcsCalc + i )
MPI_Comm_free ( commsIO + i );
return;
}
#ifndef PIO_RPC_H
#define PIO_RPC_H
#include <mpi.h>
extern MPI_Comm commGlob;
extern int sizeGlob;
extern int rankGlob;
extern int nProcsIO;
extern int nProcsCalc;
extern MPI_Comm * commsIO;
extern MPI_Comm * commCalc;
void commsIOCreate ( MPI_Comm, int );
void commsIOCleanup ();
#endif
......@@ -7,17 +7,12 @@
#include <stdio.h>
#include "limits.h"
#include "cdi.h"
#include "pio.h"
#include "pio_rpc.h"
#include "pio_util.h"
extern MPI_Comm * commGlob;
extern int sizeGlob;
extern int rankGlob;
extern int nProcsIO;
extern int nProcsCalc;
extern MPI_Comm * commsIO;
extern MPI_Comm * commCalc;
extern int isCalcPE;
MPI_Comm * commPio;
......@@ -31,7 +26,7 @@ extern int isCalcPE;
@return
*/
void IOServer ()
void IOServer ( MPI_Comm comm, int ptype )
{
char fnName[] = "IOServer()";
int source, tag, iBuffer;
......@@ -39,14 +34,16 @@ void IOServer ()
static int nfinished;
void * vp;
double * dp;
char * outtext;
size_t len, len2;
MPI_Comm commColl;
int nnodes, color;
if ( ddebug >= 1 )
myDebugComm ( __FILE__, fnName, __LINE__, commCalc );
nfinished = 0;
outtext = ( char * ) myMalloc ( MAXSTRING * sizeof ( char ), __FILE__, __LINE__ );
commColl = bInit ( ptype, comm, &color, &nnodes );
for ( ;; )
{
......@@ -88,8 +85,6 @@ void IOServer ()
myDebugMsg ( __FILE__, fnName, __LINE__, tag, source, nfinished );
}
}
free ( outtext );
return;
}
......
#ifndef PIO_SERVER_
#define PIO_SERVER_
#include <mpi.h>
void IOServer ();
extern MPI_Comm * commPio;
void IOServer ( MPI_Comm, int );
#endif
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment