Commit 3734ed2e authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

merged changes from branches/cdi_fileDrivenInput

parents e1b4fdc2 247f3a60
#ifndef INCLUDE_GUARD_CDI_GRIBAPI_UTILITIES_H
#define INCLUDE_GUARD_CDI_GRIBAPI_UTILITIES_H
#ifdef HAVE_LIBGRIB_API
#include "grid.h"
#include <grib_api.h>
#include <stdbool.h>
char* gribCopyString(grib_handle* gribHandle, const char* key);
bool gribCheckString(grib_handle* gribHandle, const char* key, const char* expectedValue);
bool gribCheckLong(grib_handle* gribHandle, const char* key, long expectedValue);
long gribGetLong(grib_handle* gh, const char* key);
long gribGetLongDefault(grib_handle* gribHandle, const char* key, long defaultValue);
double gribGetDouble(grib_handle* gh, const char* key);
double gribGetDoubleDefault(grib_handle* gribHandle, const char* key, double defaultValue);
size_t gribGetArraySize(grib_handle* gribHandle, const char* key);
void gribGetDoubleArray(grib_handle* gribHandle, const char* key, double* array); //The caller is responsible to ensure a sufficiently large buffer.
void gribGetLongArray(grib_handle* gribHandle, const char* key, long* array); //The caller is responsible to ensure a sufficiently large buffer.
long gribEditionNumber(grib_handle* gh);
char* gribMakeTimeString(grib_handle* gh, bool getEndTime); //For statistical fields, setting getEndTime produces the time of the end of the integration period, otherwise the time of the start of the integration period is returned. Returns NULL if getEndTime is set and the field does not have an integration period.
int gribapiTimeIsFC(grib_handle *gh);
int gribapiGetTsteptype(grib_handle *gh);
int gribGetDatatype(grib_handle* gribHandle);
int gribapiGetParam(grib_handle *gh);
int gribapiGetGridType(grib_handle *gh);
void gribapiGetGrid(grib_handle *gh, grid_t *grid);
#endif
#endif
#include "input_file.h"
#include "cdi.h"
#include "dmemory.h"
#include "proprietarySystemWorkarounds.h"
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
static void cdiInputFile_destruct(CdiInputFile* me);
//For an explanation of the condestruct() pattern, see the comment in iterator_grib.c
//path != NULL -> construction
//path = NULL -> destruction
static CdiInputFile* cdiInputFile_condestruct(CdiInputFile* me, const char* path)
{
#define super() (&me->super)
if(!path) goto destruct;
cdiRefObject_construct(super());
me->path = myStrDup(path);
if(!me->path) goto destructSuper;
do
{
me->fileDescriptor = open(me->path, O_RDONLY);
}
while(me->fileDescriptor == -1 && (errno == EINTR || errno == EAGAIN));
if(me->fileDescriptor == -1) goto freePath;
//construction successfull, now we can set our own destructor
super()->destructor = (void(*)(CdiReferencedObject*))cdiInputFile_destruct;
goto success;
// ^ constructor code ^
// | |
// v destructor/error-cleanup code v
destruct:
close(me->fileDescriptor);
freePath:
free(me->path);
destructSuper:
cdiRefObject_destruct(super());
me = NULL;
success:
return me;
#undef super
}
static CdiInputFile** openFileList = NULL;
static size_t openFileCount = 0, openFileListSize = 0;
static pthread_mutex_t openFileListLock = PTHREAD_MUTEX_INITIALIZER;
//This either returns a new object, or retains and returns a preexisting open file.
CdiInputFile* cdiInputFile_make(const char* path)
{
CdiInputFile* result = NULL;
xassert(path);
int error = pthread_mutex_lock(&openFileListLock);
xassert(!error);
{
//Check the list of open files for the given path.
for(size_t i = openFileCount; i-- && !result; )
{
if(!strcmp(path, openFileList[i]->path)) result = openFileList[i];
}
//If no open file was found, we open one, otherwise we just retain the existing one one more time.
if(result)
{
cdiRefObject_retain(&result->super);
}
else
{
result = xmalloc(sizeof(*result));
if(!cdiInputFile_condestruct(result, path))
{
//An error occured during construction, avoid a memory leak.
free(result);
result = NULL;
}
else
{
//Add the new file to the list of open files.
if(openFileCount == openFileListSize)
{
openFileListSize *= 2;
if(openFileListSize < 16) openFileListSize = 16;
openFileList = xrealloc(openFileList, openFileListSize);
}
xassert(openFileCount < openFileListSize);
openFileList[openFileCount++] = result;
}
}
}
error = pthread_mutex_unlock(&openFileListLock);
xassert(!error);
return result;
}
int cdiInputFile_read(const CdiInputFile* me, off_t readPosition, size_t readSize, size_t* outActualReadSize, void* buffer)
{
char* byteBuffer = buffer;
size_t trash;
if(!outActualReadSize) outActualReadSize = &trash;
*outActualReadSize = 0;
while(readSize)
{
ssize_t bytesRead = pread(me->fileDescriptor, byteBuffer, readSize, readPosition);
if(bytesRead == -1) return (errno == EINVAL) ? CDI_EINVAL : CDI_ESYSTEM;
if(bytesRead == 0) return CDI_EEOF;
byteBuffer += bytesRead;
readPosition += bytesRead;
readSize -= bytesRead;
*outActualReadSize += bytesRead;
}
return CDI_NOERR;
}
char* cdiInputFile_copyPath(const CdiInputFile* me)
{
return myStrDup(me->path);
}
void cdiInputFile_destruct(CdiInputFile* me)
{
int error = pthread_mutex_lock(&openFileListLock);
xassert(!error);
{
//Find the position of me in the list of open files.
ssize_t position;
for(position = openFileCount; position--; ) if(openFileList[position] == me) break;
xassert(position != -1);
//Remove me from the list
openFileList[position] = openFileList[--openFileCount];
}
error = pthread_mutex_unlock(&openFileListLock);
xassert(!error);
cdiInputFile_condestruct(me, NULL);
}
#ifndef INCLUDE_GUARD_CDI_GRIB_FILE_H
#define INCLUDE_GUARD_CDI_GRIB_FILE_H
#include "referenceCounting.h"
/*
CdiInputFile is a file abstraction that allows accessing an input file through any number of channels:
It is reference counted, so that it is closed at the right place,
and it is stateless, so that accesses from different callers cannot interfere with each other.
Once the reference counting code is threadsafe, CdiInputFile will also be threadsafe.
*/
typedef struct CdiInputFile {
//public:
CdiReferencedObject super;
//private:
char* path;
int fileDescriptor;
} CdiInputFile;
//Final class, the constructor is private and not defined here.
CdiInputFile* cdiInputFile_make(const char* path); //The caller is responsible to call cdiRefObject_release() on the returned object.
int cdiInputFile_read(const CdiInputFile* me, off_t readPosition, size_t readSize, size_t* outActualReadSize, void* buffer); //Returns one of CDI_EINVAL, CDI_ESYSTEM, CDI_EEOF, OR CDI_NOERR.
char* cdiInputFile_copyPath(const CdiInputFile* me); //Returns a malloc'ed string, don't forget to free() it.
//Destructor is private as well.
#endif
......@@ -232,7 +232,7 @@ const char *institutInqNamePtr(int instID)
}
char *institutInqLongnamePtr(int instID)
const char *institutInqLongnamePtr(int instID)
{
institute_t * instituteptr = NULL;
......
This diff is collapsed.
/*
* This file is for the use of iterator.c and the CdiIterator subclasses only.
*/
#ifndef INCLUDE_GUARD_CDI_ITERATOR_INT_H
#define INCLUDE_GUARD_CDI_ITERATOR_INT_H
#include "cdi.h"
#include <stdbool.h>
/*
class CdiIterator
An iterator is an object that identifies the position of one record in a file, where a record is defined as the data belonging to one level, timestep, and variable.
Using iterators to read a file can be significantly faster than using streams, because they can avoid building an index of the file.
For file formats like grib that do not provide an index within the file, this makes the difference between reading the file once or reading the file twice.
CdiIterator is an abstract base class. Which derived class is used depends on the type of the file. The class hierarchy currently looks like this:
CdiIterator <|--+-- CdiFallbackIterator
|
+-- CdiGribIterator
The fallback implementation currently uses the stream interface of CDI under the hood to provide full functionality for all filetypes for which no iterator implementation exists yet.
*/
//TODO[NH]: Debug messages, print function.
struct CdiIterator {
int filetype; //This is used to dispatch calls to the correct subclass.
bool isAdvanced; //Used to catch inquiries before the first call to CdiIteratorNextField(). //XXX: Advanced is probably not a good word (initialized?)
//The metadata that can be accessed by the inquiry calls.
//While theoretically redundant, these fields allow the handling of most inquiry calls within the base class.
//Only the name is excempted because it needs an allocation.
//These fields are set by the subclasses in the xxxIterNextField() method.
int datatype, timesteptype;
int gridId;
CdiParam param;
//The status information for reading/advancing is added in the subclasses.
};
void baseIterConstruct(CdiIterator* me, int filetype);
const char* baseIter_constructFromString(CdiIterator* me, const char* description); //Returns a pointer past the end of the parsed portion of the description string.
void baseIterDestruct(CdiIterator* me);
#endif
#include "iterator_fallback.h"
#include "cdi.h"
#include "cdi_int.h"
#include "dmemory.h"
#include "proprietarySystemWorkarounds.h"
#include "vlist.h" //Required for vlist_t, which we require because there is no safe function available to access a variable name.
#include <assert.h>
#include <stdlib.h>
//For more information on the condestruct() pattern, see comment in src/iterator_grib.c
static CdiFallbackIterator* cdiFallbackIterator_condestruct(CdiFallbackIterator* me, const char* path, int filetype)
{
if(me) goto destruct;
me = xmalloc(sizeof(*me));
baseIterConstruct(&me->super, filetype);
me->streamId = streamOpenRead(path);
if(me->streamId == CDI_UNDEFID) goto destructSuper;
me->vlistId = streamInqVlist(me->streamId);
if(me->vlistId == CDI_UNDEFID) goto closeStream;
me->variableCount = vlistNvars(me->vlistId);
if(me->variableCount <= 0) goto closeStream;
me->curLevelCount = -1; //Will be set in cdiFallbackIterator_nextField()
//These values are chosen so that the natural increment at the start of cdiFallbackIterator_nextField() will correctly position us at the first slice.
me->curTimestep = 0;
if(streamInqTimestep(me->streamId, me->curTimestep) <= 0) goto closeStream;
me->curVariable = 0;
me->curLevel = -1;
me->path = myStrDup(path);
if(!me->path) goto closeStream;
return me;
// ^ constructor code ^
// | |
// v destructor/error-cleanup code v
destruct:
free(me->path);
closeStream:
streamClose(me->streamId);
destructSuper:
baseIterDestruct(&me->super);
free(me);
return NULL;
}
CdiIterator* cdiFallbackIterator_new(const char* path, int filetype)
{
return &cdiFallbackIterator_condestruct(NULL, path, filetype)->super;
}
//Fetches the info that is published by the variables in the base class from the current field.
static void fetchSuperInfo(CdiFallbackIterator* me)
{
me->super.datatype = vlistInqVarDatatype(me->vlistId, me->curVariable);
me->super.timesteptype = vlistInqVarTsteptype(me->vlistId, me->curVariable);
me->super.gridId = vlistInqVarGrid(me->vlistId, me->curVariable);
int param = vlistInqVarParam(me->vlistId, me->curVariable);
cdiDecodeParam(param, &me->super.param.number, &me->super.param.category, &me->super.param.discipline);
}
CdiFallbackIterator* cdiFallbackIterator_clone(CdiIterator* super)
{
CdiFallbackIterator* me = (CdiFallbackIterator*)super;
//Make another stream for this file. This yields an unadvanced iterator.
CdiFallbackIterator* clone = cdiFallbackIterator_condestruct(NULL, me->path, me->super.filetype);
if(!clone) return NULL;
//Point the clone to the same position in the file.
clone->variableCount = me->variableCount;
clone->curVariable = me->curVariable;
clone->curLevelCount = me->curLevelCount;
clone->curLevel = me->curLevel;
clone->curTimestep = me->curTimestep;
clone->super.isAdvanced = super->isAdvanced;
if(super->isAdvanced) fetchSuperInfo(clone);
return clone;
}
char* cdiFallbackIterator_serialize(CdiIterator* super)
{
CdiFallbackIterator* me = (CdiFallbackIterator*)super;
char* escapedPath = cdiEscapeSpaces(me->path);
char* result = myAsprintf("%s %d %d %d %d %d", escapedPath, me->variableCount, me->curVariable, me->curLevelCount, me->curLevel, me->curTimestep);
free(escapedPath);
return result;
}
CdiFallbackIterator* cdiFallbackIterator_deserialize(const char* description)
{
CdiFallbackIterator* me = xmalloc(sizeof(*me));
if(!me) goto fail;
description = baseIter_constructFromString(&me->super, description);
while(*description == ' ') description++;
me->path = cdiUnescapeSpaces(description, &description);
if(!me->path) goto destructSuper;
me->streamId = streamOpenRead(me->path);
if(me->streamId == CDI_UNDEFID) goto freePath;
me->vlistId = streamInqVlist(me->streamId);
if(me->vlistId == CDI_UNDEFID) goto closeStream;
//This reads one variable from the description string, does error checking, and advances the given string pointer.
#define decodeValue(variable, description) do \
{ \
const char* savedStart = description; \
long long decodedValue = strtoll(description, (char**)&description, 0); /*The cast is a workaround for the wrong signature of strtoll().*/ \
variable = (int)decodedValue; \
if(savedStart == description) goto closeStream; \
if((long long)decodedValue != (long long)variable) goto closeStream; \
} while(0)
decodeValue(me->variableCount, description);
decodeValue(me->curVariable, description);
decodeValue(me->curLevelCount, description);
decodeValue(me->curLevel, description);
decodeValue(me->curTimestep, description);
#undef decodeValue
if(streamInqTimestep(me->streamId, me->curTimestep) <= 0) goto closeStream;
if(me->super.isAdvanced) fetchSuperInfo(me);
return me;
closeStream:
streamClose(me->streamId);
freePath:
free(me->path);
destructSuper:
baseIterDestruct(&me->super);
free(me);
fail:
return NULL;
}
static int advance(CdiFallbackIterator* me)
{
me->curLevel++;
if(me->curLevel == me->curLevelCount)
{
me->curLevel = 0;
me->curVariable++;
if(me->curVariable == me->variableCount)
{
me->curVariable = 0;
me->curTimestep++;
if(streamInqTimestep(me->streamId, me->curTimestep) <= 0) return CDI_EEOF;
}
}
return CDI_NOERR;
}
int cdiFallbackIterator_nextField(CdiIterator* super)
{
CdiFallbackIterator* me = (CdiFallbackIterator*)super;
int result = advance(me);
if(result) return result;
if(!me->curLevel)
{ //Fetch the information that may have changed (we are processing a new variable/timestep if this point is reached).
fetchSuperInfo(me);
me->curLevelCount = zaxisInqSize(vlistInqVarZaxis(me->vlistId, me->curVariable));
}
return CDI_NOERR;
}
char* cdiFallbackIterator_inqTime(CdiIterator* super, bool getEndTime)
{
CdiFallbackIterator* me = (CdiFallbackIterator*)super;
if(getEndTime) return NULL; //The stream interface does not export the start/end times of statistical fields, so we treat all data as point of time data, returning the validity time as the start time.
int taxisId = vlistInqTaxis(me->vlistId);
int date = taxisInqVdate(taxisId);
int time = taxisInqVtime(taxisId);
int year, month, day, hour, minute, second;
cdiDecodeDate(date, &year, &month, &day);
cdiDecodeTime(time, &hour, &minute, &second);
return myAsprintf("%04d-%02d-%02dT%02d:%02d:%02d.000", year, month, day, hour, minute, second);
}
int cdiFallbackIterator_levelType(CdiIterator* super, int levelSelector, char** outName, char** outLongName, char** outStdName, char** outUnit)
{
CdiFallbackIterator* me = (CdiFallbackIterator*)super;
int zaxisId = vlistInqVarZaxis(me->vlistId, me->curVariable);
#define copyString(outPointer, function) do \
{ \
if(outPointer) \
{ \
char tempBuffer[CDI_MAX_NAME]; \
function(zaxisId, tempBuffer); \
*outPointer = myStrDup(tempBuffer); \
} \
} \
while(0)
copyString(outName, zaxisInqName); //FIXME: zaxisInqName is unsafe.
copyString(outLongName, zaxisInqLongname); //FIXME: zaxisInqLongname is unsafe.
copyString(outStdName, zaxisInqStdname); //FIXME: zaxisInqStdname is unsafe.
copyString(outUnit, zaxisInqUnits); //FIXME: zaxisInqUnits is unsafe.
#undef copyString
return zaxisInqLtype(zaxisId);
}
int cdiFallbackIterator_level(CdiIterator* super, int levelSelector, double* outValue1, double* outValue2)
{
CdiFallbackIterator* me = (CdiFallbackIterator*)super;
int zaxisId = vlistInqVarZaxis(me->vlistId, me->curVariable);
//handle NULL pointers once and for all
double trash;
if(!outValue1) outValue1 = &trash;
if(!outValue2) outValue2 = &trash;
//get the level value
if(levelSelector)
{
*outValue1 = (zaxisInqLbounds(zaxisId, NULL))
? zaxisInqLbound(zaxisId, me->curLevel)
: zaxisInqLevel(zaxisId, me->curLevel);
}
else
{
*outValue1 = (zaxisInqUbounds(zaxisId, NULL))
? zaxisInqUbound(zaxisId, me->curLevel)
: zaxisInqLevel(zaxisId, me->curLevel);
}
*outValue2 = 0.0;
//if this is a hybrid zaxis, lookup the coordinates in the vertical coordinate table
ssize_t intLevel = (ssize_t)(2**outValue1);
if(0 <= intLevel && intLevel < zaxisInqVctSize(zaxisId) - 1)
{
const double* coordinateTable = zaxisInqVctPtr(zaxisId);
*outValue1 = coordinateTable[intLevel];
*outValue2 = coordinateTable[intLevel + 1];
}
return CDI_NOERR;
}
int cdiFallbackIterator_zaxisUuid(CdiIterator* super, int* outVgridNumber, int* outLevelCount, unsigned char (*outUuid)[16])
{
CdiFallbackIterator* me = (CdiFallbackIterator*)super;
int zaxisId = vlistInqVarZaxis(me->vlistId, me->curVariable);
if(zaxisInqLtype(zaxisId) != ZAXIS_HYBRID) return CDI_EINVAL;
if(outVgridNumber) *outVgridNumber = zaxisInqNumber(zaxisId);
if(outLevelCount) *outLevelCount = zaxisInqNlevRef(zaxisId);
if(outUuid) zaxisInqUUID(zaxisId, *outUuid);
return CDI_NOERR;
}
char* cdiFallbackIterator_copyVariableName(CdiIterator* super)
{
CdiFallbackIterator* me = (CdiFallbackIterator*)super;
return vlistCopyVarName(me->vlistId, me->curVariable);
}
void cdiFallbackIterator_readField(CdiIterator* super, double* buffer, size_t* nmiss)
{
CdiFallbackIterator* me = (CdiFallbackIterator*)super;
int missingValues = 0;
streamReadVarSlice(me->streamId, me->curVariable, me->curLevel, buffer, &missingValues);
if(nmiss) *nmiss = missingValues;
}
void cdiFallbackIterator_readFieldF(CdiIterator* super, float* buffer, size_t* nmiss)
{
CdiFallbackIterator* me = (CdiFallbackIterator*)super;
int missingValues = 0;
streamReadVarSliceF(me->streamId, me->curVariable, me->curLevel, buffer, &missingValues);
if(nmiss) *nmiss = missingValues;
}
void cdiFallbackIterator_delete(CdiIterator* super)
{
CdiFallbackIterator* me = (CdiFallbackIterator*)super;
cdiFallbackIterator_condestruct(me, NULL, 0);
}
/*
* A fallback implementation of the iterator interface that opens a stream under the hood.
*
* This implementation is mainly available to provide iterator access to file formats that don't support iterator access natively,
* nevertheless, it allows the file to dictate the order in which data is read, possibly providing performance benefits.
*/
#ifndef INCLUDE_GUARD_CDI_ITERATOR_FALLBACK_H
#define INCLUDE_GUARD_CDI_ITERATOR_FALLBACK_H
#include "iterator.h"
typedef struct CdiFallbackIterator {
CdiIterator super;
int streamId, vlistId;
char* path; //needed for clone() & serialize()
int variableCount, curVariable;
int curLevelCount, curLevel;
int curTimestep;
} CdiFallbackIterator;
CdiIterator* cdiFallbackIterator_new(const char* path, int filetype);
CdiFallbackIterator* cdiFallbackIterator_clone(CdiIterator* me);
char* cdiFallbackIterator_serialize(CdiIterator* me);
CdiFallbackIterator* cdiFallbackIterator_deserialize(const char* me);
int cdiFallbackIterator_nextField(CdiIterator* me);
char* cdiFallbackIterator_inqTime(CdiIterator* me, bool getEndTime);
int cdiFallbackIterator_levelType(CdiIterator* me, int levelSelector, char** outName, char** outLongName, char** outStdName, char** outUnit);
int cdiFallbackIterator_level(CdiIterator* me, int levelSelector, double* outValue1, double* outValue2);
int cdiFallbackIterator_zaxisUuid(CdiIterator* me, int* outVgridNumber, int* outLevelCount, unsigned char (*outUuid)[16]);
char* cdiFallbackIterator_copyVariableName(CdiIterator* me);
void cdiFallbackIterator_readField(CdiIterator* me, double* buffer, size_t* nmiss);
void cdiFallbackIterator_readFieldF(CdiIterator* me, float* buffer, size_t* nmiss);
void cdiFallbackIterator_delete(CdiIterator* super);
#endif
This diff is collapsed.