Commit d0057d94 authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Handle timestep data via explicit transport of meta-data.

parent 947a2ef7
......@@ -139,7 +139,7 @@ static void modelRun(MPI_Comm commModel)
chunk = CDI_UNDEFID;
}
#ifdef USE_MPI
pioWriteTimestep ( tsID, vdate, vtime );
pioWriteTimestep();
#endif
}
}
......
......@@ -204,7 +204,7 @@ CONTAINS
#ifdef USE_MPI
! For parallel IO:
! Start transmission of all data for output in this timestep to IO server.
CALL pioWriteTimestep ( tsID, vdate, vtime )
CALL pioWriteTimestep
#endif
END DO
......
......@@ -177,7 +177,7 @@ static void modelRun(MPI_Comm commModel)
}
}
#ifdef USE_MPI
pioWriteTimestep ( tsID, vdate, vtime );
pioWriteTimestep();
#endif
}
#ifdef USE_MPI
......
......@@ -234,7 +234,7 @@ void pioFinalize ( void );
/* pioInit: initialize I/O server processes and communication */
MPI_Comm pioInit(MPI_Comm commSuper, int nProcsIO, int IOMode,
int *pioNamespace, float partInflate);
void pioWriteTimestep ( int, int, int );
void pioWriteTimestep();
void streamWriteVarPart (int streamID, int varID,
const void *data, int nmiss,
......
......@@ -389,9 +389,6 @@
EXTERNAL pioInit
! pioWriteTimestep
! (INTEGER ,
! INTEGER ,
! INTEGER )
EXTERNAL pioWriteTimestep
! streamWriteVarPart
......
......@@ -86,7 +86,7 @@ static int pioInit_fwrap(int commSuper, int nProcsIO, int IOMode, int * pioNames
return MPI_Comm_c2f(v);
}
FCALLSCFUN5 (INT, pioInit_fwrap, PIOINIT, pioinit, INT, INT, INT, PINT, FLOAT)
FCALLSCSUB3 (pioWriteTimestep, PIOWRITETIMESTEP, piowritetimestep, INT, INT, INT)
FCALLSCSUB0 (pioWriteTimestep, PIOWRITETIMESTEP, piowritetimestep)
static void streamWriteVarPart_fwrap(int streamID, int varID, const void * data, int nmiss, void * partDesc)
{
streamWriteVarPart( streamID, varID, data, nmiss, (*(Xt_idxlist *)partDesc));
......
......@@ -8,6 +8,8 @@
#include <yaxt.h>
#include "namespace.h"
#include "taxis.h"
#include "pio.h"
#include "pio_client.h"
#include "pio_comm.h"
......@@ -172,6 +174,40 @@ cdiPioClientStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
}
}
static int
cdiPioClientStreamDefTimestep_(stream_t *streamptr, int tsID)
{
union winHeaderEntry header;
statusCode nspStatus = namespaceInqResStatus ();
int taxisID, buf_size, position;
char *buf;
MPI_Comm commCalc;
switch ( nspStatus )
{
case STAGE_DEFINITION:
break;
case STAGE_TIMELOOP:
position = 0;
taxisID = vlistInqTaxis(streamptr->vlistID);
header.funcCall
= (struct funcCallDesc){
.funcID = STREAMDEFTIMESTEP,
.funcArgs.streamNewTimestep = { streamptr->self, tsID } };
commCalc = commInqCommCalc();
buf_size = reshResourceGetPackSize(taxisID, &taxisOps, &commCalc);
buf = xmalloc((size_t)buf_size);
reshPackResource(taxisID, &taxisOps, buf, buf_size, &position,
&commCalc);
pioBufferFuncCall(header, buf, buf_size);
free(buf);
break;
case STAGE_CLEANUP:
break;
default:
xabort ( "INTERNAL ERROR" );
}
return cdiStreamDefTimestep_(streamptr, tsID);
}
void
cdiPioClientSetup(int *pioNamespace_, int *pioNamespace)
......@@ -194,6 +230,8 @@ cdiPioClientSetup(int *pioNamespace_, int *pioNamespace)
NSSW_FUNC(cdiPioClientStreamWriteVarPart));
namespaceSwitchSet(NSSWITCH_STREAM_CLOSE_BACKEND,
NSSW_FUNC(cdiPioClientStreamClose));
namespaceSwitchSet(NSSWITCH_STREAM_DEF_TIMESTEP_,
NSSW_FUNC(cdiPioClientStreamDefTimestep_));
namespaceSwitchSet(NSSWITCH_STREAM_SYNC,
NSSW_FUNC(cdiPioClientStreamNOP));
#ifdef HAVE_LIBNETCDF
......
......@@ -46,7 +46,8 @@ static struct rdmaWin
const char * const funcMap[numRPCFuncs] = {
"streamOpen",
"streamDefVlist",
"streamClose"
"streamClose",
"streamDefTimestep",
};
float cdiPIOpartInflate_;
......@@ -385,7 +386,9 @@ modelWinDefBufferSizes(void)
collIndex[collID].numRPCRecords += numRPCFuncs;
txWin[collID].size +=
numRPCFuncs * sizeof (union winHeaderEntry)
+ MAXDATAFILENAME;
+ MAXDATAFILENAME
/* data part of streamDefTimestep */
+ (2 * CDI_MAX_NAME + sizeof (taxis_t));
}
}
for (collID = 0; collID < nProcsColl; ++collID)
......@@ -532,6 +535,13 @@ modelWinEnqueue(int collID,
memcpy(txWin[collID].head, data, size);
txWin[collID].head += size;
}
else if (header.funcCall.funcID == STREAMDEFTIMESTEP)
{
header.funcCall.funcArgs.streamNewTimestep.offset
= (int)(txWin[collID].head - txWin[collID].buffer);
memcpy(txWin[collID].head, data, size);
txWin[collID].head += size;
}
}
winDict[targetEntry] = header;
}
......@@ -789,10 +799,10 @@ void pioFinalize ( void )
/************************************************************************/
void pioWriteTimestep ( int tsID, int vdate, int vtime )
void pioWriteTimestep()
{
#ifdef USE_MPI
int collID, buffer[timestepSize], iAssert = 0;
int collID, iAssert = 0;
/* int tokenEnd = END; */
int rankGlob = commInqRankGlob ();
int nProcsColl = commInqNProcsColl ();
......@@ -800,19 +810,12 @@ void pioWriteTimestep ( int tsID, int vdate, int vtime )
xdebug("%s", "START");
xassert ( tsID >= 0 &&
vdate >= 0 &&
vtime >= 0 &&
txWin != NULL);
buffer[0] = tsID;
buffer[1] = vdate;
buffer[2] = vtime;
xassert(txWin != NULL);
if ( rankGlob < nProcsColl )
{
xmpi ( MPI_Send ( &buffer[0], timestepSize, MPI_INTEGER, nProcsModel,
WRITETS, commInqCommsIO ( rankGlob )));
xmpi(MPI_Send(NULL, 0, MPI_INT, nProcsModel,
WRITETS, commInqCommsIO(rankGlob)));
xdebug("%s", "SENT MESSAGE WITH TAG \"WRITETS\"");
}
......@@ -836,8 +839,7 @@ void pioWriteTimestep ( int tsID, int vdate, int vtime )
txWin[collID].postSet = 1;
}
xdebug ( "RETURN. messages sent, windows posted: tsID=%d, vdate=%d, vtime=%d",
tsID, vdate, vtime );
xdebug("%s", "RETURN. messages sent, windows posted");
#endif
}
......
......@@ -20,23 +20,19 @@ typedef enum
COLLBUFNMISS,
} command;
enum
{
timestepSize = 3
};
#define MAXWINBUFFERSIZE ((size_t)2048 * 1024 * 1024)
enum
{
numRPCFuncs = 3,
numRPCFuncs = 4,
STREAMOPEN = -1,
STREAMDEFVLIST = -2,
STREAMCLOSE = -3,
STREAMDEFTIMESTEP = -4,
HEADERSIZEMARKER = -numRPCFuncs - 1,
PARTDESCMARKER = -numRPCFuncs - 2,
};
enum { MAXDATAFILENAME = 256, MINFUNCID = STREAMCLOSE, MAXFUNCID = STREAMOPEN };
enum { MAXDATAFILENAME = 256, MINFUNCID = -numRPCFuncs, MAXFUNCID = -1 };
extern const char * const funcMap[numRPCFuncs];
struct headerSize
......@@ -59,6 +55,10 @@ struct funcCallDesc
int streamID, vlistID;
} streamChange;
struct
{
int streamID, tsID, offset;
} streamNewTimestep;
struct
{
int fnamelen, offset, filetype;
} newFile;
......
......@@ -22,6 +22,7 @@
#include "cdi.h"
#include "namespace.h"
#include "taxis.h"
#include "pio.h"
#include "pio_comm.h"
#include "pio_interface.h"
......@@ -127,7 +128,10 @@ collDefBufferSizes()
// space required for the 3 function calls streamOpen, streamDefVlist, streamClose
// once per stream and timestep for all collprocs only on the modelproc root
rxWin[root].size += numRPCFuncs * sizeof (union winHeaderEntry)
+ MAXDATAFILENAME;
/* serialized filename */
+ MAXDATAFILENAME
/* data part of streamDefTimestep */
+ (2 * CDI_MAX_NAME + sizeof (taxis_t));
rxWin[root].dictSize += numRPCFuncs;
}
free ( streamIndexList );
......@@ -225,6 +229,27 @@ readFuncCall(struct funcCallDesc *header)
funcMap[(-1 - funcID)], streamID, vlistID);
}
break;
case STREAMDEFTIMESTEP:
{
MPI_Comm commCalc = commInqCommCalc ();
int streamID = header->funcArgs.streamNewTimestep.streamID;
int nspTarget = namespaceResHDecode(streamID).nsp;
streamID = namespaceAdaptKey2(streamID);
int tsID
= header->funcArgs.streamNewTimestep.tsID;
int oldTaxisID
= vlistInqTaxis(streamInqVlist(streamID));
int position = header->funcArgs.streamNewTimestep.offset;
int changedTaxisID
= taxisUnpack((char *)rxWin[root].buffer, (int)rxWin[root].size,
&position, nspTarget, &commCalc, 0);
taxis_t *oldTaxisPtr = taxisPtr(oldTaxisID);
taxis_t *changedTaxisPtr = taxisPtr(changedTaxisID);
ptaxisCopy(oldTaxisPtr, changedTaxisPtr);
taxisDestroy(changedTaxisID);
streamDefTimestep(streamID, tsID);
}
break;
default:
xabort ( "REMOTE FUNCTIONCALL NOT IMPLEMENTED!" );
}
......@@ -438,8 +463,7 @@ cdiPioServerCdfDefVars(stream_t *streamptr)
#endif
static
void readGetBuffers ( int tsID, int vdate, int vtime )
static void readGetBuffers()
{
int nProcsModel = commInqNProcsModel ();
int root = commInqRootGlob ();
......@@ -543,10 +567,6 @@ void readGetBuffers ( int tsID, int vdate, int vtime )
int streamID = streamMap[streamIdx].streamID;
int vlistID = streamInqVlist(streamID);
int fileType = streamMap[streamIdx].filetype;
int taxisID = vlistInqTaxis(vlistID);
taxisDefVdate(taxisID, vdate);
taxisDefVtime(taxisID, vtime);
streamDefTimestep(streamID, tsID);
switch (fileType)
{
......@@ -792,18 +812,17 @@ void clearModelWinBuffer(int modelID)
static
void getTimeStepData ( int tsID, int vdate, int vtime )
void getTimeStepData()
{
int modelID;
char text[1024];
int nProcsModel = commInqNProcsModel ();
void *getWinBaseAddr;
int attrFound;
xdebug("%s", "START");
// todo put in correct lbs and ubs
xassert ( tsID >= 0 && vdate >= 0 && vtime >= 0 );
xmpi(MPI_Win_start(groupModel, 0, getWin));
xmpi(MPI_Win_get_attr(getWin, MPI_WIN_BASE, &getWinBaseAddr, &attrFound));
xassert(attrFound);
......@@ -830,8 +849,8 @@ void getTimeStepData ( int tsID, int vdate, int vtime )
rxWin[modelID].size / sizeof (double),
DATATYPE_FLT);
}
readGetBuffers ( tsID, vdate, vtime );
readGetBuffers();
xdebug("%s", "RETURN");
}
......@@ -1022,16 +1041,12 @@ void IOServer ()
case WRITETS:
{
int iBuffer[timestepSize];
xdebugMsg(tag, source, nfinished);
xmpi(MPI_Get_count(&status, MPI_INTEGER, &size));
xassert(size == timestepSize);
xmpi(MPI_Recv(iBuffer, size, MPI_INTEGER, source,
xmpi(MPI_Recv(NULL, 0, MPI_INT, source,
tag, commCalc, &status));
xdebug("RECEIVED MESSAGE WITH TAG \"WRITETS\": "
"tsID=%d, vdate=%d, vtime=%d, source=%d",
iBuffer[0], iBuffer[1], iBuffer[2], source );
getTimeStepData(iBuffer[0], iBuffer[1], iBuffer[2]);
xdebug("RECEIVED MESSAGE WITH TAG \"WRITETS\": source=%d",
source);
getTimeStepData();
}
break;
......
#ifndef _TAXIS_H
#define _TAXIS_H
#include "resource_handle.h"
typedef struct {
/* Date format YYYYMMDD */
......@@ -33,6 +34,8 @@ double cdiEncodeTimeval(int date, int time, taxis_t *taxis);
void timeval2vtime(double timevalue, taxis_t *taxis, int *vdate, int *vtime);
double vtime2timeval(int vdate, int vtime, taxis_t *taxis);
extern resOps taxisOps;
int
taxisUnpack(char * unpackBuffer, int unpackBufferSize, int * unpackBufferPos,
int nspTarget, void *context, int checkForSameID);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment