Commit 9571cd67 authored by Deike Kleberg's avatar Deike Kleberg
Browse files

Adapted stream to global resource array.

parent 4b7acb74
......@@ -136,9 +136,6 @@ void modelRun ()
// Assign the Time axis to the variable list
vlistDefTaxis(vlistID, taxisID);
reshArrayPrint ();
// Create a dataset in netCDF fromat
......@@ -171,6 +168,8 @@ void modelRun ()
streamWriteVar(streamID, varID1, var1, nmiss);
streamWriteVar(streamID, varID2, var2, nmiss);
}
reshArrayPrint ();
// Close the output stream
streamClose(streamID);
......
......@@ -364,8 +364,7 @@ libcdi_la_SOURCES = \
vlist_var.c \
zaxis.c \
stream.c \
swap.c \
temp.h
swap.c
LOCALTARGETS = $(am__append_1)
#
......@@ -752,6 +751,7 @@ uninstall-am: uninstall-includeHEADERS uninstall-libLTLIBRARIES
tags uninstall uninstall-am uninstall-includeHEADERS \
uninstall-libLTLIBRARIES
#libcdi_la_CPPFLAGS = @CPPFLAGS@
#libcdi_la_LIBADD = @LDFLAGS@
#
......
......@@ -4,6 +4,7 @@
#include <stdio.h>
#include "cdi.h"
#include "namespace.h"
#include "resource_handle.h"
static int nNamespaces = 1;
static int activeNamespace;
......@@ -80,7 +81,9 @@ void namespaceSetActive ( int nId )
if ( nId >= nNamespaces || nId < 0 )
abort ();
reshLock ();
activeNamespace = nId;
reshUnlock ();
}
......
......@@ -18,6 +18,9 @@ static void arrayInitialize(void);
static int arrayInit = 0;
// ATTENTION: not thread safe yet, namespaces are set in model!
#if defined (HAVE_LIBPTHREAD)
# include <pthread.h>
......@@ -54,6 +57,24 @@ static node **freeListHead;
/**************************************************************/
void reshLock ()
{
ARRAY_LOCK();
}
/**************************************************************/
void reshUnlock ()
{
ARRAY_UNLOCK();
}
/**************************************************************/
static
void arrayNew ( void )
{
......@@ -155,6 +176,8 @@ void *reshGetVal ( cdiResH idx, resOps * ops )
namespaceTuple_t nspT;
node * node;
ARRAY_LOCK ();
nsp = namespaceGetActive ();
nspT = namespaceIdxDecode ( idx );
......@@ -164,7 +187,9 @@ void *reshGetVal ( cdiResH idx, resOps * ops )
nspT.idx >= 0 &&
nspT.idx < arraySizeAllocated[nsp] );
node = &arrayResources[nsp][nspT.idx];
node = arrayResources[nsp] + nspT.idx;
ARRAY_UNLOCK ();
assert ( node->ops == ops );
......@@ -215,10 +240,10 @@ int reshPut ( void *p, resOps *ops )
ARRAY_INIT();
nsp = namespaceGetActive ();
ARRAY_LOCK();
nsp = namespaceGetActive ();
if ( !freeListHead[nsp] ) arraySizeExtend();
newNode = freeListHead[nsp];
......@@ -240,14 +265,13 @@ int reshPut ( void *p, resOps *ops )
void reshRemove ( cdiResH idx, resOps * ops )
{
int nsp;
node * node;
assert ( arrayInit );
nsp = namespaceGetActive ();
ARRAY_LOCK();
nsp = namespaceGetActive ();
assert ( arrayResources[nsp][idx].ops == ops );
arrayResources[nsp][idx].next = freeListHead[nsp];
......@@ -267,10 +291,10 @@ int reshCountType ( resOps * ops )
int i, nsp, countType = 0;
if ( !( arrayInit && ops )) return 0;
nsp = namespaceGetActive ();
ARRAY_LOCK();
nsp = namespaceGetActive ();
for ( i = 0; i < arraySizeAllocated[nsp]; i++ )
if ( arrayResources[nsp][i].ptr )
......@@ -310,7 +334,7 @@ void reshArrayPrint ()
for ( j = 0; j < arraySizeAllocated[i]; j++)
{
curr = arrayResources[i]+j;
curr = arrayResources[i] + j;
if ( curr->ops && curr->ptr )
{
curr->ops->valPrint (( void * ) curr->ptr );
......@@ -323,14 +347,3 @@ void reshArrayPrint ()
namespaceSetActive ( temp );
}
void reshLock ()
{
ARRAY_LOCK ();
}
void reshUnlock ()
{
ARRAY_UNLOCK ();
}
......@@ -31,7 +31,7 @@ void reshRemove ( cdiResH, resOps * );
int reshCountType ( resOps * );
void reshLock ();
void reshUnlock ();
void reshLock ( void );
void reshUnlock ( void );
#endif
......@@ -15,6 +15,7 @@
#include "stream_int.h"
#include "pio_util.h"
#include "namespace.h"
#include "resource_handle.h"
#if defined (HAVE_LIBCGRIBEX)
#include "cgribex.h"
......@@ -56,6 +57,7 @@ char *Filetypes[] = {
int CDI_Debug = 0; /* If set to 1, debugging */
static int STREAM_Debug = 0; /* If set to 1, debugging */
int cdiDefaultLeveltype = -1;
static int cdiDataUnreduced = 0;
......@@ -63,6 +65,13 @@ static int cdiSortName = 0;
static int cdiHaveMissval = 0;
int streamCompareP ( void * streamptr1, void * streamptr2 );
void streamDestroyP ( void * streamptr );
void streamPrintP ( void * streamptr );
resOps streamOps = { streamCompareP, streamDestroyP, streamPrintP };
long cdiGetenvInt(char *envName)
{
char *envString;
......@@ -195,6 +204,9 @@ void cdiInitialize(void)
envString = getenv("PARTAB_PATH");
if ( envString ) cdiPartabPath = strdup(envString);
envString = getenv("STREAM_DEBUG");
if ( envString ) STREAM_Debug = atoi(envString);
}
}
......@@ -213,224 +225,18 @@ char *strfiletype(int filetype)
}
static int STREAM_Debug = 0; /* If set to 1, debugging */
static int *_streamListSize_allocated;
static int _stream_max = MAX_STREAMS;
static void stream_initialize(void);
static int _stream_init = FALSE;
#if defined (HAVE_LIBPTHREAD)
# include <pthread.h>
static pthread_once_t _stream_init_thread = PTHREAD_ONCE_INIT;
static pthread_mutex_t _stream_mutex;
# define STREAM_LOCK() pthread_mutex_lock(&_stream_mutex)
# define STREAM_UNLOCK() pthread_mutex_unlock(&_stream_mutex)
# define STREAM_INIT() \
if ( _stream_init == FALSE ) pthread_once(&_stream_init_thread, stream_initialize)
#else
# define STREAM_LOCK()
# define STREAM_UNLOCK()
# define STREAM_INIT() \
if ( _stream_init == FALSE ) stream_initialize()
#endif
typedef struct _streamPtrToIdx {
int idx;
int next;
stream_t *ptr;
} streamPtrToIdx;
static streamPtrToIdx **_streamList = NULL;
static int *_streamAvail;
static
void stream_list_new(void)
{
int nnsp, i;
nnsp = namespaceGetNumber ();
_streamListSize_allocated = xcalloc(nnsp, sizeof(_streamListSize_allocated[0]));
_streamList = xcalloc(nnsp, sizeof(_streamList[0]));
_streamAvail = xcalloc(nnsp, sizeof(_streamAvail[0]));
for ( i = 0; i < nnsp; i++ )
{
_streamListSize_allocated[i] = MIN_STREAMS;
assert(_streamList[i] == NULL);
_streamList[i] =
(streamPtrToIdx *) xcalloc(_streamListSize_allocated[i], sizeof(streamPtrToIdx));
_streamAvail[i] = -1;
}
}
static
void stream_list_delete(void)
{
int nnsp, i;
nnsp = namespaceGetNumber ();
if ( _streamAvail ) free ( _streamAvail );
if ( _streamList )
{
for ( i = 0; i < nnsp; i++ )
if ( _streamList[i] ) free (_streamList[i] );
free ( _streamList );
}
if ( _streamListSize_allocated ) free ( _streamListSize_allocated );
}
static
void stream_init_pointer(void)
{
int nnsp, i, j, code;
namespaceTuple_t nspt;
nnsp = namespaceGetNumber ();
for ( i = 0; i < nnsp; i++ )
{
for ( j = 0; j < _streamListSize_allocated[i]; ++j )
{
nspt.nsp = i;
nspt.idx = j;
code = namespaceIdxEncode ( nspt );
_streamList[i][j].idx = code;
_streamList[i][j].next = j + 1;
_streamList[i][j].ptr = NULL;
}
_streamList[i][_streamListSize_allocated[i]-1].next = -1;
_streamAvail[i] = 0;
}
}
static
void stream_list_extend(void)
{
int newListSize;
int i, nsp, code;
namespaceTuple_t nspt;
nsp = namespaceGetActive ();
nspt.nsp = nsp;
assert(_streamList[nsp] != NULL);
newListSize = _streamListSize_allocated[nsp] + MIN_STREAMS;
if ( newListSize <= _stream_max)
{
_streamList[nsp] =
(streamPtrToIdx *) xrealloc(_streamList[nsp],
newListSize*sizeof(streamPtrToIdx));
for ( i = _streamListSize_allocated[nsp]; i < newListSize; ++i )
{
nspt.idx = i;
code = namespaceIdxEncode ( nspt );
_streamList[nsp][i].idx = code;
_streamList[nsp][i].next = i + 1;
_streamList[nsp][i].ptr = NULL;
}
_streamAvail[nsp] = _streamListSize_allocated[nsp];
_streamList[nsp][_streamListSize_allocated[nsp]-1].next = _streamListSize_allocated[nsp];
_streamListSize_allocated[nsp] = newListSize;
_streamList[nsp][_streamListSize_allocated[nsp]-1].next = -1;
}
else
Warning("Too many open streams (limit is %d)!", _stream_max);
}
stream_t *stream_to_pointer(int idx)
{
stream_t *streamptr = NULL;
int nsp;
namespaceTuple_t nspT;
nsp = namespaceGetActive ();
nspT = namespaceIdxDecode ( idx );
if ( nspT.nsp != nsp )
Error ( "idx %d from namespace %d not available in active namespace %d !\n",
idx, nspT.nsp, nsp );
STREAM_INIT();
if ( idx >= 0 && nspT.idx < _streamListSize_allocated[nsp] )
{
STREAM_LOCK();
streamptr = _streamList[nsp][nspT.idx].ptr;
STREAM_UNLOCK();
}
else
Error("stream index %d undefined!", idx);
return (streamptr);
return ( stream_t *) reshGetVal ( idx, &streamOps );
}
/* Create an index from a pointer (add the pointer to the stream list) */
static
int stream_from_pointer(stream_t *ptr)
{
int idx = -1;
int nsp;
nsp = namespaceGetActive ();
if ( ptr )
{
STREAM_LOCK();
if ( _streamAvail[nsp] < 0 ) stream_list_extend();
if ( _streamAvail[nsp] >= 0 )
{
streamPtrToIdx *newptr;
newptr = &_streamList[nsp][_streamAvail[nsp]];
_streamAvail[nsp] = newptr->next;
newptr->next = -1;
idx = newptr->idx;
newptr->ptr = ptr;
if ( STREAM_Debug )
Message("Pointer %p has idx %d from stream list", ptr, idx);
}
STREAM_UNLOCK();
}
else
Error("Internal problem (pointer %p undefined)", ptr);
return (idx);
}
static
void stream_init_entry(stream_t *streamptr)
void stream_init_ptr ( stream_t * streamptr )
{
int i;
streamptr->self = stream_from_pointer(streamptr);
streamptr->self = -1;
streamptr->accesstype = UNDEFID;
streamptr->accessmode = 0;
......@@ -486,14 +292,21 @@ void stream_init_entry(stream_t *streamptr)
}
static
void stream_init_entry ( stream_t * streamptr )
{
stream_init_ptr ( streamptr );
streamptr->self = reshPut (( void * ) streamptr, &streamOps );
}
stream_t *stream_new_entry(void)
{
stream_t *streamptr;
cdiInitialize(); /* ***************** make MT version !!! */
STREAM_INIT();
streamptr = (stream_t *) malloc(sizeof(stream_t));
if ( streamptr ) stream_init_entry(streamptr);
......@@ -505,53 +318,18 @@ stream_t *stream_new_entry(void)
void stream_delete_entry(stream_t *streamptr)
{
int idx;
int nsp;
nsp = namespaceGetActive ();
assert ( streamptr );
idx = streamptr->self;
STREAM_LOCK();
free(streamptr);
_streamList[nsp][idx].next = _streamAvail[nsp];
_streamList[nsp][idx].ptr = 0;
_streamAvail[nsp] = idx;
STREAM_UNLOCK();
reshRemove ( idx, &streamOps );
if ( STREAM_Debug )
Message("Removed idx %d from stream list", idx);
}
static
void stream_initialize(void)
{
char *env;
#if defined (HAVE_LIBPTHREAD)
/* initialize global API mutex lock */
pthread_mutex_init(&_stream_mutex, NULL);
#endif
env = getenv("STREAM_DEBUG");
if ( env ) STREAM_Debug = atoi(env);
stream_list_new();
atexit(stream_list_delete);
STREAM_LOCK();
stream_init_pointer();
STREAM_UNLOCK();
_stream_init = TRUE;
}
void stream_check_ptr(const char *caller, stream_t *streamptr)
{
if ( streamptr == NULL )
......@@ -559,26 +337,6 @@ void stream_check_ptr(const char *caller, stream_t *streamptr)
}
int streamSize(void)
{
int streamListSize = 0;
int i;
int nsp;
nsp = namespaceGetActive ();
STREAM_INIT();
STREAM_LOCK();
for ( i = 0; i < _streamListSize_allocated[nsp]; i++ )
if ( _streamList[nsp][i].ptr ) streamListSize++;
STREAM_UNLOCK();
return streamListSize;
}
void cdiDefGlobal(const char *string, int val)
{
......@@ -737,3 +495,76 @@ int cdiInqAccesstype(int streamID)
return (streamptr->accesstype);
}
int streamCompareP ( void * streamptr1, void * streamptr2 )
{
fprintf ( stdout, "############### streamCompareP () is NOT IMPLEMENTED\n" );
return 0;
}
void streamDestroyP ( void * streamptr )
{
fprintf ( stdout, "############### streamDestroyP () is NOT IMPLEMENTED\n" );
}
void streamPrintP ( void * streamptr )
{
FILE *fp = stdout;
stream_t * sp = ( stream_t * ) streamptr;
if ( !sp ) return;
fprintf ( fp, "#\n");
fprintf ( fp, "# streamID %d\n", sp->self);
fprintf ( fp, "#\n");
fprintf ( fp, "self = %d\n", sp->self );
fprintf ( fp, "accesstype = %d\n", sp->accesstype );
fprintf ( fp, "accessmode = %d\n", sp->accessmode );
fprintf ( fp, "filetype = %d\n", sp->filetype );
fprintf ( fp, "byteorder = %d\n", sp->byteorder );
fprintf ( fp, "fileID = %d\n", sp->fileID );
fprintf ( fp, "dimgroupID = %d\n", sp->dimgroupID );
fprintf ( fp, "filemode = %d\n", sp->filemode );
fprintf ( fp, "//off_t numvals;\n" );
fprintf ( fp, "filename = %s\n", sp->filename );
fprintf ( fp, "//Record *record;\n" );
fprintf ( fp, "nrecs = %d\n", sp->nrecs );
fprintf ( fp, "nvars = %d\n", sp->nvars );
fprintf ( fp, "varlocked = %d\n", sp->varlocked );
fprintf ( fp, "//SVARINFO *vars;\n" );
fprintf ( fp, "varsAllocated = %d\n", sp->varsAllocated );
fprintf ( fp, "varinit = %d\n", sp->varinit );
fprintf ( fp, "curTsID = %d\n", sp->curTsID );
fprintf ( fp, "rtsteps = %d\n", sp->rtsteps );
fprintf ( fp, "//long ntsteps;\n" );
fprintf ( fp, "numTimestep = %d\n", sp->numTimestep );
fprintf ( fp, "// TSTEPS *tsteps;\n" );
fprintf ( fp, "tstepsTableSize= %d\n", sp->tstepsTableSize );
fprintf ( fp, "tstepsNextID = %d\n", sp->tstepsNextID );
fprintf ( fp, "//BaseTime basetime;\n" );
fprintf ( fp, "ncmode = %d\n", sp->ncmode );
fprintf ( fp, "vlistID = %d\n", sp->vlistID );
fprintf ( fp, "// int xdimID[MAX_GRIDS_PS];\n" );
fprintf ( fp, "// int ydimID[MAX_GRIDS_PS];\n" );
fprintf ( fp, "// int zaxisID[MAX_ZAXES_PS];\n" );
fprintf ( fp, "// int ncxvarID[MAX_GRIDS_PS];\n" );
fprintf ( fp, "// int ncyvarID[MAX_GRIDS_PS];\n" );
fprintf ( fp, "// int ncavarID[MAX_GRIDS_PS];\n" );
fprintf ( fp, "historyID = %d\n", sp->historyID );
fprintf ( fp, "globalatts = %d\n", sp->globalatts );
fprintf ( fp, "localatts = %d\n", sp->localatts );
fprintf ( fp, "// VCT vct;\n" );
fprintf ( fp, "unreduced = %d\n", sp->unreduced );
fprintf ( fp, "sortname = %d\n", sp->sortname );
fprintf ( fp, "have_missval = %d\n", sp->have_missval );
fprintf ( fp, "ztype = %d\n", sp->ztype );
fprintf ( fp, "zlevel = %d\n", sp->zlevel );
fprintf ( fp, "curfile = %d\n", sp->curfile );
fprintf ( fp, "nfiles = %d\n", sp->nfiles );
fprintf ( fp, "// char **fnames;\n" );
fprintf ( fp, "// void *gribContainers;\n" );
}