Commit d9a8362f authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Make namespaces more dynamic.

* This results in an API change for CDI-pio initialization. Users are no longer
  required to specify the number of namespaces in advance.
parent 6ed6f698
......@@ -367,8 +367,8 @@ Get all values of a Y-axis.
\section*{\tt \htmlref{pioInit}{pioInit}}
\begin{verbatim}
MPI_Comm pioInit (MPI_Comm commSuper, int nProcsIO, int IOMode, int nNamespaces,
int *hasLocalFile, float partInflate);
MPI_Comm pioInit (MPI_Comm commSuper, int nProcsIO, int IOMode,
int *pioNamespace, float partInflate);
\end{verbatim}
initialize I/O server processes and communication.
......
......@@ -368,8 +368,7 @@ Get all values of a Y-axis.
\begin{verbatim}
INTEGER FUNCTION pioInit (INTEGER commSuper, INTEGER nProcsIO, INTEGER IOMode,
INTEGER nNamespaces, INTEGER hasLocalFile,
REAL partInflate)
INTEGER pioNamespace, REAL partInflate)
\end{verbatim}
initialize I/O server processes and communication.
......
......@@ -119,11 +119,6 @@ struct {
int main (int argc, char *argv[])
{
enum {
nNamespaces = 1 };
static int hasLocalFile[nNamespaces] = { 0 };
MPI_Comm commModel = MPI_COMM_NULL;
#ifdef USE_MPI
MPI_Comm commGlob;
......@@ -168,8 +163,8 @@ int main (int argc, char *argv[])
}
}
commModel = pioInit(commGlob, nProcsIO, IOMode, nNamespaces, hasLocalFile,
1.0f);
commModel = pioInit(commGlob, nProcsIO, IOMode, &pioNamespace, 1.0f);
pioNamespaceSetActive(pioNamespace);
#endif
modelRun(commModel);
......
......@@ -159,16 +159,14 @@ int main (int argc, char *argv[])
IOModeDef = PIO_FPGUARD,
//IOModeDef = PIO_ASYNCH,
//IOModeDef = PIO_WRITER,
nNamespaces = 1 };
static int hasLocalFile[nNamespaces] = { 0 };
};
MPI_Comm commModel = MPI_COMM_NULL;
#ifdef USE_MPI
MPI_Comm commGlob;
int sizeGlob;
int rankGlob;
int IOMode, nProcsIO, count;
int IOMode, nProcsIO, pioNamespace;
xmpi ( MPI_Init ( &argc, &argv));
commGlob = MPI_COMM_WORLD;
......@@ -192,8 +190,8 @@ int main (int argc, char *argv[])
nProcsIO = nProcsIODef;
}
commModel = pioInit(commGlob, nProcsIO, IOMode, nNamespaces, hasLocalFile,
1.0f);
commModel = pioInit(commGlob, nProcsIO, IOMode, &pioNamespace, 1.0f);
pioNamespaceSetActive(pioNamespace);
#endif
modelRun(commModel);
......
......@@ -19,15 +19,12 @@ extern void arrayDestroy ( void );
enum {
IOMode = PIO_NONE,
nProcsIO = 1,
nNamespaces = 2,
DOUBLE_PRECISION = 8,
nlon = 12,
nlat = 6,
nlev = 5,
ntsteps = 3 };
static int hasLocalFile[nNamespaces] = { 0, 0 };
double lons[nlon] = {0, 30, 60, 90, 120, 150, 180, 210, 240, 270, 300, 330};
double lats[nlat] = {-75, -45, -15, 15, 45, 75};
double levs[nlev] = {101300, 92500, 85000, 50000, 20000};
......@@ -220,7 +217,7 @@ void modelRun ( MPI_Comm comm )
int main (int argc, char *argv[])
{
#ifdef USE_MPI
int sizeGlob;
int sizeGlob, pioNamespace;
MPI_Comm commGlob, commModel;
MPI_Init(&argc, &argv);
......@@ -235,8 +232,8 @@ int main (int argc, char *argv[])
if ( nProcsIO != 1 )
xabort ( "bad distribution of tasks on PEs" );
commModel = pioInit(commGlob, nProcsIO, IOMode, nNamespaces, hasLocalFile,
1.0f);
commModel = pioInit(commGlob, nProcsIO, IOMode, &pioNamespace, 1.0f);
pioNamespaceSetActive(pioNamespace);
modelRun ( commModel );
......
......@@ -232,8 +232,8 @@ void pioEndDef ( void );
void pioEndTimestepping ( void );
void pioFinalize ( void );
/* pioInit: initialize I/O server processes and communication */
MPI_Comm pioInit(MPI_Comm commSuper, int nProcsIO, int IOMode, int nNamespaces,
int *hasLocalFile, float partInflate);
MPI_Comm pioInit(MPI_Comm commSuper, int nProcsIO, int IOMode,
int *pioNamespace, float partInflate);
void pioWriteTimestep ( int, int, int );
void streamWriteVarPart (int streamID, int varID, int memtype,
......
......@@ -382,8 +382,7 @@
! (INTEGER commSuper,
! INTEGER nProcsIO,
! INTEGER IOMode,
! INTEGER nNamespaces,
! INTEGER hasLocalFile,
! INTEGER pioNamespace,
! REAL partInflate)
EXTERNAL pioInit
......
......@@ -79,13 +79,13 @@
FCALLSCSUB0 (pioEndDef, PIOENDDEF, pioenddef)
FCALLSCSUB0 (pioEndTimestepping, PIOENDTIMESTEPPING, pioendtimestepping)
FCALLSCSUB0 (pioFinalize, PIOFINALIZE, piofinalize)
static int pioInit_fwrap(int commSuper, int nProcsIO, int IOMode, int nNamespaces, int * hasLocalFile, float partInflate)
static int pioInit_fwrap(int commSuper, int nProcsIO, int IOMode, int * pioNamespace, float partInflate)
{
MPI_Comm v;
v = pioInit(MPI_Comm_f2c(commSuper), nProcsIO, IOMode, nNamespaces, hasLocalFile, partInflate);
v = pioInit(MPI_Comm_f2c(commSuper), nProcsIO, IOMode, pioNamespace, partInflate);
return MPI_Comm_c2f(v);
}
FCALLSCFUN6 (INT, pioInit_fwrap, PIOINIT, pioinit, INT, INT, INT, INT, PINT, FLOAT)
FCALLSCFUN5 (INT, pioInit_fwrap, PIOINIT, pioinit, INT, INT, INT, PINT, FLOAT)
FCALLSCSUB3 (pioWriteTimestep, PIOWRITETIMESTEP, piowritetimestep, INT, INT, INT)
static void streamWriteVarPart_fwrap(int streamID, int varID, int memtype, const void * data, int nmiss, void * partDesc)
{
......
......@@ -94,9 +94,8 @@ void instituteDefaultEntries ( void )
resH[11] = institutDef( 99, 0, "KNMI", "Royal Netherlands Meteorological Institute");
/* (void) institutDef( 0, 0, "IPSL", "IPSL (Institut Pierre Simon Laplace, Paris, France)"); */
if ( ! namespaceHasLocalFile ( namespaceGetActive ()))
for ( i = 0; i < 12 ; i++ )
reshSetStatus ( resH[i], &instituteOps, SUSPENDED );
for ( i = 0; i < 12 ; i++ )
reshSetStatus(resH[i], &instituteOps, SUSPENDED);
}
static
......
......@@ -107,9 +107,8 @@ void modelDefaultEntries ( void )
instID = institutInq( 0, 1, "NCEP", NULL);
resH[9] = modelDef(instID, 80, "T62L28MRF");
if ( ! namespaceHasLocalFile ( namespaceGetActive ()))
for ( i = 0; i < 10 ; i++ )
reshSetStatus ( resH[i], &modelOps, SUSPENDED );
for ( i = 0; i < 10 ; i++ )
reshSetStatus(resH[i], &modelOps, SUSPENDED);
}
static
......
......@@ -3,9 +3,9 @@
#include <stdio.h>
#include "cdi.h"
#include "namespace.h"
#include "resource_handle.h"
#include "pio_util.h"
static int nNamespaces = 1;
static int activeNamespace = 0;
......@@ -17,6 +17,24 @@ struct namespace
struct namespace *namespaces = &initialNamespace;
static int namespacesSize = 1;
#if defined (HAVE_LIBPTHREAD)
# include <pthread.h>
static pthread_mutex_t namespaceMutex = PTHREAD_MUTEX_INITIALIZER;
# define NAMESPACE_LOCK() pthread_mutex_lock(&namespaceMutex)
# define NAMESPACE_UNLOCK() pthread_mutex_unlock(&namespaceMutex)
#else
# define NAMESPACE_LOCK()
# define NAMESPACE_UNLOCK()
#endif
enum {
intbits = sizeof(int) * CHAR_BIT,
nspbits = 4,
......@@ -73,32 +91,69 @@ namespaceTuple_t namespaceResHDecode ( int resH )
return tin;
}
void namespaceInit ( int nspn, int * argHasLocalFile )
int
namespaceNew(int hasLocalFiles)
{
/* FIXME: this should not be PIO-only */
#ifdef USE_MPI
xassert(nspn <= NUM_NAMESPACES && nspn >= 1 );
nNamespaces = nspn;
if ( nspn >= 1 )
int newNamespaceID = -1;
NAMESPACE_LOCK();
if (namespacesSize > nNamespaces)
{
namespaces = xmalloc(nspn * sizeof (namespaces[0]));
for (int nspID = 0; nspID < nspn; ++nspID)
{
namespaces[nspID].hasLocalFiles = argHasLocalFile[nspID];
namespaces[nspID].resStage = STAGE_DEFINITION;
}
/* namespace is already available and only needs reinitialization */
for (int i = 0; i < namespacesSize; ++i)
if (namespaces[i].resStage == STAGE_UNUSED)
{
newNamespaceID = i;
break;
}
}
#endif
else if (namespacesSize == 1)
{
/* make room for additional namespace */
struct namespace *newNameSpaces
= xmalloc((namespacesSize + 1) * sizeof (namespaces[0]));
memcpy(newNameSpaces, namespaces, sizeof (namespaces[0]));
namespaces = newNameSpaces;
++namespacesSize;
newNamespaceID = 1;
}
else if (namespacesSize < NUM_NAMESPACES)
{
/* make room for additional namespace */
newNamespaceID = namespacesSize;
namespaces
= xrealloc(namespaces, (namespacesSize + 1) * sizeof (namespaces[0]));
++namespacesSize;
}
else /* implicit: namespacesSize >= NUM_NAMESPACES */
{
NAMESPACE_UNLOCK();
return -1;
}
xassert(newNamespaceID >= 0 && newNamespaceID < NUM_NAMESPACES);
++nNamespaces;
namespaces[newNamespaceID].hasLocalFiles = hasLocalFiles;
namespaces[newNamespaceID].resStage = STAGE_DEFINITION;
reshListCreate(newNamespaceID);
NAMESPACE_UNLOCK();
return newNamespaceID;
}
void
namespaceDelete(int namespaceID)
{
NAMESPACE_LOCK();
xassert(namespaceID < namespacesSize && nNamespaces);
reshListDestruct(namespaceID);
namespaces[namespaceID].resStage = STAGE_UNUSED;
--nNamespaces;
NAMESPACE_UNLOCK();
}
void namespaceCleanup ( void )
{
if ( nNamespaces > 1 )
{
initialNamespace = namespaces[0];
free(namespaces);
namespaces = &initialNamespace;
nNamespaces = 1;
......@@ -114,12 +169,9 @@ int namespaceGetNumber ()
void pioNamespaceSetActive ( int nId )
{
/* FIXME: this should not be PIO-only */
#ifdef USE_MPI
xassert ( nId < nNamespaces && nId >= 0 );
xassert(nId < namespacesSize && nId >= 0
&& namespaces[nId].resStage != STAGE_UNUSED);
activeNamespace = nId;
#endif
}
......@@ -131,7 +183,7 @@ int namespaceGetActive ()
int namespaceHasLocalFile ( int nId )
{
xassert ( nId < nNamespaces && nId >= 0 );
xassert(nId < namespacesSize && nId >= 0);
return namespaces ? namespaces[nId].hasLocalFiles : 0;
}
......@@ -184,6 +236,18 @@ statusCode namespaceInqResStatus ( void )
return namespaces[nsp].resStage;
}
void cdiReset(void)
{
NAMESPACE_LOCK();
for (int namespaceID = 0; namespaceID < namespacesSize; ++namespaceID)
namespaceDelete(namespaceID);
namespaces = &initialNamespace;
namespacesSize = 1;
nNamespaces = 1;
activeNamespace = 0;
NAMESPACE_UNLOCK();
}
/*
* Local Variables:
* c-file-style: "Java"
......
......@@ -4,7 +4,8 @@
typedef enum {
STAGE_DEFINITION = 0,
STAGE_TIMELOOP = 1,
STAGE_CLEANUP = 2
STAGE_CLEANUP = 2,
STAGE_UNUSED = 3,
} statusCode;
typedef struct {
......@@ -12,8 +13,9 @@ typedef struct {
int nsp;
} namespaceTuple_t;
int namespaceNew(int hasLocalFiles);
void namespaceDelete(int namespaceID);
void namespaceCleanup ( void );
void namespaceInit ( int, int * );
int namespaceGetNumber ( void );
int namespaceGetActive ( void );
int namespaceIdxEncode ( namespaceTuple_t );
......@@ -25,6 +27,7 @@ int namespaceAdaptKey2 ( int );
void namespaceDefResStatus ( statusCode );
statusCode namespaceInqResStatus ( void );
#endif
/*
* Local Variables:
......
......@@ -766,9 +766,13 @@ int pioInqVarDecoOff ( int vlistID, int varID )
@return int indicating wether the calling PE is a calcutator (1) or not (0)
*/
#ifdef USE_MPI
static int pioNamespace_ = -1;
#endif
MPI_Comm
pioInit(MPI_Comm commGlob, int nProcsIO, int IOMode,
int nNamespaces, int * hasLocalFile, float partInflate)
int *pioNamespace, float partInflate)
{
#ifdef USE_MPI
int sizeGlob;
......@@ -799,13 +803,14 @@ pioInit(MPI_Comm commGlob, int nProcsIO, int IOMode,
// JUST FOR TEST CASES WITH ONLY ONE MPI TASK
if ( commInqSizeGlob () == 1 )
{
namespaceInit ( nNamespaces, hasLocalFile );
pioNamespace_ = *pioNamespace = namespaceNew(0);
return commInqCommGlob ();
}
if ( commInqIsProcIO ())
{
IOServer ();
namespaceDelete(0);
commDestroy ();
MPI_Finalize ();
exit ( EXIT_SUCCESS );
......@@ -814,7 +819,7 @@ pioInit(MPI_Comm commGlob, int nProcsIO, int IOMode,
{
commEvalPhysNodes ();
commDefCommsIO ();
namespaceInit ( nNamespaces, hasLocalFile );
pioNamespace_ = *pioNamespace = namespaceNew(0);
}
xdebug ( "nProcsGlob=%d, RETURN", sizeGlob );
......@@ -890,7 +895,7 @@ void pioFinalize ( void )
#ifdef USE_MPI
int collID, ibuffer = 1111;
xdebug("%s", "START");
namespaceCleanup ();
namespaceDelete(pioNamespace_);
for ( collID = 0; collID < commInqNProcsColl (); collID++ )
{
xmpi ( MPI_Send ( &ibuffer, 1, MPI_INT, commInqNProcsModel (),
......
......@@ -867,7 +867,6 @@ void IOServer ()
#ifdef HAVE_PARALLEL_NC4
numPioPrimes = PPM_prime_factorization_32((uint32_t)commInqSizeColl(),
&pioPrimes);
xt_initialize(commInqCommColl());
#elif defined (HAVE_LIBNETCDF)
cdiSerialOpenFileCount = xcalloc(sizeof (cdiSerialOpenFileCount[0]),
commInqSizeColl());
......
......@@ -69,15 +69,19 @@ static struct
listElem_t *resources;
} *resHList;
static int resHListSize = 0;
/**************************************************************/
static void
listInitResources(int nsp)
{
xassert(nsp < namespaceGetNumber() && nsp >= 0);
xassert(nsp < resHListSize && nsp >= 0);
int size = resHList[nsp].size = MIN_LIST_SIZE;
xassert(resHList[nsp].resources == NULL);
resHList[nsp].resources = xcalloc(MIN_LIST_SIZE, sizeof (listElem_t));
listElem_t *p = resHList[nsp].resources;
int size = resHList[nsp].size;
for (int i = 0; i < size; i++ )
{
p[i].resH = namespaceIdxEncode2(nsp, i);
......@@ -91,51 +95,65 @@ listInitResources(int nsp)
resHList[nsp].freeHead = 0;
}
static
void listNew ( void )
static inline void
reshListClearEntry(int i)
{
int nnsp;
nnsp = namespaceGetNumber ();
resHList = xcalloc ( nnsp, sizeof (resHList[0]));
resHList[i].size = 0;
resHList[i].resources = NULL;
resHList[i].freeHead = -1;
}
for (int i = 0; i < nnsp; i++ )
void
reshListCreate(int namespaceID)
{
LIST_LOCK();
if (resHListSize <= namespaceID)
{
resHList[i].size = MIN_LIST_SIZE;
xassert ( resHList[i].resources == NULL);
resHList[i].resources = xcalloc ( MIN_LIST_SIZE, sizeof (listElem_t));
listInitResources(i);
resHList = xrealloc(resHList, (namespaceID + 1) * sizeof (resHList[0]));
for (int i = resHListSize; i <= namespaceID; ++i)
reshListClearEntry(i);
resHListSize = namespaceID + 1;
}
listInitResources(namespaceID);
LIST_UNLOCK();
}
/**************************************************************/
static void listDestroy ( void )
void
reshListDestruct(int namespaceID)
{
if (resHList)
LIST_LOCK();
xassert(resHList && namespaceID >= 0 && namespaceID < resHListSize);
int callerNamespaceID = namespaceGetActive();
pioNamespaceSetActive(namespaceID);
if (resHList[namespaceID].resources)
{
int nnsp = namespaceGetNumber ();
for (int i = 0; i < nnsp; ++i)
{
pioNamespaceSetActive(i);
if (resHList[i].resources)
{
for ( int j = 0; j < resHList[i].size; j++ )
{
listElem_t *listElem = resHList[i].resources + j;
if (listElem->val)
listElem->ops->valDestroy(listElem->val);
}
free(resHList[i].resources);
resHList[i].resources = NULL;
}
}
free(resHList);
resHList = NULL;
for ( int j = 0; j < resHList[namespaceID].size; j++ )
{
listElem_t *listElem = resHList[namespaceID].resources + j;
if (listElem->val)
listElem->ops->valDestroy(listElem->val);
}
free(resHList[namespaceID].resources);
reshListClearEntry(namespaceID);
}
if (resHList[callerNamespaceID].resources)
pioNamespaceSetActive(callerNamespaceID);
LIST_UNLOCK();
}
static void listDestroy ( void )
{
LIST_LOCK();
for (int i = 0; i < resHListSize; ++i)
if (resHList[i].resources)
namespaceDelete(i);
free(resHList);
resHList = NULL;
LIST_UNLOCK();
}
/**************************************************************/
......@@ -151,10 +169,8 @@ void listInitialize ( void )
pthread_mutex_init ( &listMutex, &ma);
pthread_mutexattr_destroy(&ma);
#endif
LIST_LOCK();
listNew ();
LIST_UNLOCK();
// create default namespace
reshListCreate(0);
/* file is special and has its own table, which needs to be
* created, before we register the listDestroy exit handler */
{
......@@ -278,8 +294,8 @@ void *reshGetValue(const char * caller, cdiResH resH, resOps * ops)
else
{
LIST_UNLOCK();
xabortC(caller, "Invalid namespace %d or index %d for resource handle %d!",
nspT.nsp, nspT.idx, (int)resH);
xabortC(caller, "Invalid namespace %d or index %d for resource handle %d when using namespace %d of size %d!",
nspT.nsp, nspT.idx, (int)resH, nsp, resHList[nsp].size);
}
if ( !(listElem && listElem->ops == ops) )
......@@ -536,7 +552,7 @@ int reshListCompare ( int nsp0, int nsp1 )
LIST_INIT();
LIST_LOCK();
xassert(namespaceGetNumber () > xmaxInt ( nsp0, nsp1 ) &&
xassert(resHListSize > xmaxInt ( nsp0, nsp1 ) &&
xminInt ( nsp0, nsp1 ) >= 0 );
for ( i = 0; i < resHList[nsp0].size; i++ )
......@@ -659,16 +675,6 @@ void reshListPrint ( char * filename )
}
void cdiReset(void)
{
LIST_LOCK();
listDestroy();
listNew ();
LIST_UNLOCK();
}
/*
* Local Variables:
* c-file-style: "Java"
......
......@@ -42,6 +42,8 @@ typedef struct {
enum { RESH_UNDEFID, ASSIGNED, SUSPENDED, CLOSED };
void reshListCreate(int namespaceID);
void reshListDestruct(int namespaceID);
int reshPut ( void *, resOps * );
void reshRemove ( cdiResH, resOps * );
......
......@@ -378,12 +378,6 @@ search_iomode_str(const char *modestr)
int main (int argc, char *argv[])
{
#ifdef USE_MPI
enum {
nNamespaces = 2 };
static int hasLocalFile[nNamespaces] = { 0, 1 };
#endif