Commit ff998304 authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Change multi-stream example to use explicit decomposition.

parent 1269d214
......@@ -12,7 +12,7 @@ typedef int MPI_Comm;
#include "cdi.h"
#include "pio_util.h"
void hoursPassingHack ( int * vdate, int * vtime, int hoursPassed )
static void hoursPassingHack ( int * vdate, int * vtime, int hoursPassed )
{
int sum, days, hours, oldDays;
......@@ -26,7 +26,12 @@ void hoursPassingHack ( int * vdate, int * vtime, int hoursPassed )
* vdate = * vdate + days;
}
void modelRun(MPI_Comm commModel)
#ifdef USE_MPI
static int
uniform_partition_start(int set_interval[2], int nparts, int part_idx);
#endif
static void modelRun(MPI_Comm commModel)
{
enum {
filetype = FILETYPE_GRB,
......@@ -48,14 +53,20 @@ void modelRun(MPI_Comm commModel)
int gridID, zaxisID[nStreams][nVars], taxisID;
int streamID[nStreams], vlistID[nStreams], varID[nStreams][nVars], tsID, tfID = 0;
int i, j, k, nmiss = 0;
int i, j, nmiss = 0;
double lons[nlon] = {0, 30, 60, 90, 120, 150, 180, 210, 240, 270, 300, 330};
double lats[nlat] = {-75, -45, -15, 15, 45, 75};
double levs[MAXNLEV] = {101300, 92500, 85000, 50000, 20000};
double var[nlon*nlat*MAXNLEV];
double *var;
int vdate = 19850101, vtime = 120000, hourStep = 20000;
int start = CDI_UNDEFID, chunk = CDI_UNDEFID, stop = CDI_UNDEFID;
size_t varSize[nStreams][nVars];
#if USE_MPI
int rank, comm_size;
struct var1DDeco {
int chunkSize, start;
Xt_idxlist partDesc;
} varDeco[nStreams][nVars];
#endif
xassert ( nStreams < MAXNSTREAMS );
gridID = gridCreate ( GRID_LONLAT, nlon*nlat );
......@@ -75,10 +86,44 @@ void modelRun(MPI_Comm commModel)
vlistID[i] = vlistCreate ();
}
for ( i = 0; i < nStreams; i++ )
for ( j = 0; j < nVars; j++ )
varID[i][j] = vlistDefVar ( vlistID[i], gridID, zaxisID[i][j], TIME_VARIABLE );
#if USE_MPI
xmpi ( MPI_Comm_rank ( commModel, &rank ));
xmpi ( MPI_Comm_size ( commModel, &comm_size ));
#endif
{
int maxChunkSize = 0;
for ( i = 0; i < nStreams; i++ )
for ( j = 0; j < nVars; j++ )
{
varID[i][j] = vlistDefVar(vlistID[i], gridID, zaxisID[i][j],
TIME_VARIABLE );
varSize[i][j] = nlon * nlat * nlev[i][j];
#ifdef USE_MPI
{
int start = uniform_partition_start((int [2]){ 0, varSize[i] - 1 },
comm_size, rank),
chunkSize = uniform_partition_start((int [2]){ 0, varSize[i] - 1 },
comm_size, rank + 1) - start;
if (maxChunkSize < chunkSize)
maxChunkSize = chunkSize;
fprintf(stderr, "%d: start=%d, chunkSize = %d\n", rank,
start, chunkSize);
Xt_idxlist idxlist
= xt_idxstripes_new(&(struct Xt_stripe){ .start = start,
.nstrides = chunkSize, .stride = 1 }, 1);
varDeco[i][j] = (struct var1DDeco){
.start = start,
.chunkSize = chunkSize,
.partDesc = idxlist
};
}
#else
if (maxChunkSize < varSize[i][j])
maxChunkSize = varSize[i][j];
#endif
}
var = malloc(maxChunkSize * sizeof (var[0]));
}
taxisID = taxisCreate ( TAXIS_ABSOLUTE );
for ( i = 0; i < nStreams; i++ )
vlistDefTaxis ( vlistID[i], taxisID );
......@@ -120,13 +165,15 @@ void modelRun(MPI_Comm commModel)
streamDefTimestep ( streamID[i], tsID );
for ( j = 0; j < nVars; j++ )
{
start = pioInqVarDecoOff ( vlistID[i], varID[i][j] );
chunk = pioInqVarDecoChunk ( vlistID[i], varID[i][j] );
stop = start + chunk;
for ( k = start; k < stop; k++ ) var[k] = 2.2;
streamWriteVar ( streamID[i], varID[i][j], &var[start], nmiss );
start = CDI_UNDEFID;
chunk = CDI_UNDEFID;
#ifdef USE_MPI
int start = varDeco[i][j].start;
int chunk = varDeco[i][j].chunkSize;
#else
int start = 0, chunk = varSize[i][j];
#endif
for(int k = 0; k < chunk; k++)
var[k] = 3.3;
streamWriteVar(streamID[i], varID[i][j], var, nmiss );
}
}
#ifdef USE_MPI
......@@ -140,6 +187,9 @@ void modelRun(MPI_Comm commModel)
#ifdef USE_MPI
pioEndTimestepping ();
for (int streamID = 0; streamID < nStreams; ++streamID)
for (int varID = 0; varID < nVars; ++varID)
xt_idxlist_delete(varDeco[streamID][varID].partDesc);
#endif
for ( i = 0; i < nStreams; i++ )
......@@ -152,6 +202,7 @@ void modelRun(MPI_Comm commModel)
for ( j = 0; j < nVars; j++ )
zaxisDestroy ( zaxisID[i][j] );
gridDestroy ( gridID );
free(var);
xdebug("%s", "RETURN");
}
......@@ -209,3 +260,15 @@ int main (int argc, char *argv[])
#endif
return 0;
}
#ifdef USE_MPI
static int
uniform_partition_start(int set_interval[2], int nparts, int part_idx)
{
int part_offset
= (((long long)set_interval[1] - (long long)set_interval[0] + 1LL)
* (long long)part_idx) / (long long)nparts;
int start = set_interval[0] + part_offset;
return start;
}
#endif
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment