Commit 8db485dc authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Fix locking problem.

parent 361a8ee1
......@@ -254,7 +254,6 @@ typedef struct {
#else
void *gribContainers;
#endif
int vlistIDorig;
void *gh; // grib handle
}
......@@ -344,9 +343,9 @@ void streamDefineTaxis(int streamID);
int streamsNewEntry(int filetype);
void streamsInitEntry(int streamID);
void cdiStreamSetupVlist(stream_t *streamptr, int vlistID, int vlistIDorig);
void cdiStreamSetupVlist(stream_t *streamptr, int vlistID);
/* default implementation of the overridable function */
void cdiStreamSetupVlist_(stream_t *streamptr, int vlistID, int vlistIDorig);
void cdiStreamSetupVlist_(stream_t *streamptr, int vlistID);
int stream_new_var(stream_t *streamptr, int gridID, int zaxisID, int tilesetID);
int tstepsNewEntry(stream_t *streamptr);
......@@ -403,7 +402,7 @@ char *cdiUnescapeSpaces(const char *string, const char **outStringEnd);
struct streamAssoc
{
int streamID, vlistID, vlistIDorig;
int streamID, vlistID;
};
struct streamAssoc
......
......@@ -428,27 +428,6 @@ void *reshGetValue(const char * caller, const char* expressionString, cdiResH re
return reshGetElem(caller, expressionString, resH, ops)->res.v.val;
}
int
reshEntryExists(cdiResH resH, const resOps * ops)
{
int nsp;
namespaceTuple_t nspT;
listElem_t *listElem = NULL;
xassert ( ops );
LIST_INIT(1);
LIST_LOCK();
nsp = namespaceGetActive ();
nspT = namespaceResHDecode ( resH );
LIST_UNLOCK();
int found = 0;
if (nspT.nsp == nsp &&
nspT.idx < resHList[nsp].size)
listElem = resHList[nsp].resources + nspT.idx;
LIST_UNLOCK();
if ( listElem && (listElem->res.v.ops == ops) ) found = 1;
return found;
}
/**************************************************************/
void reshGetResHListOfType(unsigned numIDs, int resHs[], const resOps *ops)
......
......@@ -62,9 +62,6 @@ unsigned reshCountType(const resOps *resTypeOps);
void * reshGetValue(const char* caller, const char* expressionString, cdiResH id, const resOps* ops);
#define reshGetVal(resH, ops) reshGetValue(__func__, #resH, resH, ops)
int reshEntryExists(cdiResH id, const resOps* ops);
#define reshExists(resH, ops) reshEntryExists(resH, ops)
void reshGetResHListOfType(unsigned numIDs, int IDs[], const resOps *ops);
enum cdiApplyRet {
......
......@@ -93,8 +93,8 @@ int reshUnpackResources(char * unpackBuffer, int unpackBufferSize,
for (int i = 0; i < numAssociations; ++i)
{
cdiStreamSetupVlist(stream_to_pointer(associations[i].streamID),
namespaceAdaptKey(associations[i].vlistID, originNamespace),
namespaceAdaptKey(associations[i].vlistIDorig, originNamespace));
namespaceAdaptKey(associations[i].vlistID,
originNamespace));
}
Free(associations);
return unpackBufferPos;
......
......@@ -868,7 +868,6 @@ void streamDefaultValue ( stream_t * streamptr )
for ( i = 0; i < MAX_GRIDS_PS; i++ ) streamptr->ncavarID[i] = CDI_UNDEFID;
streamptr->gribContainers = NULL;
streamptr->vlistIDorig = CDI_UNDEFID;
}
......@@ -983,7 +982,6 @@ The function @func{streamClose} closes an open dataset.
void streamClose(int streamID)
{
int index;
int vlistID;
stream_t *streamptr = stream_to_pointer(streamID);
stream_check_ptr(__func__, streamptr);
......@@ -991,7 +989,7 @@ void streamClose(int streamID)
if ( CDI_Debug )
Message("streamID = %d filename = %s", streamID, streamptr->filename);
vlistID = streamptr->vlistID;
int vlistID = streamptr->vlistID;
void (*streamCloseDelegate)(stream_t *streamptr, int recordBufIsToBeDeleted)
= (void (*)(stream_t *, int))
......@@ -1045,17 +1043,8 @@ void streamClose(int streamID)
taxisDestroy(vlistInqTaxis(vlistID));
}
vlist_unlock(vlistID);
vlistDestroy(vlistID);
/* decrease lock counter of the original vlist by 1 */
if ( streamptr->vlistIDorig != CDI_UNDEFID ) {
/* Note: Here we have to check if the original vlist still
* exists. If, for example, the garbage collection routine
* reshListDestruct takes care of the destruction of objects,
* then the original vlist might have been destroyed
* beforehand. */
if (reshExists(streamptr->vlistIDorig, &vlistOps) != 0)
vlist_unlock(streamptr->vlistIDorig);
}
}
stream_delete_entry(streamptr);
......@@ -2035,7 +2024,7 @@ cdiStreamDefVlist_(int streamID, int vlistID)
stream_check_ptr(__func__, streamptr);
if ( streamptr->vlistID == CDI_UNDEFID )
cdiStreamSetupVlist(streamptr, vlistDuplicate(vlistID), vlistID);
cdiStreamSetupVlist(streamptr, vlistDuplicate(vlistID));
else
Warning("vlist already defined for %s!", streamptr->filename);
}
......@@ -2150,29 +2139,21 @@ int streamTxCode(void)
return STREAM;
}
void cdiStreamSetupVlist(stream_t *streamptr, int vlistID, int vlistIDorig)
void
cdiStreamSetupVlist(stream_t *streamptr, int vlistID)
{
void (*myStreamSetupVlist)(stream_t *streamptr, int vlistID, int vlistIDorig)
= (void (*)(stream_t *, int, int))
void (*myStreamSetupVlist)(stream_t *streamptr, int vlistID)
= (void (*)(stream_t *, int))
namespaceSwitchGet(NSSWITCH_STREAM_SETUP_VLIST).func;
myStreamSetupVlist(streamptr, vlistID, vlistIDorig);
myStreamSetupVlist(streamptr, vlistID);
}
void
cdiStreamSetupVlist_(stream_t *streamptr, int vlistID, int vlistIDorig)
cdiStreamSetupVlist_(stream_t *streamptr, int vlistID)
{
vlist_lock(vlistID);
int nvars = vlistNvars(vlistID);
streamptr->vlistID = vlistID;
streamptr->vlistIDorig = vlistIDorig;
/* increase lock counter of the original vlist by 1 */
if ( CDI_Debug ) Message("attach vlist=%d to stream, stream vlist=%d", vlistIDorig, vlistID);
vlist_lock(vlistIDorig);
vlist_unlock(vlistID); // why is this vlistID sometimes locked?
// without this line "cdo splitmon" gives
// Destroying of a locked object (vlistID=xx) failed
for (int varID = 0; varID < nvars; varID++ )
{
int gridID = vlistInqVarGrid(vlistID, varID);
......@@ -2233,8 +2214,6 @@ static int streamCompareP(void * streamptr1, void * streamptr2)
xassert ( s2 );
if ( s1->filetype != s2->filetype ) return differ;
if ( namespaceAdaptKey2 ( s1->vlistIDorig ) !=
namespaceAdaptKey2 ( s2->vlistIDorig )) return differ;
if ( s1->byteorder != s2->byteorder ) return differ;
if ( s1->comptype != s2->comptype ) return differ;
if ( s1->complevel != s2->complevel ) return differ;
......@@ -2307,7 +2286,7 @@ void streamPrintP ( void * streamptr, FILE * fp )
}
enum {
streamNint = 11,
streamNint = 10,
};
static int
......@@ -2335,13 +2314,12 @@ streamPack(void * streamptr, void * packBuffer, int packBufferSize,
intBuffer[1] = streamP->filetype;
intBuffer[2] = (int)strlen(streamP->filename) + 1;
intBuffer[3] = streamP->vlistID;
intBuffer[4] = streamP->vlistIDorig;
intBuffer[5] = streamP->byteorder;
intBuffer[6] = streamP->comptype;
intBuffer[7] = streamP->complevel;
intBuffer[8] = streamP->unreduced;
intBuffer[9] = streamP->sortname;
intBuffer[10] = streamP->have_missval;
intBuffer[4] = streamP->byteorder;
intBuffer[5] = streamP->comptype;
intBuffer[6] = streamP->complevel;
intBuffer[7] = streamP->unreduced;
intBuffer[8] = streamP->sortname;
intBuffer[9] = streamP->have_missval;
serializePack(intBuffer, streamNint, DATATYPE_INT, packBuffer, packBufferSize, packBufferPos, context);
uint32_t d = cdiCheckSum(DATATYPE_INT, streamNint, intBuffer);
......@@ -2377,14 +2355,14 @@ streamUnpack(char * unpackBuffer, int unpackBufferSize,
int targetStreamID = namespaceAdaptKey(intBuffer[0], originNamespace),
streamID = streamOpenID(filename, 'w', intBuffer[1], targetStreamID);
xassert(streamID >= 0 && targetStreamID == streamID);
streamDefByteorder(streamID, intBuffer[5]);
streamDefCompType(streamID, intBuffer[6]);
streamDefCompLevel(streamID, intBuffer[7]);
streamDefByteorder(streamID, intBuffer[4]);
streamDefCompType(streamID, intBuffer[5]);
streamDefCompLevel(streamID, intBuffer[6]);
stream_t *streamptr = stream_to_pointer(streamID);
streamptr->unreduced = intBuffer[8];
streamptr->sortname = intBuffer[9];
streamptr->have_missval = intBuffer[10];
struct streamAssoc retval = { streamID, intBuffer[3], intBuffer[4] };
streamptr->unreduced = intBuffer[7];
streamptr->sortname = intBuffer[8];
streamptr->have_missval = intBuffer[9];
struct streamAssoc retval = { streamID, intBuffer[3] };
return retval;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment