Commit 4eabf6f4 authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

Fix: --without-threads failed (use HAVE_LIBPTHREAD in async_worker.c).

parent 482bf465
2020-04-08 Uwe Schulzweida
* Fix: --without-threads failed (use HAVE_LIBPTHREAD in async_worker.c)
2020-03-05 Uwe Schulzweida 2020-03-05 Uwe Schulzweida
* vlistCopyFlag: init mlevelID with levID2 (bug fix) * vlistCopyFlag: init mlevelID with levID2 (bug fix)
......
...@@ -165,6 +165,7 @@ void usage(void) ...@@ -165,6 +165,7 @@ void usage(void)
fprintf(stderr, " Options:\n"); fprintf(stderr, " Options:\n");
fprintf(stderr, " -d Print debugging information\n"); fprintf(stderr, " -d Print debugging information\n");
fprintf(stderr, " -f <format> Format of the output file. (grb, grb2, nc, nc2, nc4, nc4c, nc5, srv, ext or ieg)\n"); fprintf(stderr, " -f <format> Format of the output file. (grb, grb2, nc, nc2, nc4, nc4c, nc5, srv, ext or ieg)\n");
fprintf(stderr, " -i <num> Number of worker to decode/decompress GRIB records\n");
fprintf(stderr, " -s give short information if ofile is missing\n"); fprintf(stderr, " -s give short information if ofile is missing\n");
fprintf(stderr, " -t <table> Parameter table name/file\n"); fprintf(stderr, " -t <table> Parameter table name/file\n");
fprintf(stderr, " Predefined tables: "); fprintf(stderr, " Predefined tables: ");
......
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef HAVE_LIBPTHREAD
#include <pthread.h>
#endif
#include "async_worker.h" #include "async_worker.h"
#include "cdi.h" #include "cdi.h"
#include "error.h" #include "error.h"
#include <pthread.h>
#include <stdbool.h> #include <stdbool.h>
#ifdef HAVE_LIBPTHREAD
#ifdef __APPLE__ #ifdef __APPLE__
#include <dispatch/dispatch.h> #include <dispatch/dispatch.h>
#else #else
...@@ -20,8 +28,24 @@ typedef struct sema { ...@@ -20,8 +28,24 @@ typedef struct sema {
sem_t sem; sem_t sem;
#endif #endif
} sema_t; } sema_t;
#endif
struct AsyncJob {
bool inUse;
#ifdef HAVE_LIBPTHREAD
sema_t request, completion;
#endif
int (*work)(void *data);
void *data;
int result;
};
struct AsyncManager {
int workerCount, idleWorkerCount;
AsyncJob *communicators;
};
#ifdef HAVE_LIBPTHREAD
static inline int static inline int
sema_init(sema_t *s, int pshared, uint32_t value) sema_init(sema_t *s, int pshared, uint32_t value)
{ {
...@@ -63,21 +87,6 @@ sema_post(sema_t *s) ...@@ -63,21 +87,6 @@ sema_post(sema_t *s)
return 0; return 0;
} }
struct AsyncJob {
bool inUse;
sema_t request, completion;
int (*work)(void *data);
void *data;
int result;
};
struct AsyncManager {
int workerCount, idleWorkerCount;
AsyncJob *communicators;
};
static static
void *workerMain(void *arg) void *workerMain(void *arg)
{ {
...@@ -115,6 +124,7 @@ void startWorker(AsyncJob *communicator) ...@@ -115,6 +124,7 @@ void startWorker(AsyncJob *communicator)
if (pthread_create(&worker, NULL, workerMain, communicator)) xabort("pthread_create() failed"); if (pthread_create(&worker, NULL, workerMain, communicator)) xabort("pthread_create() failed");
if (pthread_detach(worker)) xabort("pthread_detach() failed"); if (pthread_detach(worker)) xabort("pthread_detach() failed");
} }
#endif
int AsyncWorker_init(AsyncManager **jobManager, int threadCount) int AsyncWorker_init(AsyncManager **jobManager, int threadCount)
{ {
...@@ -126,6 +136,7 @@ int AsyncWorker_init(AsyncManager **jobManager, int threadCount) ...@@ -126,6 +136,7 @@ int AsyncWorker_init(AsyncManager **jobManager, int threadCount)
if (*jobManager) return CDI_NOERR; if (*jobManager) return CDI_NOERR;
#ifdef HAVE_LIBPTHREAD
*jobManager = malloc(sizeof(AsyncManager)); *jobManager = malloc(sizeof(AsyncManager));
if(!*jobManager) return CDI_ESYSTEM; if(!*jobManager) return CDI_ESYSTEM;
(*jobManager)->workerCount = threadCount; (*jobManager)->workerCount = threadCount;
...@@ -134,7 +145,11 @@ int AsyncWorker_init(AsyncManager **jobManager, int threadCount) ...@@ -134,7 +145,11 @@ int AsyncWorker_init(AsyncManager **jobManager, int threadCount)
for (int i = 0; i < threadCount; i++) startWorker(&((*jobManager)->communicators[i])); for (int i = 0; i < threadCount; i++) startWorker(&((*jobManager)->communicators[i]));
(*jobManager)->idleWorkerCount = threadCount; (*jobManager)->idleWorkerCount = threadCount;
#else
Error("pthread support not compiled in!");
#endif
return CDI_NOERR; return CDI_NOERR;
} }
...@@ -163,8 +178,9 @@ AsyncJob *AsyncWorker_requestWork(AsyncManager *jobManager, int (*work)(void *da ...@@ -163,8 +178,9 @@ AsyncJob *AsyncWorker_requestWork(AsyncManager *jobManager, int (*work)(void *da
worker->work = work; worker->work = work;
worker->data = data; worker->data = data;
worker->result = 0; worker->result = 0;
#ifdef HAVE_LIBPTHREAD
if (sema_post(&worker->request)) xabort("sema_post() failed"); if (sema_post(&worker->request)) xabort("sema_post() failed");
#endif
return worker; return worker;
} }
...@@ -175,7 +191,9 @@ int AsyncWorker_wait(AsyncManager *jobManager, AsyncJob *job) ...@@ -175,7 +191,9 @@ int AsyncWorker_wait(AsyncManager *jobManager, AsyncJob *job)
if (job >= jobManager->communicators + jobManager->workerCount) return CDI_EINVAL; if (job >= jobManager->communicators + jobManager->workerCount) return CDI_EINVAL;
if (!job->inUse) return CDI_EINVAL; if (!job->inUse) return CDI_EINVAL;
#ifdef HAVE_LIBPTHREAD
while (sema_wait(&job->completion)) ; while (sema_wait(&job->completion)) ;
#endif
int result = job->result; int result = job->result;
// reset the communicator // reset the communicator
...@@ -215,8 +233,9 @@ int AsyncWorker_finalize(AsyncManager *jobManager) ...@@ -215,8 +233,9 @@ int AsyncWorker_finalize(AsyncManager *jobManager)
curWorker->work = NULL; curWorker->work = NULL;
curWorker->data = NULL; curWorker->data = NULL;
curWorker->result = 0; curWorker->result = 0;
#ifdef HAVE_LIBPTHREAD
if (sema_post(&curWorker->request)) xabort("sema_post() failed"); if (sema_post(&curWorker->request)) xabort("sema_post() failed");
#endif
// wait for the worker to exit // wait for the worker to exit
AsyncWorker_wait(jobManager, curWorker); AsyncWorker_wait(jobManager, curWorker);
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment