Commit 785bafaf authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

added cdoUpdateVars()

parent 744963a5
......@@ -659,7 +659,7 @@ void after_control(struct Control *globs, struct Variable *vars)
else
{
rval = pthread_create(&thrID, &attr, after_readTimestep, &rarg);
if ( rval != 0 ) Error( "pthread_create failed!");
if ( rval != 0 ) Error("pthread_create failed!");
}
#endif
......@@ -673,7 +673,7 @@ void after_control(struct Control *globs, struct Variable *vars)
{
pthread_join(thrID, &statusp);
if ( *(int *)statusp < 0 )
Error( "after_readTimestep error! (status = %d)", *(int *)statusp);
Error("after_readTimestep error! (status = %d)", *(int *)statusp);
}
#endif
tsFirst = FALSE;
......@@ -683,7 +683,7 @@ void after_control(struct Control *globs, struct Variable *vars)
{
pthread_join(thrID, &statusp);
if ( *(int *)statusp < 0 )
Error( "after_readTimestep error! (status = %d)", *(int *)statusp);
Error("after_readTimestep error! (status = %d)", *(int *)statusp);
}
#endif
nrecs = *(int *)statusp;
......@@ -697,7 +697,7 @@ void after_control(struct Control *globs, struct Variable *vars)
if ( nrecs && ParallelRead )
{
rval = pthread_create(&thrID, &attr, after_readTimestep, &rarg);
if ( rval != 0 ) Error( "pthread_create failed!");
if ( rval != 0 ) Error("pthread_create failed!");
}
#endif
......
......@@ -69,10 +69,16 @@
#include <cdi.h>
#include "cdo.h"
#include "cdo_int.h"
#include "pstream.h"
//#include "pstream.h"
#include "pstream_write.h"
#if defined(HAVE_LIBPTHREAD)
#include <pthread.h>
#endif
typedef struct {
int tsIDnext;
int streamID, nrecs;
field_t **vars;
}
......@@ -86,18 +92,37 @@ void *cdoReadTimestep(void *rarg)
readarg_t *readarg = (readarg_t *) rarg;
field_t **input_vars = readarg->vars;
int streamID = readarg->streamID;
int tsIDnext = readarg->tsIDnext;
int nrecs = readarg->nrecs;
for ( int recID = 0; recID < nrecs; ++recID )
{
streamInqRecord(streamID, &varID, &levelID);
streamReadRecord(streamID, input_vars[varID][levelID].ptr, &nmiss);
input_vars[varID][levelID].nmiss = nmiss;
streamReadRecord(streamID, input_vars[varID][levelID].ptr2, &nmiss);
input_vars[varID][levelID].nmiss2 = nmiss;
}
num_recs = streamInqTimestep(streamID, tsIDnext);
return ((void *) &num_recs);
}
static
void cdoUpdateVars(int nvars, int vlistID, field_t **vars)
{
for ( int varID = 0; varID < nvars; varID++ )
{
int nlevels = zaxisInqSize(vlistInqVarZaxis(vlistID, varID));
for ( int levelID = 0; levelID < nlevels; levelID++ )
{
double *tmp = vars[varID][levelID].ptr;
vars[varID][levelID].ptr = vars[varID][levelID].ptr2;
vars[varID][levelID].ptr2 = tmp;
vars[varID][levelID].nmiss = vars[varID][levelID].nmiss2;
}
}
}
void *XTimstat(void *argument)
{
......@@ -167,7 +192,8 @@ void *XTimstat(void *argument)
int cmplen = DATE_LEN - comparelen;
int streamID1 = streamOpenRead(cdoStreamName(0));
// int streamID1 = streamOpenRead(cdoStreamName(0));
int streamID1 = streamOpenRead(cdoStreamName(0)->args);
int vlistID1 = streamInqVlist(streamID1);
int vlistID2 = vlistDuplicate(vlistID1);
......@@ -224,7 +250,7 @@ void *XTimstat(void *argument)
gridsize = vlistGridsizeMax(vlistID1);
if ( vlistNumber(vlistID1) != CDI_REAL ) gridsize *= 2;
field_t **input_vars = field_malloc(vlistID1, FIELD_PTR);
field_t **input_vars = field_malloc(vlistID1, FIELD_PTR | FIELD_PTR2);
field_t **vars1 = field_malloc(vlistID1, FIELD_PTR);
field_t **samp1 = field_malloc(vlistID1, FIELD_NONE);
field_t **vars2 = NULL;
......@@ -234,16 +260,37 @@ void *XTimstat(void *argument)
readarg.streamID = streamID1;
readarg.vars = input_vars;
int lparallelread = FALSE;
int lparallelread = TRUE;
int ltsfirst = TRUE;
void *statusp = NULL;
#if defined(HAVE_LIBPTHREAD)
pthread_t thrID;
pthread_attr_t attr;
int rval;
if ( lparallelread )
{
size_t stacksize;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_attr_getstacksize(&attr, &stacksize);
if ( stacksize < 2097152 )
{
stacksize = 2097152;
pthread_attr_setstacksize(&attr, stacksize);
}
}
#endif
int tsID = 0;
int otsID = 0;
nrecs = streamInqTimestep(streamID1, tsID);
tsID++;
while ( TRUE )
{
nsets = 0;
while ( (nrecs = streamInqTimestep(streamID1, tsID)) )
while ( nrecs > 0 )
{
dtlist_taxisInqTimestep(dtlist, taxisID1, nsets);
vdate = dtlist_get_vdate(dtlist, nsets);
......@@ -254,12 +301,16 @@ void *XTimstat(void *argument)
if ( DATE_IS_NEQ(indate1, indate2, cmplen) ) break;
readarg.tsIDnext = tsID;
readarg.nrecs = nrecs;
if ( ltsfirst = 0 || lparallelread == FALSE )
if ( ltsfirst || lparallelread == FALSE )
{
#if defined(HAVE_LIBPTHREAD)
if ( lparallelread )
{
rval = pthread_create(&thrID, &attr, cdoReadTimestep, &readarg);
if ( rval != 0 ) cdoAbort("pthread_create failed!");
}
else
#endif
......@@ -267,11 +318,34 @@ void *XTimstat(void *argument)
statusp = cdoReadTimestep(&readarg);
}
#if defined(HAVE_LIBPTHREAD)
if ( lparallelread )
{
pthread_join(thrID, &statusp);
if ( *(int *)statusp < 0 )
cdoAbort("cdoReadTimestep error! (status = %d)", *(int *)statusp);
}
#endif
ltsfirst = FALSE;
}
#if defined(HAVE_LIBPTHREAD)
else
{
pthread_join(thrID, &statusp);
if ( *(int *)statusp < 0 )
cdoAbort("cdoReadTimestep error! (status = %d)", *(int *)statusp);
}
#endif
nrecs = *(int *)statusp;
cdoUpdateVars(nvars, vlistID1, input_vars);
#if defined (HAVE_LIBPTHREAD)
if ( nrecs && lparallelread )
{
readarg.tsIDnext = tsID+1;
rval = pthread_create(&thrID, &attr, cdoReadTimestep, &readarg);
if ( rval != 0 ) cdoAbort("pthread_create failed!");
}
#endif
......@@ -469,8 +543,8 @@ void *XTimstat(void *argument)
dtlist_delete(dtlist);
if ( cdoDiag ) streamClose(streamID3);
streamClose(streamID2);
if ( cdoDiag ) pstreamClose(streamID3);
pstreamClose(streamID2);
streamClose(streamID1);
cdoFinish();
......
......@@ -60,9 +60,11 @@ typedef struct {
size_t size;
int nsamp;
int nmiss;
int nmiss2;
double missval;
double *weight;
double *ptr;
double *ptr2;
}
field_t;
......
......@@ -39,8 +39,10 @@ field_t **field_allocate(int vlistID, int ptype, int init)
field[varID][levelID].grid = gridID;
field[varID][levelID].nsamp = 0;
field[varID][levelID].nmiss = 0;
field[varID][levelID].nmiss2 = 0;
field[varID][levelID].missval = missval;
field[varID][levelID].ptr = NULL;
field[varID][levelID].ptr2 = NULL;
field[varID][levelID].weight = NULL;
if ( ptype & FIELD_PTR )
......@@ -49,6 +51,12 @@ field_t **field_allocate(int vlistID, int ptype, int init)
if ( init ) memset(field[varID][levelID].ptr, 0, nwpv*gridsize*sizeof(double));
}
if ( ptype & FIELD_PTR2 )
{
field[varID][levelID].ptr2 = (double*) Malloc(nwpv*gridsize*sizeof(double));
if ( init ) memset(field[varID][levelID].ptr2, 0, nwpv*gridsize*sizeof(double));
}
if ( ptype & FIELD_WGT )
{
field[varID][levelID].weight = (double*) Malloc(nwpv*gridsize*sizeof(double));
......@@ -82,6 +90,7 @@ void field_free(field_t **field, int vlistID)
for ( int levelID = 0; levelID < nlevel; ++levelID )
{
if ( field[varID][levelID].ptr ) Free(field[varID][levelID].ptr);
if ( field[varID][levelID].ptr2 ) Free(field[varID][levelID].ptr2);
if ( field[varID][levelID].weight ) Free(field[varID][levelID].weight);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment