pio_interface.c 26.4 KB
Newer Older
1
2
3
4
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

5
#include <limits.h>
Deike Kleberg's avatar
Deike Kleberg committed
6
#include <stdlib.h>
7
#include <stdio.h>
8
#include <stdarg.h>
9
10
11

#ifdef USE_MPI
#include <mpi.h>
12
#include <yaxt.h>
13
14
#endif

15
#include "cdi.h"
Deike Kleberg's avatar
Deike Kleberg committed
16
#include "pio_util.h"
Deike Kleberg's avatar
Deike Kleberg committed
17
#include "vlist_var.h"
18
19

#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
20
#include "namespace.h"
21
#include "pio.h"
22
#include "pio_serialize.h"
23
#include "pio_interface.h"
Deike Kleberg's avatar
Deike Kleberg committed
24
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
25
#include "pio_rpc.h"
26
#include "pio_client.h"
Deike Kleberg's avatar
Deike Kleberg committed
27
#include "pio_server.h"
28
#include "resource_handle.h"
29
#include "cdi_int.h"
Deike Kleberg's avatar
Deike Kleberg committed
30
#include "vlist.h"
31

Deike Kleberg's avatar
Deike Kleberg committed
32
extern resOps streamOps;
Deike Kleberg's avatar
Deike Kleberg committed
33

34

35
static struct rdmaWin
36
37
{
  size_t size;
38
  unsigned char *buffer, *head;
39
  MPI_Win win;
40
  int postSet, refuseFuncCall;
41
  MPI_Group ioGroup;
42
  int dictSize, dictDataUsed, dictRPCUsed, dict;
43
44
} *txWin = NULL;

Deike Kleberg's avatar
Deike Kleberg committed
45

Deike Kleberg's avatar
Deike Kleberg committed
46
char * funcMap[nFuncs] = {"streamOpen", "streamDefVlist", "streamClose" };
Deike Kleberg's avatar
Deike Kleberg committed
47

48
float cdiPIOpartInflate_;
Deike Kleberg's avatar
Deike Kleberg committed
49

Deike Kleberg's avatar
Deike Kleberg committed
50
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
51

52
static int cmp ( const void * va, const void * vb )
Deike Kleberg's avatar
Deike Kleberg committed
53
{
54
55
    const int ** a, ** b;

Deike Kleberg's avatar
Deike Kleberg committed
56
57
  a = ( const int ** ) va;
  b = ( const int ** ) vb;
58

Deike Kleberg's avatar
Deike Kleberg committed
59
60
61
62
  return (( **a < **b ) - ( **a > **b ));
}

/****************************************************/
63

64
65
66
static void
mapProblems(int problemSizes[], int * problemMapping, int nProblems,
            int nWriter, double * w)
Deike Kleberg's avatar
Deike Kleberg committed
67
68
69
70
71
72
73
{
  int *ip[nProblems];
  int dummy[nProblems];
  int buckets[nWriter];
  int currCapacity, nextCapacity;
  double meanBucket[nWriter];
  int sum = 0;
Thomas Jahns's avatar
Thomas Jahns committed
74
  int writerIdx = 0;
Deike Kleberg's avatar
Deike Kleberg committed
75
  int i, j;
76

Deike Kleberg's avatar
Deike Kleberg committed
77
78
79
80
81
82
83
84
85
86
87

  for ( i = 0; i < nProblems; i++ )
    {
      ip[i] = &problemSizes[i];
      sum += problemSizes[i];
    }

  qsort ( ip, nProblems, sizeof ( int * ), cmp );

  for ( i = 0; i < nProblems; i++ )
    dummy[i] = ip[i] - problemSizes;
88
89

  for ( j = 0; j < nWriter; j++ )
Deike Kleberg's avatar
Deike Kleberg committed
90
91
    meanBucket[j] = ( double ) sum * ( * ( w + j ));

Thomas Jahns's avatar
Thomas Jahns committed
92
93
  memset(buckets, 0, sizeof (buckets));

Deike Kleberg's avatar
Deike Kleberg committed
94
95
96
  for ( i = 0; i < nProblems; i++ )
    {
      currCapacity = INT_MIN;
97

Deike Kleberg's avatar
Deike Kleberg committed
98
      for ( j = 0; j < nWriter; j++ )
99
100
	{
	  nextCapacity = meanBucket[j] - ( buckets[j] + ( *ip[i] ));
101

102
103
104
105
106
107
	  if ( nextCapacity > currCapacity )
	    {
	      currCapacity = nextCapacity;
	      writerIdx = j;
	    }
	}
Thomas Jahns's avatar
Thomas Jahns committed
108
      problemMapping[ dummy[i] ] = writerIdx;
Deike Kleberg's avatar
Deike Kleberg committed
109
110
      buckets[writerIdx] +=  *ip[i];
    }
Deike Kleberg's avatar
Deike Kleberg committed
111

112
113
114
115
116
117
118
119
120
  if ( ddebug )
    {
      xprintArray3 (  "problemSizes = ", problemSizes, nProblems, DATATYPE_INT );
      xprintArray3 ( "vector of indices, qsort of problemSizes", dummy,
                    nProblems, DATATYPE_INT );
      xprintArray3 ( "problemMapping", problemMapping, nProblems, DATATYPE_INT );
      xprintArray3 ( "meanBucket", meanBucket, nWriter, DATATYPE_FLT );
      xprintArray3 ( "actual buckets", buckets, nWriter, DATATYPE_INT );
    }
Deike Kleberg's avatar
Deike Kleberg committed
121
122
}

123
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
124
125
126
127
128

/**
   @brief is encapsulated in CDI library.

   @param vSizes array with number of levels for all var_t 's in order of streams
129

Deike Kleberg's avatar
Deike Kleberg committed
130
131
   @param sSizes array with number of var_t for all stream_t 's

132
   @param varMapping return value, array with ranks of I/O PEs assigned to var_t 's
Deike Kleberg's avatar
Deike Kleberg committed
133
134
135
in order of vSizes

   @param nStreams number of stream_t 's
136
137

   @param nodeSizes array of number of I/O PEs on each physical nodes, increasing
Deike Kleberg's avatar
Deike Kleberg committed
138
139
140
   order of ranks assumed

   @param nNodes number of physical nodes hosting I/O PEs
141

Deike Kleberg's avatar
Deike Kleberg committed
142
143
   @return
*/
144

145
146
147
static void
varMapGen(int *vSizes, int *sSizes, int *varMapping,
          int nStreams, int *nodeSizes, int nNodes)
Deike Kleberg's avatar
Deike Kleberg committed
148
{
149

Deike Kleberg's avatar
Deike Kleberg committed
150
151
152
153
154
  int weightsStreams[nStreams];
  int streamMapping[nStreams];
  int nPEs = 0, nVars = 0;
  int i, j, k, offset = 0, offsetN = 0;
  double * w;
155

Deike Kleberg's avatar
Deike Kleberg committed
156
157
158
  int * weightsVarsNode;
  int * varMappingNode;
  int nVarsNode, summandRank = 0;
159
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
160

161
  int buckets[nProcsColl];
Deike Kleberg's avatar
Deike Kleberg committed
162
163
164
165
166
167

  for ( i = 0; i < nStreams; i++ )
    {
      nVars += * ( sSizes + i );
      weightsStreams[i] = 0;
      for ( j = 0; j < * ( sSizes + i ); j++ )
168
	weightsStreams[i] += * ( vSizes + offset++ );
Deike Kleberg's avatar
Deike Kleberg committed
169
170
    }

Deike Kleberg's avatar
Deike Kleberg committed
171
  w = ( double * ) xmalloc ( nNodes * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
172
173
174
175
176
177
178
179
180
181
182
183
184
  for ( j = 0; j < nNodes; j++ )
    nPEs += * ( nodeSizes + j );

  for ( j = 0; j < nNodes; j++ )
    w[j] = ( double ) * ( nodeSizes + j ) / ( double ) nPEs;

  mapProblems ( weightsStreams, streamMapping, nStreams, nNodes, w );
  free ( w );

  for ( i = 0; i < nNodes; i++ )
    {
      nVarsNode = 0;
      for ( j = 0; j < nStreams; j++ )
185
	if ( * ( streamMapping + j ) == i )
186
187
	  nVarsNode += * ( sSizes + j );

Deike Kleberg's avatar
Deike Kleberg committed
188
189
190
      weightsVarsNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      varMappingNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      w = ( double * ) xmalloc ( * ( nodeSizes + i ) * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
191
192
193
194
      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
195
196
197
198
199
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
	    * ( weightsVarsNode + offsetN++ ) = * ( vSizes + offset++ );
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
200
201

      for ( j = 0; j < * ( nodeSizes + i ); j ++ )
202
	w[j] = 1.0 / ( double ) * ( nodeSizes + i );
Deike Kleberg's avatar
Deike Kleberg committed
203

204
      mapProblems ( weightsVarsNode, varMappingNode, nVarsNode,
205
		    * ( nodeSizes + i ),  w );
Deike Kleberg's avatar
Deike Kleberg committed
206
207
208
209
210

      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
211
212
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
213
	    * ( varMapping + offset ++ ) =
214
              commCollID2RankGlob ( * ( varMappingNode + offsetN ++ ) +
215
                                    summandRank );
216
217
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
218
219
220
221
222
223
224

      summandRank += * ( nodeSizes + i );

      free ( w );
      free ( varMappingNode );
      free ( weightsVarsNode );
    }
225

226
  if ( ddebug )
Deike Kleberg's avatar
Deike Kleberg committed
227
    {
Deike Kleberg's avatar
Deike Kleberg committed
228
      xprintArray ( "varMapping", varMapping, nVars, DATATYPE_INT  );
229
      for ( i = 0; i < nProcsColl; i++ )
230
	buckets[i] = 0;
Deike Kleberg's avatar
Deike Kleberg committed
231
      for ( i = 0; i < nVars; i ++ )
232
	buckets[commRankGlob2CollID ( *(varMapping + i ))] += * ( vSizes + i );
233
      xprintArray ( "buckets", buckets, nProcsColl, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
234
    }
235
}
Deike Kleberg's avatar
Deike Kleberg committed
236

237
/************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
238

239
240
static void
varsMapNDeco(int nNodes, int *nodeSizes)
Deike Kleberg's avatar
Deike Kleberg committed
241
{
242
  int nStreams, nVars, * resHs, * streamSizes, * varSizes, * varMapping,
Thomas Jahns's avatar
Thomas Jahns committed
243
    * collectsData;
Deike Kleberg's avatar
Deike Kleberg committed
244
  int i, j, k = 0;
Deike Kleberg's avatar
Deike Kleberg committed
245
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
246

247
248
  xdebug ( "START, nProcsColl=%d", nProcsColl );

Deike Kleberg's avatar
Deike Kleberg committed
249
  nStreams = streamSize ();
250

251
252
  resHs       = xmalloc ( nStreams * sizeof ( resHs[0] ));
  streamSizes = xmalloc ( nStreams * sizeof ( streamSizes[0] ));
253
  collectsData = xmalloc ( nProcsColl * sizeof ( collectsData[0] ));
254
  streamGetIndexList ( nStreams, resHs );
255

Deike Kleberg's avatar
Deike Kleberg committed
256
  for ( i = 0; i < nStreams; i++ )
257
    streamSizes[i] = streamInqNvars ( * ( resHs + i ));
258

Deike Kleberg's avatar
Deike Kleberg committed
259
  nVars = xsum ( nStreams, streamSizes );
260
261
  varSizes   = xmalloc ( nVars * sizeof ( varSizes[0] ));
  varMapping = xmalloc ( nVars * sizeof ( varMapping[0] ));
262

Deike Kleberg's avatar
Deike Kleberg committed
263
264
  for ( i = 0; i < nStreams; i++ )
    for ( j = 0; j < * ( streamSizes + i ); j++ )
265
      varSizes[k++] += vlistInqVarSize ( streamInqVlist ( * ( resHs + i )), j );
266

Deike Kleberg's avatar
Deike Kleberg committed
267
  xassert ( k == nVars );
268

269
  varMapGen ( varSizes, streamSizes, varMapping,
Deike Kleberg's avatar
Deike Kleberg committed
270
	      nStreams, nodeSizes, nNodes );
271
272
273

  k = 0;
  for ( i = 0; i < nStreams; i++ )
274
    for ( j = 0; j < * ( streamSizes + i ); j++ )
275
      {
276
        vlistDefVarIOrank ( streamInqVlist ( * ( resHs + i )), j,
277
                            * ( varMapping + k ));
278
        vlistDefVarIOrank ( streamInqVlistIDorig ( * ( resHs + i )), j,
279
280
                            * ( varMapping + k ));
        collectsData[commRankGlob2CollID ( varMapping[k++] )] = 1;
281
      }
282
283
284

  for ( j = 0; j < nProcsColl; j++ )
    if ( collectsData[j] == 0 )
285
286
287
288
      xabort("AT LEAST ONE COLLECTOR PROCESS IDLES, "
             "CURRENTLY NOT COVERED: "
             "PE%d collects no data",
             commCollID2RankGlob(j));
289

290
291
292
293
294
  if ( varMapping )   free ( varMapping );
  if ( varSizes )     free ( varSizes );
  if ( collectsData ) free ( collectsData );
  if ( streamSizes )  free ( streamSizes );
  if ( resHs )        free ( resHs );
295

296
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
297
298
299
300
}

/************************************************************************/

301
static
Deike Kleberg's avatar
Deike Kleberg committed
302
303
void modelWinCleanup ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
304
  int collID;
Deike Kleberg's avatar
Deike Kleberg committed
305

306
  xdebug("%s", "START");
307
  if (txWin != NULL)
308
309
310
311
312
313
314
315
316
317
318
    {
      for ( collID = 0; collID < commInqNProcsColl (); collID ++ )
        {
          if (txWin[collID].postSet)
            xmpi(MPI_Win_wait(txWin[collID].win));
          xmpi(MPI_Win_free(&txWin[collID].win));
          xmpi ( MPI_Free_mem ( txWin[collID].buffer ));
          xmpi(MPI_Group_free(&txWin[collID].ioGroup));
        }
      free(txWin);
    }
Deike Kleberg's avatar
Deike Kleberg committed
319

320
  xdebug("%s", "RETURN. CLEANED UP MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
321
322
323
}

/************************************************************************/
324

325
326
struct collDesc
{
327
  int numDataRecords, numRPCRecords;
328
329
};

330
331
static void
modelWinDefBufferSizes(void)
Deike Kleberg's avatar
Deike Kleberg committed
332
{
333
  int collID, nstreams, * streamIndexList, streamNo, nvars, varID;
334
  int sumWinBufferSize = 0;
335
336
337
  int nProcsColl  = commInqNProcsColl ();
  int rankGlob    = commInqRankGlob ();
  int root = commInqRootGlob ();
338
  struct collDesc *collIndex;
Deike Kleberg's avatar
Deike Kleberg committed
339

340
  xdebug("%s", "START");
341
  xassert(txWin != NULL);
342

Deike Kleberg's avatar
Deike Kleberg committed
343
  nstreams = reshCountType ( &streamOps );
344
  streamIndexList = xmalloc ( nstreams * sizeof ( streamIndexList[0] ));
345
  collIndex = xcalloc(nProcsColl, sizeof (collIndex[0]));
346
  reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
Deike Kleberg's avatar
Deike Kleberg committed
347
348
  for ( streamNo = 0; streamNo < nstreams; streamNo++ )
    {
349
350
351
      // memory required for data
      int streamID = streamIndexList[streamNo];
      int vlistID = streamInqVlist(streamID);
Deike Kleberg's avatar
Deike Kleberg committed
352
353
354
      nvars = vlistNvars ( vlistID );
      for ( varID = 0; varID < nvars; varID++ )
        {
355
356
357
358
359
360
361
362
          int collID = commRankGlob2CollID(vlistInqVarIOrank(vlistID, varID));
          int collIDchunk;
          {
            int varSize = vlistInqVarSize(vlistID, varID);
            int nProcsModel = commInqNProcsModel();
            collIDchunk = (int)ceilf(cdiPIOpartInflate_
                                     * (varSize + nProcsModel - 1)/nProcsModel);
          }
Deike Kleberg's avatar
Deike Kleberg committed
363
          xassert ( collID != CDI_UNDEFID && collIDchunk > 0 );
364
365
366
367
368
369
370
371
372
          collIndex[collID].numDataRecords += 2;
          txWin[collID].size += (size_t)collIDchunk * sizeof (double)
            /* re-align chunks to multiple of double size */
            + sizeof (double) - 1
            /* one header for data record, one for corresponding part
             * descriptor*/
            + 2 * sizeof (union winHeaderEntry)
            /* FIXME: heuristic for size of packed Xt_idxlist */
            + sizeof (Xt_int) * collIDchunk * 3;
Deike Kleberg's avatar
Deike Kleberg committed
373
        }
374

375
      // memory required for the 3 function calls streamOpen, streamDefVlist, streamClose
Deike Kleberg's avatar
Deike Kleberg committed
376
      // once per stream and timestep for all collprocs only on the modelproc root
377
378
      if ( rankGlob == root )
        for ( collID = 0; collID < nProcsColl; collID++ )
379
          {
380
            collIndex[collID].numRPCRecords += 3;
381
382
383
384
            txWin[collID].size +=
              3 * sizeof (union winHeaderEntry)
              + MAXDATAFILENAME;
          }
Deike Kleberg's avatar
Deike Kleberg committed
385
    }
386
  for (collID = 0; collID < nProcsColl; ++collID)
387
    {
388
389
      int numRecords = 1 + collIndex[collID].numDataRecords
        + collIndex[collID].numRPCRecords;
390
      txWin[collID].dictSize = numRecords;
391
392
      txWin[collID].dictDataUsed = 1;
      txWin[collID].dictRPCUsed = 0;
393
      /* account for size header */
394
      txWin[collID].size += sizeof (union winHeaderEntry);
395
396
      txWin[collID].size = roundUpToMultiple(txWin[collID].size,
                                             PIO_WIN_ALIGN);
397
      sumWinBufferSize += txWin[collID].size;
398
    }
399
400
401
  free(collIndex);
  free ( streamIndexList );

402
403
  xdebug("sumWinBufferSize=%zu, MAXWINBUFFERSIZE=%zu", (size_t)sumWinBufferSize,
         (size_t)MAXWINBUFFERSIZE);
Deike Kleberg's avatar
Deike Kleberg committed
404
  xassert ( sumWinBufferSize <= MAXWINBUFFERSIZE );
405
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
406
}
407

Deike Kleberg's avatar
Deike Kleberg committed
408
409
410
411

/************************************************************************/


412
static
Deike Kleberg's avatar
Deike Kleberg committed
413
  void modelWinFlushBuffer ( int collID )
Deike Kleberg's avatar
Deike Kleberg committed
414
{
Deike Kleberg's avatar
Deike Kleberg committed
415
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
416

Deike Kleberg's avatar
Deike Kleberg committed
417
418
  xassert ( collID                >= 0         &&
            collID                < nProcsColl &&
419
            txWin[collID].buffer     != NULL      &&
420
421
            txWin != NULL      &&
            txWin[collID].size >= 0         &&
422
            txWin[collID].size <= MAXWINBUFFERSIZE);
423
  memset(txWin[collID].buffer, 0, txWin[collID].size);
424
425
  txWin[collID].head = txWin[collID].buffer
    + txWin[collID].dictSize * sizeof (union winHeaderEntry);
426
  txWin[collID].refuseFuncCall = 0;
427
428
  txWin[collID].dictDataUsed = 1;
  txWin[collID].dictRPCUsed = 0;
Deike Kleberg's avatar
Deike Kleberg committed
429
430
431
}


Deike Kleberg's avatar
Deike Kleberg committed
432
433
434
/************************************************************************/


Deike Kleberg's avatar
Deike Kleberg committed
435
436
437
static
void modelWinCreate ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
438
  int collID, ranks[1];
439
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
440

441
  xdebug("%s", "START");
442
  txWin = xmalloc(nProcsColl * sizeof (txWin[0]));
Deike Kleberg's avatar
Deike Kleberg committed
443

Deike Kleberg's avatar
Deike Kleberg committed
444
  modelWinDefBufferSizes ();
445
  ranks[0] = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
446

Deike Kleberg's avatar
Deike Kleberg committed
447
  for ( collID = 0; collID < nProcsColl; collID ++ )
Deike Kleberg's avatar
Deike Kleberg committed
448
    {
449
      xassert(txWin[collID].size > 0);
450
      txWin[collID].buffer = NULL;
451
      xmpi(MPI_Alloc_mem((MPI_Aint)txWin[collID].size, MPI_INFO_NULL,
452
453
                         &txWin[collID].buffer));
      xassert ( txWin[collID].buffer != NULL );
454
455
      txWin[collID].head = txWin[collID].buffer
        + txWin[collID].dictSize * sizeof (union winHeaderEntry);
456
      xmpi(MPI_Win_create(txWin[collID].buffer, (MPI_Aint)txWin[collID].size, 1,
457
                          MPI_INFO_NULL, commInqCommsIO(collID),
458
                          &txWin[collID].win));
459
460
461
      xmpi(MPI_Comm_group(commInqCommsIO(collID), &txWin[collID].ioGroup));
      xmpi(MPI_Group_incl(txWin[collID].ioGroup, 1, ranks,
                          &txWin[collID].ioGroup ));
Deike Kleberg's avatar
Deike Kleberg committed
462
    }
463
  xdebug("%s", "RETURN, CREATED MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
464
465
466
467
}

/************************************************************************/

468
static void
469
470
modelWinEnqueue(int collID,
                union winHeaderEntry header, const void *data, size_t size)
Deike Kleberg's avatar
Deike Kleberg committed
471
{
472
473
  union winHeaderEntry *winDict
    = (union winHeaderEntry *)txWin[collID].buffer;
474
  int targetEntry;
475
476
  if (header.dataRecord.streamID > 0)
    {
477
      targetEntry = (txWin[collID].dictDataUsed)++;
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
      int offset = header.dataRecord.offset
        = (int)roundUpToMultiple(txWin[collID].head - txWin[collID].buffer,
                                 sizeof (double));
      memcpy(txWin[collID].buffer + offset, data, size);
      txWin[collID].head = txWin[collID].buffer + offset + size;
    }
  else if (header.partDesc.partDescMarker == PARTDESCMARKER)
    {
      targetEntry = (txWin[collID].dictDataUsed)++;
      Xt_uid uid = header.partDesc.uid;
      int offset = -1;
      /* search if same uid entry has already been enqueued */
      for (int entry = 2; entry < targetEntry; entry += 2)
        {
          xassert(winDict[entry].partDesc.partDescMarker
                  == PARTDESCMARKER);
          if (winDict[entry].partDesc.uid == uid)
            {
              offset = winDict[entry].partDesc.offset;
              break;
            }
        }
      if (offset == -1)
        {
          /* not yet used partition descriptor, serialize at
           * current position */
          int position = 0;
          MPI_Comm comm = commInqCommsIO(collID);
          header.partDesc.offset
            = (int)(txWin[collID].head - txWin[collID].buffer);
          size_t size = xt_idxlist_get_pack_size((Xt_idxlist)data, comm);
          size_t remaining_size = txWin[collID].size
            - (txWin[collID].head - txWin[collID].buffer);
          xassert(size <= remaining_size);
          xt_idxlist_pack((Xt_idxlist)data, txWin[collID].head,
                          (int)remaining_size, &position, comm);
          txWin[collID].head += position;
        }
      else
        /* duplicate entries are copied only once per timestep */
        header.partDesc.offset = offset;
519
    }
520
  else
521
    {
522
523
524
525
526
527
528
529
      targetEntry = txWin[collID].dictSize - ++(txWin[collID].dictRPCUsed);
      if (header.funcCall.funcID == STREAMOPEN)
        {
          header.funcCall.funcArgs.newFile.offset
            = (int)(txWin[collID].head - txWin[collID].buffer);
          memcpy(txWin[collID].head, data, size);
          txWin[collID].head += size;
        }
530
    }
531
  winDict[targetEntry] = header;
Deike Kleberg's avatar
Deike Kleberg committed
532
533
}

534
535
536
537
538
539
void
pioBufferPartData(int streamID, int varID, const double *data,
                  int nmiss, Xt_idxlist partDesc)
{
  int vlistID, collID = CDI_UNDEFID;

Deike Kleberg's avatar
Deike Kleberg committed
540
541
  vlistID  = streamInqVlist ( streamID );
  collID   = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
542
543
  xassert ( collID         >= 0                    &&
            collID         <  commInqNProcsColl () &&
544
            txWin != NULL);
Deike Kleberg's avatar
Deike Kleberg committed
545

546
  if (txWin[collID].postSet)
547
    {
548
      xmpi(MPI_Win_wait(txWin[collID].win));
549
      txWin[collID].postSet = 0;
Deike Kleberg's avatar
Deike Kleberg committed
550
      modelWinFlushBuffer ( collID );
551
    }
552

553
554
555
556
  Xt_int chunk = xt_idxlist_get_num_indices(partDesc);
  xassert(chunk <= INT_MAX);

  union winHeaderEntry dataHeader
557
    = { .dataRecord = { streamID, varID, -1, nmiss } };
558
559
560
561
562
563
564
565
  modelWinEnqueue(collID, dataHeader, data, chunk * sizeof (data[0]));
  {
    union winHeaderEntry partHeader
      = { .partDesc = { .partDescMarker = PARTDESCMARKER,
                        .uid = xt_idxlist_get_uid(partDesc),
                        .offset = 0 } };
    modelWinEnqueue(collID, partHeader, partDesc, 0);
  }
566

567
  txWin[collID].refuseFuncCall = 1;
Deike Kleberg's avatar
Deike Kleberg committed
568
569
}

570
571
/************************************************************************/

572
void pioBufferFuncCall(int funcID, int argc, ... )
573
574
575
576
577
{
  va_list ap;
  int rankGlob = commInqRankGlob ();
  int root = commInqRootGlob ();
  int collID, nProcsColl = commInqNProcsColl ();
578

579
580
  xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
  xdebug("%s, func: %s", "START", funcMap[(-1 - funcID)]);
581

Deike Kleberg's avatar
Deike Kleberg committed
582
583
  if ( rankGlob != root ) return;

584
585
586
  xassert (argc          >= 1                    &&
           argc          <= 2                    &&
           txWin != NULL);
587

Deike Kleberg's avatar
Deike Kleberg committed
588
  va_start ( ap, argc );
589

Thomas Jahns's avatar
Thomas Jahns committed
590
591
592
593
594
  union winHeaderEntry header;
  char * filename = NULL;
  size_t filenamesz = 0;
  int streamID = 0, vlistID = 0, filetype = 0;

595
  switch ( funcID )
Deike Kleberg's avatar
Deike Kleberg committed
596
    {
597
    case STREAMCLOSE:
Thomas Jahns's avatar
Thomas Jahns committed
598
599
      xassert ( argc == 1 );
    case STREAMDEFVLIST:
600
      {
Thomas Jahns's avatar
Thomas Jahns committed
601
602
603
        streamID = va_arg(ap, int);
        vlistID = CDI_UNDEFID;
        if (funcID == STREAMDEFVLIST)
604
          {
Thomas Jahns's avatar
Thomas Jahns committed
605
606
            xassert ( argc == 2 );
            vlistID = va_arg(ap, int);
607
          }
Thomas Jahns's avatar
Thomas Jahns committed
608
609
610
611
        struct funcCallDesc f = { .funcID = funcID,
                                  .funcArgs.streamChange = { streamID,
                                                             vlistID } };
        header.funcCall = f;
612
      }
613
      break;
Deike Kleberg's avatar
Deike Kleberg committed
614
615
    case STREAMOPEN:
      {
Deike Kleberg's avatar
Deike Kleberg committed
616
        xassert ( argc == 2 );
Thomas Jahns's avatar
Thomas Jahns committed
617
618
        filename = va_arg(ap, char *);
        filenamesz = strlen(filename);
Deike Kleberg's avatar
Deike Kleberg committed
619
620
        xassert ( filenamesz > 0 &&
                  filenamesz < MAXDATAFILENAME );
Thomas Jahns's avatar
Thomas Jahns committed
621
622
623
624
625
626
627
        filetype = va_arg(ap, int);
        struct funcCallDesc f = { .funcID = STREAMOPEN,
                                  .funcArgs.newFile
                                  = { .fnamelen = filenamesz,
                                      .filetype = filetype } };
        header.funcCall = f;
        ++filenamesz;
Deike Kleberg's avatar
Deike Kleberg committed
628
      }
629
      break;
Thomas Jahns's avatar
Thomas Jahns committed
630
631
632
    default:
      xabort("FUNCTION NOT MAPPED!");
    }
633

Thomas Jahns's avatar
Thomas Jahns committed
634
635
  for (collID = 0; collID < nProcsColl; ++collID)
    {
636
637
      xassert((txWin[collID].dictSize - txWin[collID].dictRPCUsed)
              > txWin[collID].dictDataUsed);
Thomas Jahns's avatar
Thomas Jahns committed
638
639
640
641
642
643
644
645
646
      if (txWin[collID].postSet)
        {
          xmpi(MPI_Win_wait(txWin[collID].win));
          txWin[collID].postSet = 0;
          modelWinFlushBuffer ( collID );
        }
      xassert(txWin[collID].refuseFuncCall == 0);
      modelWinEnqueue(collID, header, filename, filenamesz);
    }
647

Thomas Jahns's avatar
Thomas Jahns committed
648
649
650
651
652
653
654
655
656
657
658
659
660
661
  switch ( funcID )
    {
    case STREAMCLOSE:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, streamID=%d",
             funcMap[(-1 - funcID)], streamID);
      break;
    case STREAMDEFVLIST:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, streamID=%d,"
             " vlistID=%d", funcMap[(-1 - funcID)], streamID, vlistID);
      break;
    case STREAMOPEN:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, filenamesz=%zu,"
             " filename=%s, filetype=%d",
             funcMap[(-1 - funcID)], filenamesz, filename, filetype);
662
      break;
663
    }
664

Thomas Jahns's avatar
Thomas Jahns committed
665
  va_end(ap);
666
  xdebug("%s", "RETURN");
667
668
}

669
670
671
672
#endif

/*****************************************************************************/

673
674
675
676
/* pioInit definition must currently compile even in non-MPI configurations */
#ifndef MPI_VERSION
#  define MPI_Comm int
#endif
677
678
679
/**
   @brief initializes the MPI_Communicators needed for the
  communication between the calculator PEs and the I/O PEs and within the
680
681
682
683
684
685
686
687
688
  group of I/O PEs.

  commGlob: all PEs

  commPIO: I/O PEs, PEs with highest ranks in commGlob

  commModel: calculating PEs, no I/O PEs

  commsIO[i]:
689
690

  Collective call
691
692
693

  @param comm MPI_Communicator of all calling PEs
  @param nIOP number of I/O PEs
694
695
696
  @param partInflate allow for array partitions on comute
  PE that are at most sized \f$ partInflate * \lceil arraySize /
  numComputePEs \rceil \f$
697
698
699
  @return int indicating wether the calling PE is a calcutator (1) or not (0)
*/

Thomas Jahns's avatar
Thomas Jahns committed
700
701
#ifdef USE_MPI
static int pioNamespace_ = -1;
702
static int xtInitByCDI = 0;
Thomas Jahns's avatar
Thomas Jahns committed
703
704
#endif

705
706
MPI_Comm
pioInit(MPI_Comm commGlob, int nProcsIO, int IOMode,
Thomas Jahns's avatar
Thomas Jahns committed
707
        int *pioNamespace, float partInflate)
708
{
709
#ifdef USE_MPI
Thomas Jahns's avatar
Thomas Jahns committed
710
  int sizeGlob;
Deike Kleberg's avatar
Deike Kleberg committed
711

Deike Kleberg's avatar
Deike Kleberg committed
712
  if ( IOMode < PIO_MINIOMODE || IOMode > PIO_MAXIOMODE )
Deike Kleberg's avatar
Deike Kleberg committed
713
    xabort ( "IOMODE IS NOT VALID." );
714

715
#ifdef _SX
Deike Kleberg's avatar
Deike Kleberg committed
716
717
  if ( IOMode ==  PIO_ASYNCH )
    xabort ( "PIO_ASYNCH DOES NOT WORK ON SX." );
718
719
#endif

720
721
722
723
  if ((xtInitByCDI = (!xt_initialized() || xt_finalized())))
    {
      xt_initialize(commGlob);
    }
724
725
  commInit ();
  commDefCommGlob ( commGlob );
726
  sizeGlob = commInqSizeGlob ();
727

728
729
  if (((IOMode != PIO_NONE && (nProcsIO <= 0 || nProcsIO > sizeGlob - 1)))
      || (IOMode == PIO_NONE && nProcsIO != 1))
730
731
    xabort("DISTRIBUTION OF TASKS ON PROCS IS NOT VALID.\n"
           "nProcsIO=%d, sizeGlob=%d\n", nProcsIO, sizeGlob);
Deike Kleberg's avatar
Deike Kleberg committed
732
733

  commDefNProcsIO ( nProcsIO );
Deike Kleberg's avatar
Deike Kleberg committed
734
  commDefIOMode   ( IOMode, PIO_MAXIOMODE, PIO_MINIOMODEWITHSPECIALPROCS );
Deike Kleberg's avatar
Deike Kleberg committed
735
  commDefCommPio  ();
736

737
738
739
  xassert(partInflate >= 1.0);
  cdiPIOpartInflate_ = partInflate;

Deike Kleberg's avatar
Deike Kleberg committed
740
  // JUST FOR TEST CASES WITH ONLY ONE MPI TASK
741
  if ( commInqSizeGlob () == 1 )
742
    {
743
      pioNamespace_ = *pioNamespace = namespaceNew();
Deike Kleberg's avatar
Deike Kleberg committed
744
745
      return commInqCommGlob ();
    }
746

747
748
  if ( commInqIsProcIO ())
    {
749
      serializeSetMPI();
750
751
752
      namespaceSwitchSet(NSSWITCH_ABORT, NSSW_FUNC(cdiAbortC_MPI));
      namespaceSwitchSet(NSSWITCH_FILE_OPEN, NSSW_FUNC(pioFileOpen));
      namespaceSwitchSet(NSSWITCH_FILE_CLOSE, NSSW_FUNC(pioFileClose));
753
      IOServer ();
Thomas Jahns's avatar
Thomas Jahns committed
754
      namespaceDelete(0);
755
      commDestroy ();
756
      xt_finalize();
757
      MPI_Finalize ();
758
      exit ( EXIT_SUCCESS );
759
760
    }
  else
761
    cdiPioClientSetup(&pioNamespace_, pioNamespace);
762

763
  xdebug ( "nProcsGlob=%d, RETURN", sizeGlob );
764
  return commInqCommModel ();
765
#else
766
  abort();
767
#endif
768
}
769

770
771
#ifndef MPI_VERSION
#  undef MPI_Comm
Deike Kleberg's avatar
Deike Kleberg committed
772
773
#endif

774
/*****************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
775

776
void  pioEndDef ( void )
777
778
{
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
779
  char   * buffer;
780
  int bufferSize;
781
  int rankGlob = commInqRankGlob ();
782

783
  xdebug("%s", "START");
784

Deike Kleberg's avatar
Deike Kleberg committed
785
  varsMapNDeco ( commInqNNodes (), commInqNodeSizes ());
Deike Kleberg's avatar
Deike Kleberg committed
786

787
  if ( rankGlob < commInqNProcsColl ())
Deike Kleberg's avatar
Deike Kleberg committed
788
    {
789
790
      MPI_Comm comm = commInqCommsIO ( rankGlob );
      reshPackBufferCreate(&buffer, &bufferSize, &comm);
791
792

      xmpi ( MPI_Send ( buffer, bufferSize, MPI_PACKED, commInqNProcsModel (),
793
                        RESOURCES, commInqCommsIO ( rankGlob )));
794

795
      xdebug("%s", "SENT MESSAGE WITH TAG \"RESOURCES\"");
796

Deike Kleberg's avatar
Deike Kleberg committed
797
      reshPackBufferDestroy ( &buffer );
Deike Kleberg's avatar
Deike Kleberg committed
798
    }
Deike Kleberg's avatar
Deike Kleberg committed
799
800

  modelWinCreate ();
801
  namespaceDefResStatus ( STAGE_TIMELOOP );
802
  xdebug("%s", "RETURN");
803
804
805
806
807
808
809
810
#endif
}

/************************************************************************/

void  pioEndTimestepping ( void )
{
#ifdef USE_MPI
811
  xdebug("%s", "START");
812
  namespaceDefResStatus ( STAGE_CLEANUP );
813
  xdebug("%s", "RETURN");
814
#endif
815
816
}

Deike Kleberg's avatar
Deike Kleberg committed
817

818
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
819

Deike Kleberg's avatar
Deike Kleberg committed
820

Deike Kleberg's avatar
Deike Kleberg committed
821
822
823
824
825
826
827
828
829
/**
  @brief is invoked by the calculator PEs, to inform
  the I/O PEs that no more data will be written.

  @param

  @return
*/

Deike Kleberg's avatar
Deike Kleberg committed
830
void pioFinalize ( void )
Deike Kleberg's avatar
Deike Kleberg committed
831
{
832
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
833
  int collID, ibuffer = 1111;
834
  xdebug("%s", "START");
Thomas Jahns's avatar
Thomas Jahns committed
835
  namespaceDelete(pioNamespace_);
Deike Kleberg's avatar
Deike Kleberg committed
836
  for ( collID = 0; collID < commInqNProcsColl (); collID++ )
837
    {
838
      xmpi ( MPI_Send ( &ibuffer, 1, MPI_INT, commInqNProcsModel (),
Deike Kleberg's avatar
Deike Kleberg committed
839
                        FINALIZE, commInqCommsIO ( collID )));
840
      xdebug("%s", "SENT MESSAGE WITH TAG \"FINALIZE\"");
841
    }
Deike Kleberg's avatar
Deike Kleberg committed
842
  modelWinCleanup ();
843
  commDestroy ();
844
845
  if (xtInitByCDI)
    xt_finalize();
846
  xdebug("%s", "RETURN");
847
#endif
848
849
}

850
 /************************************************************************/
851

Deike Kleberg's avatar
Deike Kleberg committed
852
void pioWriteTimestep ( int tsID, int vdate, int vtime )
853
854
{
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
855
  int collID, buffer[timestepSize], iAssert = 0;
856
  /* int tokenEnd = END; */
857
  int rankGlob = commInqRankGlob ();
858
859
  int nProcsColl = commInqNProcsColl ();
  int nProcsModel = commInqNProcsModel ();
860

861
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
862

Deike Kleberg's avatar
Deike Kleberg committed
863
864
865
  xassert ( tsID       >= 0     &&
            vdate      >= 0     &&
            vtime      >= 0     &&
866
            txWin != NULL);