Commit 8666233d authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Add cdi-pio writing of netCDF via serial I/O library.

parent b7da7c4e
......@@ -33,6 +33,11 @@ extern int CDF_Debug;
static size_t ChunkSizeMin = MIN_BUF_SIZE;
*/
#if USE_MPI && !defined (HAVE_PARALLEL_NC4)
/* When in PIO mode without a parallel netcdf library, each file is
* actually opened on only one host */
extern int *cdiOpenFileMap, *cdiOpenFileCounts;
#endif
void cdf_create(const char *path, int cmode, int *ncidp)
{
......
......@@ -48,6 +48,7 @@ void streamUnpack ( char * unpackBuffer, int unpackBufferSize,
" namespaceAdaptKey(a,b)=%d",
streamID, intBuffer[0], nspTarget,
namespaceAdaptKey ( intBuffer[0], nspTarget ));
xdebug("streamID=%d\n", streamID);
xassert ( streamID >= 0 &&
namespaceAdaptKey ( intBuffer[0], nspTarget ) == streamID );
xdebug ("streamID=%d, vlistID=%d", streamID, namespaceAdaptKey ( intBuffer[4], nspTarget ));
......
......@@ -14,7 +14,9 @@ typedef enum
FINALIZE,
RESOURCES,
WINCREATE,
WRITETS
WRITETS,
COLLBUFTX,
COLLBUFNMISS,
} command;
enum
......
......@@ -329,6 +329,46 @@ myVarPart(struct PPM_extent varShape[3], struct xyzDims collGrid,
PPM_uniform_partition_nd(3, varShape, collGrid.sizes,
myCollGridCoord, myPart);
}
#elif defined (HAVE_LIBNETCDF)
/* needed for writing when some files are only written to by a single process */
/* cdiOpenFileMap(fileID) gives the writer process */
int cdiPioSerialOpenFileMap(int streamID)
{
return stream_to_pointer(streamID)->ownerRank;
}
/* for load-balancing purposes, count number of files per process */
/* cdiOpenFileCounts[rank] gives number of open files rank has to himself */
static int *cdiSerialOpenFileCount = NULL;
int cdiPioNextOpenRank()
{
xassert(cdiSerialOpenFileCount != NULL);
int commCollSize = commInqSizeColl();
int minRank = 0, minOpenCount = cdiSerialOpenFileCount[0];
for (int i = 1; i < commCollSize; ++i)
if (cdiSerialOpenFileCount[i] < minOpenCount)
{
minOpenCount = cdiSerialOpenFileCount[i];
minRank = i;
}
return minRank;
}
void cdiPioOpenFileOnRank(int rank)
{
xassert(cdiSerialOpenFileCount != NULL
&& rank >= 0 && rank < commInqSizeColl());
++(cdiSerialOpenFileCount[rank]);
}
void cdiPioCloseFileOnRank(int rank)
{
xassert(cdiSerialOpenFileCount != NULL
&& rank >= 0 && rank < commInqSizeColl());
xassert(cdiSerialOpenFileCount[rank] > 0);
--(cdiSerialOpenFileCount[rank]);
}
#endif
static
......@@ -337,7 +377,9 @@ void readGetBuffers ( int tsID, int vdate, int vtime )
int nProcsModel = commInqNProcsModel ();
int root = commInqRootGlob ();
int myCollRank = commInqRankColl();
#ifdef HAVE_NETCDF4
MPI_Comm collComm = commInqCommColl();
#endif
xdebug("%s", "START");
union winHeaderEntry *winDict
......@@ -373,9 +415,9 @@ void readGetBuffers ( int tsID, int vdate, int vtime )
int numStreamIDs = 0, sizeStreamMap = 16;
streamMap = xmalloc(sizeStreamMap * sizeof (streamMap[0]));
int streamIDOld = CDI_UNDEFID;
int headerIdx, oldStreamIdx = CDI_UNDEFID;
int oldStreamIdx = CDI_UNDEFID;
int filetype = CDI_UNDEFID;
for (headerIdx = 1; headerIdx < numDataEntries; ++headerIdx)
for (int headerIdx = 1; headerIdx < numDataEntries; ++headerIdx)
{
int streamID = winDict[headerIdx].dataRecord.streamID;
xassert(streamID > 0);
......@@ -422,7 +464,10 @@ void readGetBuffers ( int tsID, int vdate, int vtime )
streamMap[oldStreamIdx].varMap[varID] = headerIdx;
}
}
double *data = NULL, *writeBuf = NULL;
double *data = NULL;
#if defined (HAVE_PARALLEL_NC4)
double *writeBuf = NULL;
#endif
int currentDataBufSize = 0;
for (streamIdx = 0; streamIdx < numStreamIDs; ++streamIdx)
{
......@@ -565,12 +610,86 @@ void readGetBuffers ( int tsID, int vdate, int vtime )
}
}
#else
#error "serial netcdf libraries not yet supported in MPI mode"
/* determine process which has stream open (writer) and
* which has data for which variable (var owner) */
/* for each variable with a var owner send full variable
* contents to writer */
/* writer calls streamWrite */
* which has data for which variable (var owner)
* three cases need to be distinguished */
{
int nvars = streamMap[streamIdx].numVars;
int *varMap = streamMap[streamIdx].varMap;
int *varIsWritten = xmalloc(sizeof (varIsWritten[0]) * nvars);
for (int varID = 0; varID < nvars; ++varID)
varIsWritten[varID] = ((varMap[varID] != -1)
?myCollRank+1 : 0);
xmpi(MPI_Allreduce(MPI_IN_PLACE, varIsWritten, nvars,
MPI_INT, MPI_BOR, collComm));
int writerRank;
if ((writerRank = cdiPioSerialOpenFileMap(streamID))
== myCollRank)
{
for (int varID = 0; varID < nvars; ++varID)
if (varIsWritten[varID])
{
int nmiss;
int size = vlistInqVarSize(vlistID, varID);
resizeVarGatherBuf(vlistID, varID, &data,
&currentDataBufSize);
int headerIdx = varMap[varID];
if (varIsWritten[varID] == myCollRank + 1)
{
/* this process has the full array and will
* write it */
xdebug("gathering varID=%d for direct writing",
varID);
gatherArray(root, nProcsModel, headerIdx,
vlistID, data, &nmiss);
}
else
{
/* another process has the array and will
* send it over */
MPI_Status stat;
xdebug("receiving varID=%d for writing from"
" process %d",
varID, varIsWritten[varID] - 1);
xmpiStat(MPI_Recv(&nmiss, 1, MPI_INT,
varIsWritten[varID] - 1,
COLLBUFNMISS,
collComm, &stat), &stat);
xmpiStat(MPI_Recv(data, size, MPI_DOUBLE,
varIsWritten[varID] - 1,
COLLBUFTX,
collComm, &stat), &stat);
}
streamWriteVar(streamID, varID, data, nmiss);
}
}
else
for (int varID = 0; varID < nvars; ++varID)
if (varIsWritten[varID] == myCollRank + 1)
{
/* this process has the full array and another
* will write it */
int nmiss;
int size = vlistInqVarSize(vlistID, varID);
resizeVarGatherBuf(vlistID, varID, &data,
&currentDataBufSize);
int headerIdx = varMap[varID];
gatherArray(root, nProcsModel, headerIdx,
vlistID, data, &nmiss);
MPI_Request req;
MPI_Status stat;
xdebug("sending varID=%d for writing to"
" process %d",
varID, writerRank);
xmpi(MPI_Isend(&nmiss, 1, MPI_INT,
writerRank, COLLBUFNMISS,
collComm, &req));
xmpi(MPI_Send(data, size, MPI_DOUBLE,
writerRank, COLLBUFTX,
collComm));
xmpiStat(MPI_Wait(&req, &stat), &stat);
}
}
#endif
break;
#endif
......@@ -672,9 +791,14 @@ void IOServer ()
if ( commInqRankNode () == commInqSpecialRankNode ())
backendFinalize ();
commCalc = commInqCommCalc ();
#ifdef HAVE_PARALLEL_NC4
numPioPrimes = PPM_prime_factorization_32((uint32_t)commInqSizeColl(),
&pioPrimes);
xt_initialize(commInqCommColl());
#elif defined (HAVE_LIBNETCDF)
cdiSerialOpenFileCount = xcalloc(sizeof (cdiSerialOpenFileCount[0]),
commInqSizeColl());
#endif
for ( ;; )
{
......
......@@ -110,29 +110,23 @@ void * pcdiXrealloc ( void *p, size_t size, const char *functionname,
/***************************************************************/
#ifdef USE_MPI
void pcdiXMPI ( int iret, const char *filename, int line )
void pcdiXMPI(int iret, const char *filename, int line)
{
char errorString[2][MPI_MAX_ERROR_STRING + 1];
int len, errorClass, rank;
if ( iret != MPI_SUCCESS )
{
MPI_Comm_rank ( MPI_COMM_WORLD, &rank );
MPI_Error_class ( iret, &errorClass );
MPI_Error_string ( errorClass, errorString[0], &len );
errorString[0][len] = '\0';
MPI_Error_string ( iret, errorString[1], &len);
errorString[1][len] = '\0';
fprintf ( stderr, "MPI ERROR, pe%d, %s, line %d,"
"errorClass: \"%s\""
"errorString: \"%s\"\n",
rank, filename, line,
errorString[0], errorString[1]);
MPI_Abort ( MPI_COMM_WORLD, iret );
}
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Error_class(iret, &errorClass);
MPI_Error_string(errorClass, errorString[0], &len);
errorString[0][len] = '\0';
MPI_Error_string(iret, errorString[1], &len);
errorString[1][len] = '\0';
fprintf(stderr, "MPI ERROR, pe%d, %s, line %d,"
"errorClass: \"%s\""
"errorString: \"%s\"\n",
rank, filename, line,
errorString[0], errorString[1]);
MPI_Abort(MPI_COMM_WORLD, iret);
}
/*****************************************************************************/
......
......@@ -144,8 +144,12 @@ void * pcdiXrealloc ( void *, size_t, const char *, const char *, int );
#define xrealloc(p,size) pcdiXrealloc(p, size, \
__FILE__, __func__, __LINE__)
void pcdiXMPI ( int, const char *, int );
#define xmpi(ret) pcdiXMPI ( ret, __FILE__, __LINE__ )
void pcdiXMPI(int iret, const char *, int);
#define xmpi(ret) do { \
int tmpIRet = (ret); \
if (tmpIRet != MPI_SUCCESS) \
pcdiXMPI(tmpIRet, __FILE__, __LINE__ ); \
} while(0)
#ifdef USE_MPI
void pcdiXMPIStat ( int, const char *, int, MPI_Status * );
......
......@@ -600,6 +600,10 @@ int cdiInqContents(int streamID)
return (status);
}
#if USE_MPI && ! HAVE_PARALLEL_NC4 && defined HAVE_LIBNETCDF
extern int cdiPioNextOpenRank();
extern int cdiPioSerialOpenFileMap(int streamID);
#endif
int streamOpen(const char *filename, const char *filemode, int filetype)
{
......@@ -609,6 +613,9 @@ int streamOpen(const char *filename, const char *filemode, int filetype)
Record *record = NULL;
stream_t *streamptr = NULL;
int hasLocalFile = namespaceHasLocalFile ( namespaceGetActive ());
#if USE_MPI && ! defined (HAVE_PARALLEL_NC4)
int rank;
#endif
if ( CDI_Debug )
Message("Open %s mode %c file %s", strfiletype(filetype), (int) *filemode, filename);
......@@ -683,7 +690,15 @@ int streamOpen(const char *filename, const char *filemode, int filetype)
case FILETYPE_NC4:
case FILETYPE_NC4C:
{
fileID = cdf4Open(filename, filemode, &filetype);
#if USE_MPI && ! defined (HAVE_PARALLEL_NC4)
if (commInqIOMode() == PIO_NONE
|| commInqRankColl() == (rank = cdiPioNextOpenRank()))
#endif
fileID = cdf4Open(filename, filemode, &filetype);
#if USE_MPI && ! defined (HAVE_PARALLEL_NC4)
if (commInqIOMode() != PIO_NONE)
xmpi(MPI_Bcast(&fileID, 1, MPI_INT, rank, commInqCommColl()));
#endif
break;
}
#endif
......@@ -731,6 +746,14 @@ int streamOpen(const char *filename, const char *filemode, int filetype)
streamptr->filename = strdupx(filename);
streamptr->fileID = fileID;
#if USE_MPI && defined (HAVE_LIBNETCDF) && ! defined (HAVE_PARALLEL_NC4)
switch (filetype)
{
case FILETYPE_NC4:
case FILETYPE_NC4C:
streamptr->ownerRank = rank;
}
#endif
if ( streamptr->filemode == 'r' )
{
vlist_t *vlistptr;
......@@ -893,7 +916,13 @@ int streamOpenA(const char *filename, const char *filemode, int filetype)
case FILETYPE_NC4:
case FILETYPE_NC4C:
{
cdfClose(fileID);
#if USE_MPI && ! defined (HAVE_PARALLEL_NC4)
int rank, rankOpen;
if (commInqIOMode() == PIO_NONE
|| ((rank = commInqRankColl())
== (rankOpen = cdiPioSerialOpenFileMap(streamID))))
#endif
cdfClose(fileID);
break;
}
#endif
......@@ -1205,6 +1234,12 @@ void streamClose(int streamID)
case FILETYPE_NC4:
case FILETYPE_NC4C:
{
#if USE_MPI && ! defined (HAVE_PARALLEL_NC4)
int rank, rankOpen;
if (commInqIOMode() == PIO_NONE
|| ((rank = commInqRankColl())
== (rankOpen = cdiPioSerialOpenFileMap(streamID))))
#endif
cdfClose(fileID);
break;
}
......@@ -1409,7 +1444,15 @@ int streamDefTimestep(int streamID, int tsID)
streamptr->filetype == FILETYPE_NC4 ||
streamptr->filetype == FILETYPE_NC4C)
&& vlistHasTime(vlistID))
cdfDefTimestep(streamID, tsID);
{
#if USE_MPI && defined (HAVE_LIBNETCDF) && ! defined (HAVE_PARALLEL_NC4)
int rank, rankOpen;
if (commInqIOMode() == PIO_NONE
|| ((rank = commInqRankColl())
== (rankOpen = cdiPioSerialOpenFileMap(streamID))))
#endif
cdfDefTimestep(streamID, tsID);
}
cdiCreateRecords(streamID, tsID);
......@@ -2265,7 +2308,13 @@ void streamDefVlist(int streamID, int vlistID)
streamptr->filetype == FILETYPE_NC4 ||
streamptr->filetype == FILETYPE_NC4C )
{
cdfDefVars(streamID);
#if USE_MPI && defined (HAVE_LIBNETCDF) && ! defined (HAVE_PARALLEL_NC4)
int rank, rankOpen;
if (commInqIOMode() == PIO_NONE
|| ((rank = commInqRankColl())
== (rankOpen = cdiPioSerialOpenFileMap(streamID))))
#endif
cdfDefVars(streamID);
}
else if ( streamptr->filetype == FILETYPE_GRB ||
streamptr->filetype == FILETYPE_GRB2 )
......
......@@ -534,19 +534,19 @@ int streamCompareP ( void * streamptr1, void * streamptr2 )
xassert ( s2 );
if ( s1->filetype != s2->filetype ) return differ;
if ( namespaceAdaptKey2 ( s1->vlistIDorig ) !=
if ( namespaceAdaptKey2 ( s1->vlistIDorig ) !=
namespaceAdaptKey2 ( s2->vlistIDorig )) return differ;
if ( s1->byteorder != s2->byteorder ) return differ;
if ( s1->comptype != s2->comptype ) return differ;
if ( s1->complevel != s2->complevel ) return differ;
if ( s1->complevel != s2->complevel ) return differ;
if ( s1->filename )
{
len = strlen ( s1->filename ) + 1;
if ( memcmp ( s1->filename, s2->filename, len ))
if ( memcmp ( s1->filename, s2->filename, len ))
return differ;
}
else if ( s2->filename )
else if ( s2->filename )
return differ;
return equal;
......
......@@ -248,6 +248,9 @@ typedef struct {
char **fnames;
void *gribContainers;
int vlistIDorig;
#if USE_MPI
int ownerRank; // MPI rank of owner process
#endif
}
stream_t;
......
......@@ -3,7 +3,7 @@ pio_write_args="-f nc4 -w 3"
mpi_task_num=7
LOG=pio_cksum_cdf.log
suffix=nc4
if [ "@USE_MPI@" = yes -a "@HAVE_PARALLEL_NC4@" -gt 0 ]; then
if [ "@USE_MPI@" = yes -a "@ENABLE_NETCDF@" = yes ]; then
. ./pio_write_run
else
exit 77
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment