Commit eb04683a authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Change copying of resources to better preserve IDs.

parent a94856f3
......@@ -659,6 +659,47 @@ void streamGetIndexList ( int nstreams, int * streamIndexList )
reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
}
void
cdiStreamSetupVlist(stream_t *streamptr, int vlistID, int vlistIDorig)
{
int nvars = vlistNvars(vlistID);
int streamID = streamptr->self;
streamptr->vlistID = vlistID;
streamptr->vlistIDorig = vlistIDorig;
for (int varID = 0; varID < nvars; varID++ )
{
int gridID = vlistInqVarGrid(vlistID, varID);
int zaxisID = vlistInqVarZaxis(vlistID, varID);
stream_new_var(streamptr, gridID, zaxisID);
if ( streamptr->have_missval )
vlistDefVarMissval(vlistID, varID,
vlistInqVarMissval(vlistID, varID));
}
if (namespaceHasLocalFile(namespaceGetActive())
&& streamptr->filemode == 'w' )
{
if ( streamptr->filetype == FILETYPE_NC ||
streamptr->filetype == FILETYPE_NC2 ||
streamptr->filetype == FILETYPE_NC4 ||
streamptr->filetype == FILETYPE_NC4C )
{
#if USE_MPI && defined (HAVE_LIBNETCDF) && ! defined (HAVE_PARALLEL_NC4)
int rank, rankOpen;
if (commInqIOMode() == PIO_NONE
|| ((rank = commInqRankColl())
== (rankOpen = cdiPioSerialOpenFileMap(streamID))))
#endif
cdfDefVars(streamptr);
}
else if ( streamptr->filetype == FILETYPE_GRB ||
streamptr->filetype == FILETYPE_GRB2 )
{
gribContainersNew(streamptr);
}
}
}
#ifdef USE_MPI
static int
......
......@@ -295,6 +295,7 @@ void streamDefineTaxis(int streamID);
int streamsNewEntry(int filetype);
void streamsInitEntry(int streamID);
void cdiStreamSetupVlist(stream_t *streamptr, int vlistID, int vlistIDorig);
int stream_new_var(stream_t *streamptr, int gridID, int zaxisID);
int tstepsNewEntry(stream_t *streamptr);
......
......@@ -9,6 +9,7 @@
#include "pio_util.h"
#include "institution.h"
#include "model.h"
#include "cdi_int.h"
#include "vlist.h"
#include "namespace.h"
......@@ -16,15 +17,19 @@
extern void gridUnpack ( char *, int, int *, int, MPI_Comm );
extern void zaxisUnpack ( char *, int, int *, int, MPI_Comm );
extern void taxisUnpack ( char *, int, int *, int, MPI_Comm );
extern double cdiDefaultMissval;
extern int streamNint;
/*****************************************************************************/
static
void streamUnpack ( char * unpackBuffer, int unpackBufferSize,
int * unpackBufferPos, int nspTarget, MPI_Comm comm )
struct streamAssoc
{
int streamID, vlistID, vlistIDorig;
};
static struct streamAssoc
streamUnpack ( char * unpackBuffer, int unpackBufferSize,
int * unpackBufferPos, int nspTarget, MPI_Comm comm )
{
int intBuffer[streamNint], streamID;
double d;
......@@ -53,16 +58,14 @@ void streamUnpack ( char * unpackBuffer, int unpackBufferSize,
namespaceAdaptKey ( intBuffer[0], nspTarget ) == streamID );
xdebug ("streamID=%d, vlistID=%d", streamID, namespaceAdaptKey ( intBuffer[4], nspTarget ));
streamDefVlist ( streamID, namespaceAdaptKey ( intBuffer[4], nspTarget ));
xassert ( streamInqVlist ( streamID ) ==
namespaceAdaptKey ( intBuffer[3], nspTarget ));
streamDefByteorder ( streamID, intBuffer[5] );
streamDefCompType ( streamID, intBuffer[6] );
streamDefCompLevel ( streamID, intBuffer[7] );
cdiDefGlobal ( "REGULARGRID", intBuffer[8] );
cdiDefGlobal ( "SORTNAME", intBuffer[9] );
cdiDefGlobal ( "HAVE_MISSVAL", intBuffer[10] );
struct streamAssoc retval = { streamID, intBuffer[3], intBuffer[4] };
return retval;
}
......@@ -74,6 +77,9 @@ void rpcUnpackResources ( char * unpackBuffer, int unpackBufferSize,
{
int token1, token2, nspTarget;
int unpackBufferPos = 0;
int numAssociations = 0, sizeAssociations = 16;
struct streamAssoc *associations
= xmalloc(sizeof (associations[0]) * sizeAssociations);
xdebug("%s", "START");
......@@ -81,11 +87,11 @@ void rpcUnpackResources ( char * unpackBuffer, int unpackBufferSize,
{
xmpi ( MPI_Unpack ( unpackBuffer, unpackBufferSize, &unpackBufferPos,
&token1, 1, MPI_INT, comm ));
if (token1 == END)
break;
switch ( token1 )
{
case END:
return;
case START:
xmpi ( MPI_Unpack ( unpackBuffer, unpackBufferSize, &unpackBufferPos,
&nspTarget, 1, MPI_INT, comm ));
......@@ -111,8 +117,14 @@ void rpcUnpackResources ( char * unpackBuffer, int unpackBufferSize,
nspTarget, comm);
break;
case STREAM:
streamUnpack ( unpackBuffer, unpackBufferSize, &unpackBufferPos,
nspTarget, comm );
if (sizeAssociations == numAssociations)
associations
= xrealloc(associations,
sizeof (associations[0]) * (sizeAssociations *= 2));
associations[numAssociations]
= streamUnpack(unpackBuffer, unpackBufferSize, &unpackBufferPos,
nspTarget, comm );
++numAssociations;
break;
case VLIST:
vlistUnpack (unpackBuffer, unpackBufferSize, &unpackBufferPos,
......@@ -126,6 +138,15 @@ void rpcUnpackResources ( char * unpackBuffer, int unpackBufferSize,
&token2, 1, MPI_INT, comm ));
xassert ( token2 == SEPARATOR );
}
for (int i = 0; i < numAssociations; ++i)
{
cdiStreamSetupVlist(stream_to_pointer(associations[i].streamID),
namespaceAdaptKey(associations[i].vlistID,
nspTarget),
namespaceAdaptKey(associations[i].vlistIDorig,
nspTarget));
}
free(associations);
xdebug("%s", "RETURN");
}
......
......@@ -2356,8 +2356,6 @@ The function @func{streamDefVlist} defines the variable list of a stream.
*/
void streamDefVlist(int streamID, int vlistID)
{
int nvars, varID;
int gridID, zaxisID;
stream_t *streamptr;
streamptr = stream_to_pointer(streamID);
......@@ -2386,42 +2384,7 @@ void streamDefVlist(int streamID, int vlistID)
if ( streamptr->vlistID == CDI_UNDEFID )
{
streamptr->vlistID = vlistDuplicate(vlistID);
streamptr->vlistIDorig = vlistID ;
nvars = vlistNvars(vlistID);
for ( varID = 0; varID < nvars; varID++ )
{
gridID = vlistInqVarGrid(vlistID, varID);
zaxisID = vlistInqVarZaxis(vlistID, varID);
stream_new_var(streamptr, gridID, zaxisID);
if ( streamptr->have_missval )
vlistDefVarMissval(streamptr->vlistID, varID,
vlistInqVarMissval(vlistID, varID));
}
if (namespaceHasLocalFile(namespaceGetActive())
&& streamptr->filemode == 'w' )
{
if ( streamptr->filetype == FILETYPE_NC ||
streamptr->filetype == FILETYPE_NC2 ||
streamptr->filetype == FILETYPE_NC4 ||
streamptr->filetype == FILETYPE_NC4C )
{
#if USE_MPI && defined (HAVE_LIBNETCDF) && ! defined (HAVE_PARALLEL_NC4)
int rank, rankOpen;
if (commInqIOMode() == PIO_NONE
|| ((rank = commInqRankColl())
== (rankOpen = cdiPioSerialOpenFileMap(streamID))))
#endif
cdfDefVars(streamptr);
}
else if ( streamptr->filetype == FILETYPE_GRB ||
streamptr->filetype == FILETYPE_GRB2 )
{
gribContainersNew(streamptr);
}
}
cdiStreamSetupVlist(streamptr, vlistDuplicate(vlistID), vlistID);
}
else
{
......
......@@ -392,9 +392,6 @@ int vlistDuplicate(int vlistID)
vlistCopy(vlistIDnew, vlistID);
if ( ! namespaceHasLocalFile ( namespaceGetActive ()))
reshSetStatus ( vlistIDnew, &vlist_ops, SUSPENDED );
return (vlistIDnew);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment