Commit fa7f4855 authored by Deike Kleberg's avatar Deike Kleberg
Browse files

send message WRITETS to IO server

parent 07522ca6
......@@ -102,6 +102,10 @@ void modelRun ()
start = CDI_UNDEFID;
chunk = CDI_UNDEFID;
}
#ifdef USE_MPI
pioWriteTimestep ( tsID, vdate, vtime );
#endif
}
......
......@@ -210,6 +210,7 @@ extern "C" {
int pioInit ( int, int, int );
void pioFinalize ( void );
void pioMetadata ( int, int * );
void pioWriteTimestep ( int, int, int );
void pioNamespaceInit ( int, int * );
void pioNamespaceCleanup ( void );
......
......@@ -19,9 +19,10 @@ typedef struct{
MPI_Comm collectorComm;
}pioInfo;
MPI_Comm bInit ( int, MPI_Comm, int *, int * );
int pioFileOpenW ( const char* );
int pioFileClose ( int );
MPI_Comm bInit ( int, MPI_Comm, int *, int * );
void bFinalize ( void );
int pioFileOpenW ( const char* );
int pioFileClose ( int );
size_t pioFileWrite ( int, int, const void*, size_t );
#else
......
......@@ -23,6 +23,8 @@
#include "vlist.h"
#endif
int tsSize = 3;
/*****************************************************************************/
......@@ -63,18 +65,18 @@ MPI_Comm pioInit_c ( MPI_Comm comm, int nIOP, int mode )
commGlob = comm;
MPI_Comm_size ( commGlob, &sizeGlob );
xmpi ( MPI_Comm_size ( commGlob, &sizeGlob ));
nProcsCalc = sizeGlob - nIOP;
MPI_Comm_rank ( commGlob, &rankGlob );
MPI_Comm_set_name ( commGlob, "commGlob" );
xmpi ( MPI_Comm_rank ( commGlob, &rankGlob ));
xmpi ( MPI_Comm_set_name ( commGlob, "commGlob" ));
if ( ddebug >= 1 ) xdebugComm ( &commGlob );
if ( rankGlob >= nProcsCalc )
isCalcPE = 0;
else
else
isCalcPE = 1;
xmpi ( MPI_Comm_split ( comm, isCalcPE, 0, &commModel ));
......@@ -290,7 +292,6 @@ void defVarDeco ( int vlistID, int varID )
deco_t deco[nProcsCalc];
varSize = vlistInqVarSize ( vlistID, varID );
//xdebug ("+++++++++vlistID=%d, varID=%d, varSize=%d", vlistID, varID, varSize );
for ( cRank = 0; cRank < nProcsCalc; cRank++ )
{
......@@ -308,11 +309,7 @@ void defVarDeco ( int vlistID, int varID )
deco[cRank].rank = cRank;
deco[cRank].offset = lOffset;
deco[cRank].chunk = lChunk;
// xdebug("deco %d: %d, %d, %d",
// cRank, deco[cRank].rank, deco[cRank].offset,
// deco[cRank].chunk );
}
//xdebug ("+++++++++vlistID=%d, varID=%d, varSize=%d", vlistID, varID, varSize );
vlistDefVarDeco ( vlistID, varID, nProcsCalc, &deco[0] );
}
......@@ -374,7 +371,7 @@ static
void modelWinCreate ( void )
{
#ifdef USE_MPI
xdebug ();
xdebug ( "nothing happens here" );
#endif
}
......@@ -393,8 +390,8 @@ void pioMetadata ( int nNodes, int * nodeSizes )
{
reshPackBufferCreate ( &buffer, &bufferSize, commsIO[rankGlob] );
MPI_Send ( buffer, bufferSize, MPI_PACKED, nProcsCalc,
METADATA, * ( commsIO + rankGlob ));
xmpi ( MPI_Send ( buffer, bufferSize, MPI_PACKED, nProcsCalc,
METADATA, * ( commsIO + rankGlob )));
xdebug ( "sent message metadata" );
......@@ -424,10 +421,40 @@ void pioFinalize ()
int i, ibuffer = 1111;
for ( i = 0; i < nProcsIO; i++ )
MPI_Send ( &ibuffer, 1, MPI_INT, nProcsCalc, FINALIZE, *( commsIO + i ));
{
xmpi ( MPI_Send ( &ibuffer, 1, MPI_INT, nProcsCalc,
FINALIZE, *( commsIO + i )));
xdebug ( "sent message FINALIZE" );
}
commsIOCleanup ();
#endif
}
/************************************************************************/
void pioWriteTimestep ( int ts, int vdate, int vtime )
{
#ifdef USE_MPI
int size, IOID, buffer[tsSize];
assert ( ts >= 0 &&
vdate >= 0 &&
vtime >= 0 );
buffer[0] = ts;
buffer[1] = vdate;
buffer[2] = vtime;
size = sizeof ( int );
if ( rankGlob < nProcsIO )
{
xmpi ( MPI_Send ( &buffer[0], tsSize, MPI_INTEGER, nProcsCalc,
WRITETS, * ( commsIO + rankGlob )));
xdebug ( "sent message WRITETS" );
}
#endif
}
......
......@@ -17,6 +17,8 @@ extern int nProcsCalc;
extern MPI_Comm * commsIO;
extern MPI_Comm commCalc;
extern int tsSize;
enum { GRID = 1,
ZAXIS = 2,
......
......@@ -11,6 +11,7 @@
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
#include "limits.h"
#include "cdi.h"
#include "pio.h"
......@@ -43,7 +44,7 @@ void serverWinCreate ()
void IOServer ( MPI_Comm comm, int ptype )
{
int source, tag, iBuffer;
int source, tag, * iBuffer;
MPI_Status status;
static int nfinished;
MPI_Comm commColl;
......@@ -65,9 +66,11 @@ void IOServer ( MPI_Comm comm, int ptype )
switch ( tag )
{
case FINALIZE:
MPI_Recv ( &iBuffer, 1, MPI_INTEGER, source,
tag, commCalc, &status );
case FINALIZE:
iBuffer = xmalloc ( sizeof ( int ));
xmpi ( MPI_Recv ( iBuffer, 1, MPI_INTEGER, source,
tag, commCalc, &status ));
free ( iBuffer );
nfinished++;
if ( nfinished == nProcsCalc )
......@@ -81,23 +84,31 @@ void IOServer ( MPI_Comm comm, int ptype )
break;
case METADATA:
MPI_Get_count ( &status, MPI_CHAR, &size );
xmpi ( MPI_Get_count ( &status, MPI_CHAR, &size ));
buffer = xmalloc ( size * sizeof ( char ));
MPI_Recv ( buffer, size, MPI_PACKED, source,
tag, commCalc, &status );
xmpi ( MPI_Recv ( buffer, size, MPI_PACKED, source,
tag, commCalc, &status ));
rpcUnpackResources ( buffer, size, commCalc );
free ( buffer );
if ( rankGlob == nProcsCalc ) reshArrayPrint ( "reshArrayIOServer" );
if ( ddebug >= 2 ) xdebugMsg ( tag, source, nfinished );
serverWinCreate ();
break;
case WRITEVAR:
xdebug("WRITEVAR");
case WRITETS:
if ( ddebug >= 2 ) xdebugMsg ( tag, source, nfinished );
xmpi ( MPI_Get_count ( &status, MPI_INTEGER, &size ));
assert ( size == tsSize );
iBuffer = xmalloc ( size * sizeof ( int ));
xmpi ( MPI_Recv ( iBuffer, size, MPI_INTEGER, source,
tag, commCalc, &status ));
xdebug (" recved iBuffer %d %d %d", iBuffer[0], iBuffer[1], iBuffer[2] );
free ( iBuffer );
break;
default:
......
......@@ -8,8 +8,11 @@
#include "pio_util.h"
#include "cdi.h"
char commands[][12] = { "FINALIZE\0",
"METADATA\0"};
char commands[][13] = { "FINALIZE\0",
"METADATA\0",
"WINCREATE\0",
"WRITETS\0"};
/****************************************************/
......
......@@ -41,7 +41,7 @@ typedef enum
FINALIZE,
METADATA,
WINCREATE,
WRITEVAR
WRITETS
} command;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment