Commit 744963a5 authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

added cdoReadTimestep()

parent 992e5915
...@@ -655,7 +655,7 @@ void after_control(struct Control *globs, struct Variable *vars) ...@@ -655,7 +655,7 @@ void after_control(struct Control *globs, struct Variable *vars)
{ {
statusp = after_readTimestep(&rarg); statusp = after_readTimestep(&rarg);
} }
#if defined (HAVE_LIBPTHREAD) #if defined(HAVE_LIBPTHREAD)
else else
{ {
rval = pthread_create(&thrID, &attr, after_readTimestep, &rarg); rval = pthread_create(&thrID, &attr, after_readTimestep, &rarg);
...@@ -668,7 +668,7 @@ void after_control(struct Control *globs, struct Variable *vars) ...@@ -668,7 +668,7 @@ void after_control(struct Control *globs, struct Variable *vars)
if ( globs->Type > 0 ) after_legini_setup(globs, vars); if ( globs->Type > 0 ) after_legini_setup(globs, vars);
} }
#if defined (HAVE_LIBPTHREAD) #if defined(HAVE_LIBPTHREAD)
if ( ParallelRead ) if ( ParallelRead )
{ {
pthread_join(thrID, &statusp); pthread_join(thrID, &statusp);
...@@ -678,7 +678,7 @@ void after_control(struct Control *globs, struct Variable *vars) ...@@ -678,7 +678,7 @@ void after_control(struct Control *globs, struct Variable *vars)
#endif #endif
tsFirst = FALSE; tsFirst = FALSE;
} }
#if defined (HAVE_LIBPTHREAD) #if defined(HAVE_LIBPTHREAD)
else else
{ {
pthread_join(thrID, &statusp); pthread_join(thrID, &statusp);
......
...@@ -382,12 +382,12 @@ void *Timstat(void *argument) ...@@ -382,12 +382,12 @@ void *Timstat(void *argument)
for ( varID = 0; varID < nvars; varID++ ) for ( varID = 0; varID < nvars; varID++ )
{ {
if ( vlistInqVarTsteptype(vlistID1, varID) == TSTEP_CONSTANT ) continue; if ( vlistInqVarTsteptype(vlistID1, varID) == TSTEP_CONSTANT ) continue;
nwpv = vars1[varID][levelID].nwpv; nlevel = zaxisInqSize(vlistInqVarZaxis(vlistID1, varID));
gridsize = gridInqSize(vars1[varID][levelID].grid);
nlevel = zaxisInqSize(vlistInqVarZaxis(vlistID1, varID));
for ( levelID = 0; levelID < nlevel; levelID++ ) for ( levelID = 0; levelID < nlevel; levelID++ )
{ {
missval = vars1[varID][levelID].missval; nwpv = vars1[varID][levelID].nwpv;
gridsize = gridInqSize(vars1[varID][levelID].grid);
missval = vars1[varID][levelID].missval;
if ( samp1[varID][levelID].ptr ) if ( samp1[varID][levelID].ptr )
{ {
int irun = 0; int irun = 0;
......
...@@ -72,6 +72,33 @@ ...@@ -72,6 +72,33 @@
#include "pstream.h" #include "pstream.h"
typedef struct {
int streamID, nrecs;
field_t **vars;
}
readarg_t;
static int num_recs = 0;
static
void *cdoReadTimestep(void *rarg)
{
int varID, levelID, nmiss;
readarg_t *readarg = (readarg_t *) rarg;
field_t **input_vars = readarg->vars;
int streamID = readarg->streamID;
int nrecs = readarg->nrecs;
for ( int recID = 0; recID < nrecs; ++recID )
{
streamInqRecord(streamID, &varID, &levelID);
streamReadRecord(streamID, input_vars[varID][levelID].ptr, &nmiss);
input_vars[varID][levelID].nmiss = nmiss;
}
return ((void *) &num_recs);
}
void *XTimstat(void *argument) void *XTimstat(void *argument)
{ {
int timestat_date = TIMESTAT_MEAN; int timestat_date = TIMESTAT_MEAN;
...@@ -79,7 +106,7 @@ void *XTimstat(void *argument) ...@@ -79,7 +106,7 @@ void *XTimstat(void *argument)
int vdate = 0, vtime = 0; int vdate = 0, vtime = 0;
int vdate0 = 0, vtime0 = 0; int vdate0 = 0, vtime0 = 0;
int nrecs; int nrecs;
int varID, levelID, recID; int varID, levelID;
long nsets; long nsets;
int i; int i;
int streamID3 = -1; int streamID3 = -1;
...@@ -203,6 +230,14 @@ void *XTimstat(void *argument) ...@@ -203,6 +230,14 @@ void *XTimstat(void *argument)
field_t **vars2 = NULL; field_t **vars2 = NULL;
if ( lvarstd ) vars2 = field_malloc(vlistID1, FIELD_PTR); if ( lvarstd ) vars2 = field_malloc(vlistID1, FIELD_PTR);
readarg_t readarg;
readarg.streamID = streamID1;
readarg.vars = input_vars;
int lparallelread = FALSE;
int ltsfirst = TRUE;
void *statusp = NULL;
int tsID = 0; int tsID = 0;
int otsID = 0; int otsID = 0;
while ( TRUE ) while ( TRUE )
...@@ -219,12 +254,26 @@ void *XTimstat(void *argument) ...@@ -219,12 +254,26 @@ void *XTimstat(void *argument)
if ( DATE_IS_NEQ(indate1, indate2, cmplen) ) break; if ( DATE_IS_NEQ(indate1, indate2, cmplen) ) break;
for ( recID = 0; recID < nrecs; recID++ ) readarg.nrecs = nrecs;
{ if ( ltsfirst = 0 || lparallelread == FALSE )
streamInqRecord(streamID1, &varID, &levelID); {
streamReadRecord(streamID1, input_vars[varID][levelID].ptr, &nmiss); #if defined(HAVE_LIBPTHREAD)
input_vars[varID][levelID].nmiss = nmiss; if ( lparallelread )
{
}
else
#endif
{
statusp = cdoReadTimestep(&readarg);
}
ltsfirst = FALSE;
} }
#if defined(HAVE_LIBPTHREAD)
else
{
}
#endif
for ( varID = 0; varID < nvars; varID++ ) for ( varID = 0; varID < nvars; varID++ )
{ {
...@@ -350,12 +399,12 @@ void *XTimstat(void *argument) ...@@ -350,12 +399,12 @@ void *XTimstat(void *argument)
for ( varID = 0; varID < nvars; varID++ ) for ( varID = 0; varID < nvars; varID++ )
{ {
if ( vlistInqVarTsteptype(vlistID1, varID) == TSTEP_CONSTANT ) continue; if ( vlistInqVarTsteptype(vlistID1, varID) == TSTEP_CONSTANT ) continue;
nwpv = vars1[varID][levelID].nwpv; nlevels = zaxisInqSize(vlistInqVarZaxis(vlistID1, varID));
gridsize = gridInqSize(vars1[varID][levelID].grid);
nlevels = zaxisInqSize(vlistInqVarZaxis(vlistID1, varID));
for ( levelID = 0; levelID < nlevels; levelID++ ) for ( levelID = 0; levelID < nlevels; levelID++ )
{ {
missval = vars1[varID][levelID].missval; nwpv = vars1[varID][levelID].nwpv;
gridsize = gridInqSize(vars1[varID][levelID].grid);
missval = vars1[varID][levelID].missval;
if ( samp1[varID][levelID].ptr ) if ( samp1[varID][levelID].ptr )
{ {
int irun = 0; int irun = 0;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment