Commit 2b53f896 authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

cdo_task: refactored to class.

parent 0851fbe7
Pipeline #2559 passed with stages
in 16 minutes and 24 seconds
......@@ -64,7 +64,7 @@ struct RARG
struct Control *globs;
};
void *afterReadTask = nullptr;
CdoTask *afterReadTask = nullptr;
static bool lstdout = true;
......@@ -594,15 +594,7 @@ after_control(struct Control *globs, struct Variable *vars)
int code;
RARG rarg;
if (afterReadAsync)
{
afterReadTask = cdo_task_new();
if (afterReadTask == nullptr)
{
afterReadAsync = false;
cdoWarning("CDO tasks not available!");
}
}
if (afterReadAsync) afterReadTask = new CdoTask;
for (code = 0; code < MaxCodes; code++) vars[code].needed0 = vars[code].needed;
......@@ -667,7 +659,7 @@ after_control(struct Control *globs, struct Variable *vars)
{
if (afterReadAsync)
{
cdo_task_start(afterReadTask, after_readTimestep, &rarg);
afterReadTask->start(after_readTimestep, &rarg);
}
else
{
......@@ -678,14 +670,14 @@ after_control(struct Control *globs, struct Variable *vars)
if (afterReadAsync)
{
status = *(int *) cdo_task_wait(afterReadTask);
status = *(int *) afterReadTask->wait();
if (status < 0) cdoAbort("after_readTimestep error! (status = %d)", status);
}
tsFirst = false;
}
else
{
status = *(int *) cdo_task_wait(afterReadTask);
status = *(int *) afterReadTask->wait();
if (status < 0) cdoAbort("after_readTimestep error! (status = %d)", status);
}
......@@ -698,7 +690,7 @@ after_control(struct Control *globs, struct Variable *vars)
if (nrecs && afterReadAsync)
{
cdo_task_start(afterReadTask, after_readTimestep, &rarg);
afterReadTask->start(after_readTimestep, &rarg);
}
after_setEndOfInterval(globs, nrecs);
......@@ -731,7 +723,7 @@ after_control(struct Control *globs, struct Variable *vars)
globs->OldDate = globs->NewDate;
}
if (afterReadTask) cdo_task_delete(afterReadTask);
if (afterReadTask) delete afterReadTask;
}
static void
......@@ -1875,8 +1867,6 @@ Afterburner(void *process)
{
cdoInitialize(process);
CDO_task = true;
lstdout = !Options::silentMode;
struct Control globs = {};
......
......@@ -74,7 +74,7 @@ struct ensstat_arg_t
static void *
ensstat_func(void *ensarg)
{
if (CDO_task) cdo_omp_set_num_threads(Threading::ompNumThreads);
if (Options::CDO_task) cdo_omp_set_num_threads(Threading::ompNumThreads);
ensstat_arg_t *arg = (ensstat_arg_t *) ensarg;
const int t = arg->t;
......@@ -155,7 +155,7 @@ addOperators(void)
void *
Ensstat(void *process)
{
void *task = CDO_task ? cdo_task_new() : nullptr;
CdoTask *task = Options::CDO_task ? new CdoTask : nullptr;
int nrecs0;
cdoInitialize(process);
......@@ -222,7 +222,7 @@ Ensstat(void *process)
for (int fileID = 0; fileID < nfiles; fileID++)
{
ef[fileID].array[0].resize(gridsizemax);
if (CDO_task) ef[fileID].array[1].resize(gridsizemax);
if (Options::CDO_task) ef[fileID].array[1].resize(gridsizemax);
}
Varray<double> array2(gridsizemax);
......@@ -321,10 +321,10 @@ Ensstat(void *process)
ensstat_arg.efData = ef.data();
ensstat_arg.varID[t] = varID;
ensstat_arg.levelID[t] = levelID;
if (CDO_task)
if (Options::CDO_task)
{
cdo_task_start(task, ensstat_func, &ensstat_arg);
cdo_task_wait(task);
task->start(ensstat_func, &ensstat_arg);
task->wait();
// t = !t;
}
else
......@@ -346,7 +346,7 @@ CLEANUP:
cdoStreamClose(streamID2);
if (task) cdo_task_delete(task);
if (task) delete task;
cdoFinish();
......
......@@ -330,18 +330,10 @@ XTimstat(void *process)
bool lparallelread = Options::CDO_Parallel_Read > 0;
bool ltsfirst = true;
void *read_task = nullptr;
CdoTask *read_task = nullptr;
void *readresult = nullptr;
if (lparallelread)
{
read_task = cdo_task_new();
if (read_task == nullptr)
{
lparallelread = false;
cdoWarning("CDO tasks not available!");
}
}
if (lparallelread) read_task = new CdoTask;
int tsID = 0;
int otsID = 0;
......@@ -380,7 +372,7 @@ XTimstat(void *process)
}
else
{
readresult = cdo_task_wait(read_task);
readresult = read_task->wait();
}
nrecs = *(int *) readresult;
......@@ -391,7 +383,7 @@ XTimstat(void *process)
{
readarg.vars = input_vars[curFirst];
readarg.tsIDnext = tsID + 1;
cdo_task_start(read_task, cdoReadTimestep, &readarg);
read_task->start(cdoReadTimestep, &readarg);
}
if (nsets == 0)
......@@ -597,6 +589,8 @@ XTimstat(void *process)
cdoStreamClose(streamID1);
#endif
if (read_task) delete read_task;
cdoFinish();
return nullptr;
......
......@@ -244,9 +244,9 @@ template <typename... Args>
void
afterAbort(const std::string &format, Args const &... args)
{
extern void *afterReadTask;
extern CdoTask *afterReadTask;
extern bool afterReadAsync;
if (afterReadAsync) cdo_task_wait(afterReadTask);
if (afterReadAsync && afterReadTask) afterReadTask->wait();
cdoAbort(format, args...);
}
......
......@@ -1344,7 +1344,7 @@ parseOptionsLong(int argc, char *argv[])
case 'P': numThreads = parameter2int(CDO_optarg); break;
case 'p':
Options::CDO_Parallel_Read = true;
CDO_task = true;
Options::CDO_task = true;
break;
case 'Q': cdiDefGlobal("SORTNAME", true); break;
case 'R':
......
......@@ -61,6 +61,7 @@ bool CDO_Parallel_Read = false;
int CDO_Reduce_Dim = false;
int CDO_Append_History = true;
bool CDO_Reset_History = false;
bool CDO_task = false;
unsigned Random_Seed = 1;
......
......@@ -60,6 +60,7 @@ extern bool cdoDiag;
extern MemType CDO_Memtype;
extern bool CDO_Parallel_Read;
extern bool CDO_task;
extern int CDO_Reduce_Dim;
extern int CDO_Append_History;
......
......@@ -14,90 +14,66 @@
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef HAVE_LIBPTHREAD
#include <pthread.h>
#endif
#include <stdlib.h>
#include <stdio.h>
#include "cdo_task.h"
bool CDO_task = false;
#include <cstdlib>
#include <cstdio>
enum cdo_pt_state
{
SETUP,
IDLE,
JOB,
DIE
};
struct cdo_task_t
{
void *(*routine)(void *);
void *arg;
void *result;
enum cdo_pt_state state;
#ifdef HAVE_LIBPTHREAD
pthread_t thread;
pthread_cond_t work_cond;
pthread_mutex_t work_mtx;
pthread_cond_t boss_cond;
pthread_mutex_t boss_mtx;
#endif
};
#ifdef HAVE_LIBPTHREAD
static void *
cdo_task(void *task)
{
cdo_task_t *task_info = (cdo_task_t *) task;
CdoTask *task_info = (CdoTask *) task;
// cond_wait mutex must be locked before we can wait
pthread_mutex_lock(&(task_info->work_mtx));
{
// cond_wait mutex must be locked before we can wait
pthread_mutex_lock(&(task_info->work_mtx));
// printf("<worker> start\n");
// printf("<worker> start\n");
// ensure boss is waiting
pthread_mutex_lock(&(task_info->boss_mtx));
// ensure boss is waiting
pthread_mutex_lock(&(task_info->boss_mtx));
// signal to boss that setup is complete
task_info->state = IDLE;
// signal to boss that setup is complete
task_info->state = CdoTaskState::IDLE;
// wake-up signal
pthread_cond_signal(&(task_info->boss_cond));
pthread_mutex_unlock(&(task_info->boss_mtx));
// wake-up signal
pthread_cond_signal(&(task_info->boss_cond));
pthread_mutex_unlock(&(task_info->boss_mtx));
while (1)
{
pthread_cond_wait(&(task_info->work_cond), &(task_info->work_mtx));
while (1)
{
pthread_cond_wait(&(task_info->work_cond), &(task_info->work_mtx));
if (DIE == task_info->state) break; // kill thread
if (CdoTaskState::DIE == task_info->state) break; // kill thread
if (IDLE == task_info->state) continue; // accidental wake-up
if (CdoTaskState::IDLE == task_info->state) continue; // accidental wake-up
// do blocking task
// printf("<worker> JOB start\n");
task_info->result = task_info->routine(task_info->arg);
// printf("<worker> JOB end\n");
// do blocking task
// printf("<worker> JOB start\n");
task_info->result = task_info->routine(task_info->arg);
// printf("<worker> JOB end\n");
// ensure boss is waiting
pthread_mutex_lock(&(task_info->boss_mtx));
// ensure boss is waiting
pthread_mutex_lock(&(task_info->boss_mtx));
// indicate that job is done
task_info->state = IDLE;
// indicate that job is done
task_info->state = CdoTaskState::IDLE;
// wake-up signal
pthread_cond_signal(&(task_info->boss_cond));
pthread_mutex_unlock(&(task_info->boss_mtx));
}
// wake-up signal
pthread_cond_signal(&(task_info->boss_cond));
pthread_mutex_unlock(&(task_info->boss_mtx));
}
pthread_mutex_unlock(&(task_info->work_mtx));
}
pthread_mutex_unlock(&(task_info->work_mtx));
pthread_exit(nullptr);
return nullptr;
......@@ -105,138 +81,105 @@ cdo_task(void *task)
#endif
void
cdo_task_start(void *task, void *(*task_routine)(void *), void *task_arg)
CdoTask::start(void *(*task_routine)(void *), void *task_arg)
{
if (!task) return;
cdo_task_t *task_info = (cdo_task_t *) task;
// ensure worker is waiting
// ensure worker is waiting
#ifdef HAVE_LIBPTHREAD
if (CDO_task) pthread_mutex_lock(&(task_info->work_mtx));
pthread_mutex_lock(&(this->work_mtx));
#endif
// set job information & state
task_info->routine = task_routine;
task_info->arg = task_arg;
task_info->state = JOB;
this->routine = task_routine;
this->arg = task_arg;
this->state = CdoTaskState::JOB;
bool run_task = !CDO_task;
#if !defined(HAVE_LIBPTHREAD)
run_task = true;
#ifndef HAVE_LIBPTHREAD
this->result = this->routine(this->arg);
#endif
if (run_task) task_info->result = task_info->routine(task_info->arg);
// wake-up signal
// wake-up signal
#ifdef HAVE_LIBPTHREAD
if (CDO_task)
{
pthread_cond_signal(&(task_info->work_cond));
pthread_mutex_unlock(&(task_info->work_mtx));
}
pthread_cond_signal(&(this->work_cond));
pthread_mutex_unlock(&(this->work_mtx));
#endif
}
void *
cdo_task_wait(void *task)
CdoTask::wait()
{
if (!task) return nullptr;
cdo_task_t *task_info = (cdo_task_t *) task;
#ifdef HAVE_LIBPTHREAD
if (CDO_task)
while (1)
{
while (1)
{
if (IDLE == task_info->state) break;
if (CdoTaskState::IDLE == this->state) break;
pthread_cond_wait(&(task_info->boss_cond), &(task_info->boss_mtx));
pthread_cond_wait(&(this->boss_cond), &(this->boss_mtx));
// if ( IDLE == task_info->state ) break;
}
// if (CdoTaskState::IDLE == task_info->state) break;
}
#endif
return task_info->result;
return this->result;
}
void *
cdo_task_new()
CdoTask::CdoTask()
{
cdo_task_t *task_info = nullptr;
task_info = (cdo_task_t *) malloc(sizeof(cdo_task_t));
task_info->routine = nullptr;
task_info->arg = nullptr;
task_info->result = nullptr;
task_info->state = SETUP;
this->routine = nullptr;
this->arg = nullptr;
this->result = nullptr;
this->state = CdoTaskState::SETUP;
#ifdef HAVE_LIBPTHREAD
if (CDO_task)
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
size_t stacksize;
pthread_attr_getstacksize(&attr, &stacksize);
if (stacksize < 2097152)
{
pthread_attr_t attr;
size_t stacksize;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_attr_getstacksize(&attr, &stacksize);
if (stacksize < 2097152)
{
stacksize = 2097152;
pthread_attr_setstacksize(&attr, stacksize);
}
pthread_cond_init(&(task_info->work_cond), nullptr);
pthread_mutex_init(&(task_info->work_mtx), nullptr);
pthread_cond_init(&(task_info->boss_cond), nullptr);
pthread_mutex_init(&(task_info->boss_mtx), nullptr);
pthread_mutex_lock(&(task_info->boss_mtx));
pthread_create(&(task_info->thread), &attr, cdo_task, (void *) task_info);
cdo_task_wait(task_info);
stacksize = 2097152;
pthread_attr_setstacksize(&attr, stacksize);
}
#endif
return (void *) task_info;
pthread_cond_init(&(this->work_cond), nullptr);
pthread_mutex_init(&(this->work_mtx), nullptr);
pthread_cond_init(&(this->boss_cond), nullptr);
pthread_mutex_init(&(this->boss_mtx), nullptr);
pthread_mutex_lock(&(this->boss_mtx));
pthread_create(&(this->thread), &attr, cdo_task, (void *) this);
this->wait();
#endif
}
void
cdo_task_delete(void *task)
CdoTask::~CdoTask()
{
cdo_task_t *task_info = (cdo_task_t *) task;
#ifdef HAVE_LIBPTHREAD
if (CDO_task)
{
// ensure the worker is waiting
pthread_mutex_lock(&(task_info->work_mtx));
// ensure the worker is waiting
pthread_mutex_lock(&(this->work_mtx));
// printf("cdo_task_delete: send DIE to <worker>\n");
task_info->state = DIE;
// printf("CdoTask::finish: send DIE to <worker>\n");
this->state = CdoTaskState::DIE;
// wake-up signal
pthread_cond_signal(&(task_info->work_cond));
pthread_mutex_unlock(&(task_info->work_mtx));
// wake-up signal
pthread_cond_signal(&(this->work_cond));
pthread_mutex_unlock(&(this->work_mtx));
// wait for thread to exit
pthread_join(task_info->thread, nullptr);
// wait for thread to exit
pthread_join(this->thread, nullptr);
pthread_mutex_destroy(&(task_info->work_mtx));
pthread_cond_destroy(&(task_info->work_cond));
pthread_mutex_destroy(&(this->work_mtx));
pthread_cond_destroy(&(this->work_cond));
pthread_mutex_unlock(&(task_info->boss_mtx));
pthread_mutex_destroy(&(task_info->boss_mtx));
pthread_cond_destroy(&(task_info->boss_cond));
}
pthread_mutex_unlock(&(this->boss_mtx));
pthread_mutex_destroy(&(this->boss_mtx));
pthread_cond_destroy(&(this->boss_cond));
#endif
if (task_info) free(task_info);
}
#ifdef TEST_CDO_TASK
// g++ -DTEST_CDO_TASK -DHAVE_LIBPTHREAD cdo_task.cc
// g++ -g -DTEST_CDO_TASK -DHAVE_LIBPTHREAD cdo_task.cc
void *
mytask(void *arg)
......@@ -248,23 +191,18 @@ mytask(void *arg)
int
main(int argc, char **argv)
{
CDO_task = true;
CdoTask cdo_task;
void *task = cdo_task_new();
printf("Init done\n");
void *myarg = nullptr;
void *myresult;
cdo_task_start(task, mytask, myarg);
myresult = cdo_task_wait(task);
cdo_task.start(mytask, myarg);
cdo_task_start(task, mytask, myarg);
myresult = cdo_task.wait();
myresult = cdo_task_wait(task);
cdo_task.start(mytask, myarg);
cdo_task_delete(task);
myresult = cdo_task.wait();
return 0;
}
......
......@@ -14,14 +14,45 @@
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
*/
#ifndef CDO_TASK_H
#define CDO_TASK_H
extern bool CDO_task;
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef HAVE_LIBPTHREAD
#include <pthread.h>
#endif
enum class CdoTaskState
{
SETUP,
IDLE,
JOB,
DIE
};
class CdoTask
{
public:
void *(*routine)(void *);
void *arg;
void *result;
enum CdoTaskState state;
#ifdef HAVE_LIBPTHREAD
pthread_t thread;
pthread_cond_t work_cond;
pthread_mutex_t work_mtx;
pthread_cond_t boss_cond;
pthread_mutex_t boss_mtx;
#endif
void cdo_task_start(void *task, void *(*task_routine)(void *), void *task_arg);
void *cdo_task_wait(void *task);
void *cdo_task_new();
void cdo_task_delete(void *task);
CdoTask();
~CdoTask();
void start(void *(*task_routine)(void *), void *task_arg);
void *wait();
};
#endif /* CDO_TASK_H */
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment