Commit 7fa1cf0a authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

async_worker cleanup.

parent f1c310e5
......@@ -29,6 +29,7 @@ sema_init(sema_t *s, int pshared, uint32_t value)
#ifdef __APPLE__
dispatch_semaphore_t *sem = &s->sem;
(void)pshared;
*sem = dispatch_semaphore_create(value);
#else
status = sem_init(&s->sem, pshared, value);
......@@ -66,14 +67,14 @@ sema_post(sema_t *s)
struct AsyncJob {
bool inUse;
sema_t request, completion;
int (*work)(void* data);
void* data;
int (*work)(void *data);
void *data;
int result;
};
typedef struct AsyncManager {
int workerCount, idleWorkerCount;
AsyncJob* communicators;
AsyncJob *communicators;
} AsyncManager;
static AsyncManager *gManager = NULL;
......@@ -81,18 +82,19 @@ static AsyncManager *gManager = NULL;
static
void *workerMain(void *arg)
{
AsyncJob* communicator = arg;
while(true)
AsyncJob *communicator = arg;
while (true)
{
while(sema_wait(&communicator->request)) ;
if(communicator->work)
while (sema_wait(&communicator->request)) ;
if (communicator->work)
{
communicator->result = communicator->work(communicator->data);
if(sema_post(&communicator->completion)) xabort("sema_post() failed");
if (sema_post(&communicator->completion)) xabort("sema_post() failed");
}
else
{
if(sema_post(&communicator->completion)) xabort("sema_post() failed");
if (sema_post(&communicator->completion)) xabort("sema_post() failed");
break;
}
}
......@@ -107,16 +109,17 @@ void startWorker(AsyncJob *communicator)
communicator->work = NULL;
communicator->data = NULL;
communicator->result = 0;
if(sema_init(&communicator->request, 0, 0)) xabort("sema_init() failed");
if(sema_init(&communicator->completion, 0, 0)) xabort("sema_init() failed");
if (sema_init(&communicator->request, 0, 0)) xabort("sema_init() failed");
if (sema_init(&communicator->completion, 0, 0)) xabort("sema_init() failed");
pthread_t worker;
if(pthread_create(&worker, NULL, workerMain, communicator)) xabort("pthread_create() failed");
if(pthread_detach(worker)) xabort("pthread_detach() failed");
if (pthread_create(&worker, NULL, workerMain, communicator)) xabort("pthread_create() failed");
if (pthread_detach(worker)) xabort("pthread_detach() failed");
}
int AsyncWorker_init(int threadCount)
{
if(threadCount <= 0)
if (threadCount <= 0)
{
xabort("CPU core count discovery not implemented yet");
return CDI_EINVAL; // TODO: discover CPU core count, and set threadCount to a sensible positive value
......@@ -128,22 +131,24 @@ int AsyncWorker_init(int threadCount)
if(!gManager) return CDI_ESYSTEM;
gManager->workerCount = threadCount;
gManager->communicators = malloc(threadCount*sizeof*gManager->communicators);
if(!gManager->communicators) xabort("memory allocation failure");
for(int i = 0; i < threadCount; i++) startWorker(&gManager->communicators[i]);
if (!gManager->communicators) xabort("memory allocation failure");
for (int i = 0; i < threadCount; i++) startWorker(&gManager->communicators[i]);
gManager->idleWorkerCount = threadCount;
return CDI_NOERR;
}
AsyncJob *AsyncWorker_requestWork(int (*work)(void* data), void* data)
AsyncJob *AsyncWorker_requestWork(int (*work)(void *data), void *data)
{
if(!gManager) xabort("AsyncWorker_requestWork() called without calling AsyncWorker_init() first");
if(!work) xabort("AsyncWorker_requestWork() called without a valid function pointer"); //need to catch this condition to stop users from terminating our worker threads
if (!gManager) xabort("AsyncWorker_requestWork() called without calling AsyncWorker_init() first");
if (!work) xabort("AsyncWorker_requestWork() called without a valid function pointer"); //need to catch this condition to stop users from terminating our worker threads
//find an unused worker
// find an unused worker
if(!gManager->idleWorkerCount) return NULL;
AsyncJob* worker = NULL;
for(int i = 0; i < gManager->workerCount; i++)
AsyncJob *worker = NULL;
for (int i = 0; i < gManager->workerCount; i++)
{
if(!gManager->communicators[i].inUse)
{
......@@ -151,30 +156,30 @@ AsyncJob *AsyncWorker_requestWork(int (*work)(void* data), void* data)
break;
}
}
if(!worker) xabort("internal error: idleWorkerCount is not in sync with the worker states, please report this bug");
if (!worker) xabort("internal error: idleWorkerCount is not in sync with the worker states, please report this bug");
//pass the request to that worker
// pass the request to that worker
gManager->idleWorkerCount--;
worker->inUse = true;
worker->work = work;
worker->data = data;
worker->result = 0;
if(sema_post(&worker->request)) xabort("sema_post() failed");
if (sema_post(&worker->request)) xabort("sema_post() failed");
return worker;
}
int AsyncWorker_wait(AsyncJob *job)
{
if(!gManager) xabort("AsyncWorker_wait() called without calling AsyncWorker_init() first");
if(job < gManager->communicators) return CDI_EINVAL;
if(job >= gManager->communicators + gManager->workerCount) return CDI_EINVAL;
if(!job->inUse) return CDI_EINVAL;
if (!gManager) xabort("AsyncWorker_wait() called without calling AsyncWorker_init() first");
if (job < gManager->communicators) return CDI_EINVAL;
if (job >= gManager->communicators + gManager->workerCount) return CDI_EINVAL;
if (!job->inUse) return CDI_EINVAL;
while(sema_wait(&job->completion)) ;
while (sema_wait(&job->completion)) ;
int result = job->result;
//reset the communicator
// reset the communicator
job->work = NULL;
job->data = NULL;
job->result = 0;
......@@ -186,35 +191,37 @@ int AsyncWorker_wait(AsyncJob *job)
int AsyncWorker_availableWorkers()
{
if(!gManager) return 0;
if (!gManager) return 0;
return gManager->idleWorkerCount;
}
int AsyncWorker_finalize()
{
int result = CDI_NOERR;
if(!gManager) return CDI_NOERR;
for(int i = 0; i < gManager->workerCount; i++)
if (!gManager) return CDI_NOERR;
for (int i = 0; i < gManager->workerCount; i++)
{
AsyncJob* curWorker = &gManager->communicators[i];
AsyncJob *curWorker = &gManager->communicators[i];
//finish any pending job
// finish any pending job
if(curWorker->inUse)
{
AsyncWorker_wait(curWorker);
if(curWorker->result) result = curWorker->result;
}
//send the teardown signal
// send the teardown signal
curWorker->inUse = true;
curWorker->work = NULL;
curWorker->data = NULL;
curWorker->result = 0;
if(sema_post(&curWorker->request)) xabort("sema_post() failed");
if (sema_post(&curWorker->request)) xabort("sema_post() failed");
//wait for the worker to exit
// wait for the worker to exit
AsyncWorker_wait(curWorker);
}
free(gManager->communicators);
free(gManager);
gManager = NULL;
......
......@@ -6,11 +6,11 @@ typedef struct AsyncJob AsyncJob;
// a negative threadCount gives the number of cores that should remain unused by the worker threads, returns an error code
int AsyncWorker_init(int threadCount);
//executes work(data) in a worker thread, must be followed by a call to AsyncWorker_wait()
AsyncJob* AsyncWorker_requestWork(int (*work)(void* data), void* data);
// executes work(data) in a worker thread, must be followed by a call to AsyncWorker_wait()
AsyncJob *AsyncWorker_requestWork(int (*work)(void *data), void *data);
// waits for the async job to finish and returns its result (or some other error code)
int AsyncWorker_wait(AsyncJob* job);
int AsyncWorker_wait(AsyncJob *job);
// return the number of workers that are currently idle
int AsyncWorker_availableWorkers();
......
......@@ -182,7 +182,7 @@ void JobDescriptor_startJob(JobDescriptor *me, stream_t *streamptr, int recID, i
}
static
void JobDescriptor_finishJob(JobDescriptor* me, void *data, size_t *nmiss)
void JobDescriptor_finishJob(JobDescriptor *me, void *data, size_t *nmiss)
{
if (AsyncWorker_wait(me->job)) xabort("error executing job in worker thread");
memcpy(data, me->args.data, me->args.gridsize*(me->args.memtype == MEMTYPE_FLOAT ? sizeof(float) : sizeof(double)));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment