Commit a95796cb authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Also communicate partition descriptions in PIO write interface.

parent 8b49fc2c
......@@ -359,7 +359,7 @@ Get all values of a Y-axis.
\begin{verbatim}
MPI_Comm pioInit (MPI_Comm commSuper, int nProcsIO, int IOMode, int nNamespaces,
int *hasLocalFile);
int *hasLocalFile, float partInflate);
\end{verbatim}
initialize I/O server processes and communication.
......
......@@ -359,7 +359,8 @@ Get all values of a Y-axis.
\begin{verbatim}
INTEGER FUNCTION pioInit (INTEGER commSuper, INTEGER nProcsIO, INTEGER IOMode,
INTEGER nNamespaces, INTEGER hasLocalFile)
INTEGER nNamespaces, INTEGER hasLocalFile,
REAL partInflate)
\end{verbatim}
initialize I/O server processes and communication.
......
......@@ -161,7 +161,8 @@ int main (int argc, char *argv[])
}
}
commModel = pioInit(commGlob, nProcsIO, IOMode, nNamespaces, hasLocalFile);
commModel = pioInit(commGlob, nProcsIO, IOMode, nNamespaces, hasLocalFile,
1.0f);
#endif
modelRun ();
......
......@@ -186,7 +186,8 @@ int main (int argc, char *argv[])
nProcsIO = nProcsIODef;
}
commModel = pioInit(commGlob, nProcsIO, IOMode, nNamespaces, hasLocalFile);
commModel = pioInit(commGlob, nProcsIO, IOMode, nNamespaces, hasLocalFile,
1.0f);
#endif
modelRun ();
......
......@@ -236,7 +236,8 @@ int main (int argc, char *argv[])
if ( nProcsIO != 1 )
xabort ( "bad distribution of tasks on PEs" );
commModel = pioInit(commGlob, nProcsIO, IOMode, nNamespaces, hasLocalFile);
commModel = pioInit(commGlob, nProcsIO, IOMode, nNamespaces, hasLocalFile,
1.0f);
modelRun ( commModel );
......
......@@ -223,7 +223,7 @@ void pioEndTimestepping ( void );
void pioFinalize ( void );
/* pioInit: initialize I/O server processes and communication */
MPI_Comm pioInit(MPI_Comm commSuper, int nProcsIO, int IOMode, int nNamespaces,
int *hasLocalFile);
int *hasLocalFile, float partInflate);
int pioInqVarDecoChunk ( int, int );
int pioInqVarDecoOff ( int, int );
void pioNamespaceSetActive ( int );
......
......@@ -366,7 +366,8 @@
! INTEGER nProcsIO,
! INTEGER IOMode,
! INTEGER nNamespaces,
! INTEGER hasLocalFile)
! INTEGER hasLocalFile,
! REAL partInflate)
EXTERNAL pioInit
INTEGER pioInqVarDecoChunk
......
......@@ -70,13 +70,13 @@
FCALLSCSUB0 (pioEndDef, PIOENDDEF, pioenddef)
FCALLSCSUB0 (pioEndTimestepping, PIOENDTIMESTEPPING, pioendtimestepping)
FCALLSCSUB0 (pioFinalize, PIOFINALIZE, piofinalize)
static int pioInit_fwrap(int commSuper, int nProcsIO, int IOMode, int nNamespaces, int * hasLocalFile)
static int pioInit_fwrap(int commSuper, int nProcsIO, int IOMode, int nNamespaces, int * hasLocalFile, float partInflate)
{
MPI_Comm v;
v = pioInit(MPI_Comm_f2c(commSuper), nProcsIO, IOMode, nNamespaces, hasLocalFile);
v = pioInit(MPI_Comm_f2c(commSuper), nProcsIO, IOMode, nNamespaces, hasLocalFile, partInflate);
return MPI_Comm_c2f(v);
}
FCALLSCFUN5 (INT, pioInit_fwrap, PIOINIT, pioinit, INT, INT, INT, INT, PINT)
FCALLSCFUN6 (INT, pioInit_fwrap, PIOINIT, pioinit, INT, INT, INT, INT, PINT, FLOAT)
FCALLSCFUN2 (INT, pioInqVarDecoChunk, PIOINQVARDECOCHUNK, pioinqvardecochunk, INT, INT)
FCALLSCFUN2 (INT, pioInqVarDecoOff, PIOINQVARDECOOFF, pioinqvardecooff, INT, INT)
FCALLSCSUB1 (pioNamespaceSetActive, PIONAMESPACESETACTIVE, pionamespacesetactive, INT)
......
......@@ -9,6 +9,7 @@
#ifdef USE_MPI
#include <mpi.h>
#include <yaxt.h>
#endif
#include "cdi.h"
......@@ -24,6 +25,8 @@
#include "resource_handle.h"
#include "stream_int.h"
#include "vlist.h"
extern resOps streamOps;
......@@ -34,12 +37,13 @@ static struct rdmaWin
MPI_Win win;
int postSet, refuseFuncCall;
MPI_Group ioGroup;
int dictSize, dictDataUsed, dictRPCUsed;
int dictSize, dictDataUsed, dictRPCUsed, dict;
} *txWin = NULL;
char * funcMap[nFuncs] = {"streamOpen", "streamDefVlist", "streamClose" };
float cdiPIOpartInflate_;
/****************************************************/
......@@ -360,7 +364,7 @@ static void
modelWinDefBufferSizes(void)
{
int collID, nstreams, * streamIndexList, streamNo, nvars, varID;
int collIDchunk = 0, sumWinBufferSize = 0;
int sumWinBufferSize = 0;
int nProcsColl = commInqNProcsColl ();
int rankGlob = commInqRankGlob ();
int rankModel = commInqRankModel ();
......@@ -383,12 +387,19 @@ modelWinDefBufferSizes(void)
for ( varID = 0; varID < nvars; varID++ )
{
collID = CDI_UNDEFID;
collID = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
collIDchunk = vlistInqVarDecoChunk ( vlistID, varID, rankModel );
collID = cdiPIOpartInflate_
* commRankGlob2CollID(vlistInqVarIOrank(vlistID, varID));
int collIDchunk = vlistInqVarDecoChunk ( vlistID, varID, rankModel );
xassert ( collID != CDI_UNDEFID && collIDchunk > 0 );
++(collIndex[collID].numDataRecords);
txWin[collID].size += collIDchunk * sizeof (double) +
sizeof (union winHeaderEntry);
collIndex[collID].numDataRecords += 2;
txWin[collID].size += (size_t)collIDchunk * sizeof (double)
/* re-align chunks to multiple of double size */
+ sizeof (double) - 1
/* one header for data record, one for corresponding part
* descriptor*/
+ 2 * sizeof (union winHeaderEntry)
/* FIXME: heuristic for size of packed Xt_idxlist */
+ sizeof (Xt_int) * collIDchunk * 3;
}
// memory required for the 3 function calls streamOpen, streamDefVlist, streamClose
......@@ -409,7 +420,10 @@ modelWinDefBufferSizes(void)
txWin[collID].dictSize = numRecords;
txWin[collID].dictDataUsed = 1;
txWin[collID].dictRPCUsed = 0;
/* account for size header */
txWin[collID].size += sizeof (union winHeaderEntry);
txWin[collID].size = roundUpToMultiple(txWin[collID].size,
PIO_WIN_ALIGN);
sumWinBufferSize += txWin[collID].size;
}
free(collIndex);
......@@ -491,10 +505,47 @@ modelWinEnqueue(int collID,
if (header.dataRecord.streamID > 0)
{
targetEntry = (txWin[collID].dictDataUsed)++;
header.dataRecord.offset
= (int)(txWin[collID].head - txWin[collID].buffer);
memcpy(txWin[collID].head, data, size);
txWin[collID].head += size;
int offset = header.dataRecord.offset
= (int)roundUpToMultiple(txWin[collID].head - txWin[collID].buffer,
sizeof (double));
memcpy(txWin[collID].buffer + offset, data, size);
txWin[collID].head = txWin[collID].buffer + offset + size;
}
else if (header.partDesc.partDescMarker == PARTDESCMARKER)
{
targetEntry = (txWin[collID].dictDataUsed)++;
Xt_uid uid = header.partDesc.uid;
int offset = -1;
/* search if same uid entry has already been enqueued */
for (int entry = 2; entry < targetEntry; entry += 2)
{
xassert(winDict[entry].partDesc.partDescMarker
== PARTDESCMARKER);
if (winDict[entry].partDesc.uid == uid)
{
offset = winDict[entry].partDesc.offset;
break;
}
}
if (offset == -1)
{
/* not yet used partition descriptor, serialize at
* current position */
int position = 0;
MPI_Comm comm = commInqCommsIO(collID);
header.partDesc.offset
= (int)(txWin[collID].head - txWin[collID].buffer);
size_t size = xt_idxlist_get_pack_size((Xt_idxlist)data, comm);
size_t remaining_size = txWin[collID].size
- (txWin[collID].head - txWin[collID].buffer);
xassert(size <= remaining_size);
xt_idxlist_pack((Xt_idxlist)data, txWin[collID].head,
(int)remaining_size, &position, comm);
txWin[collID].head += position;
}
else
/* duplicate entries are copied only once per timestep */
header.partDesc.offset = offset;
}
else
{
......@@ -510,17 +561,31 @@ modelWinEnqueue(int collID,
winDict[targetEntry] = header;
}
void pioBufferData ( const int streamID, const int varID, const double *data, int nmiss )
void pioBufferData(int streamID, int varID,
const double *data, int nmiss)
{
int chunk, vlistID, collID = CDI_UNDEFID;
int rankModel = commInqRankModel ();
int vlistID = streamInqVlist(streamID);
int chunk = vlistInqVarDecoChunk(vlistID, varID, rankModel);
int start = pioInqVarDecoOff(vlistID, varID);
struct Xt_stripe stripe = { .start = start, .nstrides = chunk, .stride = 1 };
Xt_idxlist partDesc = xt_idxstripes_new(&stripe, 1);
pioBufferPartData(streamID, varID, data, nmiss, partDesc);
xt_idxlist_delete(partDesc);
}
void
pioBufferPartData(int streamID, int varID, const double *data,
int nmiss, Xt_idxlist partDesc)
{
int vlistID, collID = CDI_UNDEFID;
vlistID = streamInqVlist ( streamID );
collID = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
chunk = vlistInqVarDecoChunk ( vlistID, varID, rankModel );
xassert ( collID >= 0 &&
collID < commInqNProcsColl () &&
chunk >= 0 &&
txWin != NULL);
if (txWin[collID].postSet)
......@@ -530,9 +595,19 @@ void pioBufferData ( const int streamID, const int varID, const double *data, in
modelWinFlushBuffer ( collID );
}
union winHeaderEntry header
Xt_int chunk = xt_idxlist_get_num_indices(partDesc);
xassert(chunk <= INT_MAX);
union winHeaderEntry dataHeader
= { .dataRecord = { streamID, varID, -1, nmiss } };
modelWinEnqueue(collID, header, data, chunk * sizeof (data[0]));
modelWinEnqueue(collID, dataHeader, data, chunk * sizeof (data[0]));
{
union winHeaderEntry partHeader
= { .partDesc = { .partDescMarker = PARTDESCMARKER,
.uid = xt_idxlist_get_uid(partDesc),
.offset = 0 } };
modelWinEnqueue(collID, partHeader, partDesc, 0);
}
txWin[collID].refuseFuncCall = 1;
}
......@@ -685,12 +760,15 @@ int pioInqVarDecoOff ( int vlistID, int varID )
@param comm MPI_Communicator of all calling PEs
@param nIOP number of I/O PEs
@param partInflate allow for array partitions on comute
PE that are at most sized \f$ partInflate * \lceil arraySize /
numComputePEs \rceil \f$
@return int indicating wether the calling PE is a calcutator (1) or not (0)
*/
MPI_Comm
pioInit(MPI_Comm commGlob, int nProcsIO, int IOMode,
int nNamespaces, int * hasLocalFile)
int nNamespaces, int * hasLocalFile, float partInflate)
{
#ifdef USE_MPI
int sizeGlob;
......@@ -715,6 +793,9 @@ pioInit(MPI_Comm commGlob, int nProcsIO, int IOMode,
commDefIOMode ( IOMode, PIO_MAXIOMODE, PIO_MINIOMODEWITHSPECIALPROCS );
commDefCommPio ();
xassert(partInflate >= 1.0);
cdiPIOpartInflate_ = partInflate;
// JUST FOR TEST CASES WITH ONLY ONE MPI TASK
if ( commInqSizeGlob () == 1 )
{
......@@ -859,10 +940,6 @@ void pioWriteTimestep ( int tsID, int vdate, int vtime )
txWin[collID].postSet = 0;
modelWinFlushBuffer ( collID );
}
/*
* modelWinBufferPutAtEnd ( __func__, collID, &tokenEnd,
* sizeof ( tokenEnd ));
*/
union winHeaderEntry header
= { .headerSize = { .sizeID = HEADERSIZEMARKER,
.numDataEntries = txWin[collID].dictDataUsed,
......
......@@ -8,10 +8,16 @@
#ifdef USE_MPI
#include <mpi.h>
#include <yaxt.h>
void pioBufferData ( int, int, const double *, int );
void
pioBufferPartData(int streamID, int varID, const double *data,
int nmiss, Xt_idxlist partDesc);
void pioBufferData (int, int, const double *, int );
void pioBufferFuncCall(int, int, ... );
extern float cdiPIOpartInflate_;
#endif
#endif
......
......@@ -8,6 +8,7 @@
#ifdef USE_MPI
#include <mpi.h>
#include <yaxt.h>
typedef enum
{
......@@ -48,6 +49,7 @@ enum
STREAMDEFVLIST = -2,
STREAMCLOSE = -3,
HEADERSIZEMARKER = -nFuncs - 1,
PARTDESCMARKER = -nFuncs - 2,
};
enum { MAXDATAFILENAME = 256, MINFUNCID = STREAMCLOSE, MAXFUNCID = STREAMOPEN };
extern char * funcMap[nFuncs];
......@@ -80,11 +82,33 @@ struct funcCallDesc
} funcArgs;
};
/* Describes offset and ID of serialized partition descriptor.
* partDescMarker == PARTDESCMARKER, always. */
struct partDescRecord
{
int partDescMarker, offset;
Xt_uid uid;
};
union winHeaderEntry
{
struct headerSize headerSize;
struct dataRecord dataRecord;
struct funcCallDesc funcCall;
struct partDescRecord partDesc;
};
/* round size to next multiple of factor */
static inline size_t
roundUpToMultiple(size_t size, size_t factor)
{
return (size + factor - 1)/factor * factor;
}
enum
{
/* align window base addresses and sizes to this value */
PIO_WIN_ALIGN = sizeof (double),
};
#endif
......
......@@ -17,12 +17,13 @@
#include <core/ppm_combinatorics.h>
#include <core/ppm_rectilinear.h>
#include <ppm/ppm_uniform_partition.h>
#include <yaxt.h>
#endif
#include <yaxt.h>
#include "cdi.h"
#include "pio.h"
#include "pio_comm.h"
#include "pio_interface.h"
#include "pio_rpc.h"
#include "pio_util.h"
#include "stream_int.h"
......@@ -54,29 +55,25 @@ static int numPioPrimes;
static
void serverWinCleanup ()
{
int i;
int nProcsCalc = commInqNProcsModel ();
if ( getWin != MPI_WIN_NULL )
xmpi ( MPI_Win_free ( &getWin ));
if (getWin != MPI_WIN_NULL)
xmpi(MPI_Win_free(&getWin));
if (rxWin)
{
for ( i = 0; i < nProcsCalc; i++ )
free(rxWin[i].buffer);
free(rxWin[0].buffer);
free(rxWin);
}
xdebug("%s", "cleaned up mpi_win");
}
/************************************************************************/
static
void collDefBufferSizes ()
static size_t
collDefBufferSizes()
{
int nstreams, * streamIndexList, streamNo, vlistID, nvars, varID, iorank;
int modelID, decoChunk, sumGetBufferSizes = 0;
int modelID;
size_t sumGetBufferSizes = 0;
int rankGlob = commInqRankGlob ();
int nProcsModel = commInqNProcsModel ();
int root = commInqRootGlob ();
......@@ -99,11 +96,19 @@ static
{
for ( modelID = 0; modelID < nProcsModel; modelID++ )
{
decoChunk = vlistInqVarDecoChunk ( vlistID, varID, modelID );
int decoChunk
= (int)(cdiPIOpartInflate_
* vlistInqVarDecoChunk(vlistID, varID, modelID));
xassert ( decoChunk > 0 );
rxWin[modelID].size += decoChunk * sizeof (double)
+ sizeof (union winHeaderEntry);
++(rxWin[modelID].dictSize);
/* re-align chunks to multiple of double size */
+ sizeof (double) - 1
/* one header for data record, one for
* corresponding part descriptor*/
+ 2 * sizeof (union winHeaderEntry)
/* FIXME: heuristic for size of packed Xt_idxlist */
+ sizeof (Xt_int) * decoChunk * 3;
rxWin[modelID].dictSize += 2;
}
}
}
......@@ -120,9 +125,12 @@ static
/* account for size header */
rxWin[modelID].dictSize += 1;
rxWin[modelID].size += sizeof (union winHeaderEntry);
sumGetBufferSizes += rxWin[modelID].size;
rxWin[modelID].size = roundUpToMultiple(rxWin[modelID].size,
PIO_WIN_ALIGN);
sumGetBufferSizes += (size_t)rxWin[modelID].size;
}
xassert ( sumGetBufferSizes <= MAXWINBUFFERSIZE );
return sumGetBufferSizes;
}
/************************************************************************/
......@@ -144,10 +152,14 @@ static
xmpi ( MPI_Group_excl ( groupCalc, 1, ranks, &groupModel ));
rxWin = xcalloc(nProcsModel, sizeof (rxWin[0]));
collDefBufferSizes ();
for ( modelID = 0; modelID < nProcsModel; modelID++ )
rxWin[modelID].buffer = xmalloc(rxWin[modelID].size);
size_t totalBufferSize = collDefBufferSizes();
rxWin[0].buffer = xmalloc(totalBufferSize);
size_t ofs = 0;
for ( modelID = 1; modelID < nProcsModel; modelID++ )
{
ofs += rxWin[modelID - 1].size;
rxWin[modelID].buffer = rxWin[0].buffer + ofs;
}
xdebug("%s", "created mpi_win, allocated getBuffer");
}
......@@ -214,32 +226,110 @@ resizeVarGatherBuf(int vlistID, int varID, double **buf, int *bufSize)
*buf = xrealloc(*buf, (*bufSize = size) * sizeof (buf[0][0]));
}
static void
queryVarDims(int varShape[3], int vlistID, int varID)
{
int gridID = vlistInqVarGrid(vlistID, varID);
int zaxisID = vlistInqVarZaxis(vlistID, varID);
int gridType = gridInqType(gridID);
switch (gridType)
{
case GRID_LONLAT:
case GRID_GAUSSIAN:
varShape[0] = gridInqXsize(gridID);
varShape[1] = gridInqYsize(gridID);
break;
case GRID_GENERIC:
case GRID_LCC:
case GRID_SPECTRAL:
case GRID_GME:
case GRID_CURVILINEAR:
case GRID_UNSTRUCTURED:
case GRID_REFERENCE:
xabort("unimplemented grid type: %d", gridType);
break;
}
varShape[2] = zaxisInqSize(zaxisID);
}
static void
gatherArray(int root, int nProcsModel, int headerIdx,
int vlistID,
double *gatherBuf, int *nmiss)
{
double *p = gatherBuf;
union winHeaderEntry *winDict
= (union winHeaderEntry *)rxWin[root].buffer;
int streamID = winDict[headerIdx].dataRecord.streamID;
int varID = winDict[headerIdx].dataRecord.varID;
*nmiss = 0;
int varShape[3] = { 0, 0, 0 };
queryVarDims(varShape, vlistID, varID);
Xt_int varShapeXt[3];
static const Xt_int origin[3] = { 0, 0, 0 };
for (unsigned i = 0; i < 3; ++i)
varShapeXt[i] = varShape[i];
int varSize = varShape[0] * varShape[1] * varShape[2];
int *partOfs = xmalloc(2 * varSize * sizeof (partOfs[0])),
*gatherOfs = partOfs + varSize;
Xt_idxlist *part = xmalloc(nProcsModel * sizeof (part[0]));
MPI_Comm commCalc = commInqCommCalc();
{
int nmiss_ = 0, partOfsOfs = 0;
for (int modelID = 0; modelID < nProcsModel; modelID++)
{
struct dataRecord *dataHeader
= &((union winHeaderEntry *)
rxWin[modelID].buffer)[headerIdx].dataRecord;
struct partDescRecord *partHeader
= &((union winHeaderEntry *)
rxWin[modelID].buffer)[headerIdx + 1].partDesc;
int position = partHeader->offset;
xassert(dataHeader->streamID == streamID
&& dataHeader->varID == varID
&& partHeader->partDescMarker == PARTDESCMARKER
&& position > 0
&& ((size_t)position
>= sizeof (union winHeaderEntry) * rxWin[modelID].dictSize)
&& ((size_t)position < rxWin[modelID].size));
part[modelID] = xt_idxlist_unpack(rxWin[modelID].buffer,
(int)rxWin[modelID].size,
&position, commCalc);
Xt_int partSize = xt_idxlist_get_num_indices(part[modelID]);
size_t charOfs = (rxWin[modelID].buffer + dataHeader->offset)
- rxWin[0].buffer;
xassert(charOfs % sizeof (double) == 0
&& charOfs / sizeof (double) + partSize <= INT_MAX);
int elemOfs = charOfs / sizeof (double);
for (int i = 0; i < (int)partSize; ++i)
partOfs[partOfsOfs + i] = elemOfs + i;
partOfsOfs += partSize;
nmiss_ += dataHeader->nmiss;
}
*nmiss = nmiss_;
}
Xt_idxlist srcList = xt_idxlist_collection_new(part, nProcsModel);
for (int modelID = 0; modelID < nProcsModel; modelID++)
{
struct dataRecord *dataHeader
= (struct dataRecord *)rxWin[modelID].buffer
+ headerIdx;
xassert(dataHeader->streamID == streamID
&& dataHeader->varID == varID);
int chunk = vlistInqVarDecoChunk(vlistID, varID,
modelID);
memcpy(p, rxWin[modelID].buffer
+ dataHeader->offset, chunk * sizeof (p[0]));
p += chunk;
*nmiss = dataHeader->nmiss;
}
xt_idxlist_delete(part[modelID]);
free(part);
Xt_xmap gatherXmap;
{
Xt_idxlist dstList
= xt_idxsection_new(0, 3, varShapeXt, varShapeXt, origin);
struct Xt_com_list full = { .list = dstList, .rank = 0 };
gatherXmap = xt_xmap_intersection_new(1, &full, 1, &full, srcList, dstList,
MPI_COMM_SELF);
xt_idxlist_delete(dstList);
}
xt_idxlist_delete(srcList);
for (int i = 0; i < varSize; ++i)
gatherOfs[i] = i;
Xt_redist gatherRedist
= xt_redist_p2p_off_new(gatherXmap, partOfs, gatherOfs, MPI_DOUBLE);
xt_xmap_delete(gatherXmap);
xt_redist_s_exchange(gatherRedist, (void **)&rxWin[0].buffer, 1,
(void **)&gatherBuf, 1);
free(partOfs);
xt_redist_delete(gatherRedist);
}
struct xyzDims
......@@ -255,32 +345,15 @@ xyzGridSize(struct xyzDims dims)
#ifdef HAVE_PARALLEL_NC4
static void
queryVarDims(struct PPM_extent varShape[3], int vlistID, int varID)