Commit 69001163 authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

Make async worker manager stream local.

parent 7fa1cf0a
......@@ -72,12 +72,11 @@ struct AsyncJob {
int result;
};
typedef struct AsyncManager {
struct AsyncManager {
int workerCount, idleWorkerCount;
AsyncJob *communicators;
} AsyncManager;
};
static AsyncManager *gManager = NULL;
static
void *workerMain(void *arg)
......@@ -117,7 +116,7 @@ void startWorker(AsyncJob *communicator)
if (pthread_detach(worker)) xabort("pthread_detach() failed");
}
int AsyncWorker_init(int threadCount)
int AsyncWorker_init(AsyncManager **jobManager, int threadCount)
{
if (threadCount <= 0)
{
......@@ -125,41 +124,41 @@ int AsyncWorker_init(int threadCount)
return CDI_EINVAL; // TODO: discover CPU core count, and set threadCount to a sensible positive value
}
if (gManager) return CDI_NOERR;
if (*jobManager) return CDI_NOERR;
gManager = malloc(sizeof*gManager);
if(!gManager) return CDI_ESYSTEM;
gManager->workerCount = threadCount;
gManager->communicators = malloc(threadCount*sizeof*gManager->communicators);
if (!gManager->communicators) xabort("memory allocation failure");
*jobManager = malloc(sizeof(AsyncManager));
if(!*jobManager) return CDI_ESYSTEM;
(*jobManager)->workerCount = threadCount;
(*jobManager)->communicators = malloc(threadCount*sizeof(AsyncJob));
if (!(*jobManager)->communicators) xabort("memory allocation failure");
for (int i = 0; i < threadCount; i++) startWorker(&gManager->communicators[i]);
gManager->idleWorkerCount = threadCount;
for (int i = 0; i < threadCount; i++) startWorker(&((*jobManager)->communicators[i]));
(*jobManager)->idleWorkerCount = threadCount;
return CDI_NOERR;
}
AsyncJob *AsyncWorker_requestWork(int (*work)(void *data), void *data)
AsyncJob *AsyncWorker_requestWork(AsyncManager *jobManager, int (*work)(void *data), void *data)
{
if (!gManager) xabort("AsyncWorker_requestWork() called without calling AsyncWorker_init() first");
if (!jobManager) xabort("AsyncWorker_requestWork() called without calling AsyncWorker_init() first");
if (!work) xabort("AsyncWorker_requestWork() called without a valid function pointer"); //need to catch this condition to stop users from terminating our worker threads
// find an unused worker
if(!gManager->idleWorkerCount) return NULL;
if(!jobManager->idleWorkerCount) return NULL;
AsyncJob *worker = NULL;
for (int i = 0; i < gManager->workerCount; i++)
for (int i = 0; i < jobManager->workerCount; i++)
{
if(!gManager->communicators[i].inUse)
if (!jobManager->communicators[i].inUse)
{
worker = &gManager->communicators[i];
worker = &jobManager->communicators[i];
break;
}
}
if (!worker) xabort("internal error: idleWorkerCount is not in sync with the worker states, please report this bug");
// pass the request to that worker
gManager->idleWorkerCount--;
jobManager->idleWorkerCount--;
worker->inUse = true;
worker->work = work;
worker->data = data;
......@@ -169,11 +168,11 @@ AsyncJob *AsyncWorker_requestWork(int (*work)(void *data), void *data)
return worker;
}
int AsyncWorker_wait(AsyncJob *job)
int AsyncWorker_wait(AsyncManager *jobManager, AsyncJob *job)
{
if (!gManager) xabort("AsyncWorker_wait() called without calling AsyncWorker_init() first");
if (job < gManager->communicators) return CDI_EINVAL;
if (job >= gManager->communicators + gManager->workerCount) return CDI_EINVAL;
if (!jobManager) xabort("AsyncWorker_wait() called without calling AsyncWorker_init() first");
if (job < jobManager->communicators) return CDI_EINVAL;
if (job >= jobManager->communicators + jobManager->workerCount) return CDI_EINVAL;
if (!job->inUse) return CDI_EINVAL;
while (sema_wait(&job->completion)) ;
......@@ -184,31 +183,31 @@ int AsyncWorker_wait(AsyncJob *job)
job->data = NULL;
job->result = 0;
job->inUse = false;
gManager->idleWorkerCount++;
jobManager->idleWorkerCount++;
return result;
}
int AsyncWorker_availableWorkers()
int AsyncWorker_availableWorkers(AsyncManager *jobManager)
{
if (!gManager) return 0;
return gManager->idleWorkerCount;
if (!jobManager) return 0;
return jobManager->idleWorkerCount;
}
int AsyncWorker_finalize()
int AsyncWorker_finalize(AsyncManager *jobManager)
{
int result = CDI_NOERR;
if (!gManager) return CDI_NOERR;
if (!jobManager) return CDI_NOERR;
for (int i = 0; i < gManager->workerCount; i++)
for (int i = 0; i < jobManager->workerCount; i++)
{
AsyncJob *curWorker = &gManager->communicators[i];
AsyncJob *curWorker = &jobManager->communicators[i];
// finish any pending job
if(curWorker->inUse)
if (curWorker->inUse)
{
AsyncWorker_wait(curWorker);
if(curWorker->result) result = curWorker->result;
AsyncWorker_wait(jobManager, curWorker);
if (curWorker->result) result = curWorker->result;
}
// send the teardown signal
......@@ -219,12 +218,12 @@ int AsyncWorker_finalize()
if (sema_post(&curWorker->request)) xabort("sema_post() failed");
// wait for the worker to exit
AsyncWorker_wait(curWorker);
AsyncWorker_wait(jobManager, curWorker);
}
free(gManager->communicators);
free(gManager);
gManager = NULL;
free(jobManager->communicators);
free(jobManager);
jobManager = NULL;
return result;
}
......@@ -2,20 +2,21 @@
#define ASYNC_WORKER_H
typedef struct AsyncJob AsyncJob;
typedef struct AsyncManager AsyncManager;
// a negative threadCount gives the number of cores that should remain unused by the worker threads, returns an error code
int AsyncWorker_init(int threadCount);
int AsyncWorker_init(AsyncManager **jobManager, int threadCount);
// executes work(data) in a worker thread, must be followed by a call to AsyncWorker_wait()
AsyncJob *AsyncWorker_requestWork(int (*work)(void *data), void *data);
AsyncJob *AsyncWorker_requestWork(AsyncManager *jobManager, int (*work)(void *data), void *data);
// waits for the async job to finish and returns its result (or some other error code)
int AsyncWorker_wait(AsyncJob *job);
int AsyncWorker_wait(AsyncManager *jobManager, AsyncJob *job);
// return the number of workers that are currently idle
int AsyncWorker_availableWorkers();
int AsyncWorker_availableWorkers(AsyncManager *jobManager);
// waits for all pending jobs to finish, stops all workers, returns a non-zero error code from a pending job if there were any
int AsyncWorker_finalize();
int AsyncWorker_finalize(AsyncManager *jobManager);
#endif
......@@ -299,6 +299,7 @@ typedef struct {
int nextRecID;
int cachedTsID;
void *jobs;
void *jobManager;
void *gh; // grib handle
}
......
......@@ -174,17 +174,17 @@ typedef struct JobDescriptor {
} JobDescriptor;
static
void JobDescriptor_startJob(JobDescriptor *me, stream_t *streamptr, int recID, int memtype, bool resetFilePos)
void JobDescriptor_startJob(AsyncManager *jobManager, JobDescriptor *me, stream_t *streamptr, int recID, int memtype, bool resetFilePos)
{
me->args = grb_read_raw_data(streamptr, recID, memtype, NULL, NULL, resetFilePos);
me->job = AsyncWorker_requestWork(grb_decode_record, &me->args);
me->job = AsyncWorker_requestWork(jobManager, grb_decode_record, &me->args);
if (!me->job) xabort("error while trying to send job to worker thread");
}
static
void JobDescriptor_finishJob(JobDescriptor *me, void *data, size_t *nmiss)
void JobDescriptor_finishJob(AsyncManager *jobManager, JobDescriptor *me, void *data, size_t *nmiss)
{
if (AsyncWorker_wait(me->job)) xabort("error executing job in worker thread");
if (AsyncWorker_wait(jobManager, me->job)) xabort("error executing job in worker thread");
memcpy(data, me->args.data, me->args.gridsize*(me->args.memtype == MEMTYPE_FLOAT ? sizeof(float) : sizeof(double)));
*nmiss = me->args.nmiss;
......@@ -201,6 +201,7 @@ void grb_read_next_record(stream_t *streamptr, int recID, int memtype, void *dat
const int workerCount = streamptr->numWorker;
if (workerCount > 0)
{
AsyncManager *jobManager = (AsyncManager *) streamptr->jobManager;
JobDescriptor *jobs = (JobDescriptor *) streamptr->jobs;
// if this is the first call, init and start worker threads
......@@ -211,7 +212,8 @@ void grb_read_next_record(stream_t *streamptr, int recID, int memtype, void *dat
jobs = malloc(workerCount*sizeof*jobs);
streamptr->jobs = jobs;
for (int i = 0; i < workerCount; i++) jobs[i].args.recID = -1;
if (AsyncWorker_init(workerCount)) xabort("error while trying to start worker threads");
if (AsyncWorker_init(&jobManager, workerCount)) xabort("error while trying to start worker threads");
streamptr->jobManager = jobManager;
}
if (recID == 0) streamptr->nextRecID = 0;
......@@ -225,7 +227,7 @@ void grb_read_next_record(stream_t *streamptr, int recID, int memtype, void *dat
JobDescriptor *jd = &jobs[i];
if (jd->args.recID < 0)
{
JobDescriptor_startJob(jd, streamptr, timestep->recIDs[streamptr->nextRecID++], memtype, resetFilePos);
JobDescriptor_startJob(jobManager, jd, streamptr, timestep->recIDs[streamptr->nextRecID++], memtype, resetFilePos);
}
}
......@@ -236,10 +238,10 @@ void grb_read_next_record(stream_t *streamptr, int recID, int memtype, void *dat
if (jd->args.recID == recID)
{
jobFound = true;
JobDescriptor_finishJob(jd, data, nmiss);
JobDescriptor_finishJob(jobManager, jd, data, nmiss);
if (streamptr->nextRecID < timestep->nrecs)
{
JobDescriptor_startJob(jd, streamptr, timestep->recIDs[streamptr->nextRecID++], memtype, resetFilePos);
JobDescriptor_startJob(jobManager, jd, streamptr, timestep->recIDs[streamptr->nextRecID++], memtype, resetFilePos);
}
}
}
......
......@@ -30,6 +30,7 @@
#include "resource_handle.h"
#include "resource_unpack.h"
#include "namespace.h"
#include "async_worker.h"
static stream_t *stream_new_entry(int resH);
......@@ -921,6 +922,7 @@ void streamDefaultValue ( stream_t * streamptr )
streamptr->nextRecID = 0;
streamptr->cachedTsID = -1;
streamptr->jobs = NULL;
streamptr->jobManager = NULL;
}
static
......@@ -1056,7 +1058,7 @@ void streamClose(int streamID)
if ( CDI_Debug )
Message("streamID = %d filename = %s", streamID, streamptr->filename);
int vlistID = streamptr->vlistID;
const int vlistID = streamptr->vlistID;
void (*streamCloseDelegate)(stream_t *streamptr, int recordBufIsToBeDeleted)
= (void (*)(stream_t *, int))
......@@ -1078,7 +1080,7 @@ void streamClose(int streamID)
for ( int index = 0; index < streamptr->nvars; index++ )
{
sleveltable_t *pslev = streamptr->vars[index].recordTable;
unsigned nsub = streamptr->vars[index].subtypeSize >= 0
const unsigned nsub = streamptr->vars[index].subtypeSize >= 0
? (unsigned)streamptr->vars[index].subtypeSize : 0U;
for (size_t isub=0; isub < nsub; isub++)
{
......@@ -1109,6 +1111,7 @@ void streamClose(int streamID)
}
if (streamptr->jobs) free(streamptr->jobs);
if (streamptr->jobManager) AsyncWorker_finalize(streamptr->jobManager);
stream_delete_entry(streamptr);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment