Commit 92031e6e authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Also serialize resource deletions.

* This is part of the bits needed to facilitate multiple/incremental
  resource updates on I/O servers.
parent 82390dc6
......@@ -130,7 +130,7 @@ reshListCreate(int namespaceID)
LIST_LOCK();
if (resHListSize <= namespaceID)
{
resHList = (struct resHList_t*) xrealloc(resHList, (namespaceID + 1) * sizeof (resHList[0]));
resHList = (struct resHList_t *)xrealloc(resHList, (namespaceID + 1) * sizeof (resHList[0]));
for (int i = resHListSize; i <= namespaceID; ++i)
reshListClearEntry(i);
resHListSize = namespaceID + 1;
......@@ -209,26 +209,26 @@ void listSizeExtend()
{
int nsp = namespaceGetActive ();
int oldSize = resHList[nsp].size;
int newListSize = oldSize + MIN_LIST_SIZE;
size_t newListSize = (size_t)oldSize + MIN_LIST_SIZE;
resHList[nsp].resources = (listElem_t*) xrealloc(resHList[nsp].resources,
newListSize * sizeof(listElem_t));
listElem_t *r = resHList[nsp].resources;
for (int i = oldSize; i < newListSize; ++i)
for (size_t i = (size_t)oldSize; i < newListSize; ++i)
{
r[i].res.free.next = i + 1;
r[i].res.free.prev = i - 1;
r[i].res.free.next = (int)i + 1;
r[i].res.free.prev = (int)i - 1;
r[i].status = RESH_UNUSED;
}
if (resHList[nsp].freeHead != -1)
r[resHList[nsp].freeHead].res.free.next
= newListSize - 1;
= (int)newListSize - 1;
r[newListSize-1].res.free.next = resHList[nsp].freeHead;
r[oldSize].res.free.prev = -1;
resHList[nsp].freeHead = oldSize;
resHList[nsp].size = newListSize;
resHList[nsp].size = (int)newListSize;
}
/**************************************************************/
......@@ -280,12 +280,32 @@ reshRemove_(int nsp, int idx)
resHList[nsp].freeHead = idx;
}
void reshRemove ( cdiResH resH, const resOps * ops )
void reshDestroy(cdiResH resH)
{
int nsp;
namespaceTuple_t nspT;
LIST_INIT(1);
LIST_LOCK();
nsp = namespaceGetActive ();
nspT = namespaceResHDecode ( resH );
xassert ( nspT.nsp == nsp
&& nspT.idx >= 0
&& nspT.idx < resHList[nsp].size
&& resHList[nsp].resources[nspT.idx].res.v.ops);
if (resHList[nsp].resources[nspT.idx].status & RESH_IN_USE_BIT)
reshRemove_(nsp, nspT.idx);
LIST_UNLOCK();
}
void reshRemove ( cdiResH resH, const resOps * ops )
{
int nsp;
namespaceTuple_t nspT;
LIST_LOCK();
......@@ -495,7 +515,7 @@ static int getPackBufferSize(void *context)
{
if (r[i].status == RESH_DESYNC_DELETED)
{
/* FIXME: pack resource deletion */
packBufferSize += 3 * intpacksize;
}
else if (r[i].status == RESH_DESYNC_IN_USE)
{
......@@ -533,7 +553,7 @@ void reshPackBufferCreate(char **packBuffer, int *packBufferSize, void *context)
int nsp = namespaceGetActive ();
int pBSize = *packBufferSize = getPackBufferSize(context);
char *pB = *packBuffer = (char*) xcalloc(1, *packBufferSize);
char *pB = *packBuffer = (char *)xcalloc(1, (size_t)pBSize);
{
int header[3] = { start, nsp, sep };
......@@ -546,7 +566,10 @@ void reshPackBufferCreate(char **packBuffer, int *packBufferSize, void *context)
{
if (r[i].status == RESH_DESYNC_DELETED)
{
/* FIXME: pack resource deletion */
enum { del_ints = 3 };
int temp[del_ints] = { RESH_DELETE, namespaceIdxEncode2(nsp, i), SEPARATOR };
serializePack(temp, del_ints, DATATYPE_INT,
pB, pBSize, &packBufferPos, context);
}
else
{
......@@ -554,11 +577,12 @@ void reshPackBufferCreate(char **packBuffer, int *packBufferSize, void *context)
xassert ( curr->res.v.ops );
type = curr->res.v.ops->valTxCode ();
if ( ! type ) continue;
serializePack(&type, 1, DATATYPE_INT, * packBuffer,
serializePack(&type, 1, DATATYPE_INT, pB,
pBSize, &packBufferPos, context);
curr->res.v.ops->valPack(curr->res.v.val,
pB, pBSize, &packBufferPos, context);
serializePack(&sep, 1, DATATYPE_INT, pB, pBSize, &packBufferPos, context);
serializePack(&sep, 1, DATATYPE_INT,
pB, pBSize, &packBufferPos, context);
}
r[i].status &= ~RESH_SYNC_BIT;
}
......@@ -665,8 +689,8 @@ int reshListCompare ( int nsp0, int nsp1 )
*resources1 = resHList[nsp1].resources;
for (i = 0; i < listSizeMin; i++)
{
int occupied0 = resources0[i].status & RESH_IN_USE_BIT != 0,
occupied1 = resources1[i].status & RESH_IN_USE_BIT != 0;
int occupied0 = (resources0[i].status & RESH_IN_USE_BIT) != 0,
occupied1 = (resources1[i].status & RESH_IN_USE_BIT) != 0;
/* occupation mismatch ? */
int diff = occupied0 ^ occupied1;
valCompare |= (diff << cdiResHListOccupationMismatch);
......
......@@ -54,6 +54,8 @@ void reshListDestruct(int namespaceID);
int reshPut ( void *, const resOps * );
void reshReplace(cdiResH resH, void *p, const resOps *ops);
void reshRemove ( cdiResH, const resOps * );
/*> doesn't check resource type */
void reshDestroy(cdiResH);
int reshCountType ( const resOps * );
......
......@@ -11,6 +11,7 @@
#include "vlist.h"
#include "namespace.h"
#include "serialize.h"
#include "resource_handle.h"
#include "resource_unpack.h"
#include "taxis.h"
#include "zaxis.h"
......@@ -73,6 +74,11 @@ void reshUnpackResources(char * unpackBuffer, int unpackBufferSize,
vlistUnpack(unpackBuffer, unpackBufferSize, &unpackBufferPos,
originNamespace, context, 1);
break;
case RESH_DELETE:
serializeUnpack(unpackBuffer, unpackBufferSize, &unpackBufferPos,
&token2, 1, DATATYPE_INT, context);
reshDestroy(namespaceAdaptKey(token2, originNamespace));
break;
default:
xabort ( "TOKEN MAPS NO VALID DATATYPE" );
}
......
......@@ -13,6 +13,7 @@ enum
MODEL = 5,
STREAM = 6,
VLIST = 7,
RESH_DELETE,
START = 55555555,
SEPARATOR = 66666666,
END = 99999999
......
......@@ -160,6 +160,8 @@ int defineVlist ( int gridID, int zaxisID, int taxisID )
int vlistID2 = vlistCreate();
vlistDefVar(vlistID2, gridID, zaxisID, TIME_VARIABLE);
vlistCopy(vlistID2, vlistID);
vlistDestroy(vlistID);
vlistID = vlistID2;
return vlistID;
}
......@@ -205,13 +207,13 @@ int modelRun(MPI_Comm comm)
defineStream ( streamID, vlistID );
reshPackBufferCreate ( &sendBuffer, &bufferSize, &comm );
recvBuffer = xmalloc(bufferSize);
recvBuffer = xmalloc((size_t)bufferSize);
#ifdef MPI_MARSHALLING
xmpi(MPI_Sendrecv(sendBuffer, bufferSize, MPI_PACKED, 0, 0,
recvBuffer, bufferSize, MPI_PACKED, 0, 0,
MPI_COMM_SELF, MPI_STATUS_IGNORE));
#else
memcpy(recvBuffer, sendBuffer, bufferSize);
memcpy(recvBuffer, sendBuffer, (size_t)bufferSize);
#endif
namespaceSetActive(destNamespace);
reshUnpackResources(recvBuffer, bufferSize, &comm);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment