Commit bd74635b authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

afterburner: replace pthread by cdo_task for parallel read.

parent 0877630d
......@@ -19,6 +19,7 @@
#if defined(CDO)
#include "cdo.h"
#include "cdo_int.h"
#include "cdo_task.h"
#include "pstream_write.h"
#define streamOpenWrite pstreamOpenWrite
#define streamDefVlist pstreamDefVlist
......@@ -34,10 +35,6 @@
#include "compare.h"
#include "vct_l191.h"
#if defined (HAVE_LIBPTHREAD)
#include <pthread.h>
#endif
#if defined (_OPENMP)
#include <omp.h>
#endif
......@@ -105,11 +102,7 @@ static int oVertID = -1;
static int Lhybrid2pressure = FALSE;
static int TsID;
#if defined (HAVE_LIBPTHREAD)
static bool ParallelRead = true;
#else
static bool ParallelRead = false;
#endif
static bool lparallelread = true;
#define TIMESTEP_INTERVAL -1
#define MONTHLY_INTERVAL 0
......@@ -646,25 +639,17 @@ void after_control(struct Control *globs, struct Variable *vars)
int code;
RARG rarg;
void *statusp = NULL;
#if defined (HAVE_LIBPTHREAD)
pthread_t thrID;
pthread_attr_t attr;
int rval;
void *read_task = NULL;
if ( ParallelRead )
if ( lparallelread )
{
size_t stacksize;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
int status = pthread_attr_getstacksize(&attr, &stacksize);
if ( status && stacksize < 2097152 )
{
stacksize = 2097152;
pthread_attr_setstacksize(&attr, stacksize);
}
read_task = cdo_task_new();
if ( read_task == NULL )
{
lparallelread = false;
cdoWarning("CDO tasks not available!");
}
}
#endif
for ( code = 0; code < MaxCodes; code++ )
vars[code].needed0 = vars[code].needed;
......@@ -724,43 +709,34 @@ void after_control(struct Control *globs, struct Variable *vars)
rarg.vars = vars;
rarg.globs = globs;
if ( tsFirst || ParallelRead == false )
if ( tsFirst || lparallelread == false )
{
if ( ParallelRead == false )
if ( lparallelread == false )
{
statusp = after_readTimestep(&rarg);
}
#if defined(HAVE_LIBPTHREAD)
else
{
rval = pthread_create(&thrID, &attr, after_readTimestep, &rarg);
if ( rval != 0 ) Error("pthread_create failed!");
cdo_task_start(read_task, after_readTimestep, &rarg);
}
#endif
if ( tsFirst )
{
if ( globs->Type > 0 ) after_legini_setup(globs, vars);
}
if ( tsFirst && globs->Type > 0 ) after_legini_setup(globs, vars);
#if defined(HAVE_LIBPTHREAD)
if ( ParallelRead )
if ( lparallelread )
{
pthread_join(thrID, &statusp);
statusp = cdo_task_wait(read_task);
if ( *(int *)statusp < 0 )
Error("after_readTimestep error! (status = %d)", *(int *)statusp);
}
#endif
tsFirst = false;
}
#if defined(HAVE_LIBPTHREAD)
else
{
pthread_join(thrID, &statusp);
statusp = cdo_task_wait(read_task);
if ( *(int *)statusp < 0 )
Error("after_readTimestep error! (status = %d)", *(int *)statusp);
}
#endif
nrecs = *(int *)statusp;
globs->MeanCount0 = globs->MeanCount;
......@@ -768,13 +744,10 @@ void after_control(struct Control *globs, struct Variable *vars)
after_moveTimestep(vars);
#if defined (HAVE_LIBPTHREAD)
if ( nrecs && ParallelRead )
if ( nrecs && lparallelread )
{
rval = pthread_create(&thrID, &attr, after_readTimestep, &rarg);
if ( rval != 0 ) Error("pthread_create failed!");
cdo_task_start(read_task, after_readTimestep, &rarg);
}
#endif
after_setEndOfInterval(globs, nrecs);
......@@ -806,12 +779,7 @@ void after_control(struct Control *globs, struct Variable *vars)
globs->OldDate = globs->NewDate;
}
#if defined (HAVE_LIBPTHREAD)
if ( ParallelRead )
{
pthread_attr_destroy(&attr);
}
#endif
if ( read_task ) cdo_task_delete(read_task);
}
static
......@@ -2332,7 +2300,7 @@ int afterburner(int argc, char *argv[])
case 'b': Message( "option -b not longer needed!"); break;
case 'c': after_printCodes(); break;
case 'd': globs->Debug = 1; break;
case 'p': ParallelRead = true; break;
case 'p': lparallelread = true; break;
case 'P': numThreads = atoi(optarg); break;
case 'V': after_version(); break;
case 'v': Vctfile = optarg; break;
......@@ -2341,8 +2309,6 @@ int afterburner(int argc, char *argv[])
}
#if defined (_OPENMP)
/* ParallelRead = true; */
lprintf(stdout);
if ( numThreads <= 0 ) numThreads = 1;
omp_set_num_threads(numThreads);
......@@ -2357,15 +2323,7 @@ int afterburner(int argc, char *argv[])
}
#endif
if ( ParallelRead )
{
#if defined (HAVE_LIBPTHREAD)
fprintf(stdout, " Parallel read enabled\n");
#else
fprintf(stdout, " Parallel read disabled\n");
ParallelRead = false;
#endif
}
if ( lparallelread ) fprintf(stdout, " Parallel read enabled\n");
fargc0 = optind;
fargcn = argc;
......@@ -2453,6 +2411,8 @@ void *Afterburner(void *argument)
{
cdoInitialize(argument);
CDO_task = true;
lstdout = !cdoSilentMode;
struct Control *globs = (struct Control *) Malloc(sizeof(struct Control));
......
......@@ -295,7 +295,7 @@ void *XTimstat(void *argument)
readarg.streamID = streamID1;
readarg.vars = input_vars;
int lparallelread = CDO_Parallel_Read;
bool lparallelread = CDO_Parallel_Read > 0;
bool ltsfirst = true;
void *read_task = NULL;
void *readresult = NULL;
......@@ -305,7 +305,7 @@ void *XTimstat(void *argument)
read_task = cdo_task_new();
if ( read_task == NULL )
{
lparallelread = FALSE;
lparallelread = false;
cdoWarning("CDO tasks not available!");
}
}
......@@ -335,7 +335,7 @@ void *XTimstat(void *argument)
readarg.nrecs = nrecs;
readarg.recinfo = recinfo;
if ( ltsfirst || lparallelread == FALSE )
if ( ltsfirst || lparallelread == false )
{
ltsfirst = false;
readresult = cdoReadTimestep(&readarg);
......
......@@ -8,4 +8,4 @@ void *cdo_task_wait(void *task);
void *cdo_task_new();
void cdo_task_delete(void *task);
#endif // CDO_TASK_H
#endif /* CDO_TASK_H */
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment