Commit 0be56cd5 authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

cdo_task: added namespace cdo.

parent d105b581
Pipeline #2560 passed with stages
in 16 minutes and 30 seconds
......@@ -64,7 +64,7 @@ struct RARG
struct Control *globs;
};
CdoTask *afterReadTask = nullptr;
cdo::Task *afterReadTask = nullptr;
static bool lstdout = true;
......@@ -594,7 +594,7 @@ after_control(struct Control *globs, struct Variable *vars)
int code;
RARG rarg;
if (afterReadAsync) afterReadTask = new CdoTask;
if (afterReadAsync) afterReadTask = new cdo::Task;
for (code = 0; code < MaxCodes; code++) vars[code].needed0 = vars[code].needed;
......
......@@ -155,7 +155,7 @@ addOperators(void)
void *
Ensstat(void *process)
{
CdoTask *task = Options::CDO_task ? new CdoTask : nullptr;
cdo::Task *task = Options::CDO_task ? new cdo::Task : nullptr;
int nrecs0;
cdoInitialize(process);
......
......@@ -330,10 +330,10 @@ XTimstat(void *process)
bool lparallelread = Options::CDO_Parallel_Read > 0;
bool ltsfirst = true;
CdoTask *read_task = nullptr;
cdo::Task *read_task = nullptr;
void *readresult = nullptr;
if (lparallelread) read_task = new CdoTask;
if (lparallelread) read_task = new cdo::Task;
int tsID = 0;
int otsID = 0;
......
......@@ -244,7 +244,7 @@ template <typename... Args>
void
afterAbort(const std::string &format, Args const &... args)
{
extern CdoTask *afterReadTask;
extern cdo::Task *afterReadTask;
extern bool afterReadAsync;
if (afterReadAsync && afterReadTask) afterReadTask->wait();
cdoAbort(format, args...);
......
......@@ -29,7 +29,7 @@
static void *
cdo_task(void *task)
{
CdoTask *task_info = (CdoTask *) task;
cdo::Task *task_info = (cdo::Task *) task;
{
// cond_wait mutex must be locked before we can wait
......@@ -41,7 +41,7 @@ cdo_task(void *task)
pthread_mutex_lock(&(task_info->boss_mtx));
// signal to boss that setup is complete
task_info->state = CdoTaskState::IDLE;
task_info->state = cdo::TaskState::IDLE;
// wake-up signal
pthread_cond_signal(&(task_info->boss_cond));
......@@ -51,9 +51,9 @@ cdo_task(void *task)
{
pthread_cond_wait(&(task_info->work_cond), &(task_info->work_mtx));
if (CdoTaskState::DIE == task_info->state) break; // kill thread
if (cdo::TaskState::DIE == task_info->state) break; // kill thread
if (CdoTaskState::IDLE == task_info->state) continue; // accidental wake-up
if (cdo::TaskState::IDLE == task_info->state) continue; // accidental wake-up
// do blocking task
// printf("<worker> JOB start\n");
......@@ -64,7 +64,7 @@ cdo_task(void *task)
pthread_mutex_lock(&(task_info->boss_mtx));
// indicate that job is done
task_info->state = CdoTaskState::IDLE;
task_info->state = cdo::TaskState::IDLE;
// wake-up signal
pthread_cond_signal(&(task_info->boss_cond));
......@@ -80,8 +80,10 @@ cdo_task(void *task)
}
#endif
namespace cdo
{
void
CdoTask::start(void *(*task_routine)(void *), void *task_arg)
Task::start(void *(*task_routine)(void *), void *task_arg)
{
// ensure worker is waiting
#ifdef HAVE_LIBPTHREAD
......@@ -91,7 +93,7 @@ CdoTask::start(void *(*task_routine)(void *), void *task_arg)
// set job information & state
this->routine = task_routine;
this->arg = task_arg;
this->state = CdoTaskState::JOB;
this->state = cdo::TaskState::JOB;
#ifndef HAVE_LIBPTHREAD
this->result = this->routine(this->arg);
......@@ -105,28 +107,28 @@ CdoTask::start(void *(*task_routine)(void *), void *task_arg)
}
void *
CdoTask::wait()
Task::wait()
{
#ifdef HAVE_LIBPTHREAD
while (1)
{
if (CdoTaskState::IDLE == this->state) break;
if (cdo::TaskState::IDLE == this->state) break;
pthread_cond_wait(&(this->boss_cond), &(this->boss_mtx));
// if (CdoTaskState::IDLE == task_info->state) break;
// if (cdo::TaskState::IDLE == task_info->state) break;
}
#endif
return this->result;
}
CdoTask::CdoTask()
Task::Task()
{
this->routine = nullptr;
this->arg = nullptr;
this->result = nullptr;
this->state = CdoTaskState::SETUP;
this->state = cdo::TaskState::SETUP;
#ifdef HAVE_LIBPTHREAD
pthread_attr_t attr;
......@@ -153,14 +155,14 @@ CdoTask::CdoTask()
#endif
}
CdoTask::~CdoTask()
Task::~Task()
{
#ifdef HAVE_LIBPTHREAD
// ensure the worker is waiting
pthread_mutex_lock(&(this->work_mtx));
// printf("CdoTask::finish: send DIE to <worker>\n");
this->state = CdoTaskState::DIE;
// printf("Task::delete: send DIE to <worker>\n");
this->state = cdo::TaskState::DIE;
// wake-up signal
pthread_cond_signal(&(this->work_cond));
......@@ -177,6 +179,7 @@ CdoTask::~CdoTask()
pthread_cond_destroy(&(this->boss_cond));
#endif
}
} // namespace cdo
#ifdef TEST_CDO_TASK
// g++ -g -DTEST_CDO_TASK -DHAVE_LIBPTHREAD cdo_task.cc
......@@ -191,18 +194,18 @@ mytask(void *arg)
int
main(int argc, char **argv)
{
CdoTask cdo_task;
cdo::Task task;
void *myarg = nullptr;
void *myresult;
cdo_task.start(mytask, myarg);
task.start(mytask, myarg);
myresult = cdo_task.wait();
myresult = task.wait();
cdo_task.start(mytask, myarg);
task.start(mytask, myarg);
myresult = cdo_task.wait();
myresult = task.wait();
return 0;
}
......
......@@ -26,7 +26,9 @@
#include <pthread.h>
#endif
enum class CdoTaskState
namespace cdo
{
enum class TaskState
{
SETUP,
IDLE,
......@@ -34,13 +36,13 @@ enum class CdoTaskState
DIE
};
class CdoTask
class Task
{
public:
void *(*routine)(void *);
void *arg;
void *result;
enum CdoTaskState state;
enum TaskState state;
#ifdef HAVE_LIBPTHREAD
pthread_t thread;
pthread_cond_t work_cond;
......@@ -49,10 +51,11 @@ class CdoTask
pthread_mutex_t boss_mtx;
#endif
CdoTask();
~CdoTask();
Task();
~Task();
void start(void *(*task_routine)(void *), void *task_arg);
void *wait();
};
} // namespace cdo
#endif /* CDO_TASK_H */
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment