Skip to content
Snippets Groups Projects
Commit 30e77551 authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

gribapi float interface works only for packed data

parent 14c8d749
No related branches found
No related tags found
2 merge requests!42Develop,!41M214003/develop
Pipeline #32758 failed
......@@ -19,7 +19,7 @@
#include "cgribex.h" /* gribZip gribGetZip gribGinfo */
static int
grb_decode(int filetype, int memType, void *cgribexp, void *gribbuffer, size_t gribsize, void *data, size_t datasize, int unreduced,
grb_decode(int filetype, int memType, int datatype, void *cgribexp, void *gribbuffer, size_t gribsize, void *data, size_t datasize, int unreduced,
size_t *nmiss, double missval)
{
int status = 0;
......@@ -38,12 +38,13 @@ grb_decode(int filetype, int memType, void *cgribexp, void *gribbuffer, size_t g
#ifdef HAVE_LIBGRIB_API
{
void *datap = data;
int memTypeX = have_gribapi_float_interface() ? memType : MEMTYPE_DOUBLE;
if (!have_gribapi_float_interface() && memType == MEMTYPE_FLOAT) datap = Malloc(datasize * sizeof(double));
bool useFloatInterface = (have_gribapi_float_interface() && datatype != CDI_DATATYPE_FLT32 && datatype != CDI_DATATYPE_FLT64);
int memTypeX = useFloatInterface ? memType : MEMTYPE_DOUBLE;
if (!useFloatInterface && memType == MEMTYPE_FLOAT) datap = Malloc(datasize * sizeof(double));
status = gribapiDecode(memTypeX, gribbuffer, gribsize, datap, datasize, unreduced, nmiss, missval);
if (!have_gribapi_float_interface() && memType == MEMTYPE_FLOAT)
if (!useFloatInterface&& memType == MEMTYPE_FLOAT)
{
float *dataf = (float *) data;
double *datad = (double *) datap;
......@@ -107,7 +108,7 @@ grib1_unzip_record(void *gribbuffer, size_t *gribsize)
typedef struct JobArgs
{
int recID, tsID, *outZip, filetype, memtype, unreduced;
int recID, tsID, *outZip, filetype, memType, datatype, unreduced;
void *cgribexp, *gribbuffer, *data;
size_t recsize, gridsize, nmiss;
double missval;
......@@ -118,13 +119,13 @@ grb_decode_record(void *untypedArgs)
{
JobArgs *args = (JobArgs *) untypedArgs;
*args->outZip = grib1_unzip_record(args->gribbuffer, &args->recsize);
grb_decode(args->filetype, args->memtype, args->cgribexp, args->gribbuffer, args->recsize, args->data, args->gridsize,
grb_decode(args->filetype, args->memType, args->datatype, args->cgribexp, args->gribbuffer, args->recsize, args->data, args->gridsize,
args->unreduced, &args->nmiss, args->missval);
return 0;
}
static JobArgs
grb_read_raw_data(stream_t *streamptr, int tsID, int recID, int memtype, void *gribbuffer, void *data, bool resetFilePos)
grb_read_raw_data(stream_t *streamptr, int tsID, int recID, int memType, void *gribbuffer, void *data, bool resetFilePos)
{
int vlistID = streamptr->vlistID;
int varID = streamptr->tsteps[tsID].records[recID].varID;
......@@ -136,7 +137,7 @@ grb_read_raw_data(stream_t *streamptr, int tsID, int recID, int memtype, void *g
void *cgribexp = (gribbuffer && streamptr->record->objectp) ? streamptr->record->objectp : NULL;
if (!gribbuffer) gribbuffer = Malloc(streamptr->record->buffersize);
if (!data) data = Malloc(gridsize * ((memtype == MEMTYPE_FLOAT) ? sizeof(float) : sizeof(double)));
if (!data) data = Malloc(gridsize * ((memType == MEMTYPE_FLOAT) ? sizeof(float) : sizeof(double)));
if (streamptr->protocol == CDI_PROTOCOL_FDB)
{
......@@ -167,7 +168,7 @@ grb_read_raw_data(stream_t *streamptr, int tsID, int recID, int memtype, void *g
.tsID = tsID,
.outZip = &streamptr->tsteps[tsID].records[recID].zip,
.filetype = streamptr->filetype,
.memtype = memtype,
.memType = memType,
.unreduced = streamptr->unreduced,
.cgribexp = cgribexp,
.gribbuffer = gribbuffer,
......@@ -176,13 +177,14 @@ grb_read_raw_data(stream_t *streamptr, int tsID, int recID, int memtype, void *g
.gridsize = gridsize,
.nmiss = 0,
.missval = vlistInqVarMissval(vlistID, varID),
.datatype = vlistInqVarDatatype(vlistID, varID),
};
}
static size_t
grb_read_and_decode_record(stream_t *streamptr, int recID, int memtype, void *data, bool resetFilePos)
grb_read_and_decode_record(stream_t *streamptr, int recID, int memType, void *data, bool resetFilePos)
{
JobArgs args = grb_read_raw_data(streamptr, streamptr->curTsID, recID, memtype, streamptr->record->buffer, data, resetFilePos);
JobArgs args = grb_read_raw_data(streamptr, streamptr->curTsID, recID, memType, streamptr->record->buffer, data, resetFilePos);
grb_decode_record(&args);
return args.nmiss;
}
......@@ -194,9 +196,9 @@ typedef struct JobDescriptor
} JobDescriptor;
static void
JobDescriptor_startJob(AsyncManager *jobManager, JobDescriptor *me, stream_t *streamptr, int tsID, int recID, int memtype)
JobDescriptor_startJob(AsyncManager *jobManager, JobDescriptor *me, stream_t *streamptr, int tsID, int recID, int memType)
{
me->args = grb_read_raw_data(streamptr, tsID, recID, memtype, NULL, NULL, false);
me->args = grb_read_raw_data(streamptr, tsID, recID, memType, NULL, NULL, false);
me->job = AsyncWorker_requestWork(jobManager, grb_decode_record, &me->args);
if (!me->job) xabort("error while trying to send job to worker thread");
}
......@@ -205,7 +207,7 @@ static void
JobDescriptor_finishJob(AsyncManager *jobManager, JobDescriptor *me, void *data, size_t *nmiss)
{
if (AsyncWorker_wait(jobManager, me->job)) xabort("error executing job in worker thread");
memcpy(data, me->args.data, me->args.gridsize * ((me->args.memtype == MEMTYPE_FLOAT) ? sizeof(float) : sizeof(double)));
memcpy(data, me->args.data, me->args.gridsize * ((me->args.memType == MEMTYPE_FLOAT) ? sizeof(float) : sizeof(double)));
*nmiss = me->args.nmiss;
Free(me->args.gribbuffer);
......@@ -247,17 +249,17 @@ get_local_step_and_recId(stream_t *streamptr, long globalRecId, int *tsID, int *
}
static void
read_next_record(AsyncManager *jobManager, JobDescriptor *jd, stream_t *streamptr, int memtype)
read_next_record(AsyncManager *jobManager, JobDescriptor *jd, stream_t *streamptr, int memType)
{
int tsId = -1, recId = -1;
get_local_step_and_recId(streamptr, streamptr->nextGlobalRecId, &tsId, &recId);
int xRecId = streamptr->tsteps[tsId].recIDs[recId];
JobDescriptor_startJob(jobManager, jd, streamptr, tsId, xRecId, memtype);
JobDescriptor_startJob(jobManager, jd, streamptr, tsId, xRecId, memType);
streamptr->nextGlobalRecId++;
}
static void
grb_read_next_record(stream_t *streamptr, int recID, int memtype, void *data, size_t *nmiss)
grb_read_next_record(stream_t *streamptr, int recID, int memType, void *data, size_t *nmiss)
{
bool jobFound = false;
......@@ -283,7 +285,7 @@ grb_read_next_record(stream_t *streamptr, int recID, int memtype, void *data, si
for (int i = 0; streamptr->nextGlobalRecId < streamptr->maxGlobalRecs && i < workerCount; i++)
{
JobDescriptor *jd = &jobs[i];
if (jd->args.recID < 0 && jd->args.tsID < 0) read_next_record(jobManager, jd, streamptr, memtype);
if (jd->args.recID < 0 && jd->args.tsID < 0) read_next_record(jobManager, jd, streamptr, memType);
}
}
......@@ -295,36 +297,36 @@ grb_read_next_record(stream_t *streamptr, int recID, int memtype, void *data, si
{
jobFound = true;
JobDescriptor_finishJob(jobManager, jd, data, nmiss);
if (streamptr->nextGlobalRecId < streamptr->maxGlobalRecs) read_next_record(jobManager, jd, streamptr, memtype);
if (streamptr->nextGlobalRecId < streamptr->maxGlobalRecs) read_next_record(jobManager, jd, streamptr, memType);
}
}
}
// perform the work synchronously if we didn't start a job for it yet
if (!jobFound) *nmiss = grb_read_and_decode_record(streamptr, recID, memtype, data, false);
if (!jobFound) *nmiss = grb_read_and_decode_record(streamptr, recID, memType, data, false);
}
void
grb_read_record(stream_t *streamptr, int memtype, void *data, size_t *nmiss)
grb_read_record(stream_t *streamptr, int memType, void *data, size_t *nmiss)
{
int tsID = streamptr->curTsID;
int vrecID = streamptr->tsteps[tsID].curRecID;
int recID = streamptr->tsteps[tsID].recIDs[vrecID];
grb_read_next_record(streamptr, recID, memtype, data, nmiss);
grb_read_next_record(streamptr, recID, memType, data, nmiss);
}
void
grb_read_var_slice(stream_t *streamptr, int varID, int levelID, int memtype, void *data, size_t *nmiss)
grb_read_var_slice(stream_t *streamptr, int varID, int levelID, int memType, void *data, size_t *nmiss)
{
int isub = subtypeInqActiveIndex(streamptr->vars[varID].subtypeID);
int recID = streamptr->vars[varID].recordTable[isub].recordID[levelID];
*nmiss = grb_read_and_decode_record(streamptr, recID, memtype, data, true);
*nmiss = grb_read_and_decode_record(streamptr, recID, memType, data, true);
}
void
grb_read_var(stream_t *streamptr, int varID, int memtype, void *data, size_t *nmiss)
grb_read_var(stream_t *streamptr, int varID, int memType, void *data, size_t *nmiss)
{
int vlistID = streamptr->vlistID;
int fileID = streamptr->fileID;
......@@ -344,9 +346,9 @@ grb_read_var(stream_t *streamptr, int varID, int memtype, void *data, size_t *nm
{
int recID = streamptr->vars[varID].recordTable[isub].recordID[levelID];
size_t offset = levelID * gridsize;
void *datap = (memtype == MEMTYPE_FLOAT) ? (void *) ((float *) data + offset) : (void *) ((double *) data + offset);
void *datap = (memType == MEMTYPE_FLOAT) ? (void *) ((float *) data + offset) : (void *) ((double *) data + offset);
*nmiss += grb_read_and_decode_record(streamptr, recID, memtype, datap, false);
*nmiss += grb_read_and_decode_record(streamptr, recID, memType, datap, false);
}
fileSetPos(fileID, currentfilepos, SEEK_SET);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment