diff --git a/src/grb_read.c b/src/grb_read.c index 77843acd7b73b0eb62fdc598f17842b526b400b9..9570246fc693c469c9a530d882fbe3c4272f0acc 100644 --- a/src/grb_read.c +++ b/src/grb_read.c @@ -123,79 +123,18 @@ int grb_decode_record(void *untypedArgs) return 0; } -#ifdef HAVE_LIBFDB5 -static -int fdb_request_add1(fdb_request_t *req, const char *param, const char *value) -{ - return fdb_request_add(req, param, &value, 1); -} - -static -void listitem_to_request(const char *item, fdb_request_t *request) -{ - char *tmpItem = strdup(item); - int itemArgc = 0; - char *itemArgv[32]; - int len = strlen(tmpItem); - int start = (*tmpItem == '{'); - itemArgv[0] = tmpItem + start; - for (int i = start; i < len; i++) - { - if (tmpItem[i] == ',') - { - tmpItem[i] = 0; - itemArgc++; - itemArgv[itemArgc] = &tmpItem[++i]; - } - else if (tmpItem[i] == '}') - { - tmpItem[i] = 0; - itemArgc++; - if (tmpItem[i + 1] == '{') - { - itemArgv[itemArgc] = &tmpItem[i + 2]; - i += 2; - } - } - } - - //for (int i = 0; i < itemArgc; i++) printf("%d <%s>\n", i, itemArgv[i]); - - for (int i = 0; i < itemArgc; i++) - { - char *value = NULL; - char *keyValue = itemArgv[i]; - len = strlen(keyValue); - for (int k = 0; k < len; k++) - if (keyValue[k] == '=') - { - keyValue[k] = 0; - value = &keyValue[k + 1]; - break; - } - - //printf("key <%s> value <%s>\n", keyValue, value); - fdb_request_add1(request, keyValue, value); - } - - free(tmpItem); -} -#endif - static DecodeArgs grb_read_raw_data(stream_t *streamptr, int recID, int memtype, void *gribbuffer, void *data, bool resetFilePos) { const int vlistID = streamptr->vlistID; const int tsID = streamptr->curTsID; // FIXME: This should be looked up from the given recID const int varID = streamptr->tsteps[tsID].records[recID].varID; - const size_t recsize = streamptr->tsteps[tsID].records[recID].size; + size_t recsize = streamptr->tsteps[tsID].records[recID].size; const int gridID = vlistInqVarGrid(vlistID, varID); const size_t gridsize = gridInqSize(gridID); if (CDI_Debug) Message("gridID = %d gridsize = %zu", gridID, gridsize); - if (recsize == 0) Error("Internal problem! Recordsize is zero for record %d at timestep %d", recID+1, tsID+1); - void *cgribexp = (gribbuffer && streamptr->record->cgribexp) ? streamptr->record->cgribexp : NULL; if (!gribbuffer) gribbuffer = Malloc(streamptr->record->buffersize); if (!data) data = Malloc(gridsize * (memtype == MEMTYPE_FLOAT ? sizeof(float) : sizeof(double))); @@ -203,33 +142,14 @@ DecodeArgs grb_read_raw_data(stream_t *streamptr, int recID, int memtype, void * if (streamptr->fdbRetrieve) { void *fdbItem = streamptr->tsteps[tsID].records[recID].fdbItem; - printf("fdbItem = %s\n", (char*)fdbItem); + if (!fdbItem) Error("fdbItem not available!"); #ifdef HAVE_LIBFDB5 fdb_handle_t *fdb; fdb_new_handle(&fdb); - fdb_datareader_t *dr; - fdb_new_datareader(&dr); - fdb_request_t *singleRequest; - fdb_new_request(&singleRequest); - listitem_to_request(fdbItem, singleRequest); - int status = fdb_retrieve(fdb, singleRequest, dr); - fdb_delete_request(singleRequest); - if (status != FDB_SUCCESS) Error("fdb_retrieve failed!"); - - long drRecsize; - fdb_datareader_open(dr, &drRecsize); - if (drRecsize == 0) Error("fdb_datareader empty!"); - if (drRecsize > recsize) Error("internal problem: wrong recsize"); - - long read = 0; - fdb_datareader_read(dr, gribbuffer, drRecsize, &read); - // printf("fdb_datareader_read: size=%ld/%ld\n", recsize, read); - if (read != drRecsize) Error("fdb_datareader_read failed!"); - - fdb_datareader_close(dr); - fdb_delete_datareader(dr); + long fdb_read_record(fdb_handle_t *fdb, char *item, size_t *buffersize, void **gribbuffer); + recsize = fdb_read_record(fdb, fdbItem, &(streamptr->record->buffersize), &gribbuffer); fdb_delete_handle(fdb); #endif @@ -239,6 +159,8 @@ DecodeArgs grb_read_raw_data(stream_t *streamptr, int recID, int memtype, void * const int fileID = streamptr->fileID; const off_t recpos = streamptr->tsteps[tsID].records[recID].position; + if (recsize == 0) Error("Internal problem! Recordsize is zero for record %d at timestep %d", recID+1, tsID+1); + if (resetFilePos) { const off_t currentfilepos = fileGetPos(fileID); diff --git a/src/stream.c b/src/stream.c index 8815310f90ef0ba1a5f101fd7600645336c5d14c..2d47e40fa27e1d9b2c5cb8d7613a8bebb4653c40 100644 --- a/src/stream.c +++ b/src/stream.c @@ -1011,8 +1011,8 @@ void cdiStreamCloseDefaultDelegate(stream_t *streamptr, int recordBufIsToBeDelet { if (streamptr->fdbStore || streamptr->fdbRetrieve) return; - int fileID = streamptr->fileID; - int filetype = streamptr->filetype; + const int fileID = streamptr->fileID; + const int filetype = streamptr->filetype; if (fileID == CDI_UNDEFID) Warning("File %s not open!", streamptr->filename); @@ -1151,22 +1151,22 @@ void streamClose(int streamID) Free(streamptr->vars); streamptr->vars = NULL; - for ( int index = 0; index < streamptr->ntsteps; ++index ) + for (int index = 0; index < streamptr->ntsteps; ++index) { - if ( streamptr->tsteps[index].records ) Free(streamptr->tsteps[index].records); - if ( streamptr->tsteps[index].recIDs ) Free(streamptr->tsteps[index].recIDs); - taxisDestroyKernel(&streamptr->tsteps[index].taxis); + tsteps_t *tstep = &(streamptr->tsteps[index]); + if (tstep->records->fdbItem) free(tstep->records->fdbItem); + if (tstep->records) Free(tstep->records); + if (tstep->recIDs) Free(tstep->recIDs); + taxisDestroyKernel(&(tstep->taxis)); } - if ( streamptr->tsteps ) Free(streamptr->tsteps); + if (streamptr->tsteps) Free(streamptr->tsteps); - if ( streamptr->basetime.timevar_cache ) Free(streamptr->basetime.timevar_cache); + if (streamptr->basetime.timevar_cache) Free(streamptr->basetime.timevar_cache); - if ( vlistID != -1 ) + if (vlistID != -1) { - if ( streamptr->filemode != 'w' && vlistInqTaxis(vlistID) != -1 ) - taxisDestroy(vlistInqTaxis(vlistID)); - + if (streamptr->filemode != 'w' && vlistInqTaxis(vlistID) != -1) taxisDestroy(vlistInqTaxis(vlistID)); cdiVlistDestroy_(vlistID); } diff --git a/src/stream_grb.c b/src/stream_grb.c index 57fc327cf3b940473e7691c22a5f5f4aed9e5cb0..b9d6a19151ddf7a44d6675be4bc76173d8b745d5 100644 --- a/src/stream_grb.c +++ b/src/stream_grb.c @@ -338,35 +338,17 @@ int grbInqContents(stream_t * streamptr) int fdbInqContents(stream_t * streamptr) { streamptr->curTsID = 0; - - int status = fdbScanTimestep1(streamptr); - if (status == 0 && streamptr->ntsteps == -1) status = fdbScanTimestep2(streamptr); - - // const int fileID = streamptr->fileID; - // fileSetPos(fileID, 0, SEEK_SET); - + int status = fdbScanTimesteps(streamptr); return status; } #endif int fdbInqTimestep(stream_t *streamptr, int tsID) { - if (tsID == 0 && streamptr->rtsteps == 0) - Error("Call to cdiInqContents missing!"); + if (tsID == 0 && streamptr->rtsteps == 0) Error("Call to cdiInqContents missing!"); if (CDI_Debug) Message("tsid = %d rtsteps = %d", tsID, streamptr->rtsteps); - int ntsteps = CDI_UNDEFID; - while ((tsID + 1) > streamptr->rtsteps && ntsteps == CDI_UNDEFID) - { - ntsteps = fdbScanTimestep(streamptr); - if (ntsteps == CDI_EUFSTRUCT) - { - streamptr->ntsteps = streamptr->rtsteps; - break; - } - } - int nrecs; if (tsID >= streamptr->ntsteps && streamptr->ntsteps != CDI_UNDEFID) { diff --git a/src/stream_gribapi.c b/src/stream_gribapi.c index 14e37cf0300d233cbe3446d852776b348c69accb..c82b662af333ce8367eb0260b57c5b2b9c8fef8c 100644 --- a/src/stream_gribapi.c +++ b/src/stream_gribapi.c @@ -1076,35 +1076,35 @@ int fdb_fill_itemlist(fdb_handle_t *fdb, fdb_request_t *request, char ***itemLis return numItems; } -static + long fdb_read_record(fdb_handle_t *fdb, char *item, size_t *buffersize, void **gribbuffer) { - Message("item = %s", item); + //Message("%s", item); - fdb_datareader_t *dr = NULL; - fdb_new_datareader(&dr); + fdb_datareader_t *dataReader = NULL; + fdb_new_datareader(&dataReader); fdb_request_t *singleRequest = NULL; fdb_new_request(&singleRequest); fdbitem_to_request(item, singleRequest); - int status = fdb_retrieve(fdb, singleRequest, dr); + int status = fdb_retrieve(fdb, singleRequest, dataReader); fdb_delete_request(singleRequest); if (status != FDB_SUCCESS) Error("fdb_retrieve failed!"); - long recsize = 0; - fdb_datareader_open(dr, &recsize); - if (recsize == 0) Error("fdb_datareader empty!"); + long recordSize = 0; + fdb_datareader_open(dataReader, &recordSize); + if (recordSize == 0) Error("fdb_datareader empty!"); - ensureBufferSize(recsize, buffersize, gribbuffer); + ensureBufferSize(recordSize, buffersize, gribbuffer); - long read = 0; - fdb_datareader_read(dr, *gribbuffer, recsize, &read); - // printf("fdb_datareader_read: size=%ld/%ld\n", recsize, read); - if (read != recsize) Error("fdb_datareader_read failed!"); + long readSize = 0; + fdb_datareader_read(dataReader, *gribbuffer, recordSize, &readSize); + // printf("fdb_datareader_read: size=%ld/%ld\n", recordSize, readSize); + if (readSize != recordSize) Error("fdb_datareader_read failed!"); - fdb_datareader_close(dr); - fdb_delete_datareader(dr); + fdb_datareader_close(dataReader); + fdb_delete_datareader(dataReader); - return recsize; + return recordSize; } static @@ -1271,7 +1271,7 @@ int remove_duplicate_timesteps(RecordInfoEntry *recordInfoList, int numRecords, #endif -int fdbScanTimestep1(stream_t *streamptr) +int fdbScanTimesteps(stream_t *streamptr) { #ifdef HAVE_LIBFDB5 void *gribbuffer = NULL; @@ -1314,7 +1314,6 @@ int fdbScanTimestep1(stream_t *streamptr) Message("numRecords=%d numTimesteps=%d", numRecords, numTimesteps); for (int i = 0; i < numItems; ++i) if (keyValueList[i].item) free(keyValueList[i].item); - free(recordInfoList); free(keyValueList); if (numRecords == 0) return CDI_EUFSTRUCT; @@ -1336,7 +1335,7 @@ int fdbScanTimestep1(stream_t *streamptr) for (int recID = 0; recID < numRecords; recID++) { - long recsize = fdb_read_record(fdb, fdbItemList[recID], &buffersize, &gribbuffer); + const long recsize = fdb_read_record(fdb, fdbItemList[recID], &buffersize, &gribbuffer); int datatype, comptype = 0; gh = gribapiGetDiskRepresentation(recsize, &buffersize, &gribbuffer, &datatype, &comptype); @@ -1385,6 +1384,7 @@ int fdbScanTimestep1(stream_t *streamptr) int recpos = 0; gribapiAddRecord(streamptr, param, gh, recsize, recpos, datatype, comptype, varname, leveltype1, leveltype2, lbounds, level1, level2, level_sf, level_unit, &scanKeys, ptiles, 1, fdbItemList[recID]); + fdbItemList[recID] = NULL; grib_handle_delete(gh); gh = NULL; @@ -1394,9 +1394,7 @@ int fdbScanTimestep1(stream_t *streamptr) fdb_delete_handle(fdb); - if (fdbItemList) free(fdbItemList); - - streamptr->rtsteps = 1; + //streamptr->rtsteps = 1; cdi_generate_vars(streamptr); @@ -1411,27 +1409,60 @@ int fdbScanTimestep1(stream_t *streamptr) streamptr->record->buffer = gribbuffer; streamptr->record->buffersize = buffersize; - streamptr->ntsteps = 1; - //streamScanTsFixNtsteps(streamptr, recpos); + if (numTimesteps == 1) streamptr->ntsteps = 1; streamScanTimeConstAdjust(streamptr, taxis); - if (timestepRecordOffset) free(timestepRecordOffset); -#endif + for (int tsID = 1; tsID < numTimesteps; tsID++) + { + const int recordOffset = timestepRecordOffset[tsID]; + const int vdate = recordInfoList[recordOffset].date; + const int vtime = recordInfoList[recordOffset].time * 100; + // printf("timestep=%d recOffset=%d date=%d time=%d\n", tsID + 1, recordOffset, vdate, vtime); - return 0; -} + const int tsIDnew = tstepsNewEntry(streamptr); + if (tsIDnew != tsID) Error("Internal error. tsID = %d", tsID); + streamptr->tsteps[tsID-1].next = true; + streamptr->tsteps[tsID].position = 0; -int fdbScanTimestep2(stream_t *streamptr) -{ - Error("Function not implemented!"); - return 0; -} + taxis = &streamptr->tsteps[tsID].taxis; + cdi_create_records(streamptr, tsID); + record_t *records = streamptr->tsteps[tsID].records; + + int nrecs = (tsID == 1) ? streamScanInitRecords2(streamptr) : streamScanInitRecords(streamptr, tsID); + if (nrecs != numRecords) Error("Internal error. nrecs = %d", nrecs); + + taxis->vdate = vdate; + taxis->vtime = vtime; + + int rindex = 0; + for (int recID = 0; recID < numRecords; recID++) + { + records[recID].used = true; + streamptr->tsteps[tsID].recIDs[rindex] = recID; + rindex++; + + records[recID].position = 0; + records[recID].size = 0; + records[recID].fdbItem = fdbItemList[recordOffset + recID]; + fdbItemList[recordOffset + recID] = NULL; + } + + if (tsID == 1) streamptr->tsteps[1].nrecs = numRecords; + } + + streamptr->rtsteps = numTimesteps; + streamptr->ntsteps = numTimesteps; + + for (int i = 0; i < numItems; ++i) + if (fdbItemList[i]) free(fdbItemList[i]); + + if (fdbItemList) free(fdbItemList); + if (recordInfoList) free(recordInfoList); + if (timestepRecordOffset) free(timestepRecordOffset); +#endif -int fdbScanTimestep(stream_t *streamptr) -{ - Error("Function not implemented!"); return 0; } @@ -1768,7 +1799,7 @@ int gribapiScanTimestep(stream_t *streamptr) int nrecs = 0; int vlistID = streamptr->vlistID; - int tsID = streamptr->rtsteps; + int tsID = streamptr->rtsteps; taxis_t *taxis = &streamptr->tsteps[tsID].taxis; if (streamptr->tsteps[tsID].recordSize == 0) @@ -1785,7 +1816,7 @@ int gribapiScanTimestep(stream_t *streamptr) fileSetPos(fileID, streamptr->tsteps[tsID].position, SEEK_SET); - int nrecs_scanned = streamptr->tsteps[0].nallrecs + streamptr->tsteps[1].nrecs*(tsID-1); //Only used for debug output. + int nrecs_scanned = streamptr->tsteps[0].nallrecs + streamptr->tsteps[1].nrecs*(tsID-1); // Only used for debug output. int rindex = 0; off_t recpos = 0; DateTime datetime0 = { LONG_MIN, LONG_MIN }; diff --git a/src/stream_gribapi.h b/src/stream_gribapi.h index 404808ee41c8cfb952c999cbfc27920141116118..d3cdbe54a3115c24820f5affae7c4406d1e5ff33 100644 --- a/src/stream_gribapi.h +++ b/src/stream_gribapi.h @@ -5,9 +5,7 @@ #include "gribapi.h" -int fdbScanTimestep1(stream_t *streamptr); -int fdbScanTimestep2(stream_t *streamptr); -int fdbScanTimestep(stream_t *streamptr); +int fdbScanTimesteps(stream_t *streamptr); int gribapiScanTimestep1(stream_t *streamptr); int gribapiScanTimestep2(stream_t *streamptr);