pio_interface.c 29.2 KB
Newer Older
1
2
3
4
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

5
#include <limits.h>
Deike Kleberg's avatar
Deike Kleberg committed
6
#include <stdlib.h>
7
#include <stdio.h>
8
#include <stdarg.h>
9
10
11

#ifdef USE_MPI
#include <mpi.h>
12
#include <yaxt.h>
13
14
#endif

15
#include "cdi.h"
Deike Kleberg's avatar
Deike Kleberg committed
16
#include "pio_util.h"
Deike Kleberg's avatar
Deike Kleberg committed
17
#include "vlist_var.h"
18
19

#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
20
#include "namespace.h"
21
#include "pio.h"
22
#include "pio_serialize.h"
23
#include "pio_interface.h"
Deike Kleberg's avatar
Deike Kleberg committed
24
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
25
#include "pio_rpc.h"
Deike Kleberg's avatar
Deike Kleberg committed
26
#include "pio_server.h"
27
#include "resource_handle.h"
28
#include "cdi_int.h"
Deike Kleberg's avatar
Deike Kleberg committed
29
#include "vlist.h"
30

Deike Kleberg's avatar
Deike Kleberg committed
31
extern resOps streamOps;
Deike Kleberg's avatar
Deike Kleberg committed
32

33

34
static struct rdmaWin
35
36
{
  size_t size;
37
  unsigned char *buffer, *head;
38
  MPI_Win win;
39
  int postSet, refuseFuncCall;
40
  MPI_Group ioGroup;
41
  int dictSize, dictDataUsed, dictRPCUsed, dict;
42
43
} *txWin = NULL;

Deike Kleberg's avatar
Deike Kleberg committed
44

Deike Kleberg's avatar
Deike Kleberg committed
45
char * funcMap[nFuncs] = {"streamOpen", "streamDefVlist", "streamClose" };
Deike Kleberg's avatar
Deike Kleberg committed
46

47
float cdiPIOpartInflate_;
Deike Kleberg's avatar
Deike Kleberg committed
48

Deike Kleberg's avatar
Deike Kleberg committed
49
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
50

51
static int cmp ( const void * va, const void * vb )
Deike Kleberg's avatar
Deike Kleberg committed
52
{
53
54
    const int ** a, ** b;

Deike Kleberg's avatar
Deike Kleberg committed
55
56
  a = ( const int ** ) va;
  b = ( const int ** ) vb;
57

Deike Kleberg's avatar
Deike Kleberg committed
58
59
60
61
  return (( **a < **b ) - ( **a > **b ));
}

/****************************************************/
62

63
64
65
static void
mapProblems(int problemSizes[], int * problemMapping, int nProblems,
            int nWriter, double * w)
Deike Kleberg's avatar
Deike Kleberg committed
66
67
68
69
70
71
72
{
  int *ip[nProblems];
  int dummy[nProblems];
  int buckets[nWriter];
  int currCapacity, nextCapacity;
  double meanBucket[nWriter];
  int sum = 0;
Thomas Jahns's avatar
Thomas Jahns committed
73
  int writerIdx = 0;
Deike Kleberg's avatar
Deike Kleberg committed
74
  int i, j;
75

Deike Kleberg's avatar
Deike Kleberg committed
76
77
78
79
80
81
82
83
84
85
86

  for ( i = 0; i < nProblems; i++ )
    {
      ip[i] = &problemSizes[i];
      sum += problemSizes[i];
    }

  qsort ( ip, nProblems, sizeof ( int * ), cmp );

  for ( i = 0; i < nProblems; i++ )
    dummy[i] = ip[i] - problemSizes;
87
88

  for ( j = 0; j < nWriter; j++ )
Deike Kleberg's avatar
Deike Kleberg committed
89
90
91
92
93
    meanBucket[j] = ( double ) sum * ( * ( w + j ));

  for ( i = 0; i < nProblems; i++ )
    {
      currCapacity = INT_MIN;
94

Deike Kleberg's avatar
Deike Kleberg committed
95
      for ( j = 0; j < nWriter; j++ )
96
97
98
	{
	  if ( !i ) buckets[j] = 0.0;
	  nextCapacity = meanBucket[j] - ( buckets[j] + ( *ip[i] ));
99

100
101
102
103
104
105
	  if ( nextCapacity > currCapacity )
	    {
	      currCapacity = nextCapacity;
	      writerIdx = j;
	    }
	}
Thomas Jahns's avatar
Thomas Jahns committed
106
      problemMapping[ dummy[i] ] = writerIdx;
Deike Kleberg's avatar
Deike Kleberg committed
107
108
      buckets[writerIdx] +=  *ip[i];
    }
Deike Kleberg's avatar
Deike Kleberg committed
109

110
111
112
  xprintArray3 (  "problemSizes = ", problemSizes, nProblems, DATATYPE_INT );
  xprintArray3 ( "vector of indices, qsort of problemSizes", dummy,
                nProblems, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
113
114
  xprintArray3 ( "problemMapping", problemMapping, nProblems, DATATYPE_INT );
  xprintArray3 ( "meanBucket", meanBucket, nWriter, DATATYPE_FLT );
115
  xprintArray3 ( "actual buckets", buckets, nWriter, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
116
117
}

118
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
119
120
121
122
123

/**
   @brief is encapsulated in CDI library.

   @param vSizes array with number of levels for all var_t 's in order of streams
124

Deike Kleberg's avatar
Deike Kleberg committed
125
126
   @param sSizes array with number of var_t for all stream_t 's

127
   @param varMapping return value, array with ranks of I/O PEs assigned to var_t 's
Deike Kleberg's avatar
Deike Kleberg committed
128
129
130
in order of vSizes

   @param nStreams number of stream_t 's
131
132

   @param nodeSizes array of number of I/O PEs on each physical nodes, increasing
Deike Kleberg's avatar
Deike Kleberg committed
133
134
135
   order of ranks assumed

   @param nNodes number of physical nodes hosting I/O PEs
136

Deike Kleberg's avatar
Deike Kleberg committed
137
138
   @return
*/
139

140
141
142
static void
varMapGen(int *vSizes, int *sSizes, int *varMapping,
          int nStreams, int *nodeSizes, int nNodes)
Deike Kleberg's avatar
Deike Kleberg committed
143
{
144

Deike Kleberg's avatar
Deike Kleberg committed
145
146
147
148
149
  int weightsStreams[nStreams];
  int streamMapping[nStreams];
  int nPEs = 0, nVars = 0;
  int i, j, k, offset = 0, offsetN = 0;
  double * w;
150

Deike Kleberg's avatar
Deike Kleberg committed
151
152
153
  int * weightsVarsNode;
  int * varMappingNode;
  int nVarsNode, summandRank = 0;
154
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
155

156
  int buckets[nProcsColl];
Deike Kleberg's avatar
Deike Kleberg committed
157
158
159
160
161
162

  for ( i = 0; i < nStreams; i++ )
    {
      nVars += * ( sSizes + i );
      weightsStreams[i] = 0;
      for ( j = 0; j < * ( sSizes + i ); j++ )
163
	weightsStreams[i] += * ( vSizes + offset++ );
Deike Kleberg's avatar
Deike Kleberg committed
164
165
    }

Deike Kleberg's avatar
Deike Kleberg committed
166
  w = ( double * ) xmalloc ( nNodes * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
167
168
169
170
171
172
173
174
175
176
177
178
179
  for ( j = 0; j < nNodes; j++ )
    nPEs += * ( nodeSizes + j );

  for ( j = 0; j < nNodes; j++ )
    w[j] = ( double ) * ( nodeSizes + j ) / ( double ) nPEs;

  mapProblems ( weightsStreams, streamMapping, nStreams, nNodes, w );
  free ( w );

  for ( i = 0; i < nNodes; i++ )
    {
      nVarsNode = 0;
      for ( j = 0; j < nStreams; j++ )
180
	if ( * ( streamMapping + j ) == i )
181
182
	  nVarsNode += * ( sSizes + j );

Deike Kleberg's avatar
Deike Kleberg committed
183
184
185
      weightsVarsNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      varMappingNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      w = ( double * ) xmalloc ( * ( nodeSizes + i ) * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
186
187
188
189
      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
190
191
192
193
194
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
	    * ( weightsVarsNode + offsetN++ ) = * ( vSizes + offset++ );
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
195
196

      for ( j = 0; j < * ( nodeSizes + i ); j ++ )
197
	w[j] = 1.0 / ( double ) * ( nodeSizes + i );
Deike Kleberg's avatar
Deike Kleberg committed
198

199
      mapProblems ( weightsVarsNode, varMappingNode, nVarsNode,
200
		    * ( nodeSizes + i ),  w );
Deike Kleberg's avatar
Deike Kleberg committed
201
202
203
204
205

      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
206
207
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
208
	    * ( varMapping + offset ++ ) =
209
              commCollID2RankGlob ( * ( varMappingNode + offsetN ++ ) +
210
                                    summandRank );
211
212
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
213
214
215
216
217
218
219

      summandRank += * ( nodeSizes + i );

      free ( w );
      free ( varMappingNode );
      free ( weightsVarsNode );
    }
220

221
  if ( ddebug )
Deike Kleberg's avatar
Deike Kleberg committed
222
    {
Deike Kleberg's avatar
Deike Kleberg committed
223
      xprintArray ( "varMapping", varMapping, nVars, DATATYPE_INT  );
224
      for ( i = 0; i < nProcsColl; i++ )
225
	buckets[i] = 0;
Deike Kleberg's avatar
Deike Kleberg committed
226
      for ( i = 0; i < nVars; i ++ )
227
	buckets[commRankGlob2CollID ( *(varMapping + i ))] += * ( vSizes + i );
228
      xprintArray ( "buckets", buckets, nProcsColl, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
229
    }
230
}
Deike Kleberg's avatar
Deike Kleberg committed
231

232
/************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
233

234
235
static void
defVarDeco(int vlistID, int varID)
Deike Kleberg's avatar
Deike Kleberg committed
236
237
{
  int varSize, cRank, lChunk, rem, lOffset;
238
239
  int nProcsModel = commInqNProcsModel ();
  deco_t deco[nProcsModel];
240

Deike Kleberg's avatar
Deike Kleberg committed
241
242
  varSize = vlistInqVarSize ( vlistID, varID );

243
  for ( cRank = 0; cRank < nProcsModel; cRank++ )
Deike Kleberg's avatar
Deike Kleberg committed
244
    {
245
      lChunk = varSize / nProcsModel;
Deike Kleberg's avatar
Deike Kleberg committed
246
      lOffset = cRank * lChunk;
247
      rem = varSize % nProcsModel;
248
      if ( cRank < rem )
Deike Kleberg's avatar
Deike Kleberg committed
249
250
251
252
253
254
        {
          lChunk++;
          lOffset += cRank;
        }
      else
        lOffset += rem;
255

Deike Kleberg's avatar
Deike Kleberg committed
256
257
258
259
      deco[cRank].rank   = cRank;
      deco[cRank].offset = lOffset;
      deco[cRank].chunk  = lChunk;
    }
260
  vlistDefVarDeco ( vlistID, varID, nProcsModel, &deco[0] );
Deike Kleberg's avatar
Deike Kleberg committed
261
262
}

263
/************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
264

Deike Kleberg's avatar
Deike Kleberg committed
265

266
267
static void
varsMapNDeco(int nNodes, int *nodeSizes)
Deike Kleberg's avatar
Deike Kleberg committed
268
{
269
  int nStreams, nVars, * resHs, * streamSizes, * varSizes, * varMapping,
Thomas Jahns's avatar
Thomas Jahns committed
270
    * collectsData;
Deike Kleberg's avatar
Deike Kleberg committed
271
  int i, j, k = 0;
Deike Kleberg's avatar
Deike Kleberg committed
272
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
273

274
275
  xdebug ( "START, nProcsColl=%d", nProcsColl );

Deike Kleberg's avatar
Deike Kleberg committed
276
  nStreams = streamSize ();
277

278
279
  resHs       = xmalloc ( nStreams * sizeof ( resHs[0] ));
  streamSizes = xmalloc ( nStreams * sizeof ( streamSizes[0] ));
280
  collectsData = xmalloc ( nProcsColl * sizeof ( collectsData[0] ));
281
  streamGetIndexList ( nStreams, resHs );
282

Deike Kleberg's avatar
Deike Kleberg committed
283
  for ( i = 0; i < nStreams; i++ )
284
    streamSizes[i] = streamInqNvars ( * ( resHs + i ));
285

Deike Kleberg's avatar
Deike Kleberg committed
286
  nVars = xsum ( nStreams, streamSizes );
287
288
  varSizes   = xmalloc ( nVars * sizeof ( varSizes[0] ));
  varMapping = xmalloc ( nVars * sizeof ( varMapping[0] ));
289

Deike Kleberg's avatar
Deike Kleberg committed
290
291
  for ( i = 0; i < nStreams; i++ )
    for ( j = 0; j < * ( streamSizes + i ); j++ )
292
      varSizes[k++] += vlistInqVarSize ( streamInqVlist ( * ( resHs + i )), j );
293

Deike Kleberg's avatar
Deike Kleberg committed
294
  xassert ( k == nVars );
295

296
  varMapGen ( varSizes, streamSizes, varMapping,
Deike Kleberg's avatar
Deike Kleberg committed
297
	      nStreams, nodeSizes, nNodes );
298
299
300

  k = 0;
  for ( i = 0; i < nStreams; i++ )
301
    for ( j = 0; j < * ( streamSizes + i ); j++ )
302
      {
303
304
305
        defVarDeco ( streamInqVlist ( *( resHs + i )), j );
        defVarDeco ( streamInqVlistIDorig ( * ( resHs + i )), j );
        vlistDefVarIOrank ( streamInqVlist ( * ( resHs + i )), j,
306
                            * ( varMapping + k ));
307
        vlistDefVarIOrank ( streamInqVlistIDorig ( * ( resHs + i )), j,
308
309
                            * ( varMapping + k ));
        collectsData[commRankGlob2CollID ( varMapping[k++] )] = 1;
310
      }
311
312
313

  for ( j = 0; j < nProcsColl; j++ )
    if ( collectsData[j] == 0 )
314
315
316
317
      xabort("AT LEAST ONE COLLECTOR PROCESS IDLES, "
             "CURRENTLY NOT COVERED: "
             "PE%d collects no data",
             commCollID2RankGlob(j));
318

319
320
321
322
323
  if ( varMapping )   free ( varMapping );
  if ( varSizes )     free ( varSizes );
  if ( collectsData ) free ( collectsData );
  if ( streamSizes )  free ( streamSizes );
  if ( resHs )        free ( resHs );
324

325
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
326
327
328
329
}

/************************************************************************/

330
static
Deike Kleberg's avatar
Deike Kleberg committed
331
332
void modelWinCleanup ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
333
  int collID;
Deike Kleberg's avatar
Deike Kleberg committed
334

335
  xdebug("%s", "START");
336
  if (txWin != NULL)
337
338
339
340
341
342
343
344
345
346
347
    {
      for ( collID = 0; collID < commInqNProcsColl (); collID ++ )
        {
          if (txWin[collID].postSet)
            xmpi(MPI_Win_wait(txWin[collID].win));
          xmpi(MPI_Win_free(&txWin[collID].win));
          xmpi ( MPI_Free_mem ( txWin[collID].buffer ));
          xmpi(MPI_Group_free(&txWin[collID].ioGroup));
        }
      free(txWin);
    }
Deike Kleberg's avatar
Deike Kleberg committed
348

349
  xdebug("%s", "RETURN. CLEANED UP MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
350
351
352
}

/************************************************************************/
353

354
355
struct collDesc
{
356
  int numDataRecords, numRPCRecords;
357
358
};

359
360
static void
modelWinDefBufferSizes(void)
Deike Kleberg's avatar
Deike Kleberg committed
361
{
362
  int collID, nstreams, * streamIndexList, streamNo, nvars, varID;
363
  int sumWinBufferSize = 0;
364
365
  int nProcsColl  = commInqNProcsColl ();
  int rankGlob    = commInqRankGlob ();
366
  int rankModel   = commInqRankModel ();
367
  int root = commInqRootGlob ();
368
  struct collDesc *collIndex;
Deike Kleberg's avatar
Deike Kleberg committed
369

370
  xdebug("%s", "START");
371
  xassert(txWin != NULL);
372

Deike Kleberg's avatar
Deike Kleberg committed
373
  nstreams = reshCountType ( &streamOps );
374
  streamIndexList = xmalloc ( nstreams * sizeof ( streamIndexList[0] ));
375
  collIndex = xcalloc(nProcsColl, sizeof (collIndex[0]));
376
  reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
Deike Kleberg's avatar
Deike Kleberg committed
377
378
  for ( streamNo = 0; streamNo < nstreams; streamNo++ )
    {
379
380
381
      // memory required for data
      int streamID = streamIndexList[streamNo];
      int vlistID = streamInqVlist(streamID);
Deike Kleberg's avatar
Deike Kleberg committed
382
383
384
      nvars = vlistNvars ( vlistID );
      for ( varID = 0; varID < nvars; varID++ )
        {
Deike Kleberg's avatar
Deike Kleberg committed
385
          collID = CDI_UNDEFID;
386
387
388
          collID = commRankGlob2CollID(vlistInqVarIOrank(vlistID, varID));
          int collIDchunk = cdiPIOpartInflate_
            * vlistInqVarDecoChunk(vlistID, varID, rankModel);
Deike Kleberg's avatar
Deike Kleberg committed
389
          xassert ( collID != CDI_UNDEFID && collIDchunk > 0 );
390
391
392
393
394
395
396
397
398
          collIndex[collID].numDataRecords += 2;
          txWin[collID].size += (size_t)collIDchunk * sizeof (double)
            /* re-align chunks to multiple of double size */
            + sizeof (double) - 1
            /* one header for data record, one for corresponding part
             * descriptor*/
            + 2 * sizeof (union winHeaderEntry)
            /* FIXME: heuristic for size of packed Xt_idxlist */
            + sizeof (Xt_int) * collIDchunk * 3;
Deike Kleberg's avatar
Deike Kleberg committed
399
        }
400

401
      // memory required for the 3 function calls streamOpen, streamDefVlist, streamClose
Deike Kleberg's avatar
Deike Kleberg committed
402
      // once per stream and timestep for all collprocs only on the modelproc root
403
404
      if ( rankGlob == root )
        for ( collID = 0; collID < nProcsColl; collID++ )
405
          {
406
            collIndex[collID].numRPCRecords += 3;
407
408
409
410
            txWin[collID].size +=
              3 * sizeof (union winHeaderEntry)
              + MAXDATAFILENAME;
          }
Deike Kleberg's avatar
Deike Kleberg committed
411
    }
412
  for (collID = 0; collID < nProcsColl; ++collID)
413
    {
414
415
      int numRecords = 1 + collIndex[collID].numDataRecords
        + collIndex[collID].numRPCRecords;
416
      txWin[collID].dictSize = numRecords;
417
418
      txWin[collID].dictDataUsed = 1;
      txWin[collID].dictRPCUsed = 0;
419
      /* account for size header */
420
      txWin[collID].size += sizeof (union winHeaderEntry);
421
422
      txWin[collID].size = roundUpToMultiple(txWin[collID].size,
                                             PIO_WIN_ALIGN);
423
      sumWinBufferSize += txWin[collID].size;
424
    }
425
426
427
  free(collIndex);
  free ( streamIndexList );

428
429
  xdebug("sumWinBufferSize=%zu, MAXWINBUFFERSIZE=%zu", (size_t)sumWinBufferSize,
         (size_t)MAXWINBUFFERSIZE);
Deike Kleberg's avatar
Deike Kleberg committed
430
  xassert ( sumWinBufferSize <= MAXWINBUFFERSIZE );
431
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
432
}
433

Deike Kleberg's avatar
Deike Kleberg committed
434
435
436
437

/************************************************************************/


438
static
Deike Kleberg's avatar
Deike Kleberg committed
439
  void modelWinFlushBuffer ( int collID )
Deike Kleberg's avatar
Deike Kleberg committed
440
{
Deike Kleberg's avatar
Deike Kleberg committed
441
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
442

Deike Kleberg's avatar
Deike Kleberg committed
443
444
  xassert ( collID                >= 0         &&
            collID                < nProcsColl &&
445
            txWin[collID].buffer     != NULL      &&
446
447
            txWin != NULL      &&
            txWin[collID].size >= 0         &&
448
            txWin[collID].size <= MAXWINBUFFERSIZE);
449
  memset(txWin[collID].buffer, 0, txWin[collID].size);
450
451
  txWin[collID].head = txWin[collID].buffer
    + txWin[collID].dictSize * sizeof (union winHeaderEntry);
452
  txWin[collID].refuseFuncCall = 0;
453
454
  txWin[collID].dictDataUsed = 1;
  txWin[collID].dictRPCUsed = 0;
Deike Kleberg's avatar
Deike Kleberg committed
455
456
457
}


Deike Kleberg's avatar
Deike Kleberg committed
458
459
460
/************************************************************************/


Deike Kleberg's avatar
Deike Kleberg committed
461
462
463
static
void modelWinCreate ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
464
  int collID, ranks[1];
465
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
466

467
  xdebug("%s", "START");
468
  txWin = xmalloc(nProcsColl * sizeof (txWin[0]));
Deike Kleberg's avatar
Deike Kleberg committed
469

Deike Kleberg's avatar
Deike Kleberg committed
470
  modelWinDefBufferSizes ();
471
  ranks[0] = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
472

Deike Kleberg's avatar
Deike Kleberg committed
473
  for ( collID = 0; collID < nProcsColl; collID ++ )
Deike Kleberg's avatar
Deike Kleberg committed
474
    {
475
      xassert(txWin[collID].size > 0);
476
      txWin[collID].buffer = NULL;
477
      xmpi(MPI_Alloc_mem((MPI_Aint)txWin[collID].size, MPI_INFO_NULL,
478
479
                         &txWin[collID].buffer));
      xassert ( txWin[collID].buffer != NULL );
480
481
      txWin[collID].head = txWin[collID].buffer
        + txWin[collID].dictSize * sizeof (union winHeaderEntry);
482
      xmpi(MPI_Win_create(txWin[collID].buffer, (MPI_Aint)txWin[collID].size, 1,
483
                          MPI_INFO_NULL, commInqCommsIO(collID),
484
                          &txWin[collID].win));
485
486
487
      xmpi(MPI_Comm_group(commInqCommsIO(collID), &txWin[collID].ioGroup));
      xmpi(MPI_Group_incl(txWin[collID].ioGroup, 1, ranks,
                          &txWin[collID].ioGroup ));
Deike Kleberg's avatar
Deike Kleberg committed
488
    }
489
  xdebug("%s", "RETURN, CREATED MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
490
491
492
493
}

/************************************************************************/

494
static void
495
496
modelWinEnqueue(int collID,
                union winHeaderEntry header, const void *data, size_t size)
Deike Kleberg's avatar
Deike Kleberg committed
497
{
498
499
  union winHeaderEntry *winDict
    = (union winHeaderEntry *)txWin[collID].buffer;
500
  int targetEntry;
501
502
  if (header.dataRecord.streamID > 0)
    {
503
      targetEntry = (txWin[collID].dictDataUsed)++;
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
      int offset = header.dataRecord.offset
        = (int)roundUpToMultiple(txWin[collID].head - txWin[collID].buffer,
                                 sizeof (double));
      memcpy(txWin[collID].buffer + offset, data, size);
      txWin[collID].head = txWin[collID].buffer + offset + size;
    }
  else if (header.partDesc.partDescMarker == PARTDESCMARKER)
    {
      targetEntry = (txWin[collID].dictDataUsed)++;
      Xt_uid uid = header.partDesc.uid;
      int offset = -1;
      /* search if same uid entry has already been enqueued */
      for (int entry = 2; entry < targetEntry; entry += 2)
        {
          xassert(winDict[entry].partDesc.partDescMarker
                  == PARTDESCMARKER);
          if (winDict[entry].partDesc.uid == uid)
            {
              offset = winDict[entry].partDesc.offset;
              break;
            }
        }
      if (offset == -1)
        {
          /* not yet used partition descriptor, serialize at
           * current position */
          int position = 0;
          MPI_Comm comm = commInqCommsIO(collID);
          header.partDesc.offset
            = (int)(txWin[collID].head - txWin[collID].buffer);
          size_t size = xt_idxlist_get_pack_size((Xt_idxlist)data, comm);
          size_t remaining_size = txWin[collID].size
            - (txWin[collID].head - txWin[collID].buffer);
          xassert(size <= remaining_size);
          xt_idxlist_pack((Xt_idxlist)data, txWin[collID].head,
                          (int)remaining_size, &position, comm);
          txWin[collID].head += position;
        }
      else
        /* duplicate entries are copied only once per timestep */
        header.partDesc.offset = offset;
545
    }
546
  else
547
    {
548
549
550
551
552
553
554
555
      targetEntry = txWin[collID].dictSize - ++(txWin[collID].dictRPCUsed);
      if (header.funcCall.funcID == STREAMOPEN)
        {
          header.funcCall.funcArgs.newFile.offset
            = (int)(txWin[collID].head - txWin[collID].buffer);
          memcpy(txWin[collID].head, data, size);
          txWin[collID].head += size;
        }
556
    }
557
  winDict[targetEntry] = header;
Deike Kleberg's avatar
Deike Kleberg committed
558
559
}

560
561
void pioBufferData(int streamID, int varID,
                   const double *data, int nmiss)
Deike Kleberg's avatar
Deike Kleberg committed
562
{
563
  int rankModel = commInqRankModel ();
Deike Kleberg's avatar
Deike Kleberg committed
564

565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
  int vlistID = streamInqVlist(streamID);
  int chunk = vlistInqVarDecoChunk(vlistID, varID, rankModel);
  int start = pioInqVarDecoOff(vlistID, varID);

  struct Xt_stripe stripe = { .start = start, .nstrides = chunk, .stride = 1 };
  Xt_idxlist partDesc = xt_idxstripes_new(&stripe, 1);
  pioBufferPartData(streamID, varID, data, nmiss, partDesc);
  xt_idxlist_delete(partDesc);
}

void
pioBufferPartData(int streamID, int varID, const double *data,
                  int nmiss, Xt_idxlist partDesc)
{
  int vlistID, collID = CDI_UNDEFID;

Deike Kleberg's avatar
Deike Kleberg committed
581
582
  vlistID  = streamInqVlist ( streamID );
  collID   = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
583
584
  xassert ( collID         >= 0                    &&
            collID         <  commInqNProcsColl () &&
585
            txWin != NULL);
Deike Kleberg's avatar
Deike Kleberg committed
586

587
  if (txWin[collID].postSet)
588
    {
589
      xmpi(MPI_Win_wait(txWin[collID].win));
590
      txWin[collID].postSet = 0;
Deike Kleberg's avatar
Deike Kleberg committed
591
      modelWinFlushBuffer ( collID );
592
    }
593

594
595
596
597
  Xt_int chunk = xt_idxlist_get_num_indices(partDesc);
  xassert(chunk <= INT_MAX);

  union winHeaderEntry dataHeader
598
    = { .dataRecord = { streamID, varID, -1, nmiss } };
599
600
601
602
603
604
605
606
  modelWinEnqueue(collID, dataHeader, data, chunk * sizeof (data[0]));
  {
    union winHeaderEntry partHeader
      = { .partDesc = { .partDescMarker = PARTDESCMARKER,
                        .uid = xt_idxlist_get_uid(partDesc),
                        .offset = 0 } };
    modelWinEnqueue(collID, partHeader, partDesc, 0);
  }
607

608
  txWin[collID].refuseFuncCall = 1;
Deike Kleberg's avatar
Deike Kleberg committed
609
610
}

611
612
/************************************************************************/

613
void pioBufferFuncCall(int funcID, int argc, ... )
614
615
616
617
618
{
  va_list ap;
  int rankGlob = commInqRankGlob ();
  int root = commInqRootGlob ();
  int collID, nProcsColl = commInqNProcsColl ();
619

620
621
  xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
  xdebug("%s, func: %s", "START", funcMap[(-1 - funcID)]);
622

Deike Kleberg's avatar
Deike Kleberg committed
623
624
  if ( rankGlob != root ) return;

625
626
627
  xassert (argc          >= 1                    &&
           argc          <= 2                    &&
           txWin != NULL);
628

Deike Kleberg's avatar
Deike Kleberg committed
629
  va_start ( ap, argc );
630

Thomas Jahns's avatar
Thomas Jahns committed
631
632
633
634
635
  union winHeaderEntry header;
  char * filename = NULL;
  size_t filenamesz = 0;
  int streamID = 0, vlistID = 0, filetype = 0;

636
  switch ( funcID )
Deike Kleberg's avatar
Deike Kleberg committed
637
    {
638
    case STREAMCLOSE:
Thomas Jahns's avatar
Thomas Jahns committed
639
640
      xassert ( argc == 1 );
    case STREAMDEFVLIST:
641
      {
Thomas Jahns's avatar
Thomas Jahns committed
642
643
644
        streamID = va_arg(ap, int);
        vlistID = CDI_UNDEFID;
        if (funcID == STREAMDEFVLIST)
645
          {
Thomas Jahns's avatar
Thomas Jahns committed
646
647
            xassert ( argc == 2 );
            vlistID = va_arg(ap, int);
648
          }
Thomas Jahns's avatar
Thomas Jahns committed
649
650
651
652
        struct funcCallDesc f = { .funcID = funcID,
                                  .funcArgs.streamChange = { streamID,
                                                             vlistID } };
        header.funcCall = f;
653
      }
654
      break;
Deike Kleberg's avatar
Deike Kleberg committed
655
656
    case STREAMOPEN:
      {
Deike Kleberg's avatar
Deike Kleberg committed
657
        xassert ( argc == 2 );
Thomas Jahns's avatar
Thomas Jahns committed
658
659
        filename = va_arg(ap, char *);
        filenamesz = strlen(filename);
Deike Kleberg's avatar
Deike Kleberg committed
660
661
        xassert ( filenamesz > 0 &&
                  filenamesz < MAXDATAFILENAME );
Thomas Jahns's avatar
Thomas Jahns committed
662
663
664
665
666
667
668
        filetype = va_arg(ap, int);
        struct funcCallDesc f = { .funcID = STREAMOPEN,
                                  .funcArgs.newFile
                                  = { .fnamelen = filenamesz,
                                      .filetype = filetype } };
        header.funcCall = f;
        ++filenamesz;
Deike Kleberg's avatar
Deike Kleberg committed
669
      }
670
      break;
Thomas Jahns's avatar
Thomas Jahns committed
671
672
673
    default:
      xabort("FUNCTION NOT MAPPED!");
    }
674

Thomas Jahns's avatar
Thomas Jahns committed
675
676
  for (collID = 0; collID < nProcsColl; ++collID)
    {
677
678
      xassert((txWin[collID].dictSize - txWin[collID].dictRPCUsed)
              > txWin[collID].dictDataUsed);
Thomas Jahns's avatar
Thomas Jahns committed
679
680
681
682
683
684
685
686
687
      if (txWin[collID].postSet)
        {
          xmpi(MPI_Win_wait(txWin[collID].win));
          txWin[collID].postSet = 0;
          modelWinFlushBuffer ( collID );
        }
      xassert(txWin[collID].refuseFuncCall == 0);
      modelWinEnqueue(collID, header, filename, filenamesz);
    }
688

Thomas Jahns's avatar
Thomas Jahns committed
689
690
691
692
693
694
695
696
697
698
699
700
701
702
  switch ( funcID )
    {
    case STREAMCLOSE:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, streamID=%d",
             funcMap[(-1 - funcID)], streamID);
      break;
    case STREAMDEFVLIST:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, streamID=%d,"
             " vlistID=%d", funcMap[(-1 - funcID)], streamID, vlistID);
      break;
    case STREAMOPEN:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, filenamesz=%zu,"
             " filename=%s, filetype=%d",
             funcMap[(-1 - funcID)], filenamesz, filename, filetype);
703
      break;
704
    }
705

Thomas Jahns's avatar
Thomas Jahns committed
706
  va_end(ap);
707
  xdebug("%s", "RETURN");
708
709
}

710
711
712
713
#endif

/*****************************************************************************/

714
715
716
717
int pioInqVarDecoChunk ( int vlistID, int varID )
{
#ifdef USE_MPI
   int rankModel = commInqRankModel ();
Deike Kleberg's avatar
Deike Kleberg committed
718
   xassert ( rankModel != CDI_UNDEFID );
719
720
721
722
723
724
725
726
727
728
729
   return vlistInqVarDecoChunk ( vlistID, varID, rankModel );
#endif
   return vlistInqVarDecoChunk ( vlistID, varID, CDI_UNDEFID );
}

/*****************************************************************************/

int pioInqVarDecoOff ( int vlistID, int varID )
{
#ifdef USE_MPI
   int rankModel = commInqRankModel ();
Deike Kleberg's avatar
Deike Kleberg committed
730
   xassert ( rankModel != CDI_UNDEFID );
731
   return vlistInqVarDecoOff ( vlistID, varID, rankModel );
Thomas Jahns's avatar
Thomas Jahns committed
732
#else
733
   return vlistInqVarDecoOff ( vlistID, varID, CDI_UNDEFID );
Thomas Jahns's avatar
Thomas Jahns committed
734
#endif
735
736
}

737
/*****************************************************************************/
738
739
740
741
/* pioInit definition must currently compile even in non-MPI configurations */
#ifndef MPI_VERSION
#  define MPI_Comm int
#endif
742
743
744
/**
   @brief initializes the MPI_Communicators needed for the
  communication between the calculator PEs and the I/O PEs and within the
745
746
747
748
749
750
751
752
753
  group of I/O PEs.

  commGlob: all PEs

  commPIO: I/O PEs, PEs with highest ranks in commGlob

  commModel: calculating PEs, no I/O PEs

  commsIO[i]:
754
755

  Collective call
756
757
758

  @param comm MPI_Communicator of all calling PEs
  @param nIOP number of I/O PEs
759
760
761
  @param partInflate allow for array partitions on comute
  PE that are at most sized \f$ partInflate * \lceil arraySize /
  numComputePEs \rceil \f$
762
763
764
  @return int indicating wether the calling PE is a calcutator (1) or not (0)
*/

Thomas Jahns's avatar
Thomas Jahns committed
765
766
#ifdef USE_MPI
static int pioNamespace_ = -1;
767
static int xtInitByCDI = 0;
Thomas Jahns's avatar
Thomas Jahns committed
768
769
#endif

Thomas Jahns's avatar
Thomas Jahns committed
770
771
772
773
774
775
776
#if defined USE_MPI && defined HAVE_LIBNETCDF
static void
cdiPioCdfDefTimestepNOP(stream_t *streamptr, int tsID)
{
}
#endif

777
778
MPI_Comm
pioInit(MPI_Comm commGlob, int nProcsIO, int IOMode,
Thomas Jahns's avatar
Thomas Jahns committed
779
        int *pioNamespace, float partInflate)
780
{
781
#ifdef USE_MPI
Thomas Jahns's avatar
Thomas Jahns committed
782
  int sizeGlob;
Deike Kleberg's avatar
Deike Kleberg committed
783

Deike Kleberg's avatar
Deike Kleberg committed
784
  if ( IOMode < PIO_MINIOMODE || IOMode > PIO_MAXIOMODE )
Deike Kleberg's avatar
Deike Kleberg committed
785
    xabort ( "IOMODE IS NOT VALID." );
786

787
#ifdef _SX
Deike Kleberg's avatar
Deike Kleberg committed
788
789
  if ( IOMode ==  PIO_ASYNCH )
    xabort ( "PIO_ASYNCH DOES NOT WORK ON SX." );
790
791
#endif

792
793
794
795
  if ((xtInitByCDI = (!xt_initialized() || xt_finalized())))
    {
      xt_initialize(commGlob);
    }
796
797
  commInit ();
  commDefCommGlob ( commGlob );
798
  sizeGlob = commInqSizeGlob ();
799

800
801
  if (((IOMode != PIO_NONE && (nProcsIO <= 0 || nProcsIO > sizeGlob - 1)))
      || (IOMode == PIO_NONE && nProcsIO != 1))
802
803
    xabort("DISTRIBUTION OF TASKS ON PROCS IS NOT VALID.\n"
           "nProcsIO=%d, sizeGlob=%d\n", nProcsIO, sizeGlob);
Deike Kleberg's avatar
Deike Kleberg committed
804
805

  commDefNProcsIO ( nProcsIO );
Deike Kleberg's avatar
Deike Kleberg committed
806
  commDefIOMode   ( IOMode, PIO_MAXIOMODE, PIO_MINIOMODEWITHSPECIALPROCS );
Deike Kleberg's avatar
Deike Kleberg committed
807
  commDefCommPio  ();
808

809
810
811
  xassert(partInflate >= 1.0);
  cdiPIOpartInflate_ = partInflate;

Deike Kleberg's avatar
Deike Kleberg committed
812
  // JUST FOR TEST CASES WITH ONLY ONE MPI TASK
813
  if ( commInqSizeGlob () == 1 )
814
    {
Thomas Jahns's avatar
Thomas Jahns committed
815
      pioNamespace_ = *pioNamespace = namespaceNew(0);
Deike Kleberg's avatar
Deike Kleberg committed
816
817
      return commInqCommGlob ();
    }
818

819
820
  if ( commInqIsProcIO ())
    {
821
      serializeSetMPI();
Thomas Jahns's avatar
Thomas Jahns committed
822
      namespaceSwitchSet(NSSWITCH_ABORT, cdiAbortC_MPI);
823
824
      namespaceSwitchSet(NSSWITCH_FILE_OPEN, pioFileOpen);
      namespaceSwitchSet(NSSWITCH_FILE_CLOSE, pioFileClose);
825
      IOServer ();
Thomas Jahns's avatar
Thomas Jahns committed
826
      namespaceDelete(0);
827
      commDestroy ();
828
      xt_finalize();
829
      MPI_Finalize ();
830
      exit ( EXIT_SUCCESS );
831
832
    }
  else
833
    {
834
      commEvalPhysNodes ();
835
      commDefCommsIO ();
Thomas Jahns's avatar
Thomas Jahns committed
836
      pioNamespace_ = *pioNamespace = namespaceNew(0);
837
838
839
      int callerCDINamespace = namespaceGetActive();
      pioNamespaceSetActive(pioNamespace_);
      serializeSetMPI();
840
841
842
843
844
845
      namespaceSwitchSet(NSSWITCH_STREAM_OPEN_BACKEND, cdiPioStreamOpen);
      namespaceSwitchSet(NSSWITCH_STREAM_DEF_VLIST_, cdiPioStreamDefVlist_);
      namespaceSwitchSet(NSSWITCH_STREAM_WRITE_VAR_, cdiPioStreamWriteVar_);
      namespaceSwitchSet(NSSWITCH_STREAM_WRITE_VAR_CHUNK_,
                         cdiPioStreamWriteVarChunk_);
      namespaceSwitchSet(NSSWITCH_STREAM_CLOSE_BACKEND, cdiPioStreamClose);
Thomas Jahns's avatar
Thomas Jahns committed
846
847
848
#ifdef HAVE_LIBNETCDF
      namespaceSwitchSet(NSSWITCH_CDF_DEF_TIMESTEP, cdiPioCdfDefTimestepNOP);
#endif
849
      pioNamespaceSetActive(callerCDINamespace);
850
    }
851

852
  xdebug ( "nProcsGlob=%d, RETURN", sizeGlob );
853
  return commInqCommModel ();
854
#else
855
  abort();
856
#endif
857
}
858

859
860
#ifndef MPI_VERSION
#  undef MPI_Comm
Deike Kleberg's avatar
Deike Kleberg committed
861
862
#endif

863
/*****************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
864

865
void  pioEndDef ( void )
866
867
{
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
868
  char   * buffer;
869
  int bufferSize;
870
  int rankGlob = commInqRankGlob ();
871

872
  xdebug("%s", "START");
873

Deike Kleberg's avatar
Deike Kleberg committed
874
  varsMapNDeco ( commInqNNodes (), commInqNodeSizes ());
Deike Kleberg's avatar
Deike Kleberg committed
875

876
  if ( rankGlob < commInqNProcsColl ())
Deike Kleberg's avatar
Deike Kleberg committed
877
    {