pio_interface.c 26.5 KB
Newer Older
1
2
3
4
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

5
#include <limits.h>
Deike Kleberg's avatar
Deike Kleberg committed
6
#include <stdlib.h>
7
#include <stdio.h>
8
#include <stdarg.h>
9
10
11

#ifdef USE_MPI
#include <mpi.h>
12
#include <yaxt.h>
13
14
#endif

15
#include "cdi.h"
Deike Kleberg's avatar
Deike Kleberg committed
16
#include "pio_util.h"
Deike Kleberg's avatar
Deike Kleberg committed
17
#include "vlist_var.h"
18
19

#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
20
#include "namespace.h"
21
#include "pio.h"
22
#include "pio_serialize.h"
23
#include "pio_interface.h"
Deike Kleberg's avatar
Deike Kleberg committed
24
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
25
#include "pio_rpc.h"
26
#include "pio_client.h"
Deike Kleberg's avatar
Deike Kleberg committed
27
#include "pio_server.h"
28
#include "resource_handle.h"
29
#include "cdi_int.h"
Deike Kleberg's avatar
Deike Kleberg committed
30
#include "vlist.h"
31

Deike Kleberg's avatar
Deike Kleberg committed
32
extern resOps streamOps;
Deike Kleberg's avatar
Deike Kleberg committed
33

34

35
static struct rdmaWin
36
37
{
  size_t size;
38
  unsigned char *buffer, *head;
39
  MPI_Win win;
40
  int postSet, refuseFuncCall;
41
  MPI_Group ioGroup;
42
  int dictSize, dictDataUsed, dictRPCUsed, dict;
43
44
} *txWin = NULL;

Deike Kleberg's avatar
Deike Kleberg committed
45

Deike Kleberg's avatar
Deike Kleberg committed
46
char * funcMap[nFuncs] = {"streamOpen", "streamDefVlist", "streamClose" };
Deike Kleberg's avatar
Deike Kleberg committed
47

48
float cdiPIOpartInflate_;
Deike Kleberg's avatar
Deike Kleberg committed
49

Deike Kleberg's avatar
Deike Kleberg committed
50
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
51

52
static int cmp ( const void * va, const void * vb )
Deike Kleberg's avatar
Deike Kleberg committed
53
{
54
55
    const int ** a, ** b;

Deike Kleberg's avatar
Deike Kleberg committed
56
57
  a = ( const int ** ) va;
  b = ( const int ** ) vb;
58

Deike Kleberg's avatar
Deike Kleberg committed
59
60
61
62
  return (( **a < **b ) - ( **a > **b ));
}

/****************************************************/
63

64
65
66
static void
mapProblems(int problemSizes[], int * problemMapping, int nProblems,
            int nWriter, double * w)
Deike Kleberg's avatar
Deike Kleberg committed
67
68
69
70
71
72
73
{
  int *ip[nProblems];
  int dummy[nProblems];
  int buckets[nWriter];
  int currCapacity, nextCapacity;
  double meanBucket[nWriter];
  int sum = 0;
Thomas Jahns's avatar
Thomas Jahns committed
74
  int writerIdx = 0;
Deike Kleberg's avatar
Deike Kleberg committed
75
  int i, j;
76

Deike Kleberg's avatar
Deike Kleberg committed
77
78
79
80
81
82
83
84
85
86
87

  for ( i = 0; i < nProblems; i++ )
    {
      ip[i] = &problemSizes[i];
      sum += problemSizes[i];
    }

  qsort ( ip, nProblems, sizeof ( int * ), cmp );

  for ( i = 0; i < nProblems; i++ )
    dummy[i] = ip[i] - problemSizes;
88
89

  for ( j = 0; j < nWriter; j++ )
Deike Kleberg's avatar
Deike Kleberg committed
90
91
92
93
94
    meanBucket[j] = ( double ) sum * ( * ( w + j ));

  for ( i = 0; i < nProblems; i++ )
    {
      currCapacity = INT_MIN;
95

Deike Kleberg's avatar
Deike Kleberg committed
96
      for ( j = 0; j < nWriter; j++ )
97
98
99
	{
	  if ( !i ) buckets[j] = 0.0;
	  nextCapacity = meanBucket[j] - ( buckets[j] + ( *ip[i] ));
100

101
102
103
104
105
106
	  if ( nextCapacity > currCapacity )
	    {
	      currCapacity = nextCapacity;
	      writerIdx = j;
	    }
	}
Thomas Jahns's avatar
Thomas Jahns committed
107
      problemMapping[ dummy[i] ] = writerIdx;
Deike Kleberg's avatar
Deike Kleberg committed
108
109
      buckets[writerIdx] +=  *ip[i];
    }
Deike Kleberg's avatar
Deike Kleberg committed
110

111
112
113
  xprintArray3 (  "problemSizes = ", problemSizes, nProblems, DATATYPE_INT );
  xprintArray3 ( "vector of indices, qsort of problemSizes", dummy,
                nProblems, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
114
115
  xprintArray3 ( "problemMapping", problemMapping, nProblems, DATATYPE_INT );
  xprintArray3 ( "meanBucket", meanBucket, nWriter, DATATYPE_FLT );
116
  xprintArray3 ( "actual buckets", buckets, nWriter, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
117
118
}

119
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
120
121
122
123
124

/**
   @brief is encapsulated in CDI library.

   @param vSizes array with number of levels for all var_t 's in order of streams
125

Deike Kleberg's avatar
Deike Kleberg committed
126
127
   @param sSizes array with number of var_t for all stream_t 's

128
   @param varMapping return value, array with ranks of I/O PEs assigned to var_t 's
Deike Kleberg's avatar
Deike Kleberg committed
129
130
131
in order of vSizes

   @param nStreams number of stream_t 's
132
133

   @param nodeSizes array of number of I/O PEs on each physical nodes, increasing
Deike Kleberg's avatar
Deike Kleberg committed
134
135
136
   order of ranks assumed

   @param nNodes number of physical nodes hosting I/O PEs
137

Deike Kleberg's avatar
Deike Kleberg committed
138
139
   @return
*/
140

141
142
143
static void
varMapGen(int *vSizes, int *sSizes, int *varMapping,
          int nStreams, int *nodeSizes, int nNodes)
Deike Kleberg's avatar
Deike Kleberg committed
144
{
145

Deike Kleberg's avatar
Deike Kleberg committed
146
147
148
149
150
  int weightsStreams[nStreams];
  int streamMapping[nStreams];
  int nPEs = 0, nVars = 0;
  int i, j, k, offset = 0, offsetN = 0;
  double * w;
151

Deike Kleberg's avatar
Deike Kleberg committed
152
153
154
  int * weightsVarsNode;
  int * varMappingNode;
  int nVarsNode, summandRank = 0;
155
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
156

157
  int buckets[nProcsColl];
Deike Kleberg's avatar
Deike Kleberg committed
158
159
160
161
162
163

  for ( i = 0; i < nStreams; i++ )
    {
      nVars += * ( sSizes + i );
      weightsStreams[i] = 0;
      for ( j = 0; j < * ( sSizes + i ); j++ )
164
	weightsStreams[i] += * ( vSizes + offset++ );
Deike Kleberg's avatar
Deike Kleberg committed
165
166
    }

Deike Kleberg's avatar
Deike Kleberg committed
167
  w = ( double * ) xmalloc ( nNodes * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
168
169
170
171
172
173
174
175
176
177
178
179
180
  for ( j = 0; j < nNodes; j++ )
    nPEs += * ( nodeSizes + j );

  for ( j = 0; j < nNodes; j++ )
    w[j] = ( double ) * ( nodeSizes + j ) / ( double ) nPEs;

  mapProblems ( weightsStreams, streamMapping, nStreams, nNodes, w );
  free ( w );

  for ( i = 0; i < nNodes; i++ )
    {
      nVarsNode = 0;
      for ( j = 0; j < nStreams; j++ )
181
	if ( * ( streamMapping + j ) == i )
182
183
	  nVarsNode += * ( sSizes + j );

Deike Kleberg's avatar
Deike Kleberg committed
184
185
186
      weightsVarsNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      varMappingNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      w = ( double * ) xmalloc ( * ( nodeSizes + i ) * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
187
188
189
190
      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
191
192
193
194
195
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
	    * ( weightsVarsNode + offsetN++ ) = * ( vSizes + offset++ );
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
196
197

      for ( j = 0; j < * ( nodeSizes + i ); j ++ )
198
	w[j] = 1.0 / ( double ) * ( nodeSizes + i );
Deike Kleberg's avatar
Deike Kleberg committed
199

200
      mapProblems ( weightsVarsNode, varMappingNode, nVarsNode,
201
		    * ( nodeSizes + i ),  w );
Deike Kleberg's avatar
Deike Kleberg committed
202
203
204
205
206

      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
207
208
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
209
	    * ( varMapping + offset ++ ) =
210
              commCollID2RankGlob ( * ( varMappingNode + offsetN ++ ) +
211
                                    summandRank );
212
213
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
214
215
216
217
218
219
220

      summandRank += * ( nodeSizes + i );

      free ( w );
      free ( varMappingNode );
      free ( weightsVarsNode );
    }
221

222
  if ( ddebug )
Deike Kleberg's avatar
Deike Kleberg committed
223
    {
Deike Kleberg's avatar
Deike Kleberg committed
224
      xprintArray ( "varMapping", varMapping, nVars, DATATYPE_INT  );
225
      for ( i = 0; i < nProcsColl; i++ )
226
	buckets[i] = 0;
Deike Kleberg's avatar
Deike Kleberg committed
227
      for ( i = 0; i < nVars; i ++ )
228
	buckets[commRankGlob2CollID ( *(varMapping + i ))] += * ( vSizes + i );
229
      xprintArray ( "buckets", buckets, nProcsColl, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
230
    }
231
}
Deike Kleberg's avatar
Deike Kleberg committed
232

233
/************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
234

235
236
static void
varsMapNDeco(int nNodes, int *nodeSizes)
Deike Kleberg's avatar
Deike Kleberg committed
237
{
238
  int nStreams, nVars, * resHs, * streamSizes, * varSizes, * varMapping,
Thomas Jahns's avatar
Thomas Jahns committed
239
    * collectsData;
Deike Kleberg's avatar
Deike Kleberg committed
240
  int i, j, k = 0;
Deike Kleberg's avatar
Deike Kleberg committed
241
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
242

243
244
  xdebug ( "START, nProcsColl=%d", nProcsColl );

Deike Kleberg's avatar
Deike Kleberg committed
245
  nStreams = streamSize ();
246

247
248
  resHs       = xmalloc ( nStreams * sizeof ( resHs[0] ));
  streamSizes = xmalloc ( nStreams * sizeof ( streamSizes[0] ));
249
  collectsData = xmalloc ( nProcsColl * sizeof ( collectsData[0] ));
250
  streamGetIndexList ( nStreams, resHs );
251

Deike Kleberg's avatar
Deike Kleberg committed
252
  for ( i = 0; i < nStreams; i++ )
253
    streamSizes[i] = streamInqNvars ( * ( resHs + i ));
254

Deike Kleberg's avatar
Deike Kleberg committed
255
  nVars = xsum ( nStreams, streamSizes );
256
257
  varSizes   = xmalloc ( nVars * sizeof ( varSizes[0] ));
  varMapping = xmalloc ( nVars * sizeof ( varMapping[0] ));
258

Deike Kleberg's avatar
Deike Kleberg committed
259
260
  for ( i = 0; i < nStreams; i++ )
    for ( j = 0; j < * ( streamSizes + i ); j++ )
261
      varSizes[k++] += vlistInqVarSize ( streamInqVlist ( * ( resHs + i )), j );
262

Deike Kleberg's avatar
Deike Kleberg committed
263
  xassert ( k == nVars );
264

265
  varMapGen ( varSizes, streamSizes, varMapping,
Deike Kleberg's avatar
Deike Kleberg committed
266
	      nStreams, nodeSizes, nNodes );
267
268
269

  k = 0;
  for ( i = 0; i < nStreams; i++ )
270
    for ( j = 0; j < * ( streamSizes + i ); j++ )
271
      {
272
        vlistDefVarIOrank ( streamInqVlist ( * ( resHs + i )), j,
273
                            * ( varMapping + k ));
274
        vlistDefVarIOrank ( streamInqVlistIDorig ( * ( resHs + i )), j,
275
276
                            * ( varMapping + k ));
        collectsData[commRankGlob2CollID ( varMapping[k++] )] = 1;
277
      }
278
279
280

  for ( j = 0; j < nProcsColl; j++ )
    if ( collectsData[j] == 0 )
281
282
283
284
      xabort("AT LEAST ONE COLLECTOR PROCESS IDLES, "
             "CURRENTLY NOT COVERED: "
             "PE%d collects no data",
             commCollID2RankGlob(j));
285

286
287
288
289
290
  if ( varMapping )   free ( varMapping );
  if ( varSizes )     free ( varSizes );
  if ( collectsData ) free ( collectsData );
  if ( streamSizes )  free ( streamSizes );
  if ( resHs )        free ( resHs );
291

292
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
293
294
295
296
}

/************************************************************************/

297
static
Deike Kleberg's avatar
Deike Kleberg committed
298
299
void modelWinCleanup ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
300
  int collID;
Deike Kleberg's avatar
Deike Kleberg committed
301

302
  xdebug("%s", "START");
303
  if (txWin != NULL)
304
305
306
307
308
309
310
311
312
313
314
    {
      for ( collID = 0; collID < commInqNProcsColl (); collID ++ )
        {
          if (txWin[collID].postSet)
            xmpi(MPI_Win_wait(txWin[collID].win));
          xmpi(MPI_Win_free(&txWin[collID].win));
          xmpi ( MPI_Free_mem ( txWin[collID].buffer ));
          xmpi(MPI_Group_free(&txWin[collID].ioGroup));
        }
      free(txWin);
    }
Deike Kleberg's avatar
Deike Kleberg committed
315

316
  xdebug("%s", "RETURN. CLEANED UP MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
317
318
319
}

/************************************************************************/
320

321
322
struct collDesc
{
323
  int numDataRecords, numRPCRecords;
324
325
};

326
327
static void
modelWinDefBufferSizes(void)
Deike Kleberg's avatar
Deike Kleberg committed
328
{
329
  int collID, nstreams, * streamIndexList, streamNo, nvars, varID;
330
  int sumWinBufferSize = 0;
331
332
  int nProcsColl  = commInqNProcsColl ();
  int rankGlob    = commInqRankGlob ();
333
  int rankModel   = commInqRankModel ();
334
  int root = commInqRootGlob ();
335
  struct collDesc *collIndex;
Deike Kleberg's avatar
Deike Kleberg committed
336

337
  xdebug("%s", "START");
338
  xassert(txWin != NULL);
339

Deike Kleberg's avatar
Deike Kleberg committed
340
  nstreams = reshCountType ( &streamOps );
341
  streamIndexList = xmalloc ( nstreams * sizeof ( streamIndexList[0] ));
342
  collIndex = xcalloc(nProcsColl, sizeof (collIndex[0]));
343
  reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
Deike Kleberg's avatar
Deike Kleberg committed
344
345
  for ( streamNo = 0; streamNo < nstreams; streamNo++ )
    {
346
347
348
      // memory required for data
      int streamID = streamIndexList[streamNo];
      int vlistID = streamInqVlist(streamID);
Deike Kleberg's avatar
Deike Kleberg committed
349
350
351
      nvars = vlistNvars ( vlistID );
      for ( varID = 0; varID < nvars; varID++ )
        {
352
353
354
355
356
357
358
359
          int collID = commRankGlob2CollID(vlistInqVarIOrank(vlistID, varID));
          int collIDchunk;
          {
            int varSize = vlistInqVarSize(vlistID, varID);
            int nProcsModel = commInqNProcsModel();
            collIDchunk = (int)ceilf(cdiPIOpartInflate_
                                     * (varSize + nProcsModel - 1)/nProcsModel);
          }
Deike Kleberg's avatar
Deike Kleberg committed
360
          xassert ( collID != CDI_UNDEFID && collIDchunk > 0 );
361
362
363
364
365
366
367
368
369
          collIndex[collID].numDataRecords += 2;
          txWin[collID].size += (size_t)collIDchunk * sizeof (double)
            /* re-align chunks to multiple of double size */
            + sizeof (double) - 1
            /* one header for data record, one for corresponding part
             * descriptor*/
            + 2 * sizeof (union winHeaderEntry)
            /* FIXME: heuristic for size of packed Xt_idxlist */
            + sizeof (Xt_int) * collIDchunk * 3;
Deike Kleberg's avatar
Deike Kleberg committed
370
        }
371

372
      // memory required for the 3 function calls streamOpen, streamDefVlist, streamClose
Deike Kleberg's avatar
Deike Kleberg committed
373
      // once per stream and timestep for all collprocs only on the modelproc root
374
375
      if ( rankGlob == root )
        for ( collID = 0; collID < nProcsColl; collID++ )
376
          {
377
            collIndex[collID].numRPCRecords += 3;
378
379
380
381
            txWin[collID].size +=
              3 * sizeof (union winHeaderEntry)
              + MAXDATAFILENAME;
          }
Deike Kleberg's avatar
Deike Kleberg committed
382
    }
383
  for (collID = 0; collID < nProcsColl; ++collID)
384
    {
385
386
      int numRecords = 1 + collIndex[collID].numDataRecords
        + collIndex[collID].numRPCRecords;
387
      txWin[collID].dictSize = numRecords;
388
389
      txWin[collID].dictDataUsed = 1;
      txWin[collID].dictRPCUsed = 0;
390
      /* account for size header */
391
      txWin[collID].size += sizeof (union winHeaderEntry);
392
393
      txWin[collID].size = roundUpToMultiple(txWin[collID].size,
                                             PIO_WIN_ALIGN);
394
      sumWinBufferSize += txWin[collID].size;
395
    }
396
397
398
  free(collIndex);
  free ( streamIndexList );

399
400
  xdebug("sumWinBufferSize=%zu, MAXWINBUFFERSIZE=%zu", (size_t)sumWinBufferSize,
         (size_t)MAXWINBUFFERSIZE);
Deike Kleberg's avatar
Deike Kleberg committed
401
  xassert ( sumWinBufferSize <= MAXWINBUFFERSIZE );
402
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
403
}
404

Deike Kleberg's avatar
Deike Kleberg committed
405
406
407
408

/************************************************************************/


409
static
Deike Kleberg's avatar
Deike Kleberg committed
410
  void modelWinFlushBuffer ( int collID )
Deike Kleberg's avatar
Deike Kleberg committed
411
{
Deike Kleberg's avatar
Deike Kleberg committed
412
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
413

Deike Kleberg's avatar
Deike Kleberg committed
414
415
  xassert ( collID                >= 0         &&
            collID                < nProcsColl &&
416
            txWin[collID].buffer     != NULL      &&
417
418
            txWin != NULL      &&
            txWin[collID].size >= 0         &&
419
            txWin[collID].size <= MAXWINBUFFERSIZE);
420
  memset(txWin[collID].buffer, 0, txWin[collID].size);
421
422
  txWin[collID].head = txWin[collID].buffer
    + txWin[collID].dictSize * sizeof (union winHeaderEntry);
423
  txWin[collID].refuseFuncCall = 0;
424
425
  txWin[collID].dictDataUsed = 1;
  txWin[collID].dictRPCUsed = 0;
Deike Kleberg's avatar
Deike Kleberg committed
426
427
428
}


Deike Kleberg's avatar
Deike Kleberg committed
429
430
431
/************************************************************************/


Deike Kleberg's avatar
Deike Kleberg committed
432
433
434
static
void modelWinCreate ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
435
  int collID, ranks[1];
436
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
437

438
  xdebug("%s", "START");
439
  txWin = xmalloc(nProcsColl * sizeof (txWin[0]));
Deike Kleberg's avatar
Deike Kleberg committed
440

Deike Kleberg's avatar
Deike Kleberg committed
441
  modelWinDefBufferSizes ();
442
  ranks[0] = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
443

Deike Kleberg's avatar
Deike Kleberg committed
444
  for ( collID = 0; collID < nProcsColl; collID ++ )
Deike Kleberg's avatar
Deike Kleberg committed
445
    {
446
      xassert(txWin[collID].size > 0);
447
      txWin[collID].buffer = NULL;
448
      xmpi(MPI_Alloc_mem((MPI_Aint)txWin[collID].size, MPI_INFO_NULL,
449
450
                         &txWin[collID].buffer));
      xassert ( txWin[collID].buffer != NULL );
451
452
      txWin[collID].head = txWin[collID].buffer
        + txWin[collID].dictSize * sizeof (union winHeaderEntry);
453
      xmpi(MPI_Win_create(txWin[collID].buffer, (MPI_Aint)txWin[collID].size, 1,
454
                          MPI_INFO_NULL, commInqCommsIO(collID),
455
                          &txWin[collID].win));
456
457
458
      xmpi(MPI_Comm_group(commInqCommsIO(collID), &txWin[collID].ioGroup));
      xmpi(MPI_Group_incl(txWin[collID].ioGroup, 1, ranks,
                          &txWin[collID].ioGroup ));
Deike Kleberg's avatar
Deike Kleberg committed
459
    }
460
  xdebug("%s", "RETURN, CREATED MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
461
462
463
464
}

/************************************************************************/

465
static void
466
467
modelWinEnqueue(int collID,
                union winHeaderEntry header, const void *data, size_t size)
Deike Kleberg's avatar
Deike Kleberg committed
468
{
469
470
  union winHeaderEntry *winDict
    = (union winHeaderEntry *)txWin[collID].buffer;
471
  int targetEntry;
472
473
  if (header.dataRecord.streamID > 0)
    {
474
      targetEntry = (txWin[collID].dictDataUsed)++;
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
      int offset = header.dataRecord.offset
        = (int)roundUpToMultiple(txWin[collID].head - txWin[collID].buffer,
                                 sizeof (double));
      memcpy(txWin[collID].buffer + offset, data, size);
      txWin[collID].head = txWin[collID].buffer + offset + size;
    }
  else if (header.partDesc.partDescMarker == PARTDESCMARKER)
    {
      targetEntry = (txWin[collID].dictDataUsed)++;
      Xt_uid uid = header.partDesc.uid;
      int offset = -1;
      /* search if same uid entry has already been enqueued */
      for (int entry = 2; entry < targetEntry; entry += 2)
        {
          xassert(winDict[entry].partDesc.partDescMarker
                  == PARTDESCMARKER);
          if (winDict[entry].partDesc.uid == uid)
            {
              offset = winDict[entry].partDesc.offset;
              break;
            }
        }
      if (offset == -1)
        {
          /* not yet used partition descriptor, serialize at
           * current position */
          int position = 0;
          MPI_Comm comm = commInqCommsIO(collID);
          header.partDesc.offset
            = (int)(txWin[collID].head - txWin[collID].buffer);
          size_t size = xt_idxlist_get_pack_size((Xt_idxlist)data, comm);
          size_t remaining_size = txWin[collID].size
            - (txWin[collID].head - txWin[collID].buffer);
          xassert(size <= remaining_size);
          xt_idxlist_pack((Xt_idxlist)data, txWin[collID].head,
                          (int)remaining_size, &position, comm);
          txWin[collID].head += position;
        }
      else
        /* duplicate entries are copied only once per timestep */
        header.partDesc.offset = offset;
516
    }
517
  else
518
    {
519
520
521
522
523
524
525
526
      targetEntry = txWin[collID].dictSize - ++(txWin[collID].dictRPCUsed);
      if (header.funcCall.funcID == STREAMOPEN)
        {
          header.funcCall.funcArgs.newFile.offset
            = (int)(txWin[collID].head - txWin[collID].buffer);
          memcpy(txWin[collID].head, data, size);
          txWin[collID].head += size;
        }
527
    }
528
  winDict[targetEntry] = header;
Deike Kleberg's avatar
Deike Kleberg committed
529
530
}

531
532
533
534
535
536
void
pioBufferPartData(int streamID, int varID, const double *data,
                  int nmiss, Xt_idxlist partDesc)
{
  int vlistID, collID = CDI_UNDEFID;

Deike Kleberg's avatar
Deike Kleberg committed
537
538
  vlistID  = streamInqVlist ( streamID );
  collID   = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
539
540
  xassert ( collID         >= 0                    &&
            collID         <  commInqNProcsColl () &&
541
            txWin != NULL);
Deike Kleberg's avatar
Deike Kleberg committed
542

543
  if (txWin[collID].postSet)
544
    {
545
      xmpi(MPI_Win_wait(txWin[collID].win));
546
      txWin[collID].postSet = 0;
Deike Kleberg's avatar
Deike Kleberg committed
547
      modelWinFlushBuffer ( collID );
548
    }
549

550
551
552
553
  Xt_int chunk = xt_idxlist_get_num_indices(partDesc);
  xassert(chunk <= INT_MAX);

  union winHeaderEntry dataHeader
554
    = { .dataRecord = { streamID, varID, -1, nmiss } };
555
556
557
558
559
560
561
562
  modelWinEnqueue(collID, dataHeader, data, chunk * sizeof (data[0]));
  {
    union winHeaderEntry partHeader
      = { .partDesc = { .partDescMarker = PARTDESCMARKER,
                        .uid = xt_idxlist_get_uid(partDesc),
                        .offset = 0 } };
    modelWinEnqueue(collID, partHeader, partDesc, 0);
  }
563

564
  txWin[collID].refuseFuncCall = 1;
Deike Kleberg's avatar
Deike Kleberg committed
565
566
}

567
568
/************************************************************************/

569
void pioBufferFuncCall(int funcID, int argc, ... )
570
571
572
573
574
{
  va_list ap;
  int rankGlob = commInqRankGlob ();
  int root = commInqRootGlob ();
  int collID, nProcsColl = commInqNProcsColl ();
575

576
577
  xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
  xdebug("%s, func: %s", "START", funcMap[(-1 - funcID)]);
578

Deike Kleberg's avatar
Deike Kleberg committed
579
580
  if ( rankGlob != root ) return;

581
582
583
  xassert (argc          >= 1                    &&
           argc          <= 2                    &&
           txWin != NULL);
584

Deike Kleberg's avatar
Deike Kleberg committed
585
  va_start ( ap, argc );
586

Thomas Jahns's avatar
Thomas Jahns committed
587
588
589
590
591
  union winHeaderEntry header;
  char * filename = NULL;
  size_t filenamesz = 0;
  int streamID = 0, vlistID = 0, filetype = 0;

592
  switch ( funcID )
Deike Kleberg's avatar
Deike Kleberg committed
593
    {
594
    case STREAMCLOSE:
Thomas Jahns's avatar
Thomas Jahns committed
595
596
      xassert ( argc == 1 );
    case STREAMDEFVLIST:
597
      {
Thomas Jahns's avatar
Thomas Jahns committed
598
599
600
        streamID = va_arg(ap, int);
        vlistID = CDI_UNDEFID;
        if (funcID == STREAMDEFVLIST)
601
          {
Thomas Jahns's avatar
Thomas Jahns committed
602
603
            xassert ( argc == 2 );
            vlistID = va_arg(ap, int);
604
          }
Thomas Jahns's avatar
Thomas Jahns committed
605
606
607
608
        struct funcCallDesc f = { .funcID = funcID,
                                  .funcArgs.streamChange = { streamID,
                                                             vlistID } };
        header.funcCall = f;
609
      }
610
      break;
Deike Kleberg's avatar
Deike Kleberg committed
611
612
    case STREAMOPEN:
      {
Deike Kleberg's avatar
Deike Kleberg committed
613
        xassert ( argc == 2 );
Thomas Jahns's avatar
Thomas Jahns committed
614
615
        filename = va_arg(ap, char *);
        filenamesz = strlen(filename);
Deike Kleberg's avatar
Deike Kleberg committed
616
617
        xassert ( filenamesz > 0 &&
                  filenamesz < MAXDATAFILENAME );
Thomas Jahns's avatar
Thomas Jahns committed
618
619
620
621
622
623
624
        filetype = va_arg(ap, int);
        struct funcCallDesc f = { .funcID = STREAMOPEN,
                                  .funcArgs.newFile
                                  = { .fnamelen = filenamesz,
                                      .filetype = filetype } };
        header.funcCall = f;
        ++filenamesz;
Deike Kleberg's avatar
Deike Kleberg committed
625
      }
626
      break;
Thomas Jahns's avatar
Thomas Jahns committed
627
628
629
    default:
      xabort("FUNCTION NOT MAPPED!");
    }
630

Thomas Jahns's avatar
Thomas Jahns committed
631
632
  for (collID = 0; collID < nProcsColl; ++collID)
    {
633
634
      xassert((txWin[collID].dictSize - txWin[collID].dictRPCUsed)
              > txWin[collID].dictDataUsed);
Thomas Jahns's avatar
Thomas Jahns committed
635
636
637
638
639
640
641
642
643
      if (txWin[collID].postSet)
        {
          xmpi(MPI_Win_wait(txWin[collID].win));
          txWin[collID].postSet = 0;
          modelWinFlushBuffer ( collID );
        }
      xassert(txWin[collID].refuseFuncCall == 0);
      modelWinEnqueue(collID, header, filename, filenamesz);
    }
644

Thomas Jahns's avatar
Thomas Jahns committed
645
646
647
648
649
650
651
652
653
654
655
656
657
658
  switch ( funcID )
    {
    case STREAMCLOSE:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, streamID=%d",
             funcMap[(-1 - funcID)], streamID);
      break;
    case STREAMDEFVLIST:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, streamID=%d,"
             " vlistID=%d", funcMap[(-1 - funcID)], streamID, vlistID);
      break;
    case STREAMOPEN:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, filenamesz=%zu,"
             " filename=%s, filetype=%d",
             funcMap[(-1 - funcID)], filenamesz, filename, filetype);
659
      break;
660
    }
661

Thomas Jahns's avatar
Thomas Jahns committed
662
  va_end(ap);
663
  xdebug("%s", "RETURN");
664
665
}

666
667
668
669
#endif

/*****************************************************************************/

670
671
672
673
/* pioInit definition must currently compile even in non-MPI configurations */
#ifndef MPI_VERSION
#  define MPI_Comm int
#endif
674
675
676
/**
   @brief initializes the MPI_Communicators needed for the
  communication between the calculator PEs and the I/O PEs and within the
677
678
679
680
681
682
683
684
685
  group of I/O PEs.

  commGlob: all PEs

  commPIO: I/O PEs, PEs with highest ranks in commGlob

  commModel: calculating PEs, no I/O PEs

  commsIO[i]:
686
687

  Collective call
688
689
690

  @param comm MPI_Communicator of all calling PEs
  @param nIOP number of I/O PEs
691
692
693
  @param partInflate allow for array partitions on comute
  PE that are at most sized \f$ partInflate * \lceil arraySize /
  numComputePEs \rceil \f$
694
695
696
  @return int indicating wether the calling PE is a calcutator (1) or not (0)
*/

Thomas Jahns's avatar
Thomas Jahns committed
697
698
#ifdef USE_MPI
static int pioNamespace_ = -1;
699
static int xtInitByCDI = 0;
Thomas Jahns's avatar
Thomas Jahns committed
700
701
#endif

702
703
MPI_Comm
pioInit(MPI_Comm commGlob, int nProcsIO, int IOMode,
Thomas Jahns's avatar
Thomas Jahns committed
704
        int *pioNamespace, float partInflate)
705
{
706
#ifdef USE_MPI
Thomas Jahns's avatar
Thomas Jahns committed
707
  int sizeGlob;
Deike Kleberg's avatar
Deike Kleberg committed
708

Deike Kleberg's avatar
Deike Kleberg committed
709
  if ( IOMode < PIO_MINIOMODE || IOMode > PIO_MAXIOMODE )
Deike Kleberg's avatar
Deike Kleberg committed
710
    xabort ( "IOMODE IS NOT VALID." );
711

712
#ifdef _SX
Deike Kleberg's avatar
Deike Kleberg committed
713
714
  if ( IOMode ==  PIO_ASYNCH )
    xabort ( "PIO_ASYNCH DOES NOT WORK ON SX." );
715
716
#endif

717
718
719
720
  if ((xtInitByCDI = (!xt_initialized() || xt_finalized())))
    {
      xt_initialize(commGlob);
    }
721
722
  commInit ();
  commDefCommGlob ( commGlob );
723
  sizeGlob = commInqSizeGlob ();
724

725
726
  if (((IOMode != PIO_NONE && (nProcsIO <= 0 || nProcsIO > sizeGlob - 1)))
      || (IOMode == PIO_NONE && nProcsIO != 1))
727
728
    xabort("DISTRIBUTION OF TASKS ON PROCS IS NOT VALID.\n"
           "nProcsIO=%d, sizeGlob=%d\n", nProcsIO, sizeGlob);
Deike Kleberg's avatar
Deike Kleberg committed
729
730

  commDefNProcsIO ( nProcsIO );
Deike Kleberg's avatar
Deike Kleberg committed
731
  commDefIOMode   ( IOMode, PIO_MAXIOMODE, PIO_MINIOMODEWITHSPECIALPROCS );
Deike Kleberg's avatar
Deike Kleberg committed
732
  commDefCommPio  ();
733

734
735
736
  xassert(partInflate >= 1.0);
  cdiPIOpartInflate_ = partInflate;

Deike Kleberg's avatar
Deike Kleberg committed
737
  // JUST FOR TEST CASES WITH ONLY ONE MPI TASK
738
  if ( commInqSizeGlob () == 1 )
739
    {
Thomas Jahns's avatar
Thomas Jahns committed
740
      pioNamespace_ = *pioNamespace = namespaceNew(0);
Deike Kleberg's avatar
Deike Kleberg committed
741
742
      return commInqCommGlob ();
    }
743

744
745
  if ( commInqIsProcIO ())
    {
746
      serializeSetMPI();
Thomas Jahns's avatar
Thomas Jahns committed
747
      namespaceSwitchSet(NSSWITCH_ABORT, cdiAbortC_MPI);
748
749
      namespaceSwitchSet(NSSWITCH_FILE_OPEN, pioFileOpen);
      namespaceSwitchSet(NSSWITCH_FILE_CLOSE, pioFileClose);
750
      IOServer ();
Thomas Jahns's avatar
Thomas Jahns committed
751
      namespaceDelete(0);
752
      commDestroy ();
753
      xt_finalize();
754
      MPI_Finalize ();
755
      exit ( EXIT_SUCCESS );
756
757
    }
  else
758
    cdiPioClientSetup(&pioNamespace_, pioNamespace);
759

760
  xdebug ( "nProcsGlob=%d, RETURN", sizeGlob );
761
  return commInqCommModel ();
762
#else
763
  abort();
764
#endif
765
}
766

767
768
#ifndef MPI_VERSION
#  undef MPI_Comm
Deike Kleberg's avatar
Deike Kleberg committed
769
770
#endif

771
/*****************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
772

773
void  pioEndDef ( void )
774
775
{
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
776
  char   * buffer;
777
  int bufferSize;
778
  int rankGlob = commInqRankGlob ();
779

780
  xdebug("%s", "START");
781

Deike Kleberg's avatar
Deike Kleberg committed
782
  varsMapNDeco ( commInqNNodes (), commInqNodeSizes ());
Deike Kleberg's avatar
Deike Kleberg committed
783

784
  if ( rankGlob < commInqNProcsColl ())
Deike Kleberg's avatar
Deike Kleberg committed
785
    {
786
787
      MPI_Comm comm = commInqCommsIO ( rankGlob );
      reshPackBufferCreate(&buffer, &bufferSize, &comm);
788
789

      xmpi ( MPI_Send ( buffer, bufferSize, MPI_PACKED, commInqNProcsModel (),
790
                        RESOURCES, commInqCommsIO ( rankGlob )));
791

792
      xdebug("%s", "SENT MESSAGE WITH TAG \"RESOURCES\"");
793

Deike Kleberg's avatar
Deike Kleberg committed
794
      reshPackBufferDestroy ( &buffer );
Deike Kleberg's avatar
Deike Kleberg committed
795
    }
Deike Kleberg's avatar
Deike Kleberg committed
796
797

  modelWinCreate ();
798
  namespaceDefResStatus ( STAGE_TIMELOOP );
799
  xdebug("%s", "RETURN");
800
801
802
803
804
805
806
807
#endif
}

/************************************************************************/

void  pioEndTimestepping ( void )
{
#ifdef USE_MPI
808
  xdebug("%s", "START");
809
  namespaceDefResStatus ( STAGE_CLEANUP );
810
  xdebug("%s", "RETURN");
811
#endif
812
813
}

Deike Kleberg's avatar
Deike Kleberg committed
814

815
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
816

Deike Kleberg's avatar
Deike Kleberg committed
817

Deike Kleberg's avatar
Deike Kleberg committed
818
819
820
821
822
823
824
825
826
/**
  @brief is invoked by the calculator PEs, to inform
  the I/O PEs that no more data will be written.

  @param

  @return
*/

Deike Kleberg's avatar
Deike Kleberg committed
827
void pioFinalize ( void )
Deike Kleberg's avatar
Deike Kleberg committed
828
{
829
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
830
  int collID, ibuffer = 1111;
831
  xdebug("%s", "START");
Thomas Jahns's avatar
Thomas Jahns committed
832
  namespaceDelete(pioNamespace_);
Deike Kleberg's avatar
Deike Kleberg committed
833
  for ( collID = 0; collID < commInqNProcsColl (); collID++ )
834
    {
835
      xmpi ( MPI_Send ( &ibuffer, 1, MPI_INT, commInqNProcsModel (),
Deike Kleberg's avatar
Deike Kleberg committed
836
                        FINALIZE, commInqCommsIO ( collID )));
837
      xdebug("%s", "SENT MESSAGE WITH TAG \"FINALIZE\"");
838
    }
Deike Kleberg's avatar
Deike Kleberg committed
839
  modelWinCleanup ();
840
  commDestroy ();
841
842
  if (xtInitByCDI)
    xt_finalize();
843
  xdebug("%s", "RETURN");
844
#endif
845
846
}

847
 /************************************************************************/
848

Deike Kleberg's avatar
Deike Kleberg committed
849
void pioWriteTimestep ( int tsID, int vdate, int vtime )
850
851
{
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
852
  int collID, buffer[timestepSize], iAssert = 0;
853
  /* int tokenEnd = END; */
854
  int rankGlob = commInqRankGlob ();
855
856
  int nProcsColl = commInqNProcsColl ();
  int nProcsModel = commInqNProcsModel ();
857

858
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
859