diff --git a/src/cdi_int.h b/src/cdi_int.h index 0cf232ff87d1e4e9a971102bdbf9221c4cd87382..335e8789bea6927a180c61f4f5aa32cd242bd33d 100644 --- a/src/cdi_int.h +++ b/src/cdi_int.h @@ -189,6 +189,7 @@ typedef struct char varname[32]; // needed for grib decoding with GRIB_API VarScanKeys scanKeys; var_tile_t tiles; // tile-related meta-data, currently for GRIB-API only. + void *fdbItem; } record_t; @@ -300,7 +301,9 @@ typedef struct { int complevel; // compression level bool sortname; bool sortparam; + void *gribContainers; + void *gh; // grib handle int numWorker; int nextRecID; @@ -308,8 +311,6 @@ typedef struct { void *jobs; void *jobManager; - void *gh; // grib handle - bool fdbRetrieve; bool fdbStore; } diff --git a/src/grb_read.c b/src/grb_read.c index d7de3608a3e11f81b65931ef6dc905c8b251cf66..77843acd7b73b0eb62fdc598f17842b526b400b9 100644 --- a/src/grb_read.c +++ b/src/grb_read.c @@ -4,6 +4,10 @@ #ifdef HAVE_LIBGRIB +#ifdef HAVE_LIBFDB5 +#include <fdb5/api/fdb_c.h> +#endif + #include "async_worker.h" #include "dmemory.h" #include "cdi.h" @@ -119,38 +123,135 @@ int grb_decode_record(void *untypedArgs) return 0; } +#ifdef HAVE_LIBFDB5 +static +int fdb_request_add1(fdb_request_t *req, const char *param, const char *value) +{ + return fdb_request_add(req, param, &value, 1); +} + +static +void listitem_to_request(const char *item, fdb_request_t *request) +{ + char *tmpItem = strdup(item); + int itemArgc = 0; + char *itemArgv[32]; + int len = strlen(tmpItem); + int start = (*tmpItem == '{'); + itemArgv[0] = tmpItem + start; + for (int i = start; i < len; i++) + { + if (tmpItem[i] == ',') + { + tmpItem[i] = 0; + itemArgc++; + itemArgv[itemArgc] = &tmpItem[++i]; + } + else if (tmpItem[i] == '}') + { + tmpItem[i] = 0; + itemArgc++; + if (tmpItem[i + 1] == '{') + { + itemArgv[itemArgc] = &tmpItem[i + 2]; + i += 2; + } + } + } + + //for (int i = 0; i < itemArgc; i++) printf("%d <%s>\n", i, itemArgv[i]); + + for (int i = 0; i < itemArgc; i++) + { + char *value = NULL; + char *keyValue = itemArgv[i]; + len = strlen(keyValue); + for (int k = 0; k < len; k++) + if (keyValue[k] == '=') + { + keyValue[k] = 0; + value = &keyValue[k + 1]; + break; + } + + //printf("key <%s> value <%s>\n", keyValue, value); + fdb_request_add1(request, keyValue, value); + } + + free(tmpItem); +} +#endif + static DecodeArgs grb_read_raw_data(stream_t *streamptr, int recID, int memtype, void *gribbuffer, void *data, bool resetFilePos) { const int vlistID = streamptr->vlistID; - const int fileID = streamptr->fileID; const int tsID = streamptr->curTsID; // FIXME: This should be looked up from the given recID - const off_t recpos = streamptr->tsteps[tsID].records[recID].position; - const size_t recsize = streamptr->tsteps[tsID].records[recID].size; const int varID = streamptr->tsteps[tsID].records[recID].varID; + const size_t recsize = streamptr->tsteps[tsID].records[recID].size; const int gridID = vlistInqVarGrid(vlistID, varID); const size_t gridsize = gridInqSize(gridID); - if ( CDI_Debug ) Message("gridID = %d gridsize = %zu", gridID, gridsize); + if (CDI_Debug) Message("gridID = %d gridsize = %zu", gridID, gridsize); - if ( recsize == 0 ) Error("Internal problem! Recordsize is zero for record %d at timestep %d", recID+1, tsID+1); + if (recsize == 0) Error("Internal problem! Recordsize is zero for record %d at timestep %d", recID+1, tsID+1); void *cgribexp = (gribbuffer && streamptr->record->cgribexp) ? streamptr->record->cgribexp : NULL; if (!gribbuffer) gribbuffer = Malloc(streamptr->record->buffersize); - if (!data) data = Malloc(gridsize*(memtype == MEMTYPE_FLOAT ? sizeof(float) : sizeof(double))); + if (!data) data = Malloc(gridsize * (memtype == MEMTYPE_FLOAT ? sizeof(float) : sizeof(double))); - if (resetFilePos) + if (streamptr->fdbRetrieve) { - const off_t currentfilepos = fileGetPos(fileID); - fileSetPos(fileID, recpos, SEEK_SET); - if (fileRead(fileID, gribbuffer, recsize) != recsize) Error("Failed to read GRIB record"); - fileSetPos(fileID, currentfilepos, SEEK_SET); + void *fdbItem = streamptr->tsteps[tsID].records[recID].fdbItem; + printf("fdbItem = %s\n", (char*)fdbItem); + +#ifdef HAVE_LIBFDB5 + fdb_handle_t *fdb; + fdb_new_handle(&fdb); + + fdb_datareader_t *dr; + fdb_new_datareader(&dr); + fdb_request_t *singleRequest; + fdb_new_request(&singleRequest); + listitem_to_request(fdbItem, singleRequest); + int status = fdb_retrieve(fdb, singleRequest, dr); + fdb_delete_request(singleRequest); + if (status != FDB_SUCCESS) Error("fdb_retrieve failed!"); + + long drRecsize; + fdb_datareader_open(dr, &drRecsize); + if (drRecsize == 0) Error("fdb_datareader empty!"); + if (drRecsize > recsize) Error("internal problem: wrong recsize"); + + long read = 0; + fdb_datareader_read(dr, gribbuffer, drRecsize, &read); + // printf("fdb_datareader_read: size=%ld/%ld\n", recsize, read); + if (read != drRecsize) Error("fdb_datareader_read failed!"); + + fdb_datareader_close(dr); + fdb_delete_datareader(dr); + + fdb_delete_handle(fdb); +#endif } else { - fileSetPos(fileID, recpos, SEEK_SET); - if (fileRead(fileID, gribbuffer, recsize) != recsize) Error("Failed to read GRIB record"); - streamptr->numvals += gridsize; + const int fileID = streamptr->fileID; + const off_t recpos = streamptr->tsteps[tsID].records[recID].position; + + if (resetFilePos) + { + const off_t currentfilepos = fileGetPos(fileID); + fileSetPos(fileID, recpos, SEEK_SET); + if (fileRead(fileID, gribbuffer, recsize) != recsize) Error("Failed to read GRIB record!"); + fileSetPos(fileID, currentfilepos, SEEK_SET); + } + else + { + fileSetPos(fileID, recpos, SEEK_SET); + if (fileRead(fileID, gribbuffer, recsize) != recsize) Error("Failed to read GRIB record!"); + streamptr->numvals += gridsize; + } } return (DecodeArgs){ diff --git a/src/stream.c b/src/stream.c index 39d732302978ead1e8b8cd770442cdbdcaf18f91..8815310f90ef0ba1a5f101fd7600645336c5d14c 100644 --- a/src/stream.c +++ b/src/stream.c @@ -472,6 +472,11 @@ int cdiStreamOpenDefaultDelegate(const char *filename, char filemode, int filety streamptr->fdbRetrieve = true; #endif streamptr->filetype = filetype; + if (recordBufIsToBeCreated) + { + streamptr->record = (Record *) Malloc(sizeof(Record)); + streamptr->record->buffer = NULL; + } return 88; } else @@ -506,7 +511,7 @@ int cdiStreamOpenDefaultDelegate(const char *filename, char filemode, int filety case CDI_FILETYPE_GRB: { fileID = gribOpen(filename, temp); - if ( fileID < 0 ) return CDI_ESYSTEM; + if (fileID < 0) return CDI_ESYSTEM; if (recordBufIsToBeCreated) { streamptr->record = (Record *) Malloc(sizeof(Record)); @@ -523,7 +528,7 @@ int cdiStreamOpenDefaultDelegate(const char *filename, char filemode, int filety case CDI_FILETYPE_GRB2: { fileID = gribOpen(filename, temp); - if ( fileID < 0 ) return CDI_ESYSTEM; + if (fileID < 0) return CDI_ESYSTEM; if (recordBufIsToBeCreated) { streamptr->record = (Record *) Malloc(sizeof(Record)); @@ -970,6 +975,7 @@ void streamDefaultValue(stream_t * streamptr) #endif streamptr->gribContainers = NULL; + streamptr->gh = NULL; streamptr->numWorker = 0; streamptr->nextRecID = 0; diff --git a/src/stream_gribapi.c b/src/stream_gribapi.c index c7f62b57f129b518d231fd1e33e39dcb083285f2..14e37cf0300d233cbe3446d852776b348c69accb 100644 --- a/src/stream_gribapi.c +++ b/src/stream_gribapi.c @@ -657,11 +657,11 @@ static void gribapiAddRecord(stream_t *streamptr, int param, grib_handle *gh, size_t recsize, off_t position, int datatype, int comptype, const char *varname, int leveltype1, int leveltype2, int lbounds, int level1, int level2, int level_sf, int level_unit, - VarScanKeys *scanKeys, const var_tile_t *tiles, int lread_additional_keys) + VarScanKeys *scanKeys, const var_tile_t *tiles, int lread_additional_keys, void *fdbItem) { const int vlistID = streamptr->vlistID; - const int tsID = streamptr->curTsID; - const int recID = recordNewEntry(streamptr, tsID); + const int tsID = streamptr->curTsID; + const int recID = recordNewEntry(streamptr, tsID); record_t *record = &streamptr->tsteps[tsID].records[recID]; const int tsteptype = gribapiGetTsteptype(gh); @@ -679,6 +679,7 @@ void gribapiAddRecord(stream_t *streamptr, int param, grib_handle *gh, record->gridsize = gribapiGetGridsize(gh); record->scanKeys = *scanKeys; record->tiles = tiles ? *tiles : dummy_tiles; + record->fdbItem = fdbItem; strncpy(record->varname, varname, sizeof(record->varname)-1); record->varname[sizeof(record->varname) - 1] = 0; @@ -944,6 +945,22 @@ while(0) #ifdef HAVE_LIBFDB5 +typedef struct +{ + char *item; + char *keys[32]; + char *values[32]; + int numKeys; +} KeyValueEntry; + +typedef struct +{ + int date; + int time; + int param; + int levtype; +} RecordInfoEntry; + static int fdb_request_add1(fdb_request_t *req, const char *param, const char *value) { @@ -951,152 +968,455 @@ int fdb_request_add1(fdb_request_t *req, const char *param, const char *value) } static -void listitem_to_request(const char *item, fdb_request_t *request) -{ - char *tmpItem = strdup(item); - int itemArgc = 0; - char *itemArgv[32]; - int len = strlen(tmpItem); - int start = (*tmpItem == '{'); - itemArgv[0] = tmpItem + start; +void decode_fdbitem(const char *fdbItem, KeyValueEntry *keyValue) +{ + keyValue->item = strdup(fdbItem); + char *pItem = keyValue->item; + int numKeys = 0; + char **itemKeys = keyValue->keys; + char **itemValues = keyValue->values; + int len = strlen(pItem); + int start = (*pItem == '{'); + itemKeys[0] = pItem + start; for (int i = start; i < len; i++) { - if (tmpItem[i] == ',') + if (pItem[i] == ',') { - tmpItem[i] = 0; - itemArgc++; - itemArgv[itemArgc] = &tmpItem[++i]; + pItem[i] = 0; + numKeys++; + itemKeys[numKeys] = &pItem[++i]; } - else if (tmpItem[i] == '}') + else if (pItem[i] == '}') { - tmpItem[i] = 0; - itemArgc++; - if (tmpItem[i + 1] == '{') + pItem[i] = 0; + numKeys++; + if (pItem[i + 1] == '{') { - itemArgv[itemArgc] = &tmpItem[i + 2]; + itemKeys[numKeys] = &pItem[i + 2]; i += 2; } } } - //for (int i = 0; i < itemArgc; i++) printf("%d <%s>\n", i, itemArgv[i]); + keyValue->numKeys = numKeys; + //for (int i = 0; i < numKeys; i++) printf("%d <%s>\n", i, itemKeys[i]); - for (int i = 0; i < itemArgc; i++) + for (int i = 0; i < numKeys; i++) { - char *value = NULL; - char *keyValue = itemArgv[i]; + char *keyValue = itemKeys[i]; len = strlen(keyValue); for (int k = 0; k < len; k++) if (keyValue[k] == '=') { keyValue[k] = 0; - value = &keyValue[k + 1]; + itemValues[i] = &keyValue[k + 1]; break; } - //printf("key <%s> value <%s>\n", keyValue, value); - fdb_request_add1(request, keyValue, value); + //printf("key <%s> value <%s>\n", itemKeys[i], itemValues[i]); + } +} + +static +void fdbitem_to_request(const char *fdbItem, fdb_request_t *request) +{ + KeyValueEntry keyValue; + keyValue.item = NULL; + decode_fdbitem(fdbItem, &keyValue); + + int numKeys = keyValue.numKeys; + for (int i = 0; i < numKeys; i++) + { + // printf("key <%s> value <%s>\n", keyValue.keys[i], keyValue.values[i]); + fdb_request_add1(request, keyValue.keys[i], keyValue.values[i]); } - free(tmpItem); + if (keyValue.item) free(keyValue.item); } + +static +int fdb_fill_itemlist(fdb_handle_t *fdb, fdb_request_t *request, char ***itemList) +{ + const char **item = (const char **) malloc(sizeof(const char*)); + + fdb_listiterator_t *it; + fdb_new_listiterator(&it); + + fdb_list(fdb, request, it); + + int numItems = 0; + while (true) + { + bool exist; + fdb_listiterator_next(it, &exist, item); + if (!exist) break; + + numItems++; + } + Message("numItems = %d", numItems); + + if (*itemList == NULL) *itemList = (char **) malloc(numItems * sizeof(char*)); + + fdb_list(fdb, request, it); + + int itemNum = 0; + while (true) + { + bool exist; + fdb_listiterator_next(it, &exist, item); + if (!exist) break; + + (*itemList)[itemNum++] = strdup(*item); + } + + fdb_delete_listiterator(it); + + free(item); + + return numItems; +} + +static +long fdb_read_record(fdb_handle_t *fdb, char *item, size_t *buffersize, void **gribbuffer) +{ + Message("item = %s", item); + + fdb_datareader_t *dr = NULL; + fdb_new_datareader(&dr); + fdb_request_t *singleRequest = NULL; + fdb_new_request(&singleRequest); + fdbitem_to_request(item, singleRequest); + int status = fdb_retrieve(fdb, singleRequest, dr); + fdb_delete_request(singleRequest); + if (status != FDB_SUCCESS) Error("fdb_retrieve failed!"); + + long recsize = 0; + fdb_datareader_open(dr, &recsize); + if (recsize == 0) Error("fdb_datareader empty!"); + + ensureBufferSize(recsize, buffersize, gribbuffer); + + long read = 0; + fdb_datareader_read(dr, *gribbuffer, recsize, &read); + // printf("fdb_datareader_read: size=%ld/%ld\n", recsize, read); + if (read != recsize) Error("fdb_datareader_read failed!"); + + fdb_datareader_close(dr); + fdb_delete_datareader(dr); + + return recsize; +} + +static +int check_numKey(const char *key, int numKeys, int numItems) +{ + if (numKeys == 0) + { + Warning("Key %s is missing in all of the FDB records!", key); + return -1; + } + else if (numKeys < numItems) + { + Warning("Key %s is missing in some of the FDB records!", key); + return -2; + } + + return 0; +} + +static +int check_keyvalueList(int numItems, KeyValueEntry *keyValueList) +{ + const char *searchKeys[] = {"date", "time", "param", "levtype"}; + const int numSearchKeys = sizeof(searchKeys) / sizeof(searchKeys[0]); + int searchKeysCount[numSearchKeys]; + for (int k = 0; k < numSearchKeys; k++) searchKeysCount[k] = 0; + + for (int i = 0; i < numItems; i++) + { + int numKeys = keyValueList[i].numKeys; + char **itemKeys = keyValueList[i].keys; + for (int k = 0; k < numSearchKeys; k++) + { + for (int j = 0; j < numKeys; j++) + { + if (strcmp(itemKeys[j], searchKeys[k]) == 0) + { + searchKeysCount[k]++; + break; + } + } + } + } + + int status = 0; + for (int k = 0; k < numSearchKeys; k++) + if (check_numKey(searchKeys[k], searchKeysCount[k], numItems) != 0) + status = -1; + + return status; +} + +typedef struct +{ + int date, time, param, levtype; +} CmpKeys; + +static +CmpKeys set_cmpkeys(RecordInfoEntry *recordInfoList) +{ + CmpKeys cmpKeys; + cmpKeys.date = recordInfoList->date; + cmpKeys.time = recordInfoList->time; + cmpKeys.param = recordInfoList->param; + cmpKeys.levtype = recordInfoList->levtype; + return cmpKeys; +} + +static +int compare_cmpkeys(const CmpKeys *cmpKeys1, const CmpKeys *cmpKeys2) +{ + if (cmpKeys1->date == cmpKeys2->date && + cmpKeys1->time == cmpKeys2->time && + cmpKeys1->param == cmpKeys2->param && + cmpKeys1->levtype == cmpKeys2->levtype) + return 0; + + return -1; +} + +static +int get_num_records(int numItems, RecordInfoEntry *recordInfoList) +{ + const int date = recordInfoList[0].date; + const int time = recordInfoList[0].time; + + int numRecords = 0; + for (int i = 0; i < numItems; i++) + { + if (date == recordInfoList[i].date && time == recordInfoList[i].time) + numRecords++; + else + break; + } + + CmpKeys cmpKeys0 = set_cmpkeys(&recordInfoList[0]); + for (int i = 1; i < numRecords; i++) + { + CmpKeys cmpKeys = set_cmpkeys(&recordInfoList[i]); + if (compare_cmpkeys(&cmpKeys0, &cmpKeys) == 0) + { + numRecords = i; + break; + } + } + + return numRecords; +} + +enum {levTypeUndef = 0, levTypeSFC, levTypeML, levTypePL}; + +static +int get_ilevtype(const char *levtype) +{ + int ilevtype = levTypeUndef; + + if (strcmp(levtype, "sfc") == 0) ilevtype = levTypeSFC; + else if (strcmp(levtype, "ml") == 0) ilevtype = levTypeML; + else if (strcmp(levtype, "pl") == 0) ilevtype = levTypeML; + + return ilevtype; +} + +static +void decode_keyvalue(KeyValueEntry *keyValue, RecordInfoEntry *recordInfo) +{ + char **itemKeys = keyValue->keys; + char **itemValues = keyValue->values; + int numKeys = keyValue->numKeys; + for (int i = 0; i < numKeys; i++) + { + //printf("key <%s> value <%s>\n", itemKeys[i], itemValues[i]); + if (strcmp(itemKeys[i], "date") == 0) recordInfo->date = atoi(itemValues[i]); + else if (strcmp(itemKeys[i], "time") == 0) recordInfo->time = atoi(itemValues[i]); + else if (strcmp(itemKeys[i], "param") == 0) recordInfo->param = atoi(itemValues[i]); + else if (strcmp(itemKeys[i], "levtype") == 0) recordInfo->levtype = get_ilevtype(itemValues[i]); + } +} + +static +int remove_duplicate_timesteps(RecordInfoEntry *recordInfoList, int numRecords, int numTimesteps, int *timestepRecordOffset) +{ + int numTimestepsNew = numTimesteps; + + int date = recordInfoList[0].date; + int time = recordInfoList[0].time; + + for (int i = 1; i < numTimesteps; ++i) + { + int index = i * numRecords; + if (date == recordInfoList[index].date && time == recordInfoList[index].time) + { + Message("Skip timestep %d", i + 1); + numTimestepsNew--; + for (int j = i; j < numTimestepsNew; j++) timestepRecordOffset[j] = timestepRecordOffset[j + 1]; + } + + date = recordInfoList[index].date; + time = recordInfoList[index].time; + } + + return numTimestepsNew; +} + #endif int fdbScanTimestep1(stream_t *streamptr) { #ifdef HAVE_LIBFDB5 - const bool readRecords= true; - long gribbuffersize = 0; - unsigned char *gribbuffer = NULL; + void *gribbuffer = NULL; + size_t buffersize = 0; + grib_handle *gh = NULL; - fdb_handle_t *fdb; + fdb_handle_t *fdb = NULL; fdb_new_handle(&fdb); - fdb_request_t *request; + fdb_request_t *request = NULL; fdb_new_request(&request); fdb_request_add1(request, "class", "ea"); fdb_request_add1(request, "expver", "0001"); fdb_request_add1(request, "stream", "oper"); fdb_request_add1(request, "domain", "g"); - //fdb_request_add1(request, "date", "20180601"); - //fdb_request_add1(request, "time", "0000"); + fdb_request_add1(request, "date", "20180601"); + // fdb_request_add1(request, "time", "1800"); fdb_request_add1(request, "type", "an"); fdb_request_add1(request, "levtype", "sfc"); fdb_request_add1(request, "step", "0"); - // fdb_request_add1(request, "param", "129"); + fdb_request_add1(request, "param", "139"); // fdb_request_add1(request, "levelist", "300"); - const char **item = (const char **) malloc(sizeof(const char*)); + char **fdbItemList = NULL; + int numItems = fdb_fill_itemlist(fdb, request, &fdbItemList); + fdb_delete_request(request); + for (int i = 0; i < numItems; ++i) printf("item[%d] = %s\n", i, fdbItemList[i]); - fdb_listiterator_t *it; - fdb_new_listiterator(&it); + KeyValueEntry *keyValueList = (KeyValueEntry *) malloc(numItems * sizeof(KeyValueEntry)); + for (int i = 0; i < numItems; ++i) keyValueList[i].item = NULL; - fdb_list(fdb, request, it); + RecordInfoEntry *recordInfoList = (RecordInfoEntry *) malloc(numItems * sizeof(RecordInfoEntry)); - Next: + for (int i = 0; i < numItems; ++i) decode_fdbitem(fdbItemList[i], &keyValueList[i]); + if (check_keyvalueList(numItems, keyValueList) != 0) return 1; + for (int i = 0; i < numItems; ++i) decode_keyvalue(&keyValueList[i], &recordInfoList[i]); - { - bool exist; - fdb_listiterator_next(it, &exist, item); - if (!exist) goto Cleanup; - } + int numRecords = get_num_records(numItems, recordInfoList); + int numTimesteps = numItems / numRecords; + Message("numRecords=%d numTimesteps=%d", numRecords, numTimesteps); + + for (int i = 0; i < numItems; ++i) if (keyValueList[i].item) free(keyValueList[i].item); + free(recordInfoList); + free(keyValueList); + + if (numRecords == 0) return CDI_EUFSTRUCT; - printf("fdb_listiterator_next: item = %s\n", *item); + int *timestepRecordOffset = (int *) malloc(numTimesteps * sizeof(int)); + for (int i = 0; i < numTimesteps; i++) timestepRecordOffset[i] = i * numRecords; + numTimesteps = remove_duplicate_timesteps(recordInfoList, numRecords, numTimesteps, timestepRecordOffset); + Message("numRecords=%d numTimesteps=%d", numRecords, numTimesteps); + + DateTime datetime0 = { .date = 10101, .time = 0 }; + int fcast = 0; + + streamptr->curTsID = 0; + + const int tsID = tstepsNewEntry(streamptr); + if (tsID != 0) Error("Internal problem! tstepsNewEntry returns %d", tsID); - if (readRecords) + taxis_t *taxis = &streamptr->tsteps[tsID].taxis; + + for (int recID = 0; recID < numRecords; recID++) { - fdb_datareader_t *dr; - fdb_new_datareader(&dr); - fdb_request_t *singleRequest; - fdb_new_request(&singleRequest); - listitem_to_request(*item, singleRequest); - int status = fdb_retrieve(fdb, singleRequest, dr); - fdb_delete_request(singleRequest); - if (status != FDB_SUCCESS) - { - printf("fdb_retrieve failed!\n"); - return 1; - } + long recsize = fdb_read_record(fdb, fdbItemList[recID], &buffersize, &gribbuffer); - long size; - fdb_datareader_open(dr, &size); - if (size == 0) - { - printf("fdb_datareader empty!\n"); - return 1; - } + int datatype, comptype = 0; + gh = gribapiGetDiskRepresentation(recsize, &buffersize, &gribbuffer, &datatype, &comptype); + + GRIB_CHECK(my_grib_set_double(gh, "missingValue", CDI_Default_Missval), 0); + + const int param = gribapiGetParam(gh); + int leveltype1 = -1, leveltype2 = -1, lbounds, level_sf, level_unit; + var_tile_t tiles = dummy_tiles; + int level1 = 0, level2 = 0; + gribGetLevel(gh, &leveltype1, &leveltype2, &lbounds, &level1, &level2, &level_sf, &level_unit, &tiles); + + char varname[256]; + gribapiGetString(gh, "shortName", varname, sizeof(varname)); + + int64_t vdate, sdate; + int vtime, stime; + gribapiGetValidityDateTime(gh, &vdate, &vtime, &sdate, &stime); + DateTime datetime = { .date = vdate, .time = vtime }; - if (gribbuffersize < size) + VarScanKeys scanKeys = gribapiGetScanKeys(gh); + + if (recID == 0) { - gribbuffersize = size; - gribbuffer = (unsigned char *) realloc(gribbuffer, gribbuffersize); + datetime0 = datetime; + gribapiGetDataDateTime(gh, &(taxis->rdate), &(taxis->rtime)); + fcast = gribapiTimeIsFC(gh); + if (fcast) taxis->unit = gribapiGetTimeUnits(gh); + taxis->fdate = taxis->rdate; + taxis->ftime = taxis->rtime; + taxis->sdate = sdate; + taxis->stime = stime; + taxis->vdate = vdate; + taxis->vtime = vtime; } - long read = 0; - fdb_datareader_read(dr, gribbuffer, size, &read); - // printf("fdb_datareader_read: size=%ld/%ld\n", size, read); - if (read != size) + if (CDI_Debug) { - printf("fdb_datareader_read failed!\n"); - return 1; + char paramstr[32]; + cdiParamToString(param, paramstr, sizeof(paramstr)); + Message("%4d name=%s id=%s ltype=%d lev1=%d lev2=%d vdate=%lld vtime=%d", + recID + 1, varname, paramstr, leveltype1, level1, level2, vdate, vtime); } - // irecsize = size; - fdb_datareader_close(dr); - fdb_delete_datareader(dr); + var_tile_t *ptiles = memcmp(&tiles, &dummy_tiles, sizeof(var_tile_t)) ? &tiles : NULL; + int recpos = 0; + gribapiAddRecord(streamptr, param, gh, recsize, recpos, datatype, comptype, varname, leveltype1, leveltype2, + lbounds, level1, level2, level_sf, level_unit, &scanKeys, ptiles, 1, fdbItemList[recID]); + + grib_handle_delete(gh); + gh = NULL; } - goto Next; - Cleanup: + if (gh) grib_handle_delete(gh); - fdb_delete_listiterator(it); - fdb_delete_request(request); fdb_delete_handle(fdb); - free(item); -#endif + if (fdbItemList) free(fdbItemList); - Error("Function not implemented!"); + streamptr->rtsteps = 1; + + cdi_generate_vars(streamptr); + + taxis->type = fcast ? TAXIS_RELATIVE : TAXIS_ABSOLUTE; + int taxisID = taxisCreate(taxis->type); + // printf("1: %d %6d %d %6d %d %6d\n", taxis->rdate, taxis->rtime, taxis->sdate, taxis->stime, taxis->vdate, taxis->vtime); + + vlistDefTaxis(streamptr->vlistID, taxisID); + + streamScanResizeRecords1(streamptr); + + streamptr->record->buffer = gribbuffer; + streamptr->record->buffersize = buffersize; + + streamptr->ntsteps = 1; + //streamScanTsFixNtsteps(streamptr, recpos); + streamScanTimeConstAdjust(streamptr, taxis); + + if (timestepRecordOffset) free(timestepRecordOffset); +#endif return 0; } @@ -1122,7 +1442,7 @@ int gribapiScanTimestep1(stream_t *streamptr) void *gribbuffer = NULL; size_t buffersize = 0; DateTime datetime0 = { .date = 10101, .time = 0 }; - int nrecs_scanned = 0; //Only used for debug output. + int nrecs_scanned = 0; // Only used for debug output. bool warn_time = true; int fcast = 0; grib_handle *gh = NULL; @@ -1130,7 +1450,7 @@ int gribapiScanTimestep1(stream_t *streamptr) streamptr->curTsID = 0; const int tsID = tstepsNewEntry(streamptr); - if ( tsID != 0 ) Error("Internal problem! tstepsNewEntry returns %d", tsID); + if (tsID != 0) Error("Internal problem! tstepsNewEntry returns %d", tsID); taxis_t *taxis = &streamptr->tsteps[tsID].taxis; @@ -1139,7 +1459,6 @@ int gribapiScanTimestep1(stream_t *streamptr) unsigned nrecs = 0; while (true) { - int level1 = 0, level2 = 0; const size_t recsize = gribGetSize(fileID); recpos = fileGetPos(fileID); @@ -1164,6 +1483,7 @@ int gribapiScanTimestep1(stream_t *streamptr) const int param = gribapiGetParam(gh); int leveltype1 = -1, leveltype2 = -1, lbounds, level_sf, level_unit; var_tile_t tiles = dummy_tiles; + int level1 = 0, level2 = 0; gribGetLevel(gh, &leveltype1, &leveltype2, &lbounds, &level1, &level2, &level_sf, &level_unit, &tiles); char varname[256]; @@ -1181,7 +1501,7 @@ int gribapiScanTimestep1(stream_t *streamptr) datetime0 = datetime; gribapiGetDataDateTime(gh, &(taxis->rdate), &(taxis->rtime)); fcast = gribapiTimeIsFC(gh); - if ( fcast ) taxis->unit = gribapiGetTimeUnits(gh); + if (fcast) taxis->unit = gribapiGetTimeUnits(gh); taxis->fdate = taxis->rdate; taxis->ftime = taxis->rtime; taxis->sdate = sdate; @@ -1224,8 +1544,8 @@ int gribapiScanTimestep1(stream_t *streamptr) } var_tile_t *ptiles = memcmp(&tiles, &dummy_tiles, sizeof(var_tile_t)) ? &tiles : NULL; - gribapiAddRecord(streamptr, param, gh, recsize, recpos, datatype, comptype, varname, - leveltype1, leveltype2, lbounds, level1, level2, level_sf, level_unit, &scanKeys, ptiles, 1); + gribapiAddRecord(streamptr, param, gh, recsize, recpos, datatype, comptype, varname, leveltype1, leveltype2, + lbounds, level1, level2, level_sf, level_unit, &scanKeys, ptiles, 1, NULL); grib_handle_delete(gh); gh = NULL; diff --git a/src/stream_record.c b/src/stream_record.c index 2d9d588eadeb3ebb9c2a79f0ca00350f2eb464f7..66fd9d04e222b7dbd71df28fb59a5206eaad4c97 100644 --- a/src/stream_record.c +++ b/src/stream_record.c @@ -31,6 +31,7 @@ void recordInitEntry(record_t *record) memset(record->varname, 0, sizeof(record->varname)); varScanKeysInit(&record->scanKeys); memset(&record->tiles, 0, sizeof(record->tiles)); + record->fdbItem = NULL; }