Commit 96e5cf35 authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

GRIB decoding parallelization.

parent e6726fab
......@@ -3,6 +3,10 @@
* using CGRIBEX library version 1.9.3
* Version 1.9.7 released
2019-05-30 Uwe Schulzweida
* GRIB decoding parallelization [patch from Nathanael Huebbe]
2019-05-27 Uwe Schulzweida
* GRIB: Support of typeOfStatisticalProcessing = 11 (Summation) [Feature #9055]
......
......@@ -70,6 +70,7 @@ case "${HOSTNAME}" in
--enable-iso-c-interface \
--enable-swig \
--enable-python \
--with-szlib=$HOME/local/lib \
--with-eccodes=$HOME/local/eccodes-2.3.0 \
--with-netcdf=$HOME/local \
LDFLAGS="-Wl,-rpath,$HOME/local/eccodes-2.3.0/lib" \
......
......@@ -75,10 +75,10 @@ typedef struct AsyncManager {
AsyncJob* communicators;
} AsyncManager;
static AsyncManager* gManager = NULL;
static AsyncManager *gManager = NULL;
static
void* workerMain(void* arg)
void *workerMain(void *arg)
{
AsyncJob* communicator = arg;
while(true)
......@@ -95,11 +95,12 @@ void* workerMain(void* arg)
break;
}
}
return NULL;
}
static
void startWorker(AsyncJob* communicator)
void startWorker(AsyncJob *communicator)
{
communicator->inUse = false;
communicator->work = NULL;
......@@ -117,9 +118,10 @@ int AsyncWorker_init(int threadCount)
if(threadCount <= 0)
{
xabort("CPU core count discovery not implemented yet");
return CDI_EINVAL; //TODO: discover CPU core count, and set threadCount to a sensible positive value
return CDI_EINVAL; // TODO: discover CPU core count, and set threadCount to a sensible positive value
}
if(gManager) return CDI_NOERR;
if (gManager) return CDI_NOERR;
gManager = malloc(sizeof*gManager);
if(!gManager) return CDI_ESYSTEM;
......@@ -132,7 +134,7 @@ int AsyncWorker_init(int threadCount)
return CDI_NOERR;
}
AsyncJob* AsyncWorker_requestWork(int (*work)(void* data), void* data)
AsyncJob *AsyncWorker_requestWork(int (*work)(void* data), void* data)
{
if(!gManager) xabort("AsyncWorker_requestWork() called without calling AsyncWorker_init() first");
if(!work) xabort("AsyncWorker_requestWork() called without a valid function pointer"); //need to catch this condition to stop users from terminating our worker threads
......@@ -161,7 +163,7 @@ AsyncJob* AsyncWorker_requestWork(int (*work)(void* data), void* data)
return worker;
}
int AsyncWorker_wait(AsyncJob* job)
int AsyncWorker_wait(AsyncJob *job)
{
if(!gManager) xabort("AsyncWorker_wait() called without calling AsyncWorker_init() first");
if(job < gManager->communicators) return CDI_EINVAL;
......@@ -215,5 +217,6 @@ int AsyncWorker_finalize()
free(gManager->communicators);
free(gManager);
gManager = NULL;
return result;
}
......@@ -255,7 +255,6 @@ typedef struct {
int accesstype; // TYPE_REC or TYPE_VAR
int accessmode;
int filetype;
int numWorker;
int byteorder;
int fileID;
int filemode;
......@@ -296,6 +295,11 @@ typedef struct {
void *gribContainers;
#endif
int numWorker;
int nextRecID;
int cachedTsID;
void *jobs;
void *gh; // grib handle
}
stream_t;
......
......@@ -4,6 +4,7 @@
#ifdef HAVE_LIBGRIB
#include "async_worker.h"
#include "dmemory.h"
#include "cdi.h"
#include "cdi_int.h"
......@@ -16,7 +17,7 @@
static
int grbDecode(int filetype, int memtype, void *cgribexp, void *gribbuffer, size_t gribsize, void *data, size_t datasize,
int unreduced, size_t *nmiss, double missval, int vlistID, int varID)
int unreduced, size_t *nmiss, double missval)
{
int status = 0;
......@@ -36,7 +37,7 @@ int grbDecode(int filetype, int memtype, void *cgribexp, void *gribbuffer, size_
{
void *datap = (memtype == MEMTYPE_FLOAT) ? Malloc(datasize*sizeof(double)) : data;
status = gribapiDecode(gribbuffer, gribsize, datap, datasize, unreduced, nmiss, missval, vlistID, varID);
status = gribapiDecode(gribbuffer, gribsize, datap, datasize, unreduced, nmiss, missval);
if ( memtype == MEMTYPE_FLOAT )
{
......@@ -48,7 +49,6 @@ int grbDecode(int filetype, int memtype, void *cgribexp, void *gribbuffer, size_
}
#else
{
(void)vlistID; (void)varID;
Error("ecCodes support not compiled in!");
}
#endif
......@@ -56,43 +56,36 @@ int grbDecode(int filetype, int memtype, void *cgribexp, void *gribbuffer, size_
return status;
}
// Decompresses the grib data in gribbuffer.
static
int grbUnzipRecord(void *gribbuffer, size_t *gribsize)
{
int zip = 0;
int izip;
size_t unzipsize;
size_t igribsize = *gribsize;
const size_t igribsize = *gribsize;
size_t ogribsize = *gribsize;
int izip;
size_t unzipsize;
if ( (izip = gribGetZip(igribsize, (unsigned char *)gribbuffer, &unzipsize)) > 0 )
{
zip = izip;
if ( izip == 128 ) /* szip */
{
unsigned char *itmpbuffer = NULL;
size_t itmpbuffersize = 0;
if ( unzipsize < igribsize )
{
fprintf(stderr, "Decompressed size smaller than compressed size (in %zu; out %zu)!\n", igribsize, unzipsize);
return 0;
}
if ( itmpbuffersize < igribsize )
{
itmpbuffersize = igribsize;
itmpbuffer = (unsigned char *) Realloc(itmpbuffer, itmpbuffersize);
}
memcpy(itmpbuffer, gribbuffer, itmpbuffersize);
unzipsize += 100; /* need 0 to 1 bytes for rounding of bds */
ogribsize = (size_t)gribUnzip((unsigned char *)gribbuffer, (long)unzipsize, itmpbuffer, (long)igribsize);
void *buffer = Malloc(igribsize);
memcpy(buffer, gribbuffer, igribsize);
ogribsize = (size_t)gribUnzip(gribbuffer, (long)unzipsize, buffer, (long)igribsize);
Free(itmpbuffer);
Free(buffer);
if ( ogribsize <= 0 ) Error("Decompression problem!");
}
......@@ -108,74 +101,202 @@ int grbUnzipRecord(void *gribbuffer, size_t *gribsize)
}
void grb_read_record(stream_t * streamptr, int memtype, void *data, size_t *nmiss)
typedef struct DecodeArgs {
int recID, *outZip, filetype, memtype, unreduced;
void *cgribexp, *gribbuffer, *data;
size_t recsize, gridsize, nmiss;
double missval;
} DecodeArgs;
static
int grb_decode_record(void *untypedArgs)
{
int filetype = streamptr->filetype;
DecodeArgs *args = untypedArgs;
*args->outZip = grbUnzipRecord(args->gribbuffer, &args->recsize);
grbDecode(args->filetype, args->memtype, args->cgribexp, args->gribbuffer, args->recsize, args->data, args->gridsize,
args->unreduced, &args->nmiss, args->missval);
return 0;
}
void *gribbuffer = streamptr->record->buffer;
void *cgribexp = streamptr->record->cgribexp;
static
DecodeArgs grb_read_raw_data(stream_t *streamptr, int recID, int memtype, void *gribbuffer, void *data, bool resetFilePos)
{
const int vlistID = streamptr->vlistID;
const int fileID = streamptr->fileID;
const int tsID = streamptr->curTsID; // FIXME: This should be looked up from the given recID
const off_t recpos = streamptr->tsteps[tsID].records[recID].position;
const size_t recsize = streamptr->tsteps[tsID].records[recID].size;
const int varID = streamptr->tsteps[tsID].records[recID].varID;
const int gridID = vlistInqVarGrid(vlistID, varID);
const size_t gridsize = gridInqSize(gridID);
if ( CDI_Debug ) Message("gridID = %d gridsize = %zu", gridID, gridsize);
if ( recsize == 0 ) Error("Internal problem! Recordsize is zero for record %d at timestep %d", recID+1, tsID+1);
if (!gribbuffer) gribbuffer = Malloc(recsize);
if (!data) data = Malloc(gridsize*(memtype == MEMTYPE_FLOAT ? sizeof(float) : sizeof(double)));
if (resetFilePos)
{
const off_t currentfilepos = fileGetPos(fileID);
fileSetPos(fileID, recpos, SEEK_SET);
if (fileRead(fileID, gribbuffer, recsize) != recsize) Error("Failed to read GRIB record");
fileSetPos(fileID, currentfilepos, SEEK_SET);
}
else
{
fileSetPos(fileID, recpos, SEEK_SET);
if (fileRead(fileID, gribbuffer, recsize) != recsize) Error("Failed to read GRIB record");
streamptr->numvals += gridsize;
}
return (DecodeArgs){
.recID = recID,
.outZip = &streamptr->tsteps[tsID].records[recID].zip,
.filetype = streamptr->filetype,
.memtype = memtype,
.cgribexp = streamptr->record->cgribexp,
.gribbuffer = gribbuffer,
.recsize = recsize,
.data = data,
.gridsize = gridsize,
.unreduced = streamptr->unreduced,
.nmiss = 0,
.missval = vlistInqVarMissval(vlistID, varID),
};
}
typedef struct JobDescriptor {
DecodeArgs args;
AsyncJob *job;
} JobDescriptor;
static
void JobDescriptor_startJob(JobDescriptor *me, stream_t *streamptr, int recID, int memtype, bool resetFilePos)
{
me->args = grb_read_raw_data(streamptr, recID, memtype, NULL, NULL, resetFilePos);
me->job = AsyncWorker_requestWork(grb_decode_record, &me->args);
if (!me->job) xabort("error while trying to send job to worker thread");
}
static
void JobDescriptor_finishJob(JobDescriptor* me, void *data, size_t *nmiss)
{
if(AsyncWorker_wait(me->job)) xabort("error executing job in worker thread");
memcpy(data, me->args.data, me->args.gridsize*(me->args.memtype == MEMTYPE_FLOAT ? sizeof(float) : sizeof(double)));
*nmiss = me->args.nmiss;
int vlistID = streamptr->vlistID;
int fileID = streamptr->fileID;
int tsID = streamptr->curTsID;
int vrecID = streamptr->tsteps[tsID].curRecID;
int recID = streamptr->tsteps[tsID].recIDs[vrecID];
off_t recpos = streamptr->tsteps[tsID].records[recID].position;
size_t recsize = streamptr->tsteps[tsID].records[recID].size;
int varID = streamptr->tsteps[tsID].records[recID].varID;
Free(me->args.gribbuffer);
Free(me->args.data);
me->args.recID = -1; // mark as inactive
}
static
void grb_read_next_record(stream_t *streamptr, int recID, int memtype, void *data, size_t *nmiss, bool resetFilePos)
{
bool jobFound = false;
const int workerCount = streamptr->numWorker;
if (workerCount > 0)
{
JobDescriptor *jobs = (JobDescriptor *) streamptr->jobs;
int gridID = vlistInqVarGrid(vlistID, varID);
size_t gridsize = gridInqSize(gridID);
// if this is the first call, init and start worker threads
tsteps_t *timestep = &streamptr->tsteps[streamptr->curTsID];
streamptr->numvals += gridsize;
if (!jobs)
{
jobs = malloc(workerCount*sizeof*jobs);
streamptr->jobs = jobs;
for (int i = 0; i < workerCount; i++) jobs[i].args.recID = -1;
if (AsyncWorker_init(workerCount)) xabort("error while trying to start worker threads");
}
fileSetPos(fileID, recpos, SEEK_SET);
if (recID == 0) streamptr->nextRecID = 0;
if (recID == 0) streamptr->cachedTsID = streamptr->curTsID; // no active workers -> we may start processing records of a new timestep
if (streamptr->cachedTsID == streamptr->curTsID)
{
// Start as many new jobs as possible.
for (int i = 0; streamptr->nextRecID < timestep->nrecs && i < workerCount; i++)
{
JobDescriptor *jd = &jobs[i];
if (jd->args.recID < 0)
{
JobDescriptor_startJob(jd, streamptr, timestep->recIDs[streamptr->nextRecID++], memtype, resetFilePos);
}
}
// search for a job descriptor with the given recID, and use its results if it exists
for (int i = 0; !jobFound && i < workerCount; i++)
{
JobDescriptor *jd = &jobs[i];
if (jd->args.recID == recID)
{
jobFound = true;
JobDescriptor_finishJob(jd, data, nmiss);
if (streamptr->nextRecID < timestep->nrecs)
{
JobDescriptor_startJob(jd, streamptr, timestep->recIDs[streamptr->nextRecID++], memtype, resetFilePos);
}
}
}
}
}
if (fileRead(fileID, gribbuffer, recsize) != recsize)
Error("Failed to read GRIB record");
// perform the work synchronously if we didn't start a job for it yet
if (!jobFound)
{
DecodeArgs args = grb_read_raw_data(streamptr, recID, memtype, streamptr->record->buffer, data, resetFilePos);
grb_decode_record(&args);
*nmiss = args.nmiss;
}
}
double missval = vlistInqVarMissval(vlistID, varID);
streamptr->tsteps[tsID].records[recID].zip = grbUnzipRecord(gribbuffer, &recsize);
void grb_read_record(stream_t *streamptr, int memtype, void *data, size_t *nmiss)
{
const int tsID = streamptr->curTsID;
const int vrecID = streamptr->tsteps[tsID].curRecID;
const int recID = streamptr->tsteps[tsID].recIDs[vrecID];
grbDecode(filetype, memtype, cgribexp, gribbuffer, recsize, data, gridsize, streamptr->unreduced, nmiss, missval, vlistID, varID);
grb_read_next_record(streamptr, recID, memtype, data, nmiss, false);
}
void grb_read_var(stream_t * streamptr, int varID, int memtype, void *data, size_t *nmiss)
void grb_read_var(stream_t *streamptr, int varID, int memtype, void *data, size_t *nmiss)
{
int filetype = streamptr->filetype;
const int filetype = streamptr->filetype;
void *gribbuffer = streamptr->record->buffer;
void *cgribexp = streamptr->record->cgribexp;
int vlistID = streamptr->vlistID;
int fileID = streamptr->fileID;
int tsID = streamptr->curTsID;
const int vlistID = streamptr->vlistID;
const int fileID = streamptr->fileID;
const int tsID = streamptr->curTsID;
const int gridID = vlistInqVarGrid(vlistID, varID);
const size_t gridsize = gridInqSize(gridID);
int gridID = vlistInqVarGrid(vlistID, varID);
size_t gridsize = gridInqSize(gridID);
const off_t currentfilepos = fileGetPos(fileID);
off_t currentfilepos = fileGetPos(fileID);
const int isub = subtypeInqActiveIndex(streamptr->vars[varID].subtypeID);
const int nlevs = streamptr->vars[varID].recordTable[0].nlevs;
int isub = subtypeInqActiveIndex(streamptr->vars[varID].subtypeID);
int nlevs = streamptr->vars[varID].recordTable[0].nlevs;
if ( CDI_Debug ) Message("nlevs = %d gridID = %d gridsize = %zu", nlevs, gridID, gridsize);
*nmiss = 0;
for (int levelID = 0; levelID < nlevs; levelID++ )
{
int recID = streamptr->vars[varID].recordTable[isub].recordID[levelID];
off_t recpos = streamptr->tsteps[tsID].records[recID].position;
const int recID = streamptr->vars[varID].recordTable[isub].recordID[levelID];
const off_t recpos = streamptr->tsteps[tsID].records[recID].position;
size_t recsize = streamptr->tsteps[tsID].records[recID].size;
fileSetPos(fileID, recpos, SEEK_SET);
fileRead(fileID, gribbuffer, recsize);
double missval = vlistInqVarMissval(vlistID, varID);
size_t imiss;
streamptr->tsteps[tsID].records[recID].zip = grbUnzipRecord(gribbuffer, &recsize);
void *datap = NULL;
......@@ -184,9 +305,9 @@ void grb_read_var(stream_t * streamptr, int varID, int memtype, void *data, size
else
datap = (double*)data + levelID*gridsize;
grbDecode(filetype, memtype, cgribexp, gribbuffer, recsize, datap, gridsize,
streamptr->unreduced, &imiss, missval, vlistID, varID);
const double missval = vlistInqVarMissval(vlistID, varID);
size_t imiss;
grbDecode(filetype, memtype, cgribexp, gribbuffer, recsize, datap, gridsize, streamptr->unreduced, &imiss, missval);
*nmiss += imiss;
}
......@@ -196,40 +317,10 @@ void grb_read_var(stream_t * streamptr, int varID, int memtype, void *data, size
void grb_read_var_slice(stream_t *streamptr, int varID, int levelID, int memtype, void *data, size_t *nmiss)
{
int filetype = streamptr->filetype;
void *gribbuffer = streamptr->record->buffer;
void *cgribexp = streamptr->record->cgribexp;
int vlistID = streamptr->vlistID;
int gridID = vlistInqVarGrid(vlistID, varID);
size_t gridsize = gridInqSize(gridID);
int tsID = streamptr->curTsID;
if ( CDI_Debug ) Message("gridID = %d gridsize = %zu", gridID, gridsize);
const int isub = subtypeInqActiveIndex(streamptr->vars[varID].subtypeID);
const int recID = streamptr->vars[varID].recordTable[isub].recordID[levelID];
int fileID = streamptr->fileID;
off_t currentfilepos = fileGetPos(fileID);
int isub = subtypeInqActiveIndex(streamptr->vars[varID].subtypeID);
int recID = streamptr->vars[varID].recordTable[isub].recordID[levelID];
off_t recpos = streamptr->tsteps[tsID].records[recID].position;
size_t recsize = streamptr->tsteps[tsID].records[recID].size;
if ( recsize == 0 )
Error("Internal problem! Recordsize is zero for record %d at timestep %d", recID+1, tsID+1);
fileSetPos(fileID, recpos, SEEK_SET);
fileRead(fileID, gribbuffer, recsize);
streamptr->tsteps[tsID].records[recID].zip = grbUnzipRecord(gribbuffer, &recsize);
double missval = vlistInqVarMissval(vlistID, varID);
grbDecode(filetype, memtype, cgribexp, gribbuffer, recsize, data, gridsize, streamptr->unreduced, nmiss, missval, vlistID, varID);
fileSetPos(fileID, currentfilepos, SEEK_SET);
grb_read_next_record(streamptr, recID, memtype, data, nmiss, true);
}
#endif
......@@ -234,8 +234,11 @@ int getByteswap(int byteorder)
void streamDefNumWorker(int streamID, int numWorker)
{
stream_t *streamptr = stream_to_pointer(streamID);
streamptr->numWorker = numWorker;
if (numWorker > 0)
{
stream_t *streamptr = stream_to_pointer(streamID);
streamptr->numWorker = numWorker;
}
}
......@@ -865,7 +868,6 @@ void streamDefaultValue ( stream_t * streamptr )
streamptr->accesstype = CDI_UNDEFID;
streamptr->accessmode = 0;
streamptr->filetype = CDI_FILETYPE_UNDEF;
streamptr->numWorker = 0;
streamptr->byteorder = CDI_UNDEFID;
streamptr->fileID = 0;
streamptr->filemode = 0;
......@@ -914,6 +916,11 @@ void streamDefaultValue ( stream_t * streamptr )
#endif
streamptr->gribContainers = NULL;
streamptr->numWorker = 0;
streamptr->nextRecID = 0;
streamptr->cachedTsID = -1;
streamptr->jobs = NULL;
}
static
......@@ -1101,6 +1108,8 @@ void streamClose(int streamID)
cdiVlistDestroy_(vlistID);
}
if (streamptr->jobs) free(streamptr->jobs);
stream_delete_entry(streamptr);
}
......
......@@ -1464,13 +1464,10 @@ int gribapiScanTimestep(stream_t * streamptr)
#endif
int gribapiDecode(void *gribbuffer, size_t gribsize, double *data, size_t gridsize,
int unreduced, size_t *nmiss, double missval, int vlistID, int varID)
int unreduced, size_t *nmiss, double missval)
{
int status = 0;
UNUSED(vlistID);
UNUSED(varID);
if ( unreduced )
{
static bool lwarn = true;
......
......@@ -10,7 +10,7 @@ int gribapiScanTimestep2(stream_t * streamptr);
int gribapiScanTimestep(stream_t * streamptr);
int gribapiDecode(void *gribbuffer, size_t gribsize, double *data, size_t datasize,
int unreduced, size_t *nmiss, double missval, int vlistID, int varID);
int unreduced, size_t *nmiss, double missval);
size_t gribapiEncode(int varID, int levelID, int vlistID, int gridID, int zaxisID,
int64_t vdate, int vtime, int tsteptype, int numavg,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment