Commit 2362fef8 authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

XTimstat: use cdo_task for asynchron reading

parent 6afe2329
......@@ -69,13 +69,10 @@
#include <cdi.h>
#include "cdo.h"
#include "cdo_int.h"
#include "cdo_task.h"
//#include "pstream.h"
#include "pstream_write.h"
#if defined(HAVE_LIBPTHREAD)
#include <pthread.h>
#endif
typedef struct {
int tsIDnext;
......@@ -83,6 +80,7 @@ typedef struct {
field_t **vars;
}
readarg_t;
static int num_recs = 0;
static
......@@ -262,26 +260,18 @@ void *XTimstat(void *argument)
int lparallelread = TRUE;
int ltsfirst = TRUE;
void *statusp = NULL;
#if defined(HAVE_LIBPTHREAD)
pthread_t thrID;
pthread_attr_t attr;
int rval;
void *read_task = NULL;
void *readresult = NULL;
if ( lparallelread )
{
size_t stacksize;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_attr_getstacksize(&attr, &stacksize);
if ( stacksize < 2097152 )
{
stacksize = 2097152;
pthread_attr_setstacksize(&attr, stacksize);
}
read_task = cdo_task_new();
if ( read_task == NULL )
{
lparallelread = FALSE;
cdoWarning("CDO tasks not available!");
}
}
#endif
int tsID = 0;
int otsID = 0;
......@@ -306,48 +296,35 @@ void *XTimstat(void *argument)
if ( ltsfirst || lparallelread == FALSE )
{
#if defined(HAVE_LIBPTHREAD)
if ( lparallelread )
{
rval = pthread_create(&thrID, &attr, cdoReadTimestep, &readarg);
if ( rval != 0 ) cdoAbort("pthread_create failed!");
cdo_task_start(read_task, cdoReadTimestep, &readarg);
}
else
#endif
{
statusp = cdoReadTimestep(&readarg);
readresult = cdoReadTimestep(&readarg);
}
#if defined(HAVE_LIBPTHREAD)
if ( lparallelread )
{
pthread_join(thrID, &statusp);
if ( *(int *)statusp < 0 )
cdoAbort("cdoReadTimestep error! (status = %d)", *(int *)statusp);
readresult = cdo_task_wait(read_task);
}
#endif
ltsfirst = FALSE;
}
#if defined(HAVE_LIBPTHREAD)
else
{
pthread_join(thrID, &statusp);
if ( *(int *)statusp < 0 )
cdoAbort("cdoReadTimestep error! (status = %d)", *(int *)statusp);
readresult = cdo_task_wait(read_task);
}
#endif
nrecs = *(int *)statusp;
nrecs = *(int *)readresult;
cdoUpdateVars(nvars, vlistID1, input_vars);
#if defined (HAVE_LIBPTHREAD)
if ( nrecs && lparallelread )
{
readarg.tsIDnext = tsID+1;
rval = pthread_create(&thrID, &attr, cdoReadTimestep, &readarg);
if ( rval != 0 ) cdoAbort("pthread_create failed!");
cdo_task_start(read_task, cdoReadTimestep, &readarg);
}
#endif
for ( varID = 0; varID < nvars; varID++ )
{
......
......@@ -42,7 +42,7 @@ void *cdo_task(void *task)
// cond_wait mutex must be locked before we can wait
pthread_mutex_lock(&(task_info->work_mtx));
printf("<worker> start\n");
//printf("<worker> start\n");
// ensure boss is waiting
pthread_mutex_lock(&(task_info->boss_mtx));
......@@ -71,9 +71,9 @@ void *cdo_task(void *task)
}
// do blocking task
printf("<worker> JOB start\n");
task_info->routine(task_info->arg);
printf("<worker> JOB end\n");
//printf("<worker> JOB start\n");
task_info->result = task_info->routine(task_info->arg);
//printf("<worker> JOB end\n");
// ensure boss is waiting
pthread_mutex_lock(&(task_info->boss_mtx));
......@@ -140,12 +140,22 @@ void *cdo_task_new()
#if defined(HAVE_LIBPTHREAD)
task_info = (cdo_task_t *) malloc(sizeof(cdo_task_t));
task_info->routine = NULL;
task_info->arg = NULL;
task_info->result = NULL;
task_info->state = SETUP;
pthread_attr_t attr;
size_t stacksize;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_attr_getstacksize(&attr, &stacksize);
if ( stacksize < 2097152 )
{
stacksize = 2097152;
pthread_attr_setstacksize(&attr, stacksize);
}
pthread_cond_init(&(task_info->work_cond), NULL);
pthread_mutex_init(&(task_info->work_mtx), NULL);
pthread_cond_init(&(task_info->boss_cond), NULL);
......@@ -153,7 +163,7 @@ void *cdo_task_new()
pthread_mutex_lock(&(task_info->boss_mtx));
pthread_create(&(task_info->thread), NULL, cdo_task, (void *)task_info);
pthread_create(&(task_info->thread), &attr, cdo_task, (void *)task_info);
cdo_task_wait(task_info);
#endif
......@@ -170,7 +180,7 @@ void cdo_task_delete(void *task)
// ensure the worker is waiting
pthread_mutex_lock(&(task_info->work_mtx));
printf("cdo_task_delete: send DIE to <worker>\n");
//printf("cdo_task_delete: send DIE to <worker>\n");
task_info->state = DIE;
// wake-up signal
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment