pio_interface.c 24.6 KB
Newer Older
1
2
3
4
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

5
#include <limits.h>
Deike Kleberg's avatar
Deike Kleberg committed
6
#include <stdlib.h>
7
#include <stdio.h>
8
#include <stdarg.h>
9
10
11

#ifdef USE_MPI
#include <mpi.h>
12
#include <yaxt.h>
13
14
#endif

15
#include "cdi.h"
Deike Kleberg's avatar
Deike Kleberg committed
16
#include "pio_util.h"
Deike Kleberg's avatar
Deike Kleberg committed
17
#include "vlist_var.h"
18
19

#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
20
#include "namespace.h"
21
#include "pio.h"
22
#include "pio_serialize.h"
23
#include "pio_interface.h"
Deike Kleberg's avatar
Deike Kleberg committed
24
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
25
#include "pio_rpc.h"
26
#include "pio_client.h"
Deike Kleberg's avatar
Deike Kleberg committed
27
#include "pio_server.h"
28
#include "resource_handle.h"
29
#include "cdi_int.h"
Deike Kleberg's avatar
Deike Kleberg committed
30
#include "vlist.h"
31

Deike Kleberg's avatar
Deike Kleberg committed
32
extern resOps streamOps;
Deike Kleberg's avatar
Deike Kleberg committed
33

34

35
static struct rdmaWin
36
37
{
  size_t size;
38
  unsigned char *buffer, *head;
39
  MPI_Win win;
40
  int postSet, refuseFuncCall;
41
  MPI_Group ioGroup;
42
  int dictSize, dictDataUsed, dictRPCUsed, dict;
43
44
} *txWin = NULL;

Deike Kleberg's avatar
Deike Kleberg committed
45

46
47
48
const char * const funcMap[numRPCFuncs] = {
  "streamOpen",
  "streamDefVlist",
49
50
  "streamClose",
  "streamDefTimestep",
51
};
Deike Kleberg's avatar
Deike Kleberg committed
52

53
float cdiPIOpartInflate_;
Deike Kleberg's avatar
Deike Kleberg committed
54

Deike Kleberg's avatar
Deike Kleberg committed
55
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
56

57
static int cmp ( const void * va, const void * vb )
Deike Kleberg's avatar
Deike Kleberg committed
58
{
59
60
    const int ** a, ** b;

Deike Kleberg's avatar
Deike Kleberg committed
61
62
  a = ( const int ** ) va;
  b = ( const int ** ) vb;
63

Deike Kleberg's avatar
Deike Kleberg committed
64
65
66
67
  return (( **a < **b ) - ( **a > **b ));
}

/****************************************************/
68

69
70
71
static void
mapProblems(int problemSizes[], int * problemMapping, int nProblems,
            int nWriter, double * w)
Deike Kleberg's avatar
Deike Kleberg committed
72
73
74
75
76
77
78
{
  int *ip[nProblems];
  int dummy[nProblems];
  int buckets[nWriter];
  int currCapacity, nextCapacity;
  double meanBucket[nWriter];
  int sum = 0;
Thomas Jahns's avatar
Thomas Jahns committed
79
  int writerIdx = 0;
Deike Kleberg's avatar
Deike Kleberg committed
80
  int i, j;
81

Deike Kleberg's avatar
Deike Kleberg committed
82
83
84
85
86
87
88
89
90
91
92

  for ( i = 0; i < nProblems; i++ )
    {
      ip[i] = &problemSizes[i];
      sum += problemSizes[i];
    }

  qsort ( ip, nProblems, sizeof ( int * ), cmp );

  for ( i = 0; i < nProblems; i++ )
    dummy[i] = ip[i] - problemSizes;
93
94

  for ( j = 0; j < nWriter; j++ )
Deike Kleberg's avatar
Deike Kleberg committed
95
96
    meanBucket[j] = ( double ) sum * ( * ( w + j ));

Thomas Jahns's avatar
Thomas Jahns committed
97
98
  memset(buckets, 0, sizeof (buckets));

Deike Kleberg's avatar
Deike Kleberg committed
99
100
101
  for ( i = 0; i < nProblems; i++ )
    {
      currCapacity = INT_MIN;
102

Deike Kleberg's avatar
Deike Kleberg committed
103
      for ( j = 0; j < nWriter; j++ )
104
105
	{
	  nextCapacity = meanBucket[j] - ( buckets[j] + ( *ip[i] ));
106

107
108
109
110
111
112
	  if ( nextCapacity > currCapacity )
	    {
	      currCapacity = nextCapacity;
	      writerIdx = j;
	    }
	}
Thomas Jahns's avatar
Thomas Jahns committed
113
      problemMapping[ dummy[i] ] = writerIdx;
Deike Kleberg's avatar
Deike Kleberg committed
114
115
      buckets[writerIdx] +=  *ip[i];
    }
Deike Kleberg's avatar
Deike Kleberg committed
116

117
118
119
120
121
122
123
124
125
  if ( ddebug )
    {
      xprintArray3 (  "problemSizes = ", problemSizes, nProblems, DATATYPE_INT );
      xprintArray3 ( "vector of indices, qsort of problemSizes", dummy,
                    nProblems, DATATYPE_INT );
      xprintArray3 ( "problemMapping", problemMapping, nProblems, DATATYPE_INT );
      xprintArray3 ( "meanBucket", meanBucket, nWriter, DATATYPE_FLT );
      xprintArray3 ( "actual buckets", buckets, nWriter, DATATYPE_INT );
    }
Deike Kleberg's avatar
Deike Kleberg committed
126
127
}

128
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
129
130
131
132
133

/**
   @brief is encapsulated in CDI library.

   @param vSizes array with number of levels for all var_t 's in order of streams
134

Deike Kleberg's avatar
Deike Kleberg committed
135
136
   @param sSizes array with number of var_t for all stream_t 's

137
   @param varMapping return value, array with ranks of I/O PEs assigned to var_t 's
Deike Kleberg's avatar
Deike Kleberg committed
138
139
140
in order of vSizes

   @param nStreams number of stream_t 's
141
142

   @param nodeSizes array of number of I/O PEs on each physical nodes, increasing
Deike Kleberg's avatar
Deike Kleberg committed
143
144
145
   order of ranks assumed

   @param nNodes number of physical nodes hosting I/O PEs
146

Deike Kleberg's avatar
Deike Kleberg committed
147
148
   @return
*/
149

150
151
152
static void
varMapGen(int *vSizes, int *sSizes, int *varMapping,
          int nStreams, int *nodeSizes, int nNodes)
Deike Kleberg's avatar
Deike Kleberg committed
153
{
154

Deike Kleberg's avatar
Deike Kleberg committed
155
156
157
158
159
  int weightsStreams[nStreams];
  int streamMapping[nStreams];
  int nPEs = 0, nVars = 0;
  int i, j, k, offset = 0, offsetN = 0;
  double * w;
160

Deike Kleberg's avatar
Deike Kleberg committed
161
162
163
  int * weightsVarsNode;
  int * varMappingNode;
  int nVarsNode, summandRank = 0;
164
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
165

166
  int buckets[nProcsColl];
Deike Kleberg's avatar
Deike Kleberg committed
167
168
169
170
171
172

  for ( i = 0; i < nStreams; i++ )
    {
      nVars += * ( sSizes + i );
      weightsStreams[i] = 0;
      for ( j = 0; j < * ( sSizes + i ); j++ )
173
	weightsStreams[i] += * ( vSizes + offset++ );
Deike Kleberg's avatar
Deike Kleberg committed
174
175
    }

Deike Kleberg's avatar
Deike Kleberg committed
176
  w = ( double * ) xmalloc ( nNodes * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
177
178
179
180
181
182
183
184
185
186
187
188
189
  for ( j = 0; j < nNodes; j++ )
    nPEs += * ( nodeSizes + j );

  for ( j = 0; j < nNodes; j++ )
    w[j] = ( double ) * ( nodeSizes + j ) / ( double ) nPEs;

  mapProblems ( weightsStreams, streamMapping, nStreams, nNodes, w );
  free ( w );

  for ( i = 0; i < nNodes; i++ )
    {
      nVarsNode = 0;
      for ( j = 0; j < nStreams; j++ )
190
	if ( * ( streamMapping + j ) == i )
191
192
	  nVarsNode += * ( sSizes + j );

Deike Kleberg's avatar
Deike Kleberg committed
193
194
195
      weightsVarsNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      varMappingNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      w = ( double * ) xmalloc ( * ( nodeSizes + i ) * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
196
197
198
199
      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
200
201
202
203
204
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
	    * ( weightsVarsNode + offsetN++ ) = * ( vSizes + offset++ );
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
205
206

      for ( j = 0; j < * ( nodeSizes + i ); j ++ )
207
	w[j] = 1.0 / ( double ) * ( nodeSizes + i );
Deike Kleberg's avatar
Deike Kleberg committed
208

209
      mapProblems ( weightsVarsNode, varMappingNode, nVarsNode,
210
		    * ( nodeSizes + i ),  w );
Deike Kleberg's avatar
Deike Kleberg committed
211
212
213
214
215

      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
216
217
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
218
	    * ( varMapping + offset ++ ) =
219
              commCollID2RankGlob ( * ( varMappingNode + offsetN ++ ) +
220
                                    summandRank );
221
222
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
223
224
225
226
227
228
229

      summandRank += * ( nodeSizes + i );

      free ( w );
      free ( varMappingNode );
      free ( weightsVarsNode );
    }
230

231
  if ( ddebug )
Deike Kleberg's avatar
Deike Kleberg committed
232
    {
Deike Kleberg's avatar
Deike Kleberg committed
233
      xprintArray ( "varMapping", varMapping, nVars, DATATYPE_INT  );
234
      for ( i = 0; i < nProcsColl; i++ )
235
	buckets[i] = 0;
Deike Kleberg's avatar
Deike Kleberg committed
236
      for ( i = 0; i < nVars; i ++ )
237
	buckets[commRankGlob2CollID ( *(varMapping + i ))] += * ( vSizes + i );
238
      xprintArray ( "buckets", buckets, nProcsColl, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
239
    }
240
}
Deike Kleberg's avatar
Deike Kleberg committed
241

242
/************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
243

244
245
static void
varsMapNDeco(int nNodes, int *nodeSizes)
Deike Kleberg's avatar
Deike Kleberg committed
246
{
247
  int nStreams, nVars, * resHs, * streamSizes, * varSizes, * varMapping,
Thomas Jahns's avatar
Thomas Jahns committed
248
    * collectsData;
Deike Kleberg's avatar
Deike Kleberg committed
249
  int i, j, k = 0;
Deike Kleberg's avatar
Deike Kleberg committed
250
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
251

252
253
  xdebug ( "START, nProcsColl=%d", nProcsColl );

Deike Kleberg's avatar
Deike Kleberg committed
254
  nStreams = streamSize ();
255

256
257
  resHs       = xmalloc ( nStreams * sizeof ( resHs[0] ));
  streamSizes = xmalloc ( nStreams * sizeof ( streamSizes[0] ));
258
  collectsData = xmalloc ( nProcsColl * sizeof ( collectsData[0] ));
259
  streamGetIndexList ( nStreams, resHs );
260

Deike Kleberg's avatar
Deike Kleberg committed
261
  for ( i = 0; i < nStreams; i++ )
262
    streamSizes[i] = streamInqNvars ( * ( resHs + i ));
263

Deike Kleberg's avatar
Deike Kleberg committed
264
  nVars = xsum ( nStreams, streamSizes );
265
266
  varSizes   = xmalloc ( nVars * sizeof ( varSizes[0] ));
  varMapping = xmalloc ( nVars * sizeof ( varMapping[0] ));
267

Deike Kleberg's avatar
Deike Kleberg committed
268
269
  for ( i = 0; i < nStreams; i++ )
    for ( j = 0; j < * ( streamSizes + i ); j++ )
270
      varSizes[k++] += vlistInqVarSize ( streamInqVlist ( * ( resHs + i )), j );
271

Deike Kleberg's avatar
Deike Kleberg committed
272
  xassert ( k == nVars );
273

274
  varMapGen ( varSizes, streamSizes, varMapping,
Deike Kleberg's avatar
Deike Kleberg committed
275
	      nStreams, nodeSizes, nNodes );
276
277
278

  k = 0;
  for ( i = 0; i < nStreams; i++ )
279
    for ( j = 0; j < * ( streamSizes + i ); j++ )
280
      {
281
        vlistDefVarIOrank ( streamInqVlist ( * ( resHs + i )), j,
282
                            * ( varMapping + k ));
283
        vlistDefVarIOrank ( streamInqVlistIDorig ( * ( resHs + i )), j,
284
285
                            * ( varMapping + k ));
        collectsData[commRankGlob2CollID ( varMapping[k++] )] = 1;
286
      }
287
288
289

  for ( j = 0; j < nProcsColl; j++ )
    if ( collectsData[j] == 0 )
290
291
292
293
      xabort("AT LEAST ONE COLLECTOR PROCESS IDLES, "
             "CURRENTLY NOT COVERED: "
             "PE%d collects no data",
             commCollID2RankGlob(j));
294

295
296
297
298
299
  if ( varMapping )   free ( varMapping );
  if ( varSizes )     free ( varSizes );
  if ( collectsData ) free ( collectsData );
  if ( streamSizes )  free ( streamSizes );
  if ( resHs )        free ( resHs );
300

301
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
302
303
304
305
}

/************************************************************************/

306
static
Deike Kleberg's avatar
Deike Kleberg committed
307
308
void modelWinCleanup ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
309
  int collID;
Deike Kleberg's avatar
Deike Kleberg committed
310

311
  xdebug("%s", "START");
312
  if (txWin != NULL)
313
314
315
316
317
318
319
320
321
322
323
    {
      for ( collID = 0; collID < commInqNProcsColl (); collID ++ )
        {
          if (txWin[collID].postSet)
            xmpi(MPI_Win_wait(txWin[collID].win));
          xmpi(MPI_Win_free(&txWin[collID].win));
          xmpi ( MPI_Free_mem ( txWin[collID].buffer ));
          xmpi(MPI_Group_free(&txWin[collID].ioGroup));
        }
      free(txWin);
    }
Deike Kleberg's avatar
Deike Kleberg committed
324

325
  xdebug("%s", "RETURN. CLEANED UP MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
326
327
328
}

/************************************************************************/
329

330
331
struct collDesc
{
332
  int numDataRecords, numRPCRecords;
333
334
};

335
336
static void
modelWinDefBufferSizes(void)
Deike Kleberg's avatar
Deike Kleberg committed
337
{
338
  int collID, nstreams, * streamIndexList, streamNo, nvars, varID;
339
  int sumWinBufferSize = 0;
340
341
342
  int nProcsColl  = commInqNProcsColl ();
  int rankGlob    = commInqRankGlob ();
  int root = commInqRootGlob ();
343
  struct collDesc *collIndex;
Deike Kleberg's avatar
Deike Kleberg committed
344

345
  xdebug("%s", "START");
346
  xassert(txWin != NULL);
347

Deike Kleberg's avatar
Deike Kleberg committed
348
  nstreams = reshCountType ( &streamOps );
349
  streamIndexList = xmalloc ( nstreams * sizeof ( streamIndexList[0] ));
350
  collIndex = xcalloc(nProcsColl, sizeof (collIndex[0]));
351
  reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
Deike Kleberg's avatar
Deike Kleberg committed
352
353
  for ( streamNo = 0; streamNo < nstreams; streamNo++ )
    {
354
355
356
      // memory required for data
      int streamID = streamIndexList[streamNo];
      int vlistID = streamInqVlist(streamID);
Deike Kleberg's avatar
Deike Kleberg committed
357
358
359
      nvars = vlistNvars ( vlistID );
      for ( varID = 0; varID < nvars; varID++ )
        {
360
361
362
363
364
365
366
367
          int collID = commRankGlob2CollID(vlistInqVarIOrank(vlistID, varID));
          int collIDchunk;
          {
            int varSize = vlistInqVarSize(vlistID, varID);
            int nProcsModel = commInqNProcsModel();
            collIDchunk = (int)ceilf(cdiPIOpartInflate_
                                     * (varSize + nProcsModel - 1)/nProcsModel);
          }
Deike Kleberg's avatar
Deike Kleberg committed
368
          xassert ( collID != CDI_UNDEFID && collIDchunk > 0 );
369
370
371
372
373
374
375
376
377
          collIndex[collID].numDataRecords += 2;
          txWin[collID].size += (size_t)collIDchunk * sizeof (double)
            /* re-align chunks to multiple of double size */
            + sizeof (double) - 1
            /* one header for data record, one for corresponding part
             * descriptor*/
            + 2 * sizeof (union winHeaderEntry)
            /* FIXME: heuristic for size of packed Xt_idxlist */
            + sizeof (Xt_int) * collIDchunk * 3;
Deike Kleberg's avatar
Deike Kleberg committed
378
        }
379

380
381
      // memory required for the function calls encoded
      // for remote execution
Deike Kleberg's avatar
Deike Kleberg committed
382
      // once per stream and timestep for all collprocs only on the modelproc root
383
384
      if ( rankGlob == root )
        for ( collID = 0; collID < nProcsColl; collID++ )
385
          {
386
            collIndex[collID].numRPCRecords += numRPCFuncs;
387
            txWin[collID].size +=
388
              numRPCFuncs * sizeof (union winHeaderEntry)
389
390
391
              + MAXDATAFILENAME
              /* data part of streamDefTimestep */
              + (2 * CDI_MAX_NAME + sizeof (taxis_t));
392
          }
Deike Kleberg's avatar
Deike Kleberg committed
393
    }
394
  for (collID = 0; collID < nProcsColl; ++collID)
395
    {
396
397
      int numRecords = 1 + collIndex[collID].numDataRecords
        + collIndex[collID].numRPCRecords;
398
      txWin[collID].dictSize = numRecords;
399
400
      txWin[collID].dictDataUsed = 1;
      txWin[collID].dictRPCUsed = 0;
401
      /* account for size header */
402
      txWin[collID].size += sizeof (union winHeaderEntry);
403
404
      txWin[collID].size = roundUpToMultiple(txWin[collID].size,
                                             PIO_WIN_ALIGN);
405
      sumWinBufferSize += txWin[collID].size;
406
    }
407
408
409
  free(collIndex);
  free ( streamIndexList );

410
411
  xdebug("sumWinBufferSize=%zu, MAXWINBUFFERSIZE=%zu", (size_t)sumWinBufferSize,
         (size_t)MAXWINBUFFERSIZE);
Deike Kleberg's avatar
Deike Kleberg committed
412
  xassert ( sumWinBufferSize <= MAXWINBUFFERSIZE );
413
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
414
}
415

Deike Kleberg's avatar
Deike Kleberg committed
416
417
418
419

/************************************************************************/


420
static
Deike Kleberg's avatar
Deike Kleberg committed
421
  void modelWinFlushBuffer ( int collID )
Deike Kleberg's avatar
Deike Kleberg committed
422
{
Deike Kleberg's avatar
Deike Kleberg committed
423
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
424

Deike Kleberg's avatar
Deike Kleberg committed
425
426
  xassert ( collID                >= 0         &&
            collID                < nProcsColl &&
427
            txWin[collID].buffer     != NULL      &&
428
429
            txWin != NULL      &&
            txWin[collID].size >= 0         &&
430
            txWin[collID].size <= MAXWINBUFFERSIZE);
431
  memset(txWin[collID].buffer, 0, txWin[collID].size);
432
433
  txWin[collID].head = txWin[collID].buffer
    + txWin[collID].dictSize * sizeof (union winHeaderEntry);
434
  txWin[collID].refuseFuncCall = 0;
435
436
  txWin[collID].dictDataUsed = 1;
  txWin[collID].dictRPCUsed = 0;
Deike Kleberg's avatar
Deike Kleberg committed
437
438
439
}


Deike Kleberg's avatar
Deike Kleberg committed
440
441
442
/************************************************************************/


Deike Kleberg's avatar
Deike Kleberg committed
443
444
445
static
void modelWinCreate ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
446
  int collID, ranks[1];
447
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
448

449
  xdebug("%s", "START");
450
  txWin = xmalloc(nProcsColl * sizeof (txWin[0]));
Deike Kleberg's avatar
Deike Kleberg committed
451

Deike Kleberg's avatar
Deike Kleberg committed
452
  modelWinDefBufferSizes ();
453
  ranks[0] = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
454

Deike Kleberg's avatar
Deike Kleberg committed
455
  for ( collID = 0; collID < nProcsColl; collID ++ )
Deike Kleberg's avatar
Deike Kleberg committed
456
    {
457
      xassert(txWin[collID].size > 0);
458
      txWin[collID].buffer = NULL;
459
      xmpi(MPI_Alloc_mem((MPI_Aint)txWin[collID].size, MPI_INFO_NULL,
460
461
                         &txWin[collID].buffer));
      xassert ( txWin[collID].buffer != NULL );
462
463
      txWin[collID].head = txWin[collID].buffer
        + txWin[collID].dictSize * sizeof (union winHeaderEntry);
464
      xmpi(MPI_Win_create(txWin[collID].buffer, (MPI_Aint)txWin[collID].size, 1,
465
                          MPI_INFO_NULL, commInqCommsIO(collID),
466
                          &txWin[collID].win));
467
468
469
      xmpi(MPI_Comm_group(commInqCommsIO(collID), &txWin[collID].ioGroup));
      xmpi(MPI_Group_incl(txWin[collID].ioGroup, 1, ranks,
                          &txWin[collID].ioGroup ));
Deike Kleberg's avatar
Deike Kleberg committed
470
    }
471
  xdebug("%s", "RETURN, CREATED MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
472
473
474
475
}

/************************************************************************/

476
static void
477
478
modelWinEnqueue(int collID,
                union winHeaderEntry header, const void *data, size_t size)
Deike Kleberg's avatar
Deike Kleberg committed
479
{
480
481
  union winHeaderEntry *winDict
    = (union winHeaderEntry *)txWin[collID].buffer;
482
  int targetEntry;
483
484
  if (header.dataRecord.streamID > 0)
    {
485
      targetEntry = (txWin[collID].dictDataUsed)++;
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
      int offset = header.dataRecord.offset
        = (int)roundUpToMultiple(txWin[collID].head - txWin[collID].buffer,
                                 sizeof (double));
      memcpy(txWin[collID].buffer + offset, data, size);
      txWin[collID].head = txWin[collID].buffer + offset + size;
    }
  else if (header.partDesc.partDescMarker == PARTDESCMARKER)
    {
      targetEntry = (txWin[collID].dictDataUsed)++;
      Xt_uid uid = header.partDesc.uid;
      int offset = -1;
      /* search if same uid entry has already been enqueued */
      for (int entry = 2; entry < targetEntry; entry += 2)
        {
          xassert(winDict[entry].partDesc.partDescMarker
                  == PARTDESCMARKER);
          if (winDict[entry].partDesc.uid == uid)
            {
              offset = winDict[entry].partDesc.offset;
              break;
            }
        }
      if (offset == -1)
        {
          /* not yet used partition descriptor, serialize at
           * current position */
          int position = 0;
          MPI_Comm comm = commInqCommsIO(collID);
          header.partDesc.offset
            = (int)(txWin[collID].head - txWin[collID].buffer);
          size_t size = xt_idxlist_get_pack_size((Xt_idxlist)data, comm);
          size_t remaining_size = txWin[collID].size
            - (txWin[collID].head - txWin[collID].buffer);
          xassert(size <= remaining_size);
          xt_idxlist_pack((Xt_idxlist)data, txWin[collID].head,
                          (int)remaining_size, &position, comm);
          txWin[collID].head += position;
        }
      else
        /* duplicate entries are copied only once per timestep */
        header.partDesc.offset = offset;
527
    }
528
  else
529
    {
530
531
532
533
534
535
536
537
      targetEntry = txWin[collID].dictSize - ++(txWin[collID].dictRPCUsed);
      if (header.funcCall.funcID == STREAMOPEN)
        {
          header.funcCall.funcArgs.newFile.offset
            = (int)(txWin[collID].head - txWin[collID].buffer);
          memcpy(txWin[collID].head, data, size);
          txWin[collID].head += size;
        }
538
539
540
541
542
543
544
      else if (header.funcCall.funcID == STREAMDEFTIMESTEP)
        {
          header.funcCall.funcArgs.streamNewTimestep.offset
            = (int)(txWin[collID].head - txWin[collID].buffer);
          memcpy(txWin[collID].head, data, size);
          txWin[collID].head += size;
        }
545
    }
546
  winDict[targetEntry] = header;
Deike Kleberg's avatar
Deike Kleberg committed
547
548
}

549
550
551
552
553
554
void
pioBufferPartData(int streamID, int varID, const double *data,
                  int nmiss, Xt_idxlist partDesc)
{
  int vlistID, collID = CDI_UNDEFID;

Deike Kleberg's avatar
Deike Kleberg committed
555
556
  vlistID  = streamInqVlist ( streamID );
  collID   = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
557
558
  xassert ( collID         >= 0                    &&
            collID         <  commInqNProcsColl () &&
559
            txWin != NULL);
Deike Kleberg's avatar
Deike Kleberg committed
560

561
  if (txWin[collID].postSet)
562
    {
563
      xmpi(MPI_Win_wait(txWin[collID].win));
564
      txWin[collID].postSet = 0;
Deike Kleberg's avatar
Deike Kleberg committed
565
      modelWinFlushBuffer ( collID );
566
    }
567

568
569
570
571
  Xt_int chunk = xt_idxlist_get_num_indices(partDesc);
  xassert(chunk <= INT_MAX);

  union winHeaderEntry dataHeader
572
    = { .dataRecord = { streamID, varID, -1, nmiss } };
573
574
575
576
577
578
579
580
  modelWinEnqueue(collID, dataHeader, data, chunk * sizeof (data[0]));
  {
    union winHeaderEntry partHeader
      = { .partDesc = { .partDescMarker = PARTDESCMARKER,
                        .uid = xt_idxlist_get_uid(partDesc),
                        .offset = 0 } };
    modelWinEnqueue(collID, partHeader, partDesc, 0);
  }
581

582
  txWin[collID].refuseFuncCall = 1;
Deike Kleberg's avatar
Deike Kleberg committed
583
584
}

585
586
/************************************************************************/

587
588
void pioBufferFuncCall(union winHeaderEntry header,
                       const void *data, size_t data_len)
589
590
591
592
{
  int rankGlob = commInqRankGlob ();
  int root = commInqRootGlob ();
  int collID, nProcsColl = commInqNProcsColl ();
593
  int funcID = header.funcCall.funcID;
594

595
596
  xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
  xdebug("%s, func: %s", "START", funcMap[(-1 - funcID)]);
597

Deike Kleberg's avatar
Deike Kleberg committed
598
599
  if ( rankGlob != root ) return;

600
  xassert(txWin != NULL);
601

Thomas Jahns's avatar
Thomas Jahns committed
602
603
604
605
606
607
608
609
  for (collID = 0; collID < nProcsColl; ++collID)
    {
      if (txWin[collID].postSet)
        {
          xmpi(MPI_Win_wait(txWin[collID].win));
          txWin[collID].postSet = 0;
          modelWinFlushBuffer ( collID );
        }
610
611
      xassert(txWin[collID].dictRPCUsed + txWin[collID].dictDataUsed
              < txWin[collID].dictSize);
Thomas Jahns's avatar
Thomas Jahns committed
612
      xassert(txWin[collID].refuseFuncCall == 0);
613
      modelWinEnqueue(collID, header, data, data_len);
614
    }
615

616
  xdebug("%s", "RETURN");
617
618
}

619
620
621
622
#endif

/*****************************************************************************/

623
624
625
626
/* pioInit definition must currently compile even in non-MPI configurations */
#ifndef MPI_VERSION
#  define MPI_Comm int
#endif
627
628
629
/**
   @brief initializes the MPI_Communicators needed for the
  communication between the calculator PEs and the I/O PEs and within the
630
631
632
633
634
635
636
637
638
  group of I/O PEs.

  commGlob: all PEs

  commPIO: I/O PEs, PEs with highest ranks in commGlob

  commModel: calculating PEs, no I/O PEs

  commsIO[i]:
639
640

  Collective call
641
642
643

  @param comm MPI_Communicator of all calling PEs
  @param nIOP number of I/O PEs
644
645
646
  @param partInflate allow for array partitions on comute
  PE that are at most sized \f$ partInflate * \lceil arraySize /
  numComputePEs \rceil \f$
647
648
649
  @return int indicating wether the calling PE is a calcutator (1) or not (0)
*/

Thomas Jahns's avatar
Thomas Jahns committed
650
651
#ifdef USE_MPI
static int pioNamespace_ = -1;
652
static int xtInitByCDI = 0;
Thomas Jahns's avatar
Thomas Jahns committed
653
654
#endif

655
656
MPI_Comm
pioInit(MPI_Comm commGlob, int nProcsIO, int IOMode,
Thomas Jahns's avatar
Thomas Jahns committed
657
        int *pioNamespace, float partInflate)
658
{
659
#ifdef USE_MPI
Thomas Jahns's avatar
Thomas Jahns committed
660
  int sizeGlob;
Deike Kleberg's avatar
Deike Kleberg committed
661

Deike Kleberg's avatar
Deike Kleberg committed
662
  if ( IOMode < PIO_MINIOMODE || IOMode > PIO_MAXIOMODE )
Deike Kleberg's avatar
Deike Kleberg committed
663
    xabort ( "IOMODE IS NOT VALID." );
664

665
#ifdef _SX
Deike Kleberg's avatar
Deike Kleberg committed
666
667
  if ( IOMode ==  PIO_ASYNCH )
    xabort ( "PIO_ASYNCH DOES NOT WORK ON SX." );
668
669
#endif

670
671
672
673
  if ((xtInitByCDI = (!xt_initialized() || xt_finalized())))
    {
      xt_initialize(commGlob);
    }
674
675
  commInit ();
  commDefCommGlob ( commGlob );
676
  sizeGlob = commInqSizeGlob ();
677

678
679
  if (((IOMode != PIO_NONE && (nProcsIO <= 0 || nProcsIO > sizeGlob - 1)))
      || (IOMode == PIO_NONE && nProcsIO != 1))
680
681
    xabort("DISTRIBUTION OF TASKS ON PROCS IS NOT VALID.\n"
           "nProcsIO=%d, sizeGlob=%d\n", nProcsIO, sizeGlob);
Deike Kleberg's avatar
Deike Kleberg committed
682
683

  commDefNProcsIO ( nProcsIO );
Deike Kleberg's avatar
Deike Kleberg committed
684
  commDefIOMode   ( IOMode, PIO_MAXIOMODE, PIO_MINIOMODEWITHSPECIALPROCS );
Deike Kleberg's avatar
Deike Kleberg committed
685
  commDefCommPio  ();
686

687
688
689
  xassert(partInflate >= 1.0);
  cdiPIOpartInflate_ = partInflate;

Deike Kleberg's avatar
Deike Kleberg committed
690
  // JUST FOR TEST CASES WITH ONLY ONE MPI TASK
691
  if ( commInqSizeGlob () == 1 )
692
    {
693
      pioNamespace_ = *pioNamespace = namespaceNew();
Deike Kleberg's avatar
Deike Kleberg committed
694
695
      return commInqCommGlob ();
    }
696

697
698
  if ( commInqIsProcIO ())
    {
699
      serializeSetMPI();
700
701
702
      namespaceSwitchSet(NSSWITCH_ABORT, NSSW_FUNC(cdiAbortC_MPI));
      namespaceSwitchSet(NSSWITCH_FILE_OPEN, NSSW_FUNC(pioFileOpen));
      namespaceSwitchSet(NSSWITCH_FILE_CLOSE, NSSW_FUNC(pioFileClose));
703
      IOServer ();
Thomas Jahns's avatar
Thomas Jahns committed
704
      namespaceDelete(0);
705
      commDestroy ();
706
      xt_finalize();
707
      MPI_Finalize ();
708
      exit ( EXIT_SUCCESS );
709
710
    }
  else
711
    cdiPioClientSetup(&pioNamespace_, pioNamespace);
712

713
  xdebug ( "nProcsGlob=%d, RETURN", sizeGlob );
714
  return commInqCommModel ();
715
#else
716
  abort();
717
#endif
718
}
719

720
721
#ifndef MPI_VERSION
#  undef MPI_Comm
Deike Kleberg's avatar
Deike Kleberg committed
722
723
#endif

724
/*****************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
725

726
void  pioEndDef ( void )
727
728
{
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
729
  char   * buffer;
730
  int bufferSize;
731
  int rankGlob = commInqRankGlob ();
732

733
  xdebug("%s", "START");
734

Deike Kleberg's avatar
Deike Kleberg committed
735
  varsMapNDeco ( commInqNNodes (), commInqNodeSizes ());
Deike Kleberg's avatar
Deike Kleberg committed
736

737
  if ( rankGlob < commInqNProcsColl ())
Deike Kleberg's avatar
Deike Kleberg committed
738
    {
739
740
      MPI_Comm comm = commInqCommsIO ( rankGlob );
      reshPackBufferCreate(&buffer, &bufferSize, &comm);
741
742

      xmpi ( MPI_Send ( buffer, bufferSize, MPI_PACKED, commInqNProcsModel (),
743
                        RESOURCES, commInqCommsIO ( rankGlob )));
744

745
      xdebug("%s", "SENT MESSAGE WITH TAG \"RESOURCES\"");
746

Deike Kleberg's avatar
Deike Kleberg committed
747
      reshPackBufferDestroy ( &buffer );
Deike Kleberg's avatar
Deike Kleberg committed
748
    }
Deike Kleberg's avatar
Deike Kleberg committed
749
750

  modelWinCreate ();
751
  namespaceDefResStatus ( STAGE_TIMELOOP );
752
  xdebug("%s", "RETURN");
753
754
755
756
757
758
759
760
#endif
}

/************************************************************************/

void  pioEndTimestepping ( void )
{
#ifdef USE_MPI
761
  xdebug("%s", "START");
762
  namespaceDefResStatus ( STAGE_CLEANUP );
763
  xdebug("%s", "RETURN");
764
#endif
765
766
}

Deike Kleberg's avatar
Deike Kleberg committed
767

768
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
769

Deike Kleberg's avatar
Deike Kleberg committed
770

Deike Kleberg's avatar
Deike Kleberg committed
771
772
773
774
775
776
777
778
779
/**
  @brief is invoked by the calculator PEs, to inform
  the I/O PEs that no more data will be written.

  @param

  @return
*/

Deike Kleberg's avatar
Deike Kleberg committed
780
void pioFinalize ( void )
Deike Kleberg's avatar
Deike Kleberg committed
781
{
782
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
783
  int collID, ibuffer = 1111;
784
  xdebug("%s", "START");
Thomas Jahns's avatar
Thomas Jahns committed
785
  namespaceDelete(pioNamespace_);
Deike Kleberg's avatar
Deike Kleberg committed
786
  for ( collID = 0; collID < commInqNProcsColl (); collID++ )
787
    {
788
      xmpi ( MPI_Send ( &ibuffer, 1, MPI_INT, commInqNProcsModel (),
Deike Kleberg's avatar
Deike Kleberg committed
789
                        FINALIZE, commInqCommsIO ( collID )));
790
      xdebug("%s", "SENT MESSAGE WITH TAG \"FINALIZE\"");
791
    }
Deike Kleberg's avatar
Deike Kleberg committed
792
  modelWinCleanup ();
793
  commDestroy ();
794
795
  if (xtInitByCDI)
    xt_finalize();
796
  xdebug("%s", "RETURN");
797
#endif
798
799
}

800
 /************************************************************************/
801

802
void pioWriteTimestep()
803
804
{
#ifdef USE_MPI
805
  int collID, iAssert = 0;
806
  /* int tokenEnd = END; */
807
  int rankGlob = commInqRankGlob ();
808
809
  int nProcsColl = commInqNProcsColl ();
  int nProcsModel = commInqNProcsModel ();
810

811
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
812

813
  xassert(txWin != NULL);
814
815

  if ( rankGlob < nProcsColl )
Deike Kleberg's avatar
Deike Kleberg committed
816
    {
817
818
      xmpi(MPI_Send(NULL, 0, MPI_INT, nProcsModel,
                    WRITETS, commInqCommsIO(rankGlob)));
819
      xdebug("%s", "SENT MESSAGE WITH TAG \"WRITETS\"");
Deike Kleberg's avatar
Deike Kleberg committed
820
    }
821

Deike Kleberg's avatar
Deike Kleberg committed
822
  for ( collID = 0; collID < nProcsColl; collID++ )
823
    {
824
      if (txWin[collID].postSet)
825
        {
826
          xmpi(MPI_Win_wait(txWin[collID].win));
827
          txWin[collID].postSet = 0;
Deike Kleberg's avatar
Deike Kleberg committed
828
829
          modelWinFlushBuffer ( collID );
        }
830
831
      union winHeaderEntry header
        = { .headerSize = { .sizeID = HEADERSIZEMARKER,
832
833
                            .numDataEntries = txWin[collID].dictDataUsed,
                            .numRPCEntries = txWin[collID].dictRPCUsed } };
834
835
836
837
      union winHeaderEntry *winDict
        = (union winHeaderEntry *)txWin[collID].buffer;
      winDict[0] = header;

838
      xmpi(MPI_Win_post(txWin[collID].ioGroup, iAssert, txWin[collID].win));
839
      txWin[collID].postSet = 1;
840
841
    }

842
  xdebug("%s", "RETURN. messages sent, windows posted");
843

844
#endif
Deike Kleberg's avatar
Deike Kleberg committed
845
}
846
847
848

#if defined USE_MPI
void
849
streamWriteVarPart(int streamID, int varID, const void *data,