pio_interface.c 27.3 KB
Newer Older
1
2
3
4
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

5
#include <limits.h>
Deike Kleberg's avatar
Deike Kleberg committed
6
#include <stdlib.h>
7
#include <stdio.h>
8
#include <stdarg.h>
9
10
11

#ifdef USE_MPI
#include <mpi.h>
12
#include <yaxt.h>
13
14
#endif

15
#include "cdi.h"
Deike Kleberg's avatar
Deike Kleberg committed
16
#include "pio_util.h"
Deike Kleberg's avatar
Deike Kleberg committed
17
#include "vlist_var.h"
18
19

#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
20
#include "namespace.h"
21
#include "pio.h"
22
#include "pio_serialize.h"
23
#include "pio_interface.h"
Deike Kleberg's avatar
Deike Kleberg committed
24
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
25
#include "pio_rpc.h"
Deike Kleberg's avatar
Deike Kleberg committed
26
#include "pio_server.h"
27
#include "resource_handle.h"
28
#include "cdi_int.h"
Deike Kleberg's avatar
Deike Kleberg committed
29
#include "vlist.h"
30

Deike Kleberg's avatar
Deike Kleberg committed
31
extern resOps streamOps;
Deike Kleberg's avatar
Deike Kleberg committed
32

33

34
static struct rdmaWin
35
36
{
  size_t size;
37
  unsigned char *buffer, *head;
38
  MPI_Win win;
39
  int postSet, refuseFuncCall;
40
  MPI_Group ioGroup;
41
  int dictSize, dictDataUsed, dictRPCUsed, dict;
42
43
} *txWin = NULL;

Deike Kleberg's avatar
Deike Kleberg committed
44

Deike Kleberg's avatar
Deike Kleberg committed
45
char * funcMap[nFuncs] = {"streamOpen", "streamDefVlist", "streamClose" };
Deike Kleberg's avatar
Deike Kleberg committed
46

47
float cdiPIOpartInflate_;
Deike Kleberg's avatar
Deike Kleberg committed
48

Deike Kleberg's avatar
Deike Kleberg committed
49
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
50

51
static int cmp ( const void * va, const void * vb )
Deike Kleberg's avatar
Deike Kleberg committed
52
{
53
54
    const int ** a, ** b;

Deike Kleberg's avatar
Deike Kleberg committed
55
56
  a = ( const int ** ) va;
  b = ( const int ** ) vb;
57

Deike Kleberg's avatar
Deike Kleberg committed
58
59
60
61
  return (( **a < **b ) - ( **a > **b ));
}

/****************************************************/
62

63
64
65
static void
mapProblems(int problemSizes[], int * problemMapping, int nProblems,
            int nWriter, double * w)
Deike Kleberg's avatar
Deike Kleberg committed
66
67
68
69
70
71
72
{
  int *ip[nProblems];
  int dummy[nProblems];
  int buckets[nWriter];
  int currCapacity, nextCapacity;
  double meanBucket[nWriter];
  int sum = 0;
Thomas Jahns's avatar
Thomas Jahns committed
73
  int writerIdx = 0;
Deike Kleberg's avatar
Deike Kleberg committed
74
  int i, j;
75

Deike Kleberg's avatar
Deike Kleberg committed
76
77
78
79
80
81
82
83
84
85
86

  for ( i = 0; i < nProblems; i++ )
    {
      ip[i] = &problemSizes[i];
      sum += problemSizes[i];
    }

  qsort ( ip, nProblems, sizeof ( int * ), cmp );

  for ( i = 0; i < nProblems; i++ )
    dummy[i] = ip[i] - problemSizes;
87
88

  for ( j = 0; j < nWriter; j++ )
Deike Kleberg's avatar
Deike Kleberg committed
89
90
91
92
93
    meanBucket[j] = ( double ) sum * ( * ( w + j ));

  for ( i = 0; i < nProblems; i++ )
    {
      currCapacity = INT_MIN;
94

Deike Kleberg's avatar
Deike Kleberg committed
95
      for ( j = 0; j < nWriter; j++ )
96
97
98
	{
	  if ( !i ) buckets[j] = 0.0;
	  nextCapacity = meanBucket[j] - ( buckets[j] + ( *ip[i] ));
99

100
101
102
103
104
105
	  if ( nextCapacity > currCapacity )
	    {
	      currCapacity = nextCapacity;
	      writerIdx = j;
	    }
	}
Thomas Jahns's avatar
Thomas Jahns committed
106
      problemMapping[ dummy[i] ] = writerIdx;
Deike Kleberg's avatar
Deike Kleberg committed
107
108
      buckets[writerIdx] +=  *ip[i];
    }
Deike Kleberg's avatar
Deike Kleberg committed
109

110
111
112
  xprintArray3 (  "problemSizes = ", problemSizes, nProblems, DATATYPE_INT );
  xprintArray3 ( "vector of indices, qsort of problemSizes", dummy,
                nProblems, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
113
114
  xprintArray3 ( "problemMapping", problemMapping, nProblems, DATATYPE_INT );
  xprintArray3 ( "meanBucket", meanBucket, nWriter, DATATYPE_FLT );
115
  xprintArray3 ( "actual buckets", buckets, nWriter, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
116
117
}

118
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
119
120
121
122
123

/**
   @brief is encapsulated in CDI library.

   @param vSizes array with number of levels for all var_t 's in order of streams
124

Deike Kleberg's avatar
Deike Kleberg committed
125
126
   @param sSizes array with number of var_t for all stream_t 's

127
   @param varMapping return value, array with ranks of I/O PEs assigned to var_t 's
Deike Kleberg's avatar
Deike Kleberg committed
128
129
130
in order of vSizes

   @param nStreams number of stream_t 's
131
132

   @param nodeSizes array of number of I/O PEs on each physical nodes, increasing
Deike Kleberg's avatar
Deike Kleberg committed
133
134
135
   order of ranks assumed

   @param nNodes number of physical nodes hosting I/O PEs
136

Deike Kleberg's avatar
Deike Kleberg committed
137
138
   @return
*/
139

140
141
142
static void
varMapGen(int *vSizes, int *sSizes, int *varMapping,
          int nStreams, int *nodeSizes, int nNodes)
Deike Kleberg's avatar
Deike Kleberg committed
143
{
144

Deike Kleberg's avatar
Deike Kleberg committed
145
146
147
148
149
  int weightsStreams[nStreams];
  int streamMapping[nStreams];
  int nPEs = 0, nVars = 0;
  int i, j, k, offset = 0, offsetN = 0;
  double * w;
150

Deike Kleberg's avatar
Deike Kleberg committed
151
152
153
  int * weightsVarsNode;
  int * varMappingNode;
  int nVarsNode, summandRank = 0;
154
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
155

156
  int buckets[nProcsColl];
Deike Kleberg's avatar
Deike Kleberg committed
157
158
159
160
161
162

  for ( i = 0; i < nStreams; i++ )
    {
      nVars += * ( sSizes + i );
      weightsStreams[i] = 0;
      for ( j = 0; j < * ( sSizes + i ); j++ )
163
	weightsStreams[i] += * ( vSizes + offset++ );
Deike Kleberg's avatar
Deike Kleberg committed
164
165
    }

Deike Kleberg's avatar
Deike Kleberg committed
166
  w = ( double * ) xmalloc ( nNodes * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
167
168
169
170
171
172
173
174
175
176
177
178
179
  for ( j = 0; j < nNodes; j++ )
    nPEs += * ( nodeSizes + j );

  for ( j = 0; j < nNodes; j++ )
    w[j] = ( double ) * ( nodeSizes + j ) / ( double ) nPEs;

  mapProblems ( weightsStreams, streamMapping, nStreams, nNodes, w );
  free ( w );

  for ( i = 0; i < nNodes; i++ )
    {
      nVarsNode = 0;
      for ( j = 0; j < nStreams; j++ )
180
	if ( * ( streamMapping + j ) == i )
181
182
	  nVarsNode += * ( sSizes + j );

Deike Kleberg's avatar
Deike Kleberg committed
183
184
185
      weightsVarsNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      varMappingNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      w = ( double * ) xmalloc ( * ( nodeSizes + i ) * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
186
187
188
189
      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
190
191
192
193
194
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
	    * ( weightsVarsNode + offsetN++ ) = * ( vSizes + offset++ );
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
195
196

      for ( j = 0; j < * ( nodeSizes + i ); j ++ )
197
	w[j] = 1.0 / ( double ) * ( nodeSizes + i );
Deike Kleberg's avatar
Deike Kleberg committed
198

199
      mapProblems ( weightsVarsNode, varMappingNode, nVarsNode,
200
		    * ( nodeSizes + i ),  w );
Deike Kleberg's avatar
Deike Kleberg committed
201
202
203
204
205

      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
206
207
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
208
	    * ( varMapping + offset ++ ) =
209
              commCollID2RankGlob ( * ( varMappingNode + offsetN ++ ) +
210
                                    summandRank );
211
212
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
213
214
215
216
217
218
219

      summandRank += * ( nodeSizes + i );

      free ( w );
      free ( varMappingNode );
      free ( weightsVarsNode );
    }
220

221
  if ( ddebug )
Deike Kleberg's avatar
Deike Kleberg committed
222
    {
Deike Kleberg's avatar
Deike Kleberg committed
223
      xprintArray ( "varMapping", varMapping, nVars, DATATYPE_INT  );
224
      for ( i = 0; i < nProcsColl; i++ )
225
	buckets[i] = 0;
Deike Kleberg's avatar
Deike Kleberg committed
226
      for ( i = 0; i < nVars; i ++ )
227
	buckets[commRankGlob2CollID ( *(varMapping + i ))] += * ( vSizes + i );
228
      xprintArray ( "buckets", buckets, nProcsColl, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
229
    }
230
}
Deike Kleberg's avatar
Deike Kleberg committed
231

232
/************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
233

234
235
static void
varsMapNDeco(int nNodes, int *nodeSizes)
Deike Kleberg's avatar
Deike Kleberg committed
236
{
237
  int nStreams, nVars, * resHs, * streamSizes, * varSizes, * varMapping,
Thomas Jahns's avatar
Thomas Jahns committed
238
    * collectsData;
Deike Kleberg's avatar
Deike Kleberg committed
239
  int i, j, k = 0;
Deike Kleberg's avatar
Deike Kleberg committed
240
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
241

242
243
  xdebug ( "START, nProcsColl=%d", nProcsColl );

Deike Kleberg's avatar
Deike Kleberg committed
244
  nStreams = streamSize ();
245

246
247
  resHs       = xmalloc ( nStreams * sizeof ( resHs[0] ));
  streamSizes = xmalloc ( nStreams * sizeof ( streamSizes[0] ));
248
  collectsData = xmalloc ( nProcsColl * sizeof ( collectsData[0] ));
249
  streamGetIndexList ( nStreams, resHs );
250

Deike Kleberg's avatar
Deike Kleberg committed
251
  for ( i = 0; i < nStreams; i++ )
252
    streamSizes[i] = streamInqNvars ( * ( resHs + i ));
253

Deike Kleberg's avatar
Deike Kleberg committed
254
  nVars = xsum ( nStreams, streamSizes );
255
256
  varSizes   = xmalloc ( nVars * sizeof ( varSizes[0] ));
  varMapping = xmalloc ( nVars * sizeof ( varMapping[0] ));
257

Deike Kleberg's avatar
Deike Kleberg committed
258
259
  for ( i = 0; i < nStreams; i++ )
    for ( j = 0; j < * ( streamSizes + i ); j++ )
260
      varSizes[k++] += vlistInqVarSize ( streamInqVlist ( * ( resHs + i )), j );
261

Deike Kleberg's avatar
Deike Kleberg committed
262
  xassert ( k == nVars );
263

264
  varMapGen ( varSizes, streamSizes, varMapping,
Deike Kleberg's avatar
Deike Kleberg committed
265
	      nStreams, nodeSizes, nNodes );
266
267
268

  k = 0;
  for ( i = 0; i < nStreams; i++ )
269
    for ( j = 0; j < * ( streamSizes + i ); j++ )
270
      {
271
        vlistDefVarIOrank ( streamInqVlist ( * ( resHs + i )), j,
272
                            * ( varMapping + k ));
273
        vlistDefVarIOrank ( streamInqVlistIDorig ( * ( resHs + i )), j,
274
275
                            * ( varMapping + k ));
        collectsData[commRankGlob2CollID ( varMapping[k++] )] = 1;
276
      }
277
278
279

  for ( j = 0; j < nProcsColl; j++ )
    if ( collectsData[j] == 0 )
280
281
282
283
      xabort("AT LEAST ONE COLLECTOR PROCESS IDLES, "
             "CURRENTLY NOT COVERED: "
             "PE%d collects no data",
             commCollID2RankGlob(j));
284

285
286
287
288
289
  if ( varMapping )   free ( varMapping );
  if ( varSizes )     free ( varSizes );
  if ( collectsData ) free ( collectsData );
  if ( streamSizes )  free ( streamSizes );
  if ( resHs )        free ( resHs );
290

291
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
292
293
294
295
}

/************************************************************************/

296
static
Deike Kleberg's avatar
Deike Kleberg committed
297
298
void modelWinCleanup ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
299
  int collID;
Deike Kleberg's avatar
Deike Kleberg committed
300

301
  xdebug("%s", "START");
302
  if (txWin != NULL)
303
304
305
306
307
308
309
310
311
312
313
    {
      for ( collID = 0; collID < commInqNProcsColl (); collID ++ )
        {
          if (txWin[collID].postSet)
            xmpi(MPI_Win_wait(txWin[collID].win));
          xmpi(MPI_Win_free(&txWin[collID].win));
          xmpi ( MPI_Free_mem ( txWin[collID].buffer ));
          xmpi(MPI_Group_free(&txWin[collID].ioGroup));
        }
      free(txWin);
    }
Deike Kleberg's avatar
Deike Kleberg committed
314

315
  xdebug("%s", "RETURN. CLEANED UP MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
316
317
318
}

/************************************************************************/
319

320
321
struct collDesc
{
322
  int numDataRecords, numRPCRecords;
323
324
};

325
326
static void
modelWinDefBufferSizes(void)
Deike Kleberg's avatar
Deike Kleberg committed
327
{
328
  int collID, nstreams, * streamIndexList, streamNo, nvars, varID;
329
  int sumWinBufferSize = 0;
330
331
  int nProcsColl  = commInqNProcsColl ();
  int rankGlob    = commInqRankGlob ();
332
  int rankModel   = commInqRankModel ();
333
  int root = commInqRootGlob ();
334
  struct collDesc *collIndex;
Deike Kleberg's avatar
Deike Kleberg committed
335

336
  xdebug("%s", "START");
337
  xassert(txWin != NULL);
338

Deike Kleberg's avatar
Deike Kleberg committed
339
  nstreams = reshCountType ( &streamOps );
340
  streamIndexList = xmalloc ( nstreams * sizeof ( streamIndexList[0] ));
341
  collIndex = xcalloc(nProcsColl, sizeof (collIndex[0]));
342
  reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
Deike Kleberg's avatar
Deike Kleberg committed
343
344
  for ( streamNo = 0; streamNo < nstreams; streamNo++ )
    {
345
346
347
      // memory required for data
      int streamID = streamIndexList[streamNo];
      int vlistID = streamInqVlist(streamID);
Deike Kleberg's avatar
Deike Kleberg committed
348
349
350
      nvars = vlistNvars ( vlistID );
      for ( varID = 0; varID < nvars; varID++ )
        {
351
352
353
354
355
356
357
358
          int collID = commRankGlob2CollID(vlistInqVarIOrank(vlistID, varID));
          int collIDchunk;
          {
            int varSize = vlistInqVarSize(vlistID, varID);
            int nProcsModel = commInqNProcsModel();
            collIDchunk = (int)ceilf(cdiPIOpartInflate_
                                     * (varSize + nProcsModel - 1)/nProcsModel);
          }
Deike Kleberg's avatar
Deike Kleberg committed
359
          xassert ( collID != CDI_UNDEFID && collIDchunk > 0 );
360
361
362
363
364
365
366
367
368
          collIndex[collID].numDataRecords += 2;
          txWin[collID].size += (size_t)collIDchunk * sizeof (double)
            /* re-align chunks to multiple of double size */
            + sizeof (double) - 1
            /* one header for data record, one for corresponding part
             * descriptor*/
            + 2 * sizeof (union winHeaderEntry)
            /* FIXME: heuristic for size of packed Xt_idxlist */
            + sizeof (Xt_int) * collIDchunk * 3;
Deike Kleberg's avatar
Deike Kleberg committed
369
        }
370

371
      // memory required for the 3 function calls streamOpen, streamDefVlist, streamClose
Deike Kleberg's avatar
Deike Kleberg committed
372
      // once per stream and timestep for all collprocs only on the modelproc root
373
374
      if ( rankGlob == root )
        for ( collID = 0; collID < nProcsColl; collID++ )
375
          {
376
            collIndex[collID].numRPCRecords += 3;
377
378
379
380
            txWin[collID].size +=
              3 * sizeof (union winHeaderEntry)
              + MAXDATAFILENAME;
          }
Deike Kleberg's avatar
Deike Kleberg committed
381
    }
382
  for (collID = 0; collID < nProcsColl; ++collID)
383
    {
384
385
      int numRecords = 1 + collIndex[collID].numDataRecords
        + collIndex[collID].numRPCRecords;
386
      txWin[collID].dictSize = numRecords;
387
388
      txWin[collID].dictDataUsed = 1;
      txWin[collID].dictRPCUsed = 0;
389
      /* account for size header */
390
      txWin[collID].size += sizeof (union winHeaderEntry);
391
392
      txWin[collID].size = roundUpToMultiple(txWin[collID].size,
                                             PIO_WIN_ALIGN);
393
      sumWinBufferSize += txWin[collID].size;
394
    }
395
396
397
  free(collIndex);
  free ( streamIndexList );

398
399
  xdebug("sumWinBufferSize=%zu, MAXWINBUFFERSIZE=%zu", (size_t)sumWinBufferSize,
         (size_t)MAXWINBUFFERSIZE);
Deike Kleberg's avatar
Deike Kleberg committed
400
  xassert ( sumWinBufferSize <= MAXWINBUFFERSIZE );
401
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
402
}
403

Deike Kleberg's avatar
Deike Kleberg committed
404
405
406
407

/************************************************************************/


408
static
Deike Kleberg's avatar
Deike Kleberg committed
409
  void modelWinFlushBuffer ( int collID )
Deike Kleberg's avatar
Deike Kleberg committed
410
{
Deike Kleberg's avatar
Deike Kleberg committed
411
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
412

Deike Kleberg's avatar
Deike Kleberg committed
413
414
  xassert ( collID                >= 0         &&
            collID                < nProcsColl &&
415
            txWin[collID].buffer     != NULL      &&
416
417
            txWin != NULL      &&
            txWin[collID].size >= 0         &&
418
            txWin[collID].size <= MAXWINBUFFERSIZE);
419
  memset(txWin[collID].buffer, 0, txWin[collID].size);
420
421
  txWin[collID].head = txWin[collID].buffer
    + txWin[collID].dictSize * sizeof (union winHeaderEntry);
422
  txWin[collID].refuseFuncCall = 0;
423
424
  txWin[collID].dictDataUsed = 1;
  txWin[collID].dictRPCUsed = 0;
Deike Kleberg's avatar
Deike Kleberg committed
425
426
427
}


Deike Kleberg's avatar
Deike Kleberg committed
428
429
430
/************************************************************************/


Deike Kleberg's avatar
Deike Kleberg committed
431
432
433
static
void modelWinCreate ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
434
  int collID, ranks[1];
435
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
436

437
  xdebug("%s", "START");
438
  txWin = xmalloc(nProcsColl * sizeof (txWin[0]));
Deike Kleberg's avatar
Deike Kleberg committed
439

Deike Kleberg's avatar
Deike Kleberg committed
440
  modelWinDefBufferSizes ();
441
  ranks[0] = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
442

Deike Kleberg's avatar
Deike Kleberg committed
443
  for ( collID = 0; collID < nProcsColl; collID ++ )
Deike Kleberg's avatar
Deike Kleberg committed
444
    {
445
      xassert(txWin[collID].size > 0);
446
      txWin[collID].buffer = NULL;
447
      xmpi(MPI_Alloc_mem((MPI_Aint)txWin[collID].size, MPI_INFO_NULL,
448
449
                         &txWin[collID].buffer));
      xassert ( txWin[collID].buffer != NULL );
450
451
      txWin[collID].head = txWin[collID].buffer
        + txWin[collID].dictSize * sizeof (union winHeaderEntry);
452
      xmpi(MPI_Win_create(txWin[collID].buffer, (MPI_Aint)txWin[collID].size, 1,
453
                          MPI_INFO_NULL, commInqCommsIO(collID),
454
                          &txWin[collID].win));
455
456
457
      xmpi(MPI_Comm_group(commInqCommsIO(collID), &txWin[collID].ioGroup));
      xmpi(MPI_Group_incl(txWin[collID].ioGroup, 1, ranks,
                          &txWin[collID].ioGroup ));
Deike Kleberg's avatar
Deike Kleberg committed
458
    }
459
  xdebug("%s", "RETURN, CREATED MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
460
461
462
463
}

/************************************************************************/

464
static void
465
466
modelWinEnqueue(int collID,
                union winHeaderEntry header, const void *data, size_t size)
Deike Kleberg's avatar
Deike Kleberg committed
467
{
468
469
  union winHeaderEntry *winDict
    = (union winHeaderEntry *)txWin[collID].buffer;
470
  int targetEntry;
471
472
  if (header.dataRecord.streamID > 0)
    {
473
      targetEntry = (txWin[collID].dictDataUsed)++;
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
      int offset = header.dataRecord.offset
        = (int)roundUpToMultiple(txWin[collID].head - txWin[collID].buffer,
                                 sizeof (double));
      memcpy(txWin[collID].buffer + offset, data, size);
      txWin[collID].head = txWin[collID].buffer + offset + size;
    }
  else if (header.partDesc.partDescMarker == PARTDESCMARKER)
    {
      targetEntry = (txWin[collID].dictDataUsed)++;
      Xt_uid uid = header.partDesc.uid;
      int offset = -1;
      /* search if same uid entry has already been enqueued */
      for (int entry = 2; entry < targetEntry; entry += 2)
        {
          xassert(winDict[entry].partDesc.partDescMarker
                  == PARTDESCMARKER);
          if (winDict[entry].partDesc.uid == uid)
            {
              offset = winDict[entry].partDesc.offset;
              break;
            }
        }
      if (offset == -1)
        {
          /* not yet used partition descriptor, serialize at
           * current position */
          int position = 0;
          MPI_Comm comm = commInqCommsIO(collID);
          header.partDesc.offset
            = (int)(txWin[collID].head - txWin[collID].buffer);
          size_t size = xt_idxlist_get_pack_size((Xt_idxlist)data, comm);
          size_t remaining_size = txWin[collID].size
            - (txWin[collID].head - txWin[collID].buffer);
          xassert(size <= remaining_size);
          xt_idxlist_pack((Xt_idxlist)data, txWin[collID].head,
                          (int)remaining_size, &position, comm);
          txWin[collID].head += position;
        }
      else
        /* duplicate entries are copied only once per timestep */
        header.partDesc.offset = offset;
515
    }
516
  else
517
    {
518
519
520
521
522
523
524
525
      targetEntry = txWin[collID].dictSize - ++(txWin[collID].dictRPCUsed);
      if (header.funcCall.funcID == STREAMOPEN)
        {
          header.funcCall.funcArgs.newFile.offset
            = (int)(txWin[collID].head - txWin[collID].buffer);
          memcpy(txWin[collID].head, data, size);
          txWin[collID].head += size;
        }
526
    }
527
  winDict[targetEntry] = header;
Deike Kleberg's avatar
Deike Kleberg committed
528
529
}

530
531
532
533
534
535
void
pioBufferPartData(int streamID, int varID, const double *data,
                  int nmiss, Xt_idxlist partDesc)
{
  int vlistID, collID = CDI_UNDEFID;

Deike Kleberg's avatar
Deike Kleberg committed
536
537
  vlistID  = streamInqVlist ( streamID );
  collID   = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
538
539
  xassert ( collID         >= 0                    &&
            collID         <  commInqNProcsColl () &&
540
            txWin != NULL);
Deike Kleberg's avatar
Deike Kleberg committed
541

542
  if (txWin[collID].postSet)
543
    {
544
      xmpi(MPI_Win_wait(txWin[collID].win));
545
      txWin[collID].postSet = 0;
Deike Kleberg's avatar
Deike Kleberg committed
546
      modelWinFlushBuffer ( collID );
547
    }
548

549
550
551
552
  Xt_int chunk = xt_idxlist_get_num_indices(partDesc);
  xassert(chunk <= INT_MAX);

  union winHeaderEntry dataHeader
553
    = { .dataRecord = { streamID, varID, -1, nmiss } };
554
555
556
557
558
559
560
561
  modelWinEnqueue(collID, dataHeader, data, chunk * sizeof (data[0]));
  {
    union winHeaderEntry partHeader
      = { .partDesc = { .partDescMarker = PARTDESCMARKER,
                        .uid = xt_idxlist_get_uid(partDesc),
                        .offset = 0 } };
    modelWinEnqueue(collID, partHeader, partDesc, 0);
  }
562

563
  txWin[collID].refuseFuncCall = 1;
Deike Kleberg's avatar
Deike Kleberg committed
564
565
}

566
567
/************************************************************************/

568
void pioBufferFuncCall(int funcID, int argc, ... )
569
570
571
572
573
{
  va_list ap;
  int rankGlob = commInqRankGlob ();
  int root = commInqRootGlob ();
  int collID, nProcsColl = commInqNProcsColl ();
574

575
576
  xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
  xdebug("%s, func: %s", "START", funcMap[(-1 - funcID)]);
577

Deike Kleberg's avatar
Deike Kleberg committed
578
579
  if ( rankGlob != root ) return;

580
581
582
  xassert (argc          >= 1                    &&
           argc          <= 2                    &&
           txWin != NULL);
583

Deike Kleberg's avatar
Deike Kleberg committed
584
  va_start ( ap, argc );
585

Thomas Jahns's avatar
Thomas Jahns committed
586
587
588
589
590
  union winHeaderEntry header;
  char * filename = NULL;
  size_t filenamesz = 0;
  int streamID = 0, vlistID = 0, filetype = 0;

591
  switch ( funcID )
Deike Kleberg's avatar
Deike Kleberg committed
592
    {
593
    case STREAMCLOSE:
Thomas Jahns's avatar
Thomas Jahns committed
594
595
      xassert ( argc == 1 );
    case STREAMDEFVLIST:
596
      {
Thomas Jahns's avatar
Thomas Jahns committed
597
598
599
        streamID = va_arg(ap, int);
        vlistID = CDI_UNDEFID;
        if (funcID == STREAMDEFVLIST)
600
          {
Thomas Jahns's avatar
Thomas Jahns committed
601
602
            xassert ( argc == 2 );
            vlistID = va_arg(ap, int);
603
          }
Thomas Jahns's avatar
Thomas Jahns committed
604
605
606
607
        struct funcCallDesc f = { .funcID = funcID,
                                  .funcArgs.streamChange = { streamID,
                                                             vlistID } };
        header.funcCall = f;
608
      }
609
      break;
Deike Kleberg's avatar
Deike Kleberg committed
610
611
    case STREAMOPEN:
      {
Deike Kleberg's avatar
Deike Kleberg committed
612
        xassert ( argc == 2 );
Thomas Jahns's avatar
Thomas Jahns committed
613
614
        filename = va_arg(ap, char *);
        filenamesz = strlen(filename);
Deike Kleberg's avatar
Deike Kleberg committed
615
616
        xassert ( filenamesz > 0 &&
                  filenamesz < MAXDATAFILENAME );
Thomas Jahns's avatar
Thomas Jahns committed
617
618
619
620
621
622
623
        filetype = va_arg(ap, int);
        struct funcCallDesc f = { .funcID = STREAMOPEN,
                                  .funcArgs.newFile
                                  = { .fnamelen = filenamesz,
                                      .filetype = filetype } };
        header.funcCall = f;
        ++filenamesz;
Deike Kleberg's avatar
Deike Kleberg committed
624
      }
625
      break;
Thomas Jahns's avatar
Thomas Jahns committed
626
627
628
    default:
      xabort("FUNCTION NOT MAPPED!");
    }
629

Thomas Jahns's avatar
Thomas Jahns committed
630
631
  for (collID = 0; collID < nProcsColl; ++collID)
    {
632
633
      xassert((txWin[collID].dictSize - txWin[collID].dictRPCUsed)
              > txWin[collID].dictDataUsed);
Thomas Jahns's avatar
Thomas Jahns committed
634
635
636
637
638
639
640
641
642
      if (txWin[collID].postSet)
        {
          xmpi(MPI_Win_wait(txWin[collID].win));
          txWin[collID].postSet = 0;
          modelWinFlushBuffer ( collID );
        }
      xassert(txWin[collID].refuseFuncCall == 0);
      modelWinEnqueue(collID, header, filename, filenamesz);
    }
643

Thomas Jahns's avatar
Thomas Jahns committed
644
645
646
647
648
649
650
651
652
653
654
655
656
657
  switch ( funcID )
    {
    case STREAMCLOSE:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, streamID=%d",
             funcMap[(-1 - funcID)], streamID);
      break;
    case STREAMDEFVLIST:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, streamID=%d,"
             " vlistID=%d", funcMap[(-1 - funcID)], streamID, vlistID);
      break;
    case STREAMOPEN:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, filenamesz=%zu,"
             " filename=%s, filetype=%d",
             funcMap[(-1 - funcID)], filenamesz, filename, filetype);
658
      break;
659
    }
660

Thomas Jahns's avatar
Thomas Jahns committed
661
  va_end(ap);
662
  xdebug("%s", "RETURN");
663
664
}

665
666
667
668
#endif

/*****************************************************************************/

669
670
671
672
/* pioInit definition must currently compile even in non-MPI configurations */
#ifndef MPI_VERSION
#  define MPI_Comm int
#endif
673
674
675
/**
   @brief initializes the MPI_Communicators needed for the
  communication between the calculator PEs and the I/O PEs and within the
676
677
678
679
680
681
682
683
684
  group of I/O PEs.

  commGlob: all PEs

  commPIO: I/O PEs, PEs with highest ranks in commGlob

  commModel: calculating PEs, no I/O PEs

  commsIO[i]:
685
686

  Collective call
687
688
689

  @param comm MPI_Communicator of all calling PEs
  @param nIOP number of I/O PEs
690
691
692
  @param partInflate allow for array partitions on comute
  PE that are at most sized \f$ partInflate * \lceil arraySize /
  numComputePEs \rceil \f$
693
694
695
  @return int indicating wether the calling PE is a calcutator (1) or not (0)
*/

Thomas Jahns's avatar
Thomas Jahns committed
696
697
#ifdef USE_MPI
static int pioNamespace_ = -1;
698
static int xtInitByCDI = 0;
Thomas Jahns's avatar
Thomas Jahns committed
699
700
#endif

Thomas Jahns's avatar
Thomas Jahns committed
701
702
703
704
705
706
707
#if defined USE_MPI && defined HAVE_LIBNETCDF
static void
cdiPioCdfDefTimestepNOP(stream_t *streamptr, int tsID)
{
}
#endif

708
709
MPI_Comm
pioInit(MPI_Comm commGlob, int nProcsIO, int IOMode,
Thomas Jahns's avatar
Thomas Jahns committed
710
        int *pioNamespace, float partInflate)
711
{
712
#ifdef USE_MPI
Thomas Jahns's avatar
Thomas Jahns committed
713
  int sizeGlob;
Deike Kleberg's avatar
Deike Kleberg committed
714

Deike Kleberg's avatar
Deike Kleberg committed
715
  if ( IOMode < PIO_MINIOMODE || IOMode > PIO_MAXIOMODE )
Deike Kleberg's avatar
Deike Kleberg committed
716
    xabort ( "IOMODE IS NOT VALID." );
717

718
#ifdef _SX
Deike Kleberg's avatar
Deike Kleberg committed
719
720
  if ( IOMode ==  PIO_ASYNCH )
    xabort ( "PIO_ASYNCH DOES NOT WORK ON SX." );
721
722
#endif

723
724
725
726
  if ((xtInitByCDI = (!xt_initialized() || xt_finalized())))
    {
      xt_initialize(commGlob);
    }
727
728
  commInit ();
  commDefCommGlob ( commGlob );
729
  sizeGlob = commInqSizeGlob ();
730

731
732
  if (((IOMode != PIO_NONE && (nProcsIO <= 0 || nProcsIO > sizeGlob - 1)))
      || (IOMode == PIO_NONE && nProcsIO != 1))
733
734
    xabort("DISTRIBUTION OF TASKS ON PROCS IS NOT VALID.\n"
           "nProcsIO=%d, sizeGlob=%d\n", nProcsIO, sizeGlob);
Deike Kleberg's avatar
Deike Kleberg committed
735
736

  commDefNProcsIO ( nProcsIO );
Deike Kleberg's avatar
Deike Kleberg committed
737
  commDefIOMode   ( IOMode, PIO_MAXIOMODE, PIO_MINIOMODEWITHSPECIALPROCS );
Deike Kleberg's avatar
Deike Kleberg committed
738
  commDefCommPio  ();
739

740
741
742
  xassert(partInflate >= 1.0);
  cdiPIOpartInflate_ = partInflate;

Deike Kleberg's avatar
Deike Kleberg committed
743
  // JUST FOR TEST CASES WITH ONLY ONE MPI TASK
744
  if ( commInqSizeGlob () == 1 )
745
    {
Thomas Jahns's avatar
Thomas Jahns committed
746
      pioNamespace_ = *pioNamespace = namespaceNew(0);
Deike Kleberg's avatar
Deike Kleberg committed
747
748
      return commInqCommGlob ();
    }
749

750
751
  if ( commInqIsProcIO ())
    {
752
      serializeSetMPI();
Thomas Jahns's avatar
Thomas Jahns committed
753
      namespaceSwitchSet(NSSWITCH_ABORT, cdiAbortC_MPI);
754
755
      namespaceSwitchSet(NSSWITCH_FILE_OPEN, pioFileOpen);
      namespaceSwitchSet(NSSWITCH_FILE_CLOSE, pioFileClose);
756
      IOServer ();
Thomas Jahns's avatar
Thomas Jahns committed
757
      namespaceDelete(0);
758
      commDestroy ();
759
      xt_finalize();
760
      MPI_Finalize ();
761
      exit ( EXIT_SUCCESS );
762
763
    }
  else
764
    {
765
      commEvalPhysNodes ();
766
      commDefCommsIO ();
Thomas Jahns's avatar
Thomas Jahns committed
767
      pioNamespace_ = *pioNamespace = namespaceNew(0);
768
769
770
      int callerCDINamespace = namespaceGetActive();
      pioNamespaceSetActive(pioNamespace_);
      serializeSetMPI();
771
772
773
774
775
776
      namespaceSwitchSet(NSSWITCH_STREAM_OPEN_BACKEND, cdiPioStreamOpen);
      namespaceSwitchSet(NSSWITCH_STREAM_DEF_VLIST_, cdiPioStreamDefVlist_);
      namespaceSwitchSet(NSSWITCH_STREAM_WRITE_VAR_, cdiPioStreamWriteVar_);
      namespaceSwitchSet(NSSWITCH_STREAM_WRITE_VAR_CHUNK_,
                         cdiPioStreamWriteVarChunk_);
      namespaceSwitchSet(NSSWITCH_STREAM_CLOSE_BACKEND, cdiPioStreamClose);
Thomas Jahns's avatar
Thomas Jahns committed
777
778
779
#ifdef HAVE_LIBNETCDF
      namespaceSwitchSet(NSSWITCH_CDF_DEF_TIMESTEP, cdiPioCdfDefTimestepNOP);
#endif
780
      pioNamespaceSetActive(callerCDINamespace);
781
    }
782

783
  xdebug ( "nProcsGlob=%d, RETURN", sizeGlob );
784
  return commInqCommModel ();
785
#else
786
  abort();
787
#endif
788
}
789

790
791
#ifndef MPI_VERSION
#  undef MPI_Comm
Deike Kleberg's avatar
Deike Kleberg committed
792
793
#endif

794
/*****************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
795

796
void  pioEndDef ( void )
797
798
{
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
799
  char   * buffer;
800
  int bufferSize;
801
  int rankGlob = commInqRankGlob ();
802

803
  xdebug("%s", "START");
804

Deike Kleberg's avatar
Deike Kleberg committed
805
  varsMapNDeco ( commInqNNodes (), commInqNodeSizes ());
Deike Kleberg's avatar
Deike Kleberg committed
806

807
  if ( rankGlob < commInqNProcsColl ())
Deike Kleberg's avatar
Deike Kleberg committed
808
    {
809
810
      MPI_Comm comm = commInqCommsIO ( rankGlob );
      reshPackBufferCreate(&buffer, &bufferSize, &comm);
811
812

      xmpi ( MPI_Send ( buffer, bufferSize, MPI_PACKED, commInqNProcsModel (),
813
                        RESOURCES, commInqCommsIO ( rankGlob )));
814

815
      xdebug("%s", "SENT MESSAGE WITH TAG \"RESOURCES\"");
816

Deike Kleberg's avatar
Deike Kleberg committed
817
      reshPackBufferDestroy ( &buffer );
Deike Kleberg's avatar
Deike Kleberg committed
818
    }
Deike Kleberg's avatar
Deike Kleberg committed
819
820

  modelWinCreate ();
821
  namespaceDefResStatus ( STAGE_TIMELOOP );
822
  xdebug("%s", "RETURN");
823
824
825
826
827
828
829
830
#endif
}

/************************************************************************/

void  pioEndTimestepping ( void )
{
#ifdef USE_MPI
831
  xdebug("%s", "START");
832
  namespaceDefResStatus ( STAGE_CLEANUP );
833
  xdebug("%s", "RETURN");
834
#endif
835
836
}

Deike Kleberg's avatar
Deike Kleberg committed
837

838
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
839

Deike Kleberg's avatar
Deike Kleberg committed
840

Deike Kleberg's avatar
Deike Kleberg committed
841
842
843
844
845
846
847
848
849
/**
  @brief is invoked by the calculator PEs, to inform
  the I/O PEs that no more data will be written.

  @param

  @return
*/

Deike Kleberg's avatar
Deike Kleberg committed
850
void pioFinalize ( void )
Deike Kleberg's avatar
Deike Kleberg committed
851
{
852
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
853
  int collID, ibuffer = 1111;
854
  xdebug("%s", "START");
Thomas Jahns's avatar
Thomas Jahns committed
855
  namespaceDelete(pioNamespace_);
Deike Kleberg's avatar
Deike Kleberg committed
856
  for ( collID = 0; collID < commInqNProcsColl (); collID++ )
857
    {
858
      xmpi ( MPI_Send ( &ibuffer, 1, MPI_INT, commInqNProcsModel (),
Deike Kleberg's avatar
Deike Kleberg committed
859
                        FINALIZE, commInqCommsIO ( collID )));
860
      xdebug("%s", "SENT MESSAGE WITH TAG \"FINALIZE\"");
861
    }
Deike Kleberg's avatar
Deike Kleberg committed
862
  modelWinCleanup ();
863
  commDestroy ();
864
865
  if (xtInitByCDI)
    xt_finalize();
866
  xdebug("%s", "RETURN");
867
#endif
868
869
}

870
 /************************************************************************/
Deike Kleberg's avatar