Commit b0ea73f4 authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

change fixed size of MAX_STREAMS=4096 to MIN_STREAMS=1024 and MAX_STREAMS=65536

parent 3c49f8ed
2010-04-16 Uwe Schulzweida <Uwe.Schulzweida@zmaw.de>
* change fixed size of MAX_STREAMS=4096 to MIN_STREAMS=1024 and MAX_STREAMS=65536
2010-04-13 Uwe Schulzweida <Uwe.Schulzweida@zmaw.de> 2010-04-13 Uwe Schulzweida <Uwe.Schulzweida@zmaw.de>
* cgribexDefTime: bug fix for GRIB time range 10 * cgribexDefTime: bug fix for GRIB time range 10
......
#include <stdio.h> #include <stdio.h>
#include <stdlib.h>
#include "cdi.h" #include "cdi.h"
#define NOPEN 8096 #define NOPEN 2048
#define FILENAME "example.grb" #define FILENAME "example.grb"
int main(void) int main(void)
{ {
int iopen; int iopen;
int idel;
int taxisID, vlistID, streamID; int taxisID, vlistID, streamID;
int streamIDs[NOPEN]; int streamIDs[NOPEN];
...@@ -31,6 +33,46 @@ int main(void) ...@@ -31,6 +33,46 @@ int main(void)
streamClose(streamID); streamClose(streamID);
} }
for ( iopen = 0; iopen < NOPEN; ++iopen )
streamIDs[iopen] = -1;
for ( iopen = 0; iopen < NOPEN; ++iopen )
{
fprintf(stderr, "simultaneous open %d\n", iopen+1);
/* Open the dataset */
streamID = streamOpenRead(FILENAME);
if ( streamID < 0 )
{
fprintf(stderr, "%s\n", cdiStringError(streamID));
return(1);
}
/* Get the variable list of the dataset */
vlistID = streamInqVlist(streamID);
/* Get the Time axis from the variable list */
taxisID = vlistInqTaxis(vlistID);
streamIDs[iopen] = streamID;
if ( iopen%2 )
{
idel = (int) ((iopen+1) * (rand() / (RAND_MAX + 1.0)));
if ( streamIDs[idel] >= 0 )
{
streamClose(streamIDs[idel]);
streamIDs[idel] = -1;
fprintf(stderr, "randomly closed %d\n", idel);
}
}
}
for ( iopen = 0; iopen < NOPEN; ++iopen )
{
/* Close the input stream */
if ( streamIDs[iopen] >= 0 ) streamClose(streamIDs[iopen]);
}
for ( iopen = 0; iopen < NOPEN; ++iopen ) for ( iopen = 0; iopen < NOPEN; ++iopen )
{ {
fprintf(stderr, "simultaneous open %d\n", iopen+1); fprintf(stderr, "simultaneous open %d\n", iopen+1);
...@@ -54,7 +96,7 @@ int main(void) ...@@ -54,7 +96,7 @@ int main(void)
for ( iopen = 0; iopen < NOPEN; ++iopen ) for ( iopen = 0; iopen < NOPEN; ++iopen )
{ {
/* Close the input stream */ /* Close the input stream */
streamClose(streamIDs[iopen]); if ( streamIDs[iopen] >= 0 ) streamClose(streamIDs[iopen]);
} }
return 0; return 0;
......
...@@ -28,6 +28,7 @@ extern "C" { ...@@ -28,6 +28,7 @@ extern "C" {
#define CDI_ELIBNAVAIL -22 /* xxx library not available */ #define CDI_ELIBNAVAIL -22 /* xxx library not available */
#define CDI_EUFSTRUCT -23 /* Unsupported file structure */ #define CDI_EUFSTRUCT -23 /* Unsupported file structure */
#define CDI_EUNC4 -24 /* Unsupported netCDF4 structure */ #define CDI_EUNC4 -24 /* Unsupported netCDF4 structure */
#define CDI_ELIMIT -99 /* Internal limits exceeded */
/* File types */ /* File types */
......
...@@ -6,14 +6,15 @@ ...@@ -6,14 +6,15 @@
#include <errno.h> #include <errno.h>
#include "cdi.h" #include "cdi.h"
static char UnknownError[] = "Unknown Error";
static char _EUFTYPE[] = "Unsupported file type";
static char _ELIBNAVAIL[] = "Unsupported file type (library support not compiled in)";
static char _EUFSTRUCT[] = "Unsupported file structure";
static char _EUNC4[] = "Unsupported netCDF4 structure";
char *cdiStringError(int cdiErrno) char *cdiStringError(int cdiErrno)
{ {
static char UnknownError[] = "Unknown Error";
static char _EUFTYPE[] = "Unsupported file type";
static char _ELIBNAVAIL[] = "Unsupported file type (library support not compiled in)";
static char _EUFSTRUCT[] = "Unsupported file structure";
static char _EUNC4[] = "Unsupported netCDF4 structure";
static char _ELIMIT[] = "Internal limits exceeded";
switch (cdiErrno) { switch (cdiErrno) {
case CDI_ESYSTEM: case CDI_ESYSTEM:
{ {
...@@ -25,6 +26,7 @@ char *cdiStringError(int cdiErrno) ...@@ -25,6 +26,7 @@ char *cdiStringError(int cdiErrno)
case CDI_ELIBNAVAIL: return _ELIBNAVAIL; case CDI_ELIBNAVAIL: return _ELIBNAVAIL;
case CDI_EUFSTRUCT: return _EUFSTRUCT; case CDI_EUFSTRUCT: return _EUFSTRUCT;
case CDI_EUNC4: return _EUNC4; case CDI_EUNC4: return _EUNC4;
case CDI_ELIMIT: return _ELIMIT;
} }
return UnknownError; return UnknownError;
......
#ifndef _CDI_LIMITS_H #ifndef _CDI_LIMITS_H
#define _CDI_LIMITS_H #define _CDI_LIMITS_H
#define MAX_STREAMS 4096 /* maximum number of streams */ #define MIN_STREAMS 1024 /* minimum number of streams */
#define MAX_STREAMS 65536 /* maximum number of streams */
#define MIN_VLISTS 4096 /* minimum number of vlists */
#define MAX_VLISTS 4096 /* maximum number of vlists */ #define MAX_VLISTS 4096 /* maximum number of vlists */
#define MAX_GRIDS 1024 /* maximum number of grids */ #define MAX_GRIDS 1024 /* maximum number of grids */
#define MAX_ZAXIS 1024 /* maximum number of zaxis */ #define MAX_ZAXIS 1024 /* maximum number of zaxis */
......
...@@ -684,7 +684,9 @@ int streamOpen(const char *filename, const char *filemode, int filetype) ...@@ -684,7 +684,9 @@ int streamOpen(const char *filename, const char *filemode, int filetype)
else else
{ {
streamptr = stream_new_entry(); streamptr = stream_new_entry();
streamID = streamptr->self; streamID = streamptr->self;
if ( streamID < 0 ) return(CDI_ELIMIT);
streamptr->record = record; streamptr->record = record;
streamptr->filetype = filetype; streamptr->filetype = filetype;
...@@ -695,7 +697,11 @@ int streamOpen(const char *filename, const char *filemode, int filetype) ...@@ -695,7 +697,11 @@ int streamOpen(const char *filename, const char *filemode, int filetype)
if ( streamptr->filemode == 'r' ) if ( streamptr->filemode == 'r' )
{ {
vlist_t *vlistptr; vlist_t *vlistptr;
streamptr->vlistID = vlistCreate(); int vlistID;
vlistID = vlistCreate();
if ( vlistID < 0 ) return(CDI_ELIMIT);
streamptr->vlistID = vlistID;
/* cdiReadByteorder(streamID); */ /* cdiReadByteorder(streamID); */
status = cdiInqContents(streamID); status = cdiInqContents(streamID);
if ( status < 0 ) return (status); if ( status < 0 ) return (status);
......
...@@ -199,6 +199,7 @@ char *strfiletype(int filetype) ...@@ -199,6 +199,7 @@ char *strfiletype(int filetype)
static int STREAM_Debug = 0; /* If set to 1, debugging */ static int STREAM_Debug = 0; /* If set to 1, debugging */
static int _stream_min = MIN_STREAMS;
static int _stream_max = MAX_STREAMS; static int _stream_max = MAX_STREAMS;
static void stream_initialize(void); static void stream_initialize(void);
...@@ -228,13 +229,13 @@ static pthread_mutex_t _stream_mutex; ...@@ -228,13 +229,13 @@ static pthread_mutex_t _stream_mutex;
typedef struct _streamPtrToIdx { typedef struct _streamPtrToIdx {
int idx; int idx;
int next;
stream_t *ptr; stream_t *ptr;
struct _streamPtrToIdx *next;
} streamPtrToIdx; } streamPtrToIdx;
static streamPtrToIdx *_streamList = NULL; static streamPtrToIdx *_streamList = NULL;
static streamPtrToIdx *_streamAvail = NULL; static int _streamAvail = -1;
static static
void stream_list_new(void) void stream_list_new(void)
...@@ -243,7 +244,7 @@ void stream_list_new(void) ...@@ -243,7 +244,7 @@ void stream_list_new(void)
assert(_streamList == NULL); assert(_streamList == NULL);
_streamList = (streamPtrToIdx *) malloc(_stream_max*sizeof(streamPtrToIdx)); _streamList = (streamPtrToIdx *) malloc(_stream_min*sizeof(streamPtrToIdx));
} }
static static
...@@ -257,18 +258,49 @@ void stream_list_delete(void) ...@@ -257,18 +258,49 @@ void stream_list_delete(void)
static static
void stream_init_pointer(void) void stream_init_pointer(void)
{ {
int i; int i;
for ( i = 0; i < _stream_max; i++ ) for ( i = 0; i < _stream_min; ++i )
{ {
_streamList[i].next = _streamList + i + 1;
_streamList[i].idx = i; _streamList[i].idx = i;
_streamList[i].next = i + 1;
_streamList[i].ptr = NULL; _streamList[i].ptr = NULL;
} }
_streamList[_stream_max-1].next = 0; _streamList[_stream_min-1].next = -1;
_streamAvail = 0;
}
static
void stream_list_extend(void)
{
static char func[] = "stream_list_extend";
int nstreams;
int i;
assert(_streamList != NULL);
nstreams = _stream_min + MIN_STREAMS;
_streamAvail = _streamList; if ( nstreams <= _stream_max)
{
_streamList = (streamPtrToIdx *) realloc(_streamList, nstreams*sizeof(streamPtrToIdx));
for ( i = _stream_min; i < nstreams; ++i )
{
_streamList[i].idx = i;
_streamList[i].next = i + 1;
_streamList[i].ptr = NULL;
}
_streamAvail = _stream_min;
_streamList[_stream_min-1].next = _stream_min;
_stream_min = nstreams;
_streamList[_stream_min-1].next = -1;
}
else
Warning(func, "Too many open streams (limit is %d)!", _stream_max);
} }
...@@ -279,7 +311,7 @@ stream_t *stream_to_pointer(int idx) ...@@ -279,7 +311,7 @@ stream_t *stream_to_pointer(int idx)
STREAM_INIT(); STREAM_INIT();
if ( idx >= 0 && idx < _stream_max ) if ( idx >= 0 && idx < _stream_min )
{ {
STREAM_LOCK(); STREAM_LOCK();
...@@ -304,21 +336,21 @@ int stream_from_pointer(stream_t *ptr) ...@@ -304,21 +336,21 @@ int stream_from_pointer(stream_t *ptr)
{ {
STREAM_LOCK(); STREAM_LOCK();
if ( _streamAvail ) if ( _streamAvail < 0 ) stream_list_extend();
if ( _streamAvail >= 0 )
{ {
streamPtrToIdx *newptr; streamPtrToIdx *newptr;
newptr = _streamAvail; newptr = &_streamList[_streamAvail];
_streamAvail = _streamAvail->next; _streamAvail = newptr->next;
newptr->next = 0; newptr->next = -1;
idx = newptr->idx; idx = newptr->idx;
newptr->ptr = ptr; newptr->ptr = ptr;
if ( STREAM_Debug ) if ( STREAM_Debug )
Message(func, "Pointer %p has idx %d from stream list", ptr, idx); Message(func, "Pointer %p has idx %d from stream list", ptr, idx);
} }
else
Warning(func, "Too many open streams (limit is %d)!", _stream_max);
STREAM_UNLOCK(); STREAM_UNLOCK();
} }
...@@ -420,7 +452,7 @@ void stream_delete_entry(stream_t *streamptr) ...@@ -420,7 +452,7 @@ void stream_delete_entry(stream_t *streamptr)
_streamList[idx].next = _streamAvail; _streamList[idx].next = _streamAvail;
_streamList[idx].ptr = 0; _streamList[idx].ptr = 0;
_streamAvail = &_streamList[idx]; _streamAvail = idx;
STREAM_UNLOCK(); STREAM_UNLOCK();
...@@ -471,7 +503,7 @@ int streamSize(void) ...@@ -471,7 +503,7 @@ int streamSize(void)
STREAM_LOCK(); STREAM_LOCK();
for ( i = 0; i < _stream_max; i++ ) for ( i = 0; i < _stream_min; i++ )
if ( _streamList[i].ptr ) streamsize++; if ( _streamList[i].ptr ) streamsize++;
STREAM_UNLOCK(); STREAM_UNLOCK();
......
...@@ -276,7 +276,7 @@ int vlistCreate(void) ...@@ -276,7 +276,7 @@ int vlistCreate(void)
vlistID = vlistptr->self; vlistID = vlistptr->self;
vlistLock(vlistID); if ( vlistID >= 0 ) vlistLock(vlistID);
return (vlistID); return (vlistID);
} }
...@@ -327,6 +327,7 @@ void vlistDestroy(int vlistID) ...@@ -327,6 +327,7 @@ void vlistDestroy(int vlistID)
} }
} }
int vlistNlock(int vlistID) int vlistNlock(int vlistID)
{ {
static char func[] = "vlistNlock"; static char func[] = "vlistNlock";
...@@ -339,6 +340,7 @@ int vlistNlock(int vlistID) ...@@ -339,6 +340,7 @@ int vlistNlock(int vlistID)
return (vlistptr->nlock); return (vlistptr->nlock);
} }
void vlistLock(int vlistID) void vlistLock(int vlistID)
{ {
static char func[] = "vlistLock"; static char func[] = "vlistLock";
...@@ -352,6 +354,7 @@ void vlistLock(int vlistID) ...@@ -352,6 +354,7 @@ void vlistLock(int vlistID)
/* Message(func, "vlistID %d nlock %d", vlistID, vlistptr->nlock); */ /* Message(func, "vlistID %d nlock %d", vlistID, vlistptr->nlock); */
} }
void vlistUnlock(int vlistID) void vlistUnlock(int vlistID)
{ {
static char func[] = "vlistUnlock"; static char func[] = "vlistUnlock";
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment