Commit 82390dc6 authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Change resource handles to allow not block resource updates.

* Instead the update is tagged to be synchronized later.
* Secondary updates to resources are not yet handled by the PIO code
  it's meant for.
parent 3120badb
This diff is collapsed.
......@@ -80,7 +80,7 @@ void instituteDefaultEntries ( void )
/* (void) institutDef( 0, 0, "IPSL", "IPSL (Institut Pierre Simon Laplace, Paris, France)"); */
for ( i = 0; i < n ; i++ )
reshSetStatus(resH[i], &instituteOps, RESH_PRE_ASSIGNED);
reshSetStatus(resH[i], &instituteOps, RESH_IN_USE);
}
......
......@@ -113,8 +113,9 @@ void modelDefaultEntries ( void )
instID = institutInq( 0, 1, "NCEP", NULL);
resH[9] = modelDef(instID, 80, "T62L28MRF");
/* pre-defined models are not synchronized */
for ( i = 0; i < nDefModels ; i++ )
reshSetStatus(resH[i], &modelOps, RESH_PRE_ASSIGNED);
reshSetStatus(resH[i], &modelOps, RESH_IN_USE);
}
static
......
......@@ -154,7 +154,7 @@ reshListDestruct(int namespaceID)
for ( int j = 0; j < resHList[namespaceID].size; j++ )
{
listElem_t *listElem = resHList[namespaceID].resources + j;
if (listElem->status != RESH_UNUSED)
if (listElem->status & RESH_IN_USE_BIT)
listElem->res.v.ops->valDestroy(listElem->res.v.val);
}
free(resHList[namespaceID].resources);
......@@ -243,7 +243,7 @@ reshPut_(int nsp, int entry, void *p, const resOps *ops)
resHList[nsp].freeHead = next;
newListElem->res.v.val = p;
newListElem->res.v.ops = ops;
newListElem->status = RESH_ASSIGNED;
newListElem->status = RESH_DESYNC_IN_USE;
}
int reshPut ( void *p, const resOps *ops )
......@@ -276,7 +276,7 @@ reshRemove_(int nsp, int idx)
r[idx].res.free.next = curFree;
if (curFree != -1)
r[curFree].res.free.prev = idx;
r[idx].status = RESH_UNUSED;
r[idx].status = RESH_DESYNC_DELETED;
resHList[nsp].freeHead = idx;
}
......@@ -296,7 +296,7 @@ void reshRemove ( cdiResH resH, const resOps * ops )
xassert ( nspT.nsp == nsp
&& nspT.idx >= 0
&& nspT.idx < resHList[nsp].size
&& resHList[nsp].resources[nspT.idx].status != RESH_UNUSED
&& (resHList[nsp].resources[nspT.idx].status & RESH_IN_USE_BIT)
&& resHList[nsp].resources[nspT.idx].res.v.ops
&& resHList[nsp].resources[nspT.idx].res.v.ops == ops );
......@@ -317,7 +317,7 @@ void reshReplace(cdiResH resH, void *p, const resOps *ops)
while (resHList[nsp].size <= nspT.idx)
listSizeExtend();
listElem_t *q = resHList[nsp].resources + nspT.idx;
if (q->status != RESH_UNUSED)
if (q->status & RESH_IN_USE_BIT)
{
q->res.v.ops->valDestroy(q->res.v.val);
reshRemove_(nsp, nspT.idx);
......@@ -384,7 +384,7 @@ void reshGetResHListOfType ( int c, int * resHs, const resOps * ops )
nsp = namespaceGetActive ();
for ( i = 0; i < resHList[nsp].size && j < c; i++ )
if (resHList[nsp].resources[i].status != RESH_UNUSED
if ((resHList[nsp].resources[i].status & RESH_IN_USE_BIT)
&& resHList[nsp].resources[i].res.v.ops == ops)
resHs[j++] = namespaceIdxEncode2(nsp, i);
......@@ -404,7 +404,7 @@ cdiResHApply(enum cdiApplyRet (*func)(int id, void *res, const resOps *p,
int nsp = namespaceGetActive ();
enum cdiApplyRet ret = CDI_APPLY_GO_ON;
for (int i = 0; i < resHList[nsp].size && ret > 0; ++i)
if (resHList[nsp].resources[i].status != RESH_UNUSED)
if (resHList[nsp].resources[i].status & RESH_IN_USE_BIT)
ret = func(namespaceIdxEncode2(nsp, i),
resHList[nsp].resources[i].res.v.val,
resHList[nsp].resources[i].res.v.ops, data);
......@@ -428,7 +428,7 @@ cdiResHFilterApply(const resOps *p,
enum cdiApplyRet ret = CDI_APPLY_GO_ON;
listElem_t *r = resHList[nsp].resources;
for (int i = 0; i < resHList[nsp].size && ret > 0; ++i)
if (r[i].status != RESH_UNUSED && r[i].res.v.ops == p)
if ((r[i].status & RESH_IN_USE_BIT) && r[i].res.v.ops == p)
ret = func(namespaceIdxEncode2(nsp, i), r[i].res.v.val,
data);
LIST_UNLOCK();
......@@ -454,7 +454,7 @@ int reshCountType ( const resOps * ops )
listElem_t *r = resHList[nsp].resources;
for ( i = 0; i < resHList[nsp].size; i++ )
countType += (r[i].status != RESH_UNUSED && r[i].res.v.ops == ops);
countType += ((r[i].status & RESH_IN_USE_BIT) && r[i].res.v.ops == ops);
LIST_UNLOCK();
......@@ -481,25 +481,30 @@ reshPackResource(int resH, const resOps *ops,
static int getPackBufferSize(void *context)
{
int nsp, i;
int intpacksize, packBufferSize = 0;
nsp = namespaceGetActive ();
int nsp = namespaceGetActive ();
/* pack start marker, namespace and sererator marker */
packBufferSize += 3 * (intpacksize = serializeGetSize(1, DATATYPE_INT, context));
/* pack resources, type marker and seperator marker */
listElem_t *r = resHList[nsp].resources;
for ( i = 0; i < resHList[nsp].size; i++)
if (r[i].status == RESH_ASSIGNED)
for ( int i = 0; i < resHList[nsp].size; i++)
if (r[i].status & RESH_SYNC_BIT)
{
xassert ( r[i].res.v.ops );
/* message plus frame of 2 ints */
packBufferSize +=
r[i].res.v.ops->valGetPackSize(r[i].res.v.val, context)
+ 2 * intpacksize;
if (r[i].status == RESH_DESYNC_DELETED)
{
/* FIXME: pack resource deletion */
}
else if (r[i].status == RESH_DESYNC_IN_USE)
{
xassert ( r[i].res.v.ops );
/* message plus frame of 2 ints */
packBufferSize +=
r[i].res.v.ops->valGetPackSize(r[i].res.v.val, context)
+ 2 * intpacksize;
}
}
/* end marker */
packBufferSize += intpacksize;
......@@ -537,24 +542,25 @@ void reshPackBufferCreate(char **packBuffer, int *packBufferSize, void *context)
listElem_t *r = resHList[nsp].resources;
for ( i = 0; i < resHList[nsp].size; i++ )
if ( r[i].status == RESH_ASSIGNED)
if (r[i].status & RESH_SYNC_BIT)
{
listElem_t * curr = r + i;
xassert ( curr->res.v.ops );
type = curr->res.v.ops->valTxCode ();
if ( ! type ) continue;
serializePack( &type, 1, DATATYPE_INT, * packBuffer,
* packBufferSize, &packBufferPos, context);
curr->res.v.ops->valPack(curr->res.v.val,
pB, pBSize, &packBufferPos, context);
serializePack(&sep, 1, DATATYPE_INT, pB, pBSize, &packBufferPos, context);
curr->status = RESH_CLOSED;
if (r[i].status == RESH_DESYNC_DELETED)
{
/* FIXME: pack resource deletion */
}
else
{
listElem_t * curr = r + i;
xassert ( curr->res.v.ops );
type = curr->res.v.ops->valTxCode ();
if ( ! type ) continue;
serializePack(&type, 1, DATATYPE_INT, * packBuffer,
pBSize, &packBufferPos, context);
curr->res.v.ops->valPack(curr->res.v.val,
pB, pBSize, &packBufferPos, context);
serializePack(&sep, 1, DATATYPE_INT, pB, pBSize, &packBufferPos, context);
}
r[i].status &= ~RESH_SYNC_BIT;
}
LIST_UNLOCK();
......@@ -572,7 +578,7 @@ void reshSetStatus ( cdiResH resH, const resOps * ops, int status )
namespaceTuple_t nspT;
listElem_t * listElem;
xassert(ops && status != RESH_UNUSED);
xassert(ops && (status & RESH_IN_USE_BIT));
LIST_INIT(1);
......@@ -659,8 +665,8 @@ int reshListCompare ( int nsp0, int nsp1 )
*resources1 = resHList[nsp1].resources;
for (i = 0; i < listSizeMin; i++)
{
int occupied0 = resources0[i].status != RESH_UNUSED,
occupied1 = resources1[i].status != RESH_UNUSED;
int occupied0 = resources0[i].status & RESH_IN_USE_BIT != 0,
occupied1 = resources1[i].status & RESH_IN_USE_BIT != 0;
/* occupation mismatch ? */
int diff = occupied0 ^ occupied1;
valCompare |= (diff << cdiResHListOccupationMismatch);
......@@ -682,11 +688,11 @@ int reshListCompare ( int nsp0, int nsp1 )
}
/* find resources in nsp 0 beyond end of nsp 1 */
for (int j = listSizeMin; j < resHList[nsp0].size; ++j)
valCompare |= ((resources0[j].status != RESH_UNUSED)
valCompare |= (((resources0[j].status & RESH_IN_USE_BIT) != 0)
<< cdiResHListOccupationMismatch);
/* find resources in nsp 1 beyond end of nsp 0 */
for (; i < resHList[nsp1].size; ++i)
valCompare |= ((resources1[i].status != RESH_UNUSED)
valCompare |= (((resources1[i].status & RESH_IN_USE_BIT) != 0)
<< cdiResHListOccupationMismatch);
LIST_UNLOCK();
......@@ -725,7 +731,7 @@ void reshListPrint(FILE *fp)
for ( j = 0; j < resHList[i].size; j++ )
{
curr = resHList[i].resources + j;
if (curr->status != RESH_UNUSED)
if (!(curr->status & RESH_IN_USE_BIT))
{
curr->res.v.ops->valPrint(curr->res.v.val, fp);
fprintf ( fp, "\n" );
......
......@@ -34,10 +34,19 @@ typedef struct {
}resOps;
enum {
RESH_UNUSED, /* resource holds no value */
RESH_ASSIGNED, /* resource is user-assigned */
RESH_PRE_ASSIGNED, /* resource is pre-assigned by library */
RESH_CLOSED, /* resource is marked immutable */
RESH_IN_USE_BIT = 1 << 0,
RESH_SYNC_BIT = 1 << 1,
/* resource holds no value */
RESH_UNUSED = 0,
/* resource was deleted and needs to be synced */
RESH_DESYNC_DELETED
= RESH_SYNC_BIT,
/* resource is synchronized */
RESH_IN_USE
= RESH_IN_USE_BIT,
/* resource is in use, desynchronized and needs to be synced */
RESH_DESYNC_IN_USE
= RESH_IN_USE_BIT | RESH_SYNC_BIT,
};
void reshListCreate(int namespaceID);
......
......@@ -443,12 +443,6 @@ void streamDefByteorder(int streamID, int byteorder)
stream_check_ptr(__func__, streamptr);
if ( reshGetStatus ( streamID, &streamOps ) == RESH_CLOSED )
{
Warning("%s", "Operation not executed.");
return;
}
streamptr->byteorder = byteorder;
filetype = streamptr->filetype;
......@@ -482,6 +476,7 @@ void streamDefByteorder(int streamID, int byteorder)
}
#endif
}
reshSetStatus(streamID, &streamOps, RESH_DESYNC_IN_USE);
}
/*
......@@ -2223,37 +2218,29 @@ int streamInqVlistIDorig(int streamID)
void streamDefCompType(int streamID, int comptype)
{
stream_t *streamptr;
if ( reshGetStatus ( streamID, &streamOps ) == RESH_CLOSED )
{
Warning("%s", "Operation not executed.");
return;
}
streamptr = stream_to_pointer(streamID);
stream_t *streamptr = stream_to_pointer(streamID);
stream_check_ptr(__func__, streamptr);
streamptr->comptype = comptype;
if (streamptr->comptype != comptype)
{
streamptr->comptype = comptype;
reshSetStatus(streamID, &streamOps, RESH_DESYNC_IN_USE);
}
}
void streamDefCompLevel(int streamID, int complevel)
{
stream_t *streamptr;
if ( reshGetStatus ( streamID, &streamOps ) == RESH_CLOSED )
{
Warning("%s", "Operation not executed.");
return;
}
streamptr = stream_to_pointer(streamID);
stream_t *streamptr = stream_to_pointer(streamID);
stream_check_ptr(__func__, streamptr);
streamptr->complevel = complevel;
if (streamptr->complevel != complevel)
{
streamptr->complevel = complevel;
reshSetStatus(streamID, &streamOps, RESH_DESYNC_IN_USE);
}
}
......
......@@ -294,13 +294,15 @@ int taxisDuplicate(int taxisID1)
void taxisDefType(int taxisID, int type)
{
RETURN_IFCLOSED_TAXIS(taxisID);
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_check_ptr(__func__, taxisptr);
taxisptr->type = type;
if (taxisptr->type != type)
{
taxisptr->type = type;
reshSetStatus(taxisID, &taxisOps, RESH_DESYNC_IN_USE);
}
}
/*
......@@ -319,11 +321,15 @@ The function @func{taxisDefVdate} defines the verification date of a Time axis.
*/
void taxisDefVdate(int taxisID, int vdate)
{
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_t *taxisptr = (taxis_t *)reshGetVal(taxisID, &taxisOps);
taxis_check_ptr(__func__, taxisptr);
taxisptr->vdate = vdate;
if (taxisptr->vdate != vdate)
{
taxisptr->vdate = vdate;
reshSetStatus(taxisID, &taxisOps, RESH_DESYNC_IN_USE);
}
}
/*
......@@ -342,11 +348,15 @@ The function @func{taxisDefVtime} defines the verification time of a Time axis.
*/
void taxisDefVtime(int taxisID, int vtime)
{
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_t *taxisptr = (taxis_t *)reshGetVal(taxisID, &taxisOps);
taxis_check_ptr(__func__, taxisptr);
taxisptr->vtime = vtime;
if (taxisptr->vtime != vtime)
{
taxisptr->vtime = vtime;
reshSetStatus(taxisID, &taxisOps, RESH_DESYNC_IN_USE);
}
}
/*
......@@ -365,13 +375,15 @@ The function @func{taxisDefRdate} defines the reference date of a Time axis.
*/
void taxisDefRdate(int taxisID, int rdate)
{
RETURN_IFCLOSED_TAXIS(taxisID);
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_check_ptr(__func__, taxisptr);
taxisptr->rdate = rdate;
if (taxisptr->rdate != rdate)
{
taxisptr->rdate = rdate;
reshSetStatus(taxisID, &taxisOps, RESH_DESYNC_IN_USE);
}
}
/*
......@@ -390,13 +402,15 @@ The function @func{taxisDefRtime} defines the reference time of a Time axis.
*/
void taxisDefRtime(int taxisID, int rtime)
{
RETURN_IFCLOSED_TAXIS(taxisID);
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_check_ptr(__func__, taxisptr);
taxisptr->rtime = rtime;
if (taxisptr->rtime != rtime)
{
taxisptr->rtime = rtime;
reshSetStatus(taxisID, &taxisOps, RESH_DESYNC_IN_USE);
}
}
/*
......@@ -415,13 +429,15 @@ The function @func{taxisDefFdate} defines the forecast reference date of a Time
*/
void taxisDefFdate(int taxisID, int fdate)
{
RETURN_IFCLOSED_TAXIS(taxisID);
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_check_ptr(__func__, taxisptr);
taxisptr->fdate = fdate;
if (taxisptr->fdate != fdate)
{
taxisptr->fdate = fdate;
reshSetStatus(taxisID, &taxisOps, RESH_DESYNC_IN_USE);
}
}
/*
......@@ -440,13 +456,15 @@ The function @func{taxisDefFtime} defines the forecast reference time of a Time
*/
void taxisDefFtime(int taxisID, int ftime)
{
RETURN_IFCLOSED_TAXIS(taxisID);
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_check_ptr(__func__, taxisptr);
taxisptr->ftime = ftime;
if (taxisptr->ftime != ftime)
{
taxisptr->ftime = ftime;
reshSetStatus(taxisID, &taxisOps, RESH_DESYNC_IN_USE);
}
}
/*
......@@ -467,61 +485,71 @@ The function @func{taxisDefCalendar} defines the calendar of a Time axis.
*/
void taxisDefCalendar(int taxisID, int calendar)
{
RETURN_IFCLOSED_TAXIS(taxisID);
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_check_ptr(__func__, taxisptr);
taxisptr->calendar = calendar;
if (taxisptr->calendar != calendar)
{
taxisptr->calendar = calendar;
reshSetStatus(taxisID, &taxisOps, RESH_DESYNC_IN_USE);
}
}
void taxisDefTunit(int taxisID, int unit)
{
RETURN_IFCLOSED_TAXIS(taxisID);
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_check_ptr(__func__, taxisptr);
taxisptr->unit = unit;
if (taxisptr->unit != unit)
{
taxisptr->unit = unit;
reshSetStatus(taxisID, &taxisOps, RESH_DESYNC_IN_USE);
}
}
void taxisDefForecastTunit(int taxisID, int unit)
{
RETURN_IFCLOSED_TAXIS(taxisID);
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_t *taxisptr = (taxis_t *)reshGetVal(taxisID, &taxisOps);
taxis_check_ptr(__func__, taxisptr);
taxisptr->fc_unit = unit;
if (taxisptr->fc_unit != unit)
{
taxisptr->fc_unit = unit;
reshSetStatus(taxisID, &taxisOps, RESH_DESYNC_IN_USE);
}
}
void taxisDefForecastPeriod(int taxisID, double fc_period)
{
RETURN_IFCLOSED_TAXIS(taxisID);
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_check_ptr(__func__, taxisptr);
taxisptr->fc_period = fc_period;
if (taxisptr->fc_period != fc_period)
{
taxisptr->fc_period = fc_period;
reshSetStatus(taxisID, &taxisOps, RESH_DESYNC_IN_USE);
}
}
void taxisDefNumavg(int taxisID, int numavg)
{
RETURN_IFCLOSED_TAXIS(taxisID);
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_check_ptr(__func__, taxisptr);
taxisptr->numavg = numavg;
if (taxisptr->numavg != numavg)
{
taxisptr->numavg = numavg;
reshSetStatus(taxisID, &taxisOps, RESH_DESYNC_IN_USE);
}
}
/*
......@@ -530,7 +558,7 @@ The valid CDI time types are TAXIS_ABSOLUTE and TAXIS_RELATIVE.
*/
int taxisInqType(int taxisID)
{
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_t *taxisptr = (taxis_t *)reshGetVal(taxisID, &taxisOps);
taxis_check_ptr(__func__, taxisptr);
......@@ -540,7 +568,7 @@ int taxisInqType(int taxisID)
int taxisHasBounds(int taxisID)
{
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_t *taxisptr = (taxis_t *)reshGetVal(taxisID, &taxisOps);
taxis_check_ptr(__func__, taxisptr);
......@@ -550,25 +578,27 @@ int taxisHasBounds(int taxisID)
void taxisDeleteBounds(int taxisID)
{
RETURN_IFCLOSED_TAXIS(taxisID);
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_check_ptr(__func__, taxisptr);
taxisptr->has_bounds = FALSE;
if (taxisptr->has_bounds != FALSE)
{
taxisptr->has_bounds = FALSE;
reshSetStatus(taxisID, &taxisOps, RESH_DESYNC_IN_USE);
}
}
void taxisCopyTimestep(int taxisID2, int taxisID1)
{
taxis_t *taxisptr1 = ( taxis_t * ) reshGetVal ( taxisID1, &taxisOps );
taxis_t *taxisptr2 = ( taxis_t * ) reshGetVal ( taxisID2, &taxisOps );
taxis_t *taxisptr1 = (taxis_t *)reshGetVal(taxisID1, &taxisOps),
*taxisptr2 = (taxis_t *)reshGetVal(taxisID2, &taxisOps);
taxis_check_ptr(__func__, taxisptr1);
taxis_check_ptr(__func__, taxisptr2);
reshLock ();
reshLock();
taxisptr2->rdate = taxisptr1->rdate;
taxisptr2->rtime = taxisptr1->rtime;
......@@ -590,7 +620,8 @@ void taxisCopyTimestep(int taxisID2, int taxisID1)
taxisptr2->fc_unit = taxisptr1->fc_unit;
taxisptr2->fc_period = taxisptr1->fc_period;
reshUnlock ();
reshSetStatus(taxisID2, &taxisOps, RESH_DESYNC_IN_USE);
reshUnlock();
}
/*
......@@ -611,7 +642,7 @@ The function @func{taxisInqVdate} returns the verification date of a Time axis.
*/
int taxisInqVdate(int taxisID)
{
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_t *taxisptr = (taxis_t *)reshGetVal(taxisID, &taxisOps);
taxis_check_ptr(__func__, taxisptr);
......@@ -621,7 +652,7 @@ int taxisInqVdate(int taxisID)
void taxisInqVdateBounds(int taxisID, int *vdate_lb, int *vdate_ub)
{
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_t *taxisptr = (taxis_t *)reshGetVal(taxisID, &taxisOps);
taxis_check_ptr(__func__, taxisptr);
......@@ -632,16 +663,19 @@ void taxisInqVdateBounds(int taxisID, int *vdate_lb, int *vdate_ub)
void taxisDefVdateBounds(int taxisID, int vdate_lb, int vdate_ub)
{
RETURN_IFCLOSED_TAXIS(taxisID);
taxis_t *taxisptr = ( taxis_t * ) reshGetVal ( taxisID, &taxisOps );
taxis_check_ptr(__func__, taxisptr);