pio_interface.c 26.5 KB
Newer Older
1
2
3
4
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

5
#include <limits.h>
Deike Kleberg's avatar
Deike Kleberg committed
6
#include <stdlib.h>
7
#include <stdio.h>
8
#include <stdarg.h>
9
10
11

#ifdef USE_MPI
#include <mpi.h>
12
#include <yaxt.h>
13
14
#endif

15
#include "cdi.h"
Deike Kleberg's avatar
Deike Kleberg committed
16
#include "pio_util.h"
Deike Kleberg's avatar
Deike Kleberg committed
17
#include "vlist_var.h"
18
19

#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
20
#include "namespace.h"
21
#include "pio.h"
22
#include "pio_serialize.h"
23
#include "pio_interface.h"
Deike Kleberg's avatar
Deike Kleberg committed
24
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
25
#include "pio_rpc.h"
26
#include "pio_client.h"
Deike Kleberg's avatar
Deike Kleberg committed
27
#include "pio_server.h"
28
#include "resource_handle.h"
29
#include "cdi_int.h"
Deike Kleberg's avatar
Deike Kleberg committed
30
#include "vlist.h"
31

Deike Kleberg's avatar
Deike Kleberg committed
32
extern resOps streamOps;
Deike Kleberg's avatar
Deike Kleberg committed
33

34

35
static struct rdmaWin
36
37
{
  size_t size;
38
  unsigned char *buffer, *head;
39
  MPI_Win win;
40
  int postSet, refuseFuncCall;
41
  MPI_Group ioGroup;
42
  int dictSize, dictDataUsed, dictRPCUsed, dict;
43
44
} *txWin = NULL;

Deike Kleberg's avatar
Deike Kleberg committed
45

46
47
48
49
50
const char * const funcMap[numRPCFuncs] = {
  "streamOpen",
  "streamDefVlist",
  "streamClose"
};
Deike Kleberg's avatar
Deike Kleberg committed
51

52
float cdiPIOpartInflate_;
Deike Kleberg's avatar
Deike Kleberg committed
53

Deike Kleberg's avatar
Deike Kleberg committed
54
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
55

56
static int cmp ( const void * va, const void * vb )
Deike Kleberg's avatar
Deike Kleberg committed
57
{
58
59
    const int ** a, ** b;

Deike Kleberg's avatar
Deike Kleberg committed
60
61
  a = ( const int ** ) va;
  b = ( const int ** ) vb;
62

Deike Kleberg's avatar
Deike Kleberg committed
63
64
65
66
  return (( **a < **b ) - ( **a > **b ));
}

/****************************************************/
67

68
69
70
static void
mapProblems(int problemSizes[], int * problemMapping, int nProblems,
            int nWriter, double * w)
Deike Kleberg's avatar
Deike Kleberg committed
71
72
73
74
75
76
77
{
  int *ip[nProblems];
  int dummy[nProblems];
  int buckets[nWriter];
  int currCapacity, nextCapacity;
  double meanBucket[nWriter];
  int sum = 0;
Thomas Jahns's avatar
Thomas Jahns committed
78
  int writerIdx = 0;
Deike Kleberg's avatar
Deike Kleberg committed
79
  int i, j;
80

Deike Kleberg's avatar
Deike Kleberg committed
81
82
83
84
85
86
87
88
89
90
91

  for ( i = 0; i < nProblems; i++ )
    {
      ip[i] = &problemSizes[i];
      sum += problemSizes[i];
    }

  qsort ( ip, nProblems, sizeof ( int * ), cmp );

  for ( i = 0; i < nProblems; i++ )
    dummy[i] = ip[i] - problemSizes;
92
93

  for ( j = 0; j < nWriter; j++ )
Deike Kleberg's avatar
Deike Kleberg committed
94
95
    meanBucket[j] = ( double ) sum * ( * ( w + j ));

Thomas Jahns's avatar
Thomas Jahns committed
96
97
  memset(buckets, 0, sizeof (buckets));

Deike Kleberg's avatar
Deike Kleberg committed
98
99
100
  for ( i = 0; i < nProblems; i++ )
    {
      currCapacity = INT_MIN;
101

Deike Kleberg's avatar
Deike Kleberg committed
102
      for ( j = 0; j < nWriter; j++ )
103
104
	{
	  nextCapacity = meanBucket[j] - ( buckets[j] + ( *ip[i] ));
105

106
107
108
109
110
111
	  if ( nextCapacity > currCapacity )
	    {
	      currCapacity = nextCapacity;
	      writerIdx = j;
	    }
	}
Thomas Jahns's avatar
Thomas Jahns committed
112
      problemMapping[ dummy[i] ] = writerIdx;
Deike Kleberg's avatar
Deike Kleberg committed
113
114
      buckets[writerIdx] +=  *ip[i];
    }
Deike Kleberg's avatar
Deike Kleberg committed
115

116
117
118
119
120
121
122
123
124
  if ( ddebug )
    {
      xprintArray3 (  "problemSizes = ", problemSizes, nProblems, DATATYPE_INT );
      xprintArray3 ( "vector of indices, qsort of problemSizes", dummy,
                    nProblems, DATATYPE_INT );
      xprintArray3 ( "problemMapping", problemMapping, nProblems, DATATYPE_INT );
      xprintArray3 ( "meanBucket", meanBucket, nWriter, DATATYPE_FLT );
      xprintArray3 ( "actual buckets", buckets, nWriter, DATATYPE_INT );
    }
Deike Kleberg's avatar
Deike Kleberg committed
125
126
}

127
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
128
129
130
131
132

/**
   @brief is encapsulated in CDI library.

   @param vSizes array with number of levels for all var_t 's in order of streams
133

Deike Kleberg's avatar
Deike Kleberg committed
134
135
   @param sSizes array with number of var_t for all stream_t 's

136
   @param varMapping return value, array with ranks of I/O PEs assigned to var_t 's
Deike Kleberg's avatar
Deike Kleberg committed
137
138
139
in order of vSizes

   @param nStreams number of stream_t 's
140
141

   @param nodeSizes array of number of I/O PEs on each physical nodes, increasing
Deike Kleberg's avatar
Deike Kleberg committed
142
143
144
   order of ranks assumed

   @param nNodes number of physical nodes hosting I/O PEs
145

Deike Kleberg's avatar
Deike Kleberg committed
146
147
   @return
*/
148

149
150
151
static void
varMapGen(int *vSizes, int *sSizes, int *varMapping,
          int nStreams, int *nodeSizes, int nNodes)
Deike Kleberg's avatar
Deike Kleberg committed
152
{
153

Deike Kleberg's avatar
Deike Kleberg committed
154
155
156
157
158
  int weightsStreams[nStreams];
  int streamMapping[nStreams];
  int nPEs = 0, nVars = 0;
  int i, j, k, offset = 0, offsetN = 0;
  double * w;
159

Deike Kleberg's avatar
Deike Kleberg committed
160
161
162
  int * weightsVarsNode;
  int * varMappingNode;
  int nVarsNode, summandRank = 0;
163
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
164

165
  int buckets[nProcsColl];
Deike Kleberg's avatar
Deike Kleberg committed
166
167
168
169
170
171

  for ( i = 0; i < nStreams; i++ )
    {
      nVars += * ( sSizes + i );
      weightsStreams[i] = 0;
      for ( j = 0; j < * ( sSizes + i ); j++ )
172
	weightsStreams[i] += * ( vSizes + offset++ );
Deike Kleberg's avatar
Deike Kleberg committed
173
174
    }

Deike Kleberg's avatar
Deike Kleberg committed
175
  w = ( double * ) xmalloc ( nNodes * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
176
177
178
179
180
181
182
183
184
185
186
187
188
  for ( j = 0; j < nNodes; j++ )
    nPEs += * ( nodeSizes + j );

  for ( j = 0; j < nNodes; j++ )
    w[j] = ( double ) * ( nodeSizes + j ) / ( double ) nPEs;

  mapProblems ( weightsStreams, streamMapping, nStreams, nNodes, w );
  free ( w );

  for ( i = 0; i < nNodes; i++ )
    {
      nVarsNode = 0;
      for ( j = 0; j < nStreams; j++ )
189
	if ( * ( streamMapping + j ) == i )
190
191
	  nVarsNode += * ( sSizes + j );

Deike Kleberg's avatar
Deike Kleberg committed
192
193
194
      weightsVarsNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      varMappingNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      w = ( double * ) xmalloc ( * ( nodeSizes + i ) * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
195
196
197
198
      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
199
200
201
202
203
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
	    * ( weightsVarsNode + offsetN++ ) = * ( vSizes + offset++ );
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
204
205

      for ( j = 0; j < * ( nodeSizes + i ); j ++ )
206
	w[j] = 1.0 / ( double ) * ( nodeSizes + i );
Deike Kleberg's avatar
Deike Kleberg committed
207

208
      mapProblems ( weightsVarsNode, varMappingNode, nVarsNode,
209
		    * ( nodeSizes + i ),  w );
Deike Kleberg's avatar
Deike Kleberg committed
210
211
212
213
214

      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
215
216
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
217
	    * ( varMapping + offset ++ ) =
218
              commCollID2RankGlob ( * ( varMappingNode + offsetN ++ ) +
219
                                    summandRank );
220
221
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
222
223
224
225
226
227
228

      summandRank += * ( nodeSizes + i );

      free ( w );
      free ( varMappingNode );
      free ( weightsVarsNode );
    }
229

230
  if ( ddebug )
Deike Kleberg's avatar
Deike Kleberg committed
231
    {
Deike Kleberg's avatar
Deike Kleberg committed
232
      xprintArray ( "varMapping", varMapping, nVars, DATATYPE_INT  );
233
      for ( i = 0; i < nProcsColl; i++ )
234
	buckets[i] = 0;
Deike Kleberg's avatar
Deike Kleberg committed
235
      for ( i = 0; i < nVars; i ++ )
236
	buckets[commRankGlob2CollID ( *(varMapping + i ))] += * ( vSizes + i );
237
      xprintArray ( "buckets", buckets, nProcsColl, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
238
    }
239
}
Deike Kleberg's avatar
Deike Kleberg committed
240

241
/************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
242

243
244
static void
varsMapNDeco(int nNodes, int *nodeSizes)
Deike Kleberg's avatar
Deike Kleberg committed
245
{
246
  int nStreams, nVars, * resHs, * streamSizes, * varSizes, * varMapping,
Thomas Jahns's avatar
Thomas Jahns committed
247
    * collectsData;
Deike Kleberg's avatar
Deike Kleberg committed
248
  int i, j, k = 0;
Deike Kleberg's avatar
Deike Kleberg committed
249
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
250

251
252
  xdebug ( "START, nProcsColl=%d", nProcsColl );

Deike Kleberg's avatar
Deike Kleberg committed
253
  nStreams = streamSize ();
254

255
256
  resHs       = xmalloc ( nStreams * sizeof ( resHs[0] ));
  streamSizes = xmalloc ( nStreams * sizeof ( streamSizes[0] ));
257
  collectsData = xmalloc ( nProcsColl * sizeof ( collectsData[0] ));
258
  streamGetIndexList ( nStreams, resHs );
259

Deike Kleberg's avatar
Deike Kleberg committed
260
  for ( i = 0; i < nStreams; i++ )
261
    streamSizes[i] = streamInqNvars ( * ( resHs + i ));
262

Deike Kleberg's avatar
Deike Kleberg committed
263
  nVars = xsum ( nStreams, streamSizes );
264
265
  varSizes   = xmalloc ( nVars * sizeof ( varSizes[0] ));
  varMapping = xmalloc ( nVars * sizeof ( varMapping[0] ));
266

Deike Kleberg's avatar
Deike Kleberg committed
267
268
  for ( i = 0; i < nStreams; i++ )
    for ( j = 0; j < * ( streamSizes + i ); j++ )
269
      varSizes[k++] += vlistInqVarSize ( streamInqVlist ( * ( resHs + i )), j );
270

Deike Kleberg's avatar
Deike Kleberg committed
271
  xassert ( k == nVars );
272

273
  varMapGen ( varSizes, streamSizes, varMapping,
Deike Kleberg's avatar
Deike Kleberg committed
274
	      nStreams, nodeSizes, nNodes );
275
276
277

  k = 0;
  for ( i = 0; i < nStreams; i++ )
278
    for ( j = 0; j < * ( streamSizes + i ); j++ )
279
      {
280
        vlistDefVarIOrank ( streamInqVlist ( * ( resHs + i )), j,
281
                            * ( varMapping + k ));
282
        vlistDefVarIOrank ( streamInqVlistIDorig ( * ( resHs + i )), j,
283
284
                            * ( varMapping + k ));
        collectsData[commRankGlob2CollID ( varMapping[k++] )] = 1;
285
      }
286
287
288

  for ( j = 0; j < nProcsColl; j++ )
    if ( collectsData[j] == 0 )
289
290
291
292
      xabort("AT LEAST ONE COLLECTOR PROCESS IDLES, "
             "CURRENTLY NOT COVERED: "
             "PE%d collects no data",
             commCollID2RankGlob(j));
293

294
295
296
297
298
  if ( varMapping )   free ( varMapping );
  if ( varSizes )     free ( varSizes );
  if ( collectsData ) free ( collectsData );
  if ( streamSizes )  free ( streamSizes );
  if ( resHs )        free ( resHs );
299

300
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
301
302
303
304
}

/************************************************************************/

305
static
Deike Kleberg's avatar
Deike Kleberg committed
306
307
void modelWinCleanup ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
308
  int collID;
Deike Kleberg's avatar
Deike Kleberg committed
309

310
  xdebug("%s", "START");
311
  if (txWin != NULL)
312
313
314
315
316
317
318
319
320
321
322
    {
      for ( collID = 0; collID < commInqNProcsColl (); collID ++ )
        {
          if (txWin[collID].postSet)
            xmpi(MPI_Win_wait(txWin[collID].win));
          xmpi(MPI_Win_free(&txWin[collID].win));
          xmpi ( MPI_Free_mem ( txWin[collID].buffer ));
          xmpi(MPI_Group_free(&txWin[collID].ioGroup));
        }
      free(txWin);
    }
Deike Kleberg's avatar
Deike Kleberg committed
323

324
  xdebug("%s", "RETURN. CLEANED UP MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
325
326
327
}

/************************************************************************/
328

329
330
struct collDesc
{
331
  int numDataRecords, numRPCRecords;
332
333
};

334
335
static void
modelWinDefBufferSizes(void)
Deike Kleberg's avatar
Deike Kleberg committed
336
{
337
  int collID, nstreams, * streamIndexList, streamNo, nvars, varID;
338
  int sumWinBufferSize = 0;
339
340
341
  int nProcsColl  = commInqNProcsColl ();
  int rankGlob    = commInqRankGlob ();
  int root = commInqRootGlob ();
342
  struct collDesc *collIndex;
Deike Kleberg's avatar
Deike Kleberg committed
343

344
  xdebug("%s", "START");
345
  xassert(txWin != NULL);
346

Deike Kleberg's avatar
Deike Kleberg committed
347
  nstreams = reshCountType ( &streamOps );
348
  streamIndexList = xmalloc ( nstreams * sizeof ( streamIndexList[0] ));
349
  collIndex = xcalloc(nProcsColl, sizeof (collIndex[0]));
350
  reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
Deike Kleberg's avatar
Deike Kleberg committed
351
352
  for ( streamNo = 0; streamNo < nstreams; streamNo++ )
    {
353
354
355
      // memory required for data
      int streamID = streamIndexList[streamNo];
      int vlistID = streamInqVlist(streamID);
Deike Kleberg's avatar
Deike Kleberg committed
356
357
358
      nvars = vlistNvars ( vlistID );
      for ( varID = 0; varID < nvars; varID++ )
        {
359
360
361
362
363
364
365
366
          int collID = commRankGlob2CollID(vlistInqVarIOrank(vlistID, varID));
          int collIDchunk;
          {
            int varSize = vlistInqVarSize(vlistID, varID);
            int nProcsModel = commInqNProcsModel();
            collIDchunk = (int)ceilf(cdiPIOpartInflate_
                                     * (varSize + nProcsModel - 1)/nProcsModel);
          }
Deike Kleberg's avatar
Deike Kleberg committed
367
          xassert ( collID != CDI_UNDEFID && collIDchunk > 0 );
368
369
370
371
372
373
374
375
376
          collIndex[collID].numDataRecords += 2;
          txWin[collID].size += (size_t)collIDchunk * sizeof (double)
            /* re-align chunks to multiple of double size */
            + sizeof (double) - 1
            /* one header for data record, one for corresponding part
             * descriptor*/
            + 2 * sizeof (union winHeaderEntry)
            /* FIXME: heuristic for size of packed Xt_idxlist */
            + sizeof (Xt_int) * collIDchunk * 3;
Deike Kleberg's avatar
Deike Kleberg committed
377
        }
378

379
380
      // memory required for the function calls encoded
      // for remote execution
Deike Kleberg's avatar
Deike Kleberg committed
381
      // once per stream and timestep for all collprocs only on the modelproc root
382
383
      if ( rankGlob == root )
        for ( collID = 0; collID < nProcsColl; collID++ )
384
          {
385
            collIndex[collID].numRPCRecords += numRPCFuncs;
386
            txWin[collID].size +=
387
              numRPCFuncs * sizeof (union winHeaderEntry)
388
389
              + MAXDATAFILENAME;
          }
Deike Kleberg's avatar
Deike Kleberg committed
390
    }
391
  for (collID = 0; collID < nProcsColl; ++collID)
392
    {
393
394
      int numRecords = 1 + collIndex[collID].numDataRecords
        + collIndex[collID].numRPCRecords;
395
      txWin[collID].dictSize = numRecords;
396
397
      txWin[collID].dictDataUsed = 1;
      txWin[collID].dictRPCUsed = 0;
398
      /* account for size header */
399
      txWin[collID].size += sizeof (union winHeaderEntry);
400
401
      txWin[collID].size = roundUpToMultiple(txWin[collID].size,
                                             PIO_WIN_ALIGN);
402
      sumWinBufferSize += txWin[collID].size;
403
    }
404
405
406
  free(collIndex);
  free ( streamIndexList );

407
408
  xdebug("sumWinBufferSize=%zu, MAXWINBUFFERSIZE=%zu", (size_t)sumWinBufferSize,
         (size_t)MAXWINBUFFERSIZE);
Deike Kleberg's avatar
Deike Kleberg committed
409
  xassert ( sumWinBufferSize <= MAXWINBUFFERSIZE );
410
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
411
}
412

Deike Kleberg's avatar
Deike Kleberg committed
413
414
415
416

/************************************************************************/


417
static
Deike Kleberg's avatar
Deike Kleberg committed
418
  void modelWinFlushBuffer ( int collID )
Deike Kleberg's avatar
Deike Kleberg committed
419
{
Deike Kleberg's avatar
Deike Kleberg committed
420
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
421

Deike Kleberg's avatar
Deike Kleberg committed
422
423
  xassert ( collID                >= 0         &&
            collID                < nProcsColl &&
424
            txWin[collID].buffer     != NULL      &&
425
426
            txWin != NULL      &&
            txWin[collID].size >= 0         &&
427
            txWin[collID].size <= MAXWINBUFFERSIZE);
428
  memset(txWin[collID].buffer, 0, txWin[collID].size);
429
430
  txWin[collID].head = txWin[collID].buffer
    + txWin[collID].dictSize * sizeof (union winHeaderEntry);
431
  txWin[collID].refuseFuncCall = 0;
432
433
  txWin[collID].dictDataUsed = 1;
  txWin[collID].dictRPCUsed = 0;
Deike Kleberg's avatar
Deike Kleberg committed
434
435
436
}


Deike Kleberg's avatar
Deike Kleberg committed
437
438
439
/************************************************************************/


Deike Kleberg's avatar
Deike Kleberg committed
440
441
442
static
void modelWinCreate ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
443
  int collID, ranks[1];
444
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
445

446
  xdebug("%s", "START");
447
  txWin = xmalloc(nProcsColl * sizeof (txWin[0]));
Deike Kleberg's avatar
Deike Kleberg committed
448

Deike Kleberg's avatar
Deike Kleberg committed
449
  modelWinDefBufferSizes ();
450
  ranks[0] = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
451

Deike Kleberg's avatar
Deike Kleberg committed
452
  for ( collID = 0; collID < nProcsColl; collID ++ )
Deike Kleberg's avatar
Deike Kleberg committed
453
    {
454
      xassert(txWin[collID].size > 0);
455
      txWin[collID].buffer = NULL;
456
      xmpi(MPI_Alloc_mem((MPI_Aint)txWin[collID].size, MPI_INFO_NULL,
457
458
                         &txWin[collID].buffer));
      xassert ( txWin[collID].buffer != NULL );
459
460
      txWin[collID].head = txWin[collID].buffer
        + txWin[collID].dictSize * sizeof (union winHeaderEntry);
461
      xmpi(MPI_Win_create(txWin[collID].buffer, (MPI_Aint)txWin[collID].size, 1,
462
                          MPI_INFO_NULL, commInqCommsIO(collID),
463
                          &txWin[collID].win));
464
465
466
      xmpi(MPI_Comm_group(commInqCommsIO(collID), &txWin[collID].ioGroup));
      xmpi(MPI_Group_incl(txWin[collID].ioGroup, 1, ranks,
                          &txWin[collID].ioGroup ));
Deike Kleberg's avatar
Deike Kleberg committed
467
    }
468
  xdebug("%s", "RETURN, CREATED MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
469
470
471
472
}

/************************************************************************/

473
static void
474
475
modelWinEnqueue(int collID,
                union winHeaderEntry header, const void *data, size_t size)
Deike Kleberg's avatar
Deike Kleberg committed
476
{
477
478
  union winHeaderEntry *winDict
    = (union winHeaderEntry *)txWin[collID].buffer;
479
  int targetEntry;
480
481
  if (header.dataRecord.streamID > 0)
    {
482
      targetEntry = (txWin[collID].dictDataUsed)++;
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
      int offset = header.dataRecord.offset
        = (int)roundUpToMultiple(txWin[collID].head - txWin[collID].buffer,
                                 sizeof (double));
      memcpy(txWin[collID].buffer + offset, data, size);
      txWin[collID].head = txWin[collID].buffer + offset + size;
    }
  else if (header.partDesc.partDescMarker == PARTDESCMARKER)
    {
      targetEntry = (txWin[collID].dictDataUsed)++;
      Xt_uid uid = header.partDesc.uid;
      int offset = -1;
      /* search if same uid entry has already been enqueued */
      for (int entry = 2; entry < targetEntry; entry += 2)
        {
          xassert(winDict[entry].partDesc.partDescMarker
                  == PARTDESCMARKER);
          if (winDict[entry].partDesc.uid == uid)
            {
              offset = winDict[entry].partDesc.offset;
              break;
            }
        }
      if (offset == -1)
        {
          /* not yet used partition descriptor, serialize at
           * current position */
          int position = 0;
          MPI_Comm comm = commInqCommsIO(collID);
          header.partDesc.offset
            = (int)(txWin[collID].head - txWin[collID].buffer);
          size_t size = xt_idxlist_get_pack_size((Xt_idxlist)data, comm);
          size_t remaining_size = txWin[collID].size
            - (txWin[collID].head - txWin[collID].buffer);
          xassert(size <= remaining_size);
          xt_idxlist_pack((Xt_idxlist)data, txWin[collID].head,
                          (int)remaining_size, &position, comm);
          txWin[collID].head += position;
        }
      else
        /* duplicate entries are copied only once per timestep */
        header.partDesc.offset = offset;
524
    }
525
  else
526
    {
527
528
529
530
531
532
533
534
      targetEntry = txWin[collID].dictSize - ++(txWin[collID].dictRPCUsed);
      if (header.funcCall.funcID == STREAMOPEN)
        {
          header.funcCall.funcArgs.newFile.offset
            = (int)(txWin[collID].head - txWin[collID].buffer);
          memcpy(txWin[collID].head, data, size);
          txWin[collID].head += size;
        }
535
    }
536
  winDict[targetEntry] = header;
Deike Kleberg's avatar
Deike Kleberg committed
537
538
}

539
540
541
542
543
544
void
pioBufferPartData(int streamID, int varID, const double *data,
                  int nmiss, Xt_idxlist partDesc)
{
  int vlistID, collID = CDI_UNDEFID;

Deike Kleberg's avatar
Deike Kleberg committed
545
546
  vlistID  = streamInqVlist ( streamID );
  collID   = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
547
548
  xassert ( collID         >= 0                    &&
            collID         <  commInqNProcsColl () &&
549
            txWin != NULL);
Deike Kleberg's avatar
Deike Kleberg committed
550

551
  if (txWin[collID].postSet)
552
    {
553
      xmpi(MPI_Win_wait(txWin[collID].win));
554
      txWin[collID].postSet = 0;
Deike Kleberg's avatar
Deike Kleberg committed
555
      modelWinFlushBuffer ( collID );
556
    }
557

558
559
560
561
  Xt_int chunk = xt_idxlist_get_num_indices(partDesc);
  xassert(chunk <= INT_MAX);

  union winHeaderEntry dataHeader
562
    = { .dataRecord = { streamID, varID, -1, nmiss } };
563
564
565
566
567
568
569
570
  modelWinEnqueue(collID, dataHeader, data, chunk * sizeof (data[0]));
  {
    union winHeaderEntry partHeader
      = { .partDesc = { .partDescMarker = PARTDESCMARKER,
                        .uid = xt_idxlist_get_uid(partDesc),
                        .offset = 0 } };
    modelWinEnqueue(collID, partHeader, partDesc, 0);
  }
571

572
  txWin[collID].refuseFuncCall = 1;
Deike Kleberg's avatar
Deike Kleberg committed
573
574
}

575
576
/************************************************************************/

577
void pioBufferFuncCall(int funcID, int argc, ... )
578
579
580
581
582
{
  va_list ap;
  int rankGlob = commInqRankGlob ();
  int root = commInqRootGlob ();
  int collID, nProcsColl = commInqNProcsColl ();
583

584
585
  xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
  xdebug("%s, func: %s", "START", funcMap[(-1 - funcID)]);
586

Deike Kleberg's avatar
Deike Kleberg committed
587
588
  if ( rankGlob != root ) return;

589
590
591
  xassert (argc          >= 1                    &&
           argc          <= 2                    &&
           txWin != NULL);
592

Deike Kleberg's avatar
Deike Kleberg committed
593
  va_start ( ap, argc );
594

Thomas Jahns's avatar
Thomas Jahns committed
595
596
597
598
599
  union winHeaderEntry header;
  char * filename = NULL;
  size_t filenamesz = 0;
  int streamID = 0, vlistID = 0, filetype = 0;

600
  switch ( funcID )
Deike Kleberg's avatar
Deike Kleberg committed
601
    {
602
    case STREAMCLOSE:
Thomas Jahns's avatar
Thomas Jahns committed
603
604
      xassert ( argc == 1 );
    case STREAMDEFVLIST:
605
      {
Thomas Jahns's avatar
Thomas Jahns committed
606
607
608
        streamID = va_arg(ap, int);
        vlistID = CDI_UNDEFID;
        if (funcID == STREAMDEFVLIST)
609
          {
Thomas Jahns's avatar
Thomas Jahns committed
610
611
            xassert ( argc == 2 );
            vlistID = va_arg(ap, int);
612
          }
Thomas Jahns's avatar
Thomas Jahns committed
613
614
615
616
        struct funcCallDesc f = { .funcID = funcID,
                                  .funcArgs.streamChange = { streamID,
                                                             vlistID } };
        header.funcCall = f;
617
      }
618
      break;
Deike Kleberg's avatar
Deike Kleberg committed
619
620
    case STREAMOPEN:
      {
Deike Kleberg's avatar
Deike Kleberg committed
621
        xassert ( argc == 2 );
Thomas Jahns's avatar
Thomas Jahns committed
622
623
        filename = va_arg(ap, char *);
        filenamesz = strlen(filename);
Deike Kleberg's avatar
Deike Kleberg committed
624
625
        xassert ( filenamesz > 0 &&
                  filenamesz < MAXDATAFILENAME );
Thomas Jahns's avatar
Thomas Jahns committed
626
627
628
629
630
631
632
        filetype = va_arg(ap, int);
        struct funcCallDesc f = { .funcID = STREAMOPEN,
                                  .funcArgs.newFile
                                  = { .fnamelen = filenamesz,
                                      .filetype = filetype } };
        header.funcCall = f;
        ++filenamesz;
Deike Kleberg's avatar
Deike Kleberg committed
633
      }
634
      break;
Thomas Jahns's avatar
Thomas Jahns committed
635
636
637
    default:
      xabort("FUNCTION NOT MAPPED!");
    }
638

Thomas Jahns's avatar
Thomas Jahns committed
639
640
  for (collID = 0; collID < nProcsColl; ++collID)
    {
641
642
      xassert((txWin[collID].dictSize - txWin[collID].dictRPCUsed)
              > txWin[collID].dictDataUsed);
Thomas Jahns's avatar
Thomas Jahns committed
643
644
645
646
647
648
649
650
651
      if (txWin[collID].postSet)
        {
          xmpi(MPI_Win_wait(txWin[collID].win));
          txWin[collID].postSet = 0;
          modelWinFlushBuffer ( collID );
        }
      xassert(txWin[collID].refuseFuncCall == 0);
      modelWinEnqueue(collID, header, filename, filenamesz);
    }
652

Thomas Jahns's avatar
Thomas Jahns committed
653
654
655
656
657
658
659
660
661
662
663
664
665
666
  switch ( funcID )
    {
    case STREAMCLOSE:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, streamID=%d",
             funcMap[(-1 - funcID)], streamID);
      break;
    case STREAMDEFVLIST:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, streamID=%d,"
             " vlistID=%d", funcMap[(-1 - funcID)], streamID, vlistID);
      break;
    case STREAMOPEN:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, filenamesz=%zu,"
             " filename=%s, filetype=%d",
             funcMap[(-1 - funcID)], filenamesz, filename, filetype);
667
      break;
668
    }
669

Thomas Jahns's avatar
Thomas Jahns committed
670
  va_end(ap);
671
  xdebug("%s", "RETURN");
672
673
}

674
675
676
677
#endif

/*****************************************************************************/

678
679
680
681
/* pioInit definition must currently compile even in non-MPI configurations */
#ifndef MPI_VERSION
#  define MPI_Comm int
#endif
682
683
684
/**
   @brief initializes the MPI_Communicators needed for the
  communication between the calculator PEs and the I/O PEs and within the
685
686
687
688
689
690
691
692
693
  group of I/O PEs.

  commGlob: all PEs

  commPIO: I/O PEs, PEs with highest ranks in commGlob

  commModel: calculating PEs, no I/O PEs

  commsIO[i]:
694
695

  Collective call
696
697
698

  @param comm MPI_Communicator of all calling PEs
  @param nIOP number of I/O PEs
699
700
701
  @param partInflate allow for array partitions on comute
  PE that are at most sized \f$ partInflate * \lceil arraySize /
  numComputePEs \rceil \f$
702
703
704
  @return int indicating wether the calling PE is a calcutator (1) or not (0)
*/

Thomas Jahns's avatar
Thomas Jahns committed
705
706
#ifdef USE_MPI
static int pioNamespace_ = -1;
707
static int xtInitByCDI = 0;
Thomas Jahns's avatar
Thomas Jahns committed
708
709
#endif

710
711
MPI_Comm
pioInit(MPI_Comm commGlob, int nProcsIO, int IOMode,
Thomas Jahns's avatar
Thomas Jahns committed
712
        int *pioNamespace, float partInflate)
713
{
714
#ifdef USE_MPI
Thomas Jahns's avatar
Thomas Jahns committed
715
  int sizeGlob;
Deike Kleberg's avatar
Deike Kleberg committed
716

Deike Kleberg's avatar
Deike Kleberg committed
717
  if ( IOMode < PIO_MINIOMODE || IOMode > PIO_MAXIOMODE )
Deike Kleberg's avatar
Deike Kleberg committed
718
    xabort ( "IOMODE IS NOT VALID." );
719

720
#ifdef _SX
Deike Kleberg's avatar
Deike Kleberg committed
721
722
  if ( IOMode ==  PIO_ASYNCH )
    xabort ( "PIO_ASYNCH DOES NOT WORK ON SX." );
723
724
#endif

725
726
727
728
  if ((xtInitByCDI = (!xt_initialized() || xt_finalized())))
    {
      xt_initialize(commGlob);
    }
729
730
  commInit ();
  commDefCommGlob ( commGlob );
731
  sizeGlob = commInqSizeGlob ();
732

733
734
  if (((IOMode != PIO_NONE && (nProcsIO <= 0 || nProcsIO > sizeGlob - 1)))
      || (IOMode == PIO_NONE && nProcsIO != 1))
735
736
    xabort("DISTRIBUTION OF TASKS ON PROCS IS NOT VALID.\n"
           "nProcsIO=%d, sizeGlob=%d\n", nProcsIO, sizeGlob);
Deike Kleberg's avatar
Deike Kleberg committed
737
738

  commDefNProcsIO ( nProcsIO );
Deike Kleberg's avatar
Deike Kleberg committed
739
  commDefIOMode   ( IOMode, PIO_MAXIOMODE, PIO_MINIOMODEWITHSPECIALPROCS );
Deike Kleberg's avatar
Deike Kleberg committed
740
  commDefCommPio  ();
741

742
743
744
  xassert(partInflate >= 1.0);
  cdiPIOpartInflate_ = partInflate;

Deike Kleberg's avatar
Deike Kleberg committed
745
  // JUST FOR TEST CASES WITH ONLY ONE MPI TASK
746
  if ( commInqSizeGlob () == 1 )
747
    {
748
      pioNamespace_ = *pioNamespace = namespaceNew();
Deike Kleberg's avatar
Deike Kleberg committed
749
750
      return commInqCommGlob ();
    }
751

752
753
  if ( commInqIsProcIO ())
    {
754
      serializeSetMPI();
755
756
757
      namespaceSwitchSet(NSSWITCH_ABORT, NSSW_FUNC(cdiAbortC_MPI));
      namespaceSwitchSet(NSSWITCH_FILE_OPEN, NSSW_FUNC(pioFileOpen));
      namespaceSwitchSet(NSSWITCH_FILE_CLOSE, NSSW_FUNC(pioFileClose));
758
      IOServer ();
Thomas Jahns's avatar
Thomas Jahns committed
759
      namespaceDelete(0);
760
      commDestroy ();
761
      xt_finalize();
762
      MPI_Finalize ();
763
      exit ( EXIT_SUCCESS );
764
765
    }
  else
766
    cdiPioClientSetup(&pioNamespace_, pioNamespace);
767

768
  xdebug ( "nProcsGlob=%d, RETURN", sizeGlob );
769
  return commInqCommModel ();
770
#else
771
  abort();
772
#endif
773
}
774

775
776
#ifndef MPI_VERSION
#  undef MPI_Comm
Deike Kleberg's avatar
Deike Kleberg committed
777
778
#endif

779
/*****************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
780

781
void  pioEndDef ( void )
782
783
{
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
784
  char   * buffer;
785
  int bufferSize;
786
  int rankGlob = commInqRankGlob ();
787

788
  xdebug("%s", "START");
789

Deike Kleberg's avatar
Deike Kleberg committed
790
  varsMapNDeco ( commInqNNodes (), commInqNodeSizes ());
Deike Kleberg's avatar
Deike Kleberg committed
791

792
  if ( rankGlob < commInqNProcsColl ())
Deike Kleberg's avatar
Deike Kleberg committed
793
    {
794
795
      MPI_Comm comm = commInqCommsIO ( rankGlob );
      reshPackBufferCreate(&buffer, &bufferSize, &comm);
796
797

      xmpi ( MPI_Send ( buffer, bufferSize, MPI_PACKED, commInqNProcsModel (),
798
                        RESOURCES, commInqCommsIO ( rankGlob )));
799

800
      xdebug("%s", "SENT MESSAGE WITH TAG \"RESOURCES\"");
801

Deike Kleberg's avatar
Deike Kleberg committed
802
      reshPackBufferDestroy ( &buffer );
Deike Kleberg's avatar
Deike Kleberg committed
803
    }
Deike Kleberg's avatar
Deike Kleberg committed
804
805

  modelWinCreate ();
806
  namespaceDefResStatus ( STAGE_TIMELOOP );
807
  xdebug("%s", "RETURN");
808
809
810
811
812
813
814
815
#endif
}

/************************************************************************/

void  pioEndTimestepping ( void )
{
#ifdef USE_MPI
816
  xdebug("%s", "START");
817
  namespaceDefResStatus ( STAGE_CLEANUP );
818
  xdebug("%s", "RETURN");
819
#endif
820
821
}

Deike Kleberg's avatar
Deike Kleberg committed
822

823
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
824

Deike Kleberg's avatar
Deike Kleberg committed
825

Deike Kleberg's avatar
Deike Kleberg committed
826
827
828
829
830
831
832
833
834
/**
  @brief is invoked by the calculator PEs, to inform
  the I/O PEs that no more data will be written.

  @param

  @return
*/

Deike Kleberg's avatar
Deike Kleberg committed
835
void pioFinalize ( void )
Deike Kleberg's avatar
Deike Kleberg committed
836
{
837
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
838
  int collID, ibuffer = 1111;
839
  xdebug("%s", "START");
Thomas Jahns's avatar
Thomas Jahns committed
840
  namespaceDelete(pioNamespace_);
Deike Kleberg's avatar
Deike Kleberg committed
841
  for ( collID = 0; collID < commInqNProcsColl (); collID++ )
842
    {
843
      xmpi ( MPI_Send ( &ibuffer, 1, MPI_INT, commInqNProcsModel (),
Deike Kleberg's avatar
Deike Kleberg committed
844
                        FINALIZE, commInqCommsIO ( collID )));
845
      xdebug("%s", "SENT MESSAGE WITH TAG \"FINALIZE\"");
846
    }
Deike Kleberg's avatar
Deike Kleberg committed
847
  modelWinCleanup ();
848
  commDestroy ();
849
850
  if (xtInitByCDI)
    xt_finalize();
851
  xdebug("%s", "RETURN");
852
#endif
853
854
}

855
 /************************************************************************/
856

Deike Kleberg's avatar
Deike Kleberg committed
857
void pioWriteTimestep ( int tsID, int vdate, int vtime )
858
859
{
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
860
  int collID, buffer[timestepSize], iAssert = 0;
861
  /* int tokenEnd = END; */
862
  int rankGlob = commInqRankGlob ();
863
864
  int nProcsColl = commInqNProcsColl ();
  int nProcsModel = commInqNProcsModel ();
865

866
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
867

Deike Kleberg's avatar
Deike Kleberg committed