Commit 91b16748 authored by Deike Kleberg's avatar Deike Kleberg
Browse files

Started feature close/open streams with defined resources.

parent 5b2dbdfd
......@@ -32,7 +32,7 @@ int IOMode = PIO_MPI_NONB;
void modelRun ()
{
int gridID, zaxisID[nVars], taxisID;
int vlistID, varID[nVars], streamID, tsID;
int vlistID, varID[nVars], streamID[2], tsID;
int i, j, nmiss = 0, rank;
double lons[nlon] = {0, 30, 60, 90, 120, 150, 180, 210, 240, 270, 300, 330};
double lats[nlat] = {-75, -45, -15, 15, 45, 75};
......@@ -63,14 +63,14 @@ void modelRun ()
taxisID = taxisCreate ( TAXIS_ABSOLUTE );
vlistDefTaxis ( vlistID, taxisID );
streamID = streamOpenWrite ( "example.grb", FILETYPE_GRB );
if ( streamID < 0 )
streamID[0] = streamOpenWrite ( "example.grb", FILETYPE_GRB );
if ( streamID[0] < 0 )
{
fprintf ( stderr, "%s\n", cdiStringError ( streamID ));
fprintf ( stderr, "%s\n", cdiStringError ( streamID[0] ));
return;
}
streamDefVlist ( streamID, vlistID);
streamDefVlist ( streamID[0], vlistID);
pioEndDef ();
......@@ -82,7 +82,7 @@ void modelRun ()
{
taxisDefVdate ( taxisID, vdate + tsID );
taxisDefVtime ( taxisID, vtime );
streamDefTimestep ( streamID, tsID );
streamDefTimestep ( streamID[0], tsID );
for ( i = 0; i < nVars; i++ )
{
start = vlistInqVarDecoOff ( vlistID, varID[i] );
......@@ -94,15 +94,16 @@ void modelRun ()
sprintf ( text, "var[%d], start=%d, chunk=%d", i, start, chunk );
xprintArray3 ( text, &var[start], chunk, DATATYPE_FLT );
streamWriteVar ( streamID, varID[i], &var[start], nmiss );
streamWriteVar ( streamID[0], varID[i], &var[start], nmiss );
start = CDI_UNDEFID;
chunk = CDI_UNDEFID;
}
streamID[1] = streamOpenWrite ( "example2.grb", FILETYPE_GRB );
pioWriteTimestep ( tsID, vdate, vtime );
}
streamClose ( streamID );
streamClose ( streamID[0] );
vlistDestroy ( vlistID );
taxisDestroy ( taxisID );
for ( i = 0; i < nVars; i++ )
......
......@@ -11,6 +11,8 @@ static int nNamespaces = 1;
static int activeNamespace = 0;
static int HLF = 1;
static int * hasLocalFile = &HLF;
static int RAS = 0;
static int * resASent = &RAS;
enum {
intbits = sizeof(int) * CHAR_BIT,
......@@ -79,6 +81,7 @@ void pioNamespaceInit ( int nspn, int * argHasLocalFile )
hasLocalFile = xmalloc ( nspn * sizeof ( hasLocalFile[0] ));
for ( i = 0; i < nspn; i++ )
hasLocalFile[i] = argHasLocalFile[i];
resASent = xmalloc ( nspn * sizeof ( resASent[0] ));
}
#endif
}
......@@ -157,6 +160,20 @@ int namespaceAdaptKey2 ( int key )
return namespaceIdxEncode2 ( nsp, tin.idx );
}
void namespaceDefResASent ( void )
{
int nsp = namespaceGetActive ();
resASent[nsp] = 1;
}
int namespaceInqResASent ( void )
{
int nsp = namespaceGetActive ();
return resASent[nsp];
}
/*
* Local Variables:
* c-file-style: "Java"
......
......@@ -4,6 +4,7 @@
typedef struct {
int idx;
int nsp;
int resASent;
}namespaceTuple_t;
......@@ -16,6 +17,8 @@ namespaceTuple_t namespaceResHDecode ( int );
int namespaceHasLocalFile ( int );
int namespaceAdaptKey ( int, int );
int namespaceAdaptKey2 ( int );
void namespaceDefResASent ( void );
int namespaceInqResASent ( void );
#endif
/*
......
......@@ -149,6 +149,14 @@ int commInqRankGlob ( void )
}
int commInqRootGlob ( void )
{
assert ( info != NULL &&
info->root != CDI_UNDEFID );
return info->root;
}
void commDefNProcsIO ( int n )
{
assert ( info != NULL &&
......
......@@ -68,6 +68,7 @@ void commDefCommGlob ( MPI_Comm );
MPI_Comm commInqCommGlob ( void );
int commInqSizeGlob ( void );
int commInqRankGlob ( void );
int commInqRootGlob ( void );
void commDefNProcsIO ( int );
int commInqNProcsIO ( void );
......
......@@ -4,6 +4,7 @@
#include <stdlib.h>
#include <stdio.h>
#include <stdarg.h>
#include "limits.h"
#include "cdi.h"
#include "pio_util.h"
......@@ -27,6 +28,9 @@ int * winPostSet = NULL;
MPI_Group * groupsIO = NULL;
MPI_Group * groupsIONetto = NULL;
enum { nFuncs = 3, STREAMOPEN = 0, STREAMDEFVLIST = 1, STREAMCLOSE = 2 };
static char * funcMap[nFuncs] = {"streamOpen", "streamDefVlist", "streamClose" };
/*****************************************************************************/
......@@ -324,13 +328,16 @@ static
{
int collID, nstreams, * streamIndexArray, streamNo, vlistID, nvars, varID;
int collIDchunk = 0;
int nProcsColl = commInqNProcsColl ();
int nProcsColl = commInqNProcsColl ();
int rankGlob = commInqRankGlob ();
int root = commInqRootGlob ();
nstreams = reshCountType ( &streamOps );
streamIndexArray = xmalloc ( nstreams * sizeof ( int ));
reshGetResHArrayOfType ( nstreams, streamIndexArray, &streamOps );
for ( streamNo = 0; streamNo < nstreams; streamNo++ )
{
// space required for data
vlistID = streamInqVlist ( streamIndexArray[streamNo] );
nvars = vlistNvars ( vlistID );
for ( varID = 0; varID < nvars; varID++ )
......@@ -343,6 +350,13 @@ static
winBufferSize[collID] += (( collIDchunk + winBufferOverheadChunk ) *
sizeof ( double ));
}
// space required for the function calls streamOpen, streamDefVlist, streamClose
// once per stream and timestep only on the modelprocs with rank < nProcsModel
if ( rankGlob == root )
for ( collID = 0; collID < nProcsColl; collID++ )
winBufferSize[collID] += ( 3 * winBufferOverheadChunk * sizeof ( double ) +
4 * CDI_MAX_NAME );
}
free ( streamIndexArray );
......@@ -396,10 +410,10 @@ void modelWinCreate ( void )
/************************************************************************/
void pioBuffer ( int streamID, int varID, const double *data, int nmiss )
void pioBufferData ( int streamID, int varID, const double *data, int nmiss )
{
int size = sizeof ( double ), chunk, vlistID, collID = CDI_UNDEFID;
double temp, sep = ( double ) SEPARATOR;
double temp, sep = ( double ) SEPARATOR, datatoken = ( double ) DATATOKEN;
vlistID = streamInqVlist ( streamID );
collID = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
......@@ -417,7 +431,9 @@ void pioBuffer ( int streamID, int varID, const double *data, int nmiss )
xmpi ( MPI_Win_wait ( win[collID] ));
winPostSet[collID] = 0;
}
memcpy ( winBufferHead[collID], &datatoken, size);
winBufferHead[collID]++;
temp = ( double ) streamID;
memcpy ( winBufferHead[collID], &temp, size);
winBufferHead[collID]++;
......@@ -432,6 +448,71 @@ void pioBuffer ( int streamID, int varID, const double *data, int nmiss )
winBufferHead[collID]++;
}
/************************************************************************/
int func2Int ( const char * func )
{
int iret = CDI_UNDEFID, funcID;
for ( funcID = 0; funcID < nFuncs; funcID ++ )
if ( strlen ( funcMap[funcID] ) == strlen ( func ) &&
memcmp ( func, funcMap[funcID], strlen ( func )) == 0 )
iret = funcID;
return iret;
}
/************************************************************************/
void pioBufferFuncCall ( const char * func, int argc, ... )
{
va_list ap;
int rankGlob = commInqRankGlob ();
int root = commInqRootGlob ();
int collID, nProcsColl = commInqNProcsColl ();
double sep = ( double ) SEPARATOR, funcCall = ( double ) FUNCCALL;
int size = sizeof ( double );
if ( rankGlob == root )
{
va_start ( ap, argc );
switch ( func2Int ( func ))
{
case STREAMOPEN:
{
char * filename;
int filetype;
assert ( argc == 2 );
filename = va_arg ( ap, char * );
filetype = va_arg ( ap, int );
for ( collID = 0; collID < nProcsColl; collID++ )
{
if ( winPostSet[collID] )
{
xmpi ( MPI_Win_wait ( win[collID] ));
winPostSet[collID] = 0;
}
memcpy ( winBufferHead[collID], &funcCall, size);
winBufferHead[collID]++;
memcpy ( winBufferHead[collID], &sep, size);
winBufferHead[collID]++;
}
xdebug ( "WROTE FUNCTION CALL IN BUFFER OF WINS: %s, %s, %d",
funcMap[0], filename, filetype );
break;
}
default:
xabort ( "FUNCTION NOT MAPPED!" );
}
va_end ( ap );
}
}
#endif
/*****************************************************************************/
......@@ -528,6 +609,8 @@ void pioEndDef ( void )
RESOURCES, commInqCommsIO ( rankGlob )));
xdebug ( "sent message RESOURCES" );
namespaceDefResASent ();
reshPackBufferDestroy ( &buffer );
}
......
......@@ -9,7 +9,8 @@
#include <mpi.h>
void pioBuffer ( int, int, const double *, int );
void pioBufferData ( int, int, const double *, int );
void pioBufferFuncCall ( const char *, int, ... );
#endif
......
......@@ -266,12 +266,14 @@ int fcMPINONB ( int fileID )
iret = queueDelNode ( bibAFiledataM, fileID );
if ( !bibAFiledataM->head )
/*
if ( !bibAFiledataM->head )
{
xdebug ( "IOPE%d: cleanup queue",
rankNode );
free ( bibAFiledataM );
xdebug ( "IOPE%d: cleanup queue",
rankNode );
free ( bibAFiledataM );
}
*/
/* timer output */
......
......@@ -26,14 +26,16 @@ enum
STREAM = 6,
VLIST = 7,
START = 55555555,
SEPARATOR = 77777777,
SEPARATOR = 66666666,
DATATOKEN = 77777777,
FUNCCALL = 88888888,
END = 99999999
};
enum
{
winBufferOverhead = 1,
winBufferOverheadChunk = 3,
winBufferOverheadChunk = 4,
winBufferSizeMax = 128 * 1024,
timestepSize = 3
};
......
......@@ -63,12 +63,14 @@ static
int i, decoChunk;
int rankGlob = commInqRankGlob ();
int nProcsCalc = commInqNProcsModel ();
int root = commInqRootGlob ();
nstreams = reshCountType ( &streamOps );
streamIndexArray = xmalloc ( nstreams * sizeof ( int ));
reshGetResHArrayOfType ( nstreams, streamIndexArray, &streamOps );
for ( streamNo = 0; streamNo < nstreams; streamNo++ )
{
// space required for data
vlistID = streamInqVlist ( streamIndexArray[streamNo] );
nvars = vlistNvars ( vlistID );
for ( varID = 0; varID < nvars; varID++ )
......@@ -86,6 +88,10 @@ static
}
}
}
// space required for the function calls streamOpen, streamDefVlist, streamClose
// once per stream and timestep only on the modelprocs with rank < nProcsModel
getBufferSize[root] += ( 3 * winBufferOverheadChunk * sizeof ( double ) +
4 * CDI_MAX_NAME );
}
for ( i = 0; i < nProcsCalc; i++ )
{
......@@ -144,60 +150,77 @@ void writeVars ( int tsID, int vdate, int vtime )
int nmiss = 0;
char text[1024];
int nProcsModel = commInqNProcsModel ();
int root = commInqRootGlob ();
getBufferHead = xmalloc ( nProcsModel * sizeof ( getBufferHead[0] ));
for ( i = 0; i < nProcsModel; i++ )
getBufferHead[i] = getBuffer[i];
while (( int ) * getBufferHead[0] != END )
while (( int ) * getBufferHead[root] != END )
{
assert (( int ) * getBufferHead[0] != CDI_UNDEFID );
if (( int ) * getBufferHead[0] != streamID )
switch ( ( int ) * getBufferHead[root] )
{
streamID = ( int ) * getBufferHead[0];
vlistID = streamInqVlist ( streamID );
taxisID = vlistInqTaxis ( vlistID );
taxisDefVdate ( taxisID, vdate + tsID );
taxisDefVtime ( taxisID, vtime );
streamDefTimestep ( streamID, tsID );
}
getBufferHead[0]++;
varID = ( int ) * getBufferHead[0];
getBufferHead[0]++;
size = vlistInqVarSize ( vlistID, varID );
data = xmalloc ( size * sizeof ( double ));
dataHead = data;
for ( i = 0; i < nProcsModel; i++ )
{
if ( i )
case DATATOKEN:
getBufferHead[root]++;
assert (( int ) * getBufferHead[root] != CDI_UNDEFID );
if (( int ) * getBufferHead[root] != streamID )
{
assert ( * ( getBufferHead[i] ) == ( double ) streamID );
getBufferHead[i]++;
assert ( * ( getBufferHead[i] ) == ( double ) varID );
streamID = ( int ) * getBufferHead[root];
vlistID = streamInqVlist ( streamID );
taxisID = vlistInqTaxis ( vlistID );
taxisDefVdate ( taxisID, vdate + tsID );
taxisDefVtime ( taxisID, vtime );
streamDefTimestep ( streamID, tsID );
}
getBufferHead[root]++;
varID = ( int ) * getBufferHead[root];
getBufferHead[root]++;
size = vlistInqVarSize ( vlistID, varID );
data = xmalloc ( size * sizeof ( double ));
dataHead = data;
for ( i = 0; i < nProcsModel; i++ )
{
if ( i != root )
{
assert (( int ) * getBufferHead[i] == DATATOKEN );
getBufferHead[i]++;
assert (( int ) * getBufferHead[i] == streamID );
getBufferHead[i]++;
assert (( int ) * getBufferHead[i] == varID );
getBufferHead[i]++;
}
chunk = vlistInqVarDecoChunk3 ( vlistID, varID, i );
memcpy ( dataHead, getBufferHead[i], chunk * sizeof ( double ));
dataHead += chunk;
getBufferHead[i] += chunk;
assert (( int ) * ( getBufferHead[i] ) == SEPARATOR );
getBufferHead[i]++;
}
chunk = vlistInqVarDecoChunk3 ( vlistID, varID, i );
memcpy ( dataHead, getBufferHead[i], chunk * sizeof ( double ));
dataHead += chunk;
getBufferHead[i] += chunk;
assert (( int ) * ( getBufferHead[i] ) == SEPARATOR );
getBufferHead[i]++;
}
streamWriteVar ( streamID, varID, data, nmiss );
streamWriteVar ( streamID, varID, data, nmiss );
if ( ddebug > 2 )
{
sprintf ( text, "streamID=%d, var[%d], size=%d", streamID, varID, size );
xprintArray ( text, data, size, DATATYPE_FLT );
}
if ( ddebug > 2 )
{
sprintf ( text, "streamID=%d, var[%d], size=%d", streamID, varID, size );
xprintArray ( text, data, size, DATATYPE_FLT );
}
free ( data );
break;
case FUNCCALL:
xdebug ( "READ FUNCTION CALL FROM WIN BUFFER!" );
getBufferHead[root]++;
assert (( int ) * ( getBufferHead[root] ) == SEPARATOR );
getBufferHead[root]++;
break;
default:
xabort ( "BUFFER NOT READABLE!" );
free ( data );
}
}
free ( getBufferHead );
free ( getBufferHead );
}
/************************************************************************/
......
......@@ -29,6 +29,8 @@
#include "pio_rpc.h"
#include "pio_comm.h"
#include <string.h>
#define MAX_FNAMES 3
extern resOps streamOps;
......@@ -608,6 +610,7 @@ int streamOpen(const char *filename, const char *filemode, int filetype)
int status;
Record *record = NULL;
stream_t *streamptr = NULL;
int hasLocalFile = namespaceHasLocalFile ( namespaceGetActive ());
if ( CDI_Debug )
Message("Open %s mode %c file %s", strfiletype(filetype), (int) *filemode, filename);
......@@ -617,7 +620,7 @@ int streamOpen(const char *filename, const char *filemode, int filetype)
xdebug( "namespaceHasLocalFile(%d)=%s", namespaceGetActive(),
namespaceHasLocalFile ( namespaceGetActive ()) ? "true":"false");
if ( namespaceHasLocalFile ( namespaceGetActive ()))
if ( hasLocalFile )
{
switch (filetype)
{
......@@ -687,7 +690,7 @@ int streamOpen(const char *filename, const char *filemode, int filetype)
}
}
if ( fileID < 0 && namespaceHasLocalFile ( namespaceGetActive ()) )
if ( fileID < 0 && hasLocalFile )
{
streamID = fileID;
}
......@@ -718,6 +721,11 @@ int streamOpen(const char *filename, const char *filemode, int filetype)
vlistptr = vlist_to_pointer(streamptr->vlistID);
vlistptr->ntsteps = streamNtsteps(streamID);
}
if ( !hasLocalFile &&
namespaceInqResASent () == 1 &&
streamptr->filemode == 'w' )
pioBufferFuncCall ( __func__, 2, filename, filetype );
}
return (streamID);
......@@ -1584,7 +1592,7 @@ void streamWriteVar(int streamID, int varID, const double *data, int nmiss)
#ifdef USE_MPI
if ( ! namespaceHasLocalFile ( namespaceGetActive ()))
{
pioBuffer ( streamID, varID, data, nmiss );
pioBufferData ( streamID, varID, data, nmiss );
return;
}
#endif
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment