Commit db181be3 authored by Deike Kleberg's avatar Deike Kleberg
Browse files

Some changes for consistency of resource array. Output for test in files.

parent 164abdce
......@@ -73,6 +73,8 @@ void modelRun ()
int taxisID2, taxisID3, taxisID4, taxisID5;
int instID, modelID;
int rank;
if ( ddebug >= 3 ) xdebug ();
#ifdef USE_MPI
......@@ -196,8 +198,12 @@ void modelRun ()
streamWriteVar(streamID, varID1, var1, nmiss);
streamWriteVar(streamID, varID2, var2, nmiss);
}
//reshArrayPrint ();
#ifdef USE_MPI
MPI_Comm_rank ( MPI_COMM_WORLD, &rank );
if ( rank == 0 )
#endif
reshArrayPrint ( "reshArrayModel" );
// Close the output stream
streamClose(streamID);
......
......@@ -209,7 +209,7 @@ void pioMetadata ( void );
void namespaceInit ( int, int * );
void namespaceSetActive ( int );
void reshArrayPrint ( void );
void reshArrayPrint ( char * );
void gridGetIndexArray ( int, int * );
void zaxisGetIndexArray ( int, int * );
......
......@@ -48,7 +48,7 @@ char *Grids[] = {
static int gridCompareP ( void * gridptr1, void * gridptr2 );
static void gridDestroyP ( void * gridptr );
static void gridPrintP ( void * gridptr );
static void gridPrintP ( void * gridptr, FILE * fp );
#ifdef USE_MPI
static int gridGetSizeP ( void * gridptr, MPI_Comm comm );
static void gridPackP ( void * gridptr, void * buff, int size,
......@@ -3006,9 +3006,9 @@ double *gridInqYboundsPtr(int gridID)
}
void gridPrintKernel ( grid_t * gridptr, int opt)
void gridPrintKernel ( grid_t * gridptr, int opt, FILE * fpArg)
{
FILE *fp = stdout;
FILE *fp = fpArg;
int type;
int gridsize, xsize, ysize, xdim, ydim;
int trunc;
......@@ -3315,16 +3315,16 @@ void gridPrint ( int gridID, int opt )
grid_check_ptr(gridID, gridptr);
gridPrintKernel ( gridptr, opt );
gridPrintKernel ( gridptr, opt, stdout );
}
void gridPrintP ( void * gridptr )
void gridPrintP ( void * gridptr, FILE * fp )
{
assert ( gridptr );
gridPrintKernel (( grid_t * ) gridptr , 0 );
gridPrintKernel (( grid_t * ) gridptr , 0, fp );
}
......
......@@ -9,6 +9,7 @@
#include "pio_util.h"
#include "resource_handle.h"
#include "pio_rpc.h"
#include "namespace.h"
#undef UNDEFID
#define UNDEFID -1
......@@ -31,7 +32,7 @@ institute_t;
static int instituteCompareP ( void * instituteptr1, void * instituteptr2 );
static void instituteDestroyP ( void * instituteptr );
static void institutePrintP ( void * instituteptr );
static void institutePrintP ( void * instituteptr, FILE * fp );
#ifdef USE_MPI
static int instituteGetSizeP ( void * instituteptr, MPI_Comm comm );
static void institutePackP ( void * instituteptr, void *buf, int size,
......@@ -76,19 +77,39 @@ institute_t * instituteNewEntry ( void )
static
void instituteDefaultEntries ( void )
{
ECMWF = institutDef( 98, 0, "ECMWF", "European Centre for Medium-Range Weather Forecasts");
MPIMET = institutDef( 98, 232, "MPIMET", "Max-Planck-Institute for Meteorology");
(void) institutDef( 98, 255, "MPIMET", "Max-Planck-Institute for Meteorology");
(void) institutDef( 98, 232, "MPIMET", "Max-Planck Institute for Meteorology");
(void) institutDef( 78, 255, "DWD", "Deutscher Wetterdienst");
MCH = institutDef(215, 255, "MCH", "MeteoSwiss");
(void) institutDef( 7, 0, "NCEP", "National Centers for Environmental Prediction");
(void) institutDef( 7, 1, "NCEP", "National Centers for Environmental Prediction");
(void) institutDef( 60, 0, "NCAR", "National Center for Atmospheric Research");
(void) institutDef( 74, 0, "METOFFICE", "U.K. Met Office");
(void) institutDef( 97, 0, "ESA", "European Space Agency ");
(void) institutDef( 99, 0, "KNMI", "Royal Netherlands Meteorological Institute");
/* (void) institutDef( 0, 0, "IPSL", "IPSL (Institut Pierre Simon Laplace, Paris, France)"); */
cdiResH resH[12];
int i;
resH[0] = ECMWF = institutDef( 98, 0, "ECMWF",
"European Centre for Medium-Range Weather Forecasts");
resH[1] = MPIMET = institutDef( 98, 232, "MPIMET",
"Max-Planck-Institute for Meteorology");
resH[2] = institutDef( 98, 255, "MPIMET",
"Max-Planck-Institute for Meteorology");
resH[3] = institutDef( 98, 232, "MPIMET",
"Max-Planck Institute for Meteorology");
resH[4] = institutDef( 78, 255, "DWD",
"Deutscher Wetterdienst");
resH[5] = MCH = institutDef(215, 255, "MCH",
"MeteoSwiss");
resH[6] = institutDef( 7, 0, "NCEP",
"National Centers for Environmental Prediction");
resH[7] = institutDef( 7, 1, "NCEP",
"National Centers for Environmental Prediction");
resH[8] = institutDef( 60, 0, "NCAR",
"National Center for Atmospheric Research");
resH[9] = institutDef( 74, 0, "METOFFICE",
"U.K. Met Office");
resH[10] = institutDef( 97, 0, "ESA",
"European Space Agency ");
resH[11] = institutDef( 99, 0, "KNMI",
"Royal Netherlands Meteorological Institute");
/* (void) institutDef( 0, 0, "IPSL",
"IPSL (Institut Pierre Simon Laplace, Paris, France)"); */
if ( ! namespaceHasLocalFile ( namespaceGetActive ()))
for ( i = 0; i < 12 ; i++ )
reshSetStatus ( resH[i], &instituteOps, SUSPENDED );
}
......@@ -307,9 +328,8 @@ void instituteDestroyP ( void * instituteptr )
}
void institutePrintP ( void * instituteptr )
void institutePrintP ( void * instituteptr, FILE * fp )
{
FILE *fp = stdout;
institute_t * ip = ( institute_t * ) instituteptr;
if ( !ip ) return;
......
......@@ -8,6 +8,7 @@
#include "pio_util.h"
#include "resource_handle.h"
#include "pio_rpc.h"
#include "namespace.h"
#undef UNDEFID
#define UNDEFID -1
......@@ -34,7 +35,7 @@ static void modelInit(void);
static int modelCompareP ( void * modelptr1, void * modelptr2 );
static void modelDestroyP ( void * modelptr );
static void modelPrintP ( void * modelptr );
static void modelPrintP ( void * modelptr, FILE * fp );
#ifdef USE_MPI
static int modelGetSizeP ( void * modelptr, MPI_Comm comm );
static void modelPackP ( void * modelptr, void * buff, int size,
......@@ -77,32 +78,39 @@ int modelDef(int instID, int modelgribID, const char *name);
static
void modelDefaultEntries ( void )
{
int instID;
int instID, i;
cdiResH resH[10];
instID = institutInq( 0, 0, "ECMWF", NULL);
/* (void) modelDef(instID, 131, "ERA15"); */
/* (void) modelDef(instID, 199, "ERA40"); */
instID = institutInq( 0, 0, "MPIMET", NULL);
ECHAM5 = modelDef(instID, 64, "ECHAM5.4");
(void) modelDef(instID, 63, "ECHAM5.3");
(void) modelDef(instID, 62, "ECHAM5.2");
(void) modelDef(instID, 61, "ECHAM5.1");
resH[0] = ECHAM5 = modelDef(instID, 64, "ECHAM5.4");
resH[1] = modelDef(instID, 63, "ECHAM5.3");
resH[2] = modelDef(instID, 62, "ECHAM5.2");
resH[3] = modelDef(instID, 61, "ECHAM5.1");
instID = institutInq( 98, 255, "MPIMET", NULL);
(void) modelDef(instID, 60, "ECHAM5.0");
ECHAM4 = modelDef(instID, 50, "ECHAM4");
(void) modelDef(instID, 110, "MPIOM1");
resH[4] = modelDef(instID, 60, "ECHAM5.0");
resH[5] = ECHAM4 = modelDef(instID, 50, "ECHAM4");
resH[6] = modelDef(instID, 110, "MPIOM1");
instID = institutInq( 0, 0, "DWD", NULL);
(void) modelDef(instID, 149, "GME");
resH[7] = modelDef(instID, 149, "GME");
instID = institutInq( 0, 0, "MCH", NULL);
//(void) = modelDef(instID, 137, "COSMO");
COSMO = modelDef(instID, 255, "COSMO");
resH[8] = COSMO = modelDef(instID, 255, "COSMO");
instID = institutInq( 0, 1, "NCEP", NULL);
(void) modelDef(instID, 80, "T62L28MRF");
resH[9] = modelDef(instID, 80, "T62L28MRF");
if ( ! namespaceHasLocalFile ( namespaceGetActive ()))
for ( i = 0; i < 10 ; i++ )
reshSetStatus ( resH[i], &modelOps, SUSPENDED );
}
static
......@@ -245,9 +253,8 @@ void modelDestroyP ( void * modelptr )
}
void modelPrintP ( void * modelptr )
void modelPrintP ( void * modelptr, FILE * fp )
{
FILE *fp = stdout;
model_t * mp = ( model_t * ) modelptr;
if ( !mp ) return;
......
......@@ -199,7 +199,7 @@ void unpackStream ( char * unpackBuffer, int unpackBufferSize,
xmpi ( MPI_Unpack ( unpackBuffer, unpackBufferSize, unpackBufferPos,
&vlistID, 1, MPI_INT, comm ));
//streamDefVlist ( streamID, vlistID );
streamDefVlist ( streamID, vlistID );
}
......@@ -222,7 +222,8 @@ void rpcUnpackResources ( char * unpackBuffer, int unpackBufferSize,
{
case END:
xdebug ( "#### end of packed data" );
reshArrayPrint ();
if ( rankGlob == sizeGlob - 1 )
reshArrayPrint ( "reshArrayIOServer" );
return;
case GRID:
unpackGrid ( unpackBuffer, unpackBufferSize, &unpackBufferPos, comm );
......
......@@ -45,7 +45,7 @@ typedef struct node {
struct node *next;
resOps *ops;
void *ptr;
int sent;
int status;
} node;
......@@ -92,11 +92,11 @@ void arrayInitPointer ( void )
{
for ( j = 0; j < arraySizeAllocated[i]; j++ )
{
arrayResources[i][j].idx = namespaceIdxEncode2 ( i, j );
arrayResources[i][j].next = arrayResources[i] + j + 1;
arrayResources[i][j].ops = NULL;
arrayResources[i][j].ptr = NULL;
arrayResources[i][j].sent = 0;
arrayResources[i][j].idx = namespaceIdxEncode2 ( i, j );
arrayResources[i][j].next = arrayResources[i] + j + 1;
arrayResources[i][j].ops = NULL;
arrayResources[i][j].ptr = NULL;
arrayResources[i][j].status = ASSIGNED;
}
arrayResources[i][arraySizeAllocated[i]-1].next = NULL;
......@@ -199,13 +199,13 @@ int reshPut ( void *p, resOps *ops )
if ( !freeListHead[nsp] ) arraySizeExtend();
newNode = freeListHead[nsp];
freeListHead[nsp] = freeListHead[nsp]->next;
newNode->next = NULL;
idx = newNode->idx;
newNode->ptr = p;
newNode->ops = ops;
newNode->sent = 0;
newNode = freeListHead[nsp];
freeListHead[nsp] = freeListHead[nsp]->next;
newNode->next = NULL;
idx = newNode->idx;
newNode->ptr = p;
newNode->ops = ops;
newNode->status = ASSIGNED;
ARRAY_UNLOCK();
......@@ -235,11 +235,11 @@ void reshRemove ( cdiResH idx, resOps * ops )
arrayResources[nsp][nspT.idx].ops &&
arrayResources[nsp][nspT.idx].ops == ops );
arrayResources[nsp][nspT.idx].next = freeListHead[nsp];
arrayResources[nsp][nspT.idx].ops = NULL;
arrayResources[nsp][nspT.idx].ptr = NULL;
arrayResources[nsp][nspT.idx].sent = 0;
freeListHead [nsp] = arrayResources[nsp] + nspT.idx;
arrayResources[nsp][nspT.idx].next = freeListHead[nsp];
arrayResources[nsp][nspT.idx].ops = NULL;
arrayResources[nsp][nspT.idx].ptr = NULL;
arrayResources[nsp][nspT.idx].status = ASSIGNED;
freeListHead [nsp] = arrayResources[nsp] + nspT.idx;
ARRAY_UNLOCK();
}
......@@ -346,13 +346,13 @@ int getPackBufferSize ( MPI_Comm comm )
for ( i = 0; i < arraySizeAllocated[nsp]; i++ )
if ( arrayResources[nsp][i].ptr )
if ( ! arrayResources[nsp][i].sent )
if ( arrayResources[nsp][i].status == ASSIGNED )
{
curr = arrayResources[nsp] + i;
assert ( curr->ops );
/* message plus frame of 2 ints */
packBufferSize += curr->ops->valGetSize ( curr->ptr, comm )
packBufferSize += curr->ops->valGetPackSize ( curr->ptr, comm )
+ 2 * intpacksize;
}
......@@ -392,7 +392,7 @@ void reshPackBufferCreate ( char ** packBuffer, int * packBufferSize, MPI_Comm c
for ( i = 0; i < arraySizeAllocated[nsp]; i++ )
if ( arrayResources[nsp][i].ptr )
if ( ! arrayResources[nsp][i].sent )
if ( arrayResources[nsp][i].status == ASSIGNED )
{
curr = arrayResources[nsp] + i;
assert ( curr->ops );
......@@ -415,7 +415,7 @@ void reshPackBufferCreate ( char ** packBuffer, int * packBufferSize, MPI_Comm c
xmpi(MPI_Pack ( &sep, 1, MPI_INT, * packBuffer,
* packBufferSize, &packBufferPos, comm ));
curr->sent = 1;
curr->status = CLOSED;
}
ARRAY_UNLOCK();
......@@ -429,6 +429,76 @@ void reshPackBufferCreate ( char ** packBuffer, int * packBufferSize, MPI_Comm c
#endif
/**************************************************************/
/* for thread safety this feature would have to be integrated in reshPut */
void reshSetStatus ( cdiResH idx, resOps * ops, int status )
{
int nsp;
namespaceTuple_t nspT;
node * node;
assert ( arrayInit && ops );
ARRAY_LOCK ();
nsp = namespaceGetActive ();
nspT = namespaceIdxDecode ( idx );
assert ( arrayInit &&
nspT.nsp == nsp &&
nspT.idx >= 0 &&
nspT.idx < arraySizeAllocated[nsp] );
node = arrayResources[nsp] + nspT.idx;
assert ( node &&
node->ops == ops );
node->status = status;
ARRAY_UNLOCK ();
}
/**************************************************************/
int reshGetStatus ( cdiResH idx, resOps * ops )
{
int nsp;
namespaceTuple_t nspT;
node * node;
assert ( arrayInit && ops );
ARRAY_LOCK ();
nsp = namespaceGetActive ();
nspT = namespaceIdxDecode ( idx );
assert ( arrayInit &&
nspT.nsp == nsp &&
nspT.idx >= 0 &&
nspT.idx < arraySizeAllocated[nsp] );
node = arrayResources[nsp] + nspT.idx;
ARRAY_UNLOCK ();
assert ( node &&
node->ops == ops );
return node->status;
}
/**************************************************************/
void reshLock ()
{
ARRAY_LOCK();
......@@ -447,45 +517,59 @@ void reshUnlock ()
/**************************************************************/
void reshArrayPrint ()
void reshArrayPrint ( char * filename )
{
int i, j, temp;
node * curr;
FILE * fp;
xdebug();
fprintf ( stdout, "arrayInit=%d\n", arrayInit );
if ( ! arrayInit ) return;
if ( filename )
{
fp = fopen ( filename, "w" );
if ( ! fp )
{
xdebug ( "could not open file" );
fp = stdout;
}
}
else
fp = stdout;
temp = namespaceGetActive ();
fprintf ( stdout, "\n\n##########################################\n#\n# print " \
fprintf ( fp, "\n\n##########################################\n#\n# print " \
"global resource array \n#\n" );
for ( i = 0; i < namespaceGetNumber (); i++ )
{
namespaceSetActive ( i );
fprintf ( stdout, "\n" );
fprintf ( stdout, "##################################\n" );
fprintf ( stdout, "#\n" );
fprintf ( stdout, "# namespace=%d\n", i );
fprintf ( stdout, "#\n" );
fprintf ( stdout, "##################################\n\n" );
fprintf ( fp, "\n" );
fprintf ( fp, "##################################\n" );
fprintf ( fp, "#\n" );
fprintf ( fp, "# namespace=%d\n", i );
fprintf ( fp, "#\n" );
fprintf ( fp, "##################################\n\n" );
for ( j = 0; j < arraySizeAllocated[i]; j++)
{
curr = arrayResources[i] + j;
if ( curr->ops && curr->ptr )
{
curr->ops->valPrint (( void * ) curr->ptr );
fprintf ( stdout, "\n" );
curr->ops->valPrint (( void * ) curr->ptr, fp );
fprintf ( fp, "\n" );
}
}
}
fprintf ( stdout, "#\n# end global resource array" \
fprintf ( fp, "#\n# end global resource array" \
"\n#\n##########################################\n\n" );
fclose ( fp );
namespaceSetActive ( temp );
}
......@@ -23,30 +23,30 @@
typedef int cdiResH;
/* return 0 on equality, not 0 otherwise */
typedef int ( * valCompareFunc )( void *, void * );
typedef void ( * valDestroyFunc )( void * );
typedef void ( * valPrintFunc )( void * );
typedef int ( * valCompareFunc )( void *, void * );
typedef void ( * valDestroyFunc )( void * );
typedef void ( * valPrintFunc )( void *, FILE * );
#ifdef USE_MPI
typedef int ( * valGetSizeFunc )( void *, MPI_Comm comm );
typedef void ( * valPackFunc )( void *, void *buf, int size, int *pos,
MPI_Comm comm );
typedef int ( * valTxCodeFunc )( void * );
typedef int ( * valGetPackSizeFunc )( void *, MPI_Comm comm );
typedef void ( * valPackFunc )( void *, void *buf, int size, int *pos,
MPI_Comm comm );
typedef int ( * valTxCodeFunc )( void * );
#endif
typedef struct {
valCompareFunc valCompare;
valDestroyFunc valDestroy;
valPrintFunc valPrint;
valCompareFunc valCompare;
valDestroyFunc valDestroy;
valPrintFunc valPrint;
#ifdef USE_MPI
valGetSizeFunc valGetSize;
valPackFunc valPack;
valTxCodeFunc valTxCode;
valGetPackSizeFunc valGetPackSize;
valPackFunc valPack;
valTxCodeFunc valTxCode;
#endif
}resOps;
enum { ASSIGNED, SUSPENDED, CLOSED };
int reshPut ( void *, resOps * );
void reshRemove ( cdiResH, resOps * );
int reshCountType ( resOps * );
......@@ -60,9 +60,10 @@ void reshPackBufferCreate ( char **, int *, MPI_Comm );
void reshPackBufferDestroy ( char ** );
#endif
void reshSetStatus ( cdiResH, resOps *, int );
int reshGetStatus ( cdiResH, resOps * );
void reshLock ( void );
void reshUnlock ( void );
#endif
......@@ -1968,6 +1968,7 @@ void streamDefVlist(int streamID, int vlistID)
if ( streamptr->vlistID == CDI_UNDEFID )
{
streamptr->vlistID = vlistDuplicate(vlistID);
streamptr->vlistIDorig = vlistID ;
nvars = vlistNvars(vlistID);
for ( varID = 0; varID < nvars; varID++ )
......
......@@ -68,7 +68,7 @@ static int cdiHaveMissval = 0;
static int streamCompareP ( void * streamptr1, void * streamptr2 );
static void streamDestroyP ( void * streamptr );
static void streamPrintP ( void * streamptr );
static void streamPrintP ( void * streamptr, FILE * fp );
#ifdef USE_MPI
static int streamGetSizeP ( void * streamptr, MPI_Comm comm );
static void streamPackP ( void * streamptr, void * buff, int size,
......@@ -300,6 +300,7 @@ void streamDefaultValue ( stream_t * streamptr )
streamptr->fnames = NULL;
streamptr->gribContainers = NULL;
streamptr->vlistIDorig = UNDEFID;
}
......@@ -515,9 +516,8 @@ void streamDestroyP ( void * streamptr )
}
void streamPrintP ( void * streamptr )
void streamPrintP ( void * streamptr, FILE * fp )
{
FILE *fp = stdout;
stream_t * sp = ( stream_t * ) streamptr;
if ( !sp ) return;
......@@ -616,9 +616,11 @@ void streamPackP ( void * streamptr, void * packBuffer, int packBufferSize,
packBuffer, packBufferSize, packBufferPos, comm );
MPI_Pack ( streamP->filename, size, MPI_CHAR,
packBuffer, packBufferSize, packBufferPos, comm );
MPI_Pack ( &streamP->filetype, 1, MPI_INT,
packBuffer, packBufferSize, packBufferPos, comm );
MPI_Pack ( &streamP->vlistID, 1, MPI_INT,
MPI_Pack ( &streamP->vlistIDorig, 1, MPI_INT,
packBuffer, packBufferSize, packBufferPos, comm );
}
......
......@@ -242,6 +242,7 @@ typedef struct {
int nfiles;
char **fnames;
void *gribContainers;
int vlistIDorig;
}
stream_t;
......
......@@ -41,7 +41,7 @@ char *Timeunits[] = {
static int taxisCompareP ( void * taxisptr1, void * taxisptr2 );
static void taxisDestroyP ( void * taxisptr );
static void taxisPrintP ( void * taxisptr );