pio_interface.c 26.3 KB
Newer Older
1
2
3
4
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

5
#include <limits.h>
Deike Kleberg's avatar
Deike Kleberg committed
6
#include <stdlib.h>
7
#include <stdio.h>
8
#include <stdarg.h>
9
10
11

#ifdef USE_MPI
#include <mpi.h>
12
#include <yaxt.h>
13
14
#endif

15
#include "cdi.h"
Deike Kleberg's avatar
Deike Kleberg committed
16
#include "pio_util.h"
Deike Kleberg's avatar
Deike Kleberg committed
17
#include "vlist_var.h"
18
19

#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
20
#include "namespace.h"
21
#include "pio.h"
22
#include "pio_serialize.h"
23
#include "pio_interface.h"
Deike Kleberg's avatar
Deike Kleberg committed
24
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
25
#include "pio_rpc.h"
26
#include "pio_client.h"
Deike Kleberg's avatar
Deike Kleberg committed
27
#include "pio_server.h"
28
#include "resource_handle.h"
29
#include "cdi_int.h"
Deike Kleberg's avatar
Deike Kleberg committed
30
#include "vlist.h"
31

Deike Kleberg's avatar
Deike Kleberg committed
32
extern resOps streamOps;
Deike Kleberg's avatar
Deike Kleberg committed
33

34

35
static struct rdmaWin
36
37
{
  size_t size;
38
  unsigned char *buffer, *head;
39
  MPI_Win win;
40
  int postSet, refuseFuncCall;
41
  MPI_Group ioGroup;
42
  int dictSize, dictDataUsed, dictRPCUsed, dict;
43
44
} *txWin = NULL;

Deike Kleberg's avatar
Deike Kleberg committed
45

Deike Kleberg's avatar
Deike Kleberg committed
46
char * funcMap[nFuncs] = {"streamOpen", "streamDefVlist", "streamClose" };
Deike Kleberg's avatar
Deike Kleberg committed
47

48
float cdiPIOpartInflate_;
Deike Kleberg's avatar
Deike Kleberg committed
49

Deike Kleberg's avatar
Deike Kleberg committed
50
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
51

52
static int cmp ( const void * va, const void * vb )
Deike Kleberg's avatar
Deike Kleberg committed
53
{
54
55
    const int ** a, ** b;

Deike Kleberg's avatar
Deike Kleberg committed
56
57
  a = ( const int ** ) va;
  b = ( const int ** ) vb;
58

Deike Kleberg's avatar
Deike Kleberg committed
59
60
61
62
  return (( **a < **b ) - ( **a > **b ));
}

/****************************************************/
63

64
65
66
static void
mapProblems(int problemSizes[], int * problemMapping, int nProblems,
            int nWriter, double * w)
Deike Kleberg's avatar
Deike Kleberg committed
67
68
69
70
71
72
73
{
  int *ip[nProblems];
  int dummy[nProblems];
  int buckets[nWriter];
  int currCapacity, nextCapacity;
  double meanBucket[nWriter];
  int sum = 0;
Thomas Jahns's avatar
Thomas Jahns committed
74
  int writerIdx = 0;
Deike Kleberg's avatar
Deike Kleberg committed
75
  int i, j;
76

Deike Kleberg's avatar
Deike Kleberg committed
77
78
79
80
81
82
83
84
85
86
87

  for ( i = 0; i < nProblems; i++ )
    {
      ip[i] = &problemSizes[i];
      sum += problemSizes[i];
    }

  qsort ( ip, nProblems, sizeof ( int * ), cmp );

  for ( i = 0; i < nProblems; i++ )
    dummy[i] = ip[i] - problemSizes;
88
89

  for ( j = 0; j < nWriter; j++ )
Deike Kleberg's avatar
Deike Kleberg committed
90
91
    meanBucket[j] = ( double ) sum * ( * ( w + j ));

Thomas Jahns's avatar
Thomas Jahns committed
92
93
  memset(buckets, 0, sizeof (buckets));

Deike Kleberg's avatar
Deike Kleberg committed
94
95
96
  for ( i = 0; i < nProblems; i++ )
    {
      currCapacity = INT_MIN;
97

Deike Kleberg's avatar
Deike Kleberg committed
98
      for ( j = 0; j < nWriter; j++ )
99
100
	{
	  nextCapacity = meanBucket[j] - ( buckets[j] + ( *ip[i] ));
101

102
103
104
105
106
107
	  if ( nextCapacity > currCapacity )
	    {
	      currCapacity = nextCapacity;
	      writerIdx = j;
	    }
	}
Thomas Jahns's avatar
Thomas Jahns committed
108
      problemMapping[ dummy[i] ] = writerIdx;
Deike Kleberg's avatar
Deike Kleberg committed
109
110
      buckets[writerIdx] +=  *ip[i];
    }
Deike Kleberg's avatar
Deike Kleberg committed
111

112
113
114
  xprintArray3 (  "problemSizes = ", problemSizes, nProblems, DATATYPE_INT );
  xprintArray3 ( "vector of indices, qsort of problemSizes", dummy,
                nProblems, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
115
116
  xprintArray3 ( "problemMapping", problemMapping, nProblems, DATATYPE_INT );
  xprintArray3 ( "meanBucket", meanBucket, nWriter, DATATYPE_FLT );
117
  xprintArray3 ( "actual buckets", buckets, nWriter, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
118
119
}

120
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
121
122
123
124
125

/**
   @brief is encapsulated in CDI library.

   @param vSizes array with number of levels for all var_t 's in order of streams
126

Deike Kleberg's avatar
Deike Kleberg committed
127
128
   @param sSizes array with number of var_t for all stream_t 's

129
   @param varMapping return value, array with ranks of I/O PEs assigned to var_t 's
Deike Kleberg's avatar
Deike Kleberg committed
130
131
132
in order of vSizes

   @param nStreams number of stream_t 's
133
134

   @param nodeSizes array of number of I/O PEs on each physical nodes, increasing
Deike Kleberg's avatar
Deike Kleberg committed
135
136
137
   order of ranks assumed

   @param nNodes number of physical nodes hosting I/O PEs
138

Deike Kleberg's avatar
Deike Kleberg committed
139
140
   @return
*/
141

142
143
144
static void
varMapGen(int *vSizes, int *sSizes, int *varMapping,
          int nStreams, int *nodeSizes, int nNodes)
Deike Kleberg's avatar
Deike Kleberg committed
145
{
146

Deike Kleberg's avatar
Deike Kleberg committed
147
148
149
150
151
  int weightsStreams[nStreams];
  int streamMapping[nStreams];
  int nPEs = 0, nVars = 0;
  int i, j, k, offset = 0, offsetN = 0;
  double * w;
152

Deike Kleberg's avatar
Deike Kleberg committed
153
154
155
  int * weightsVarsNode;
  int * varMappingNode;
  int nVarsNode, summandRank = 0;
156
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
157

158
  int buckets[nProcsColl];
Deike Kleberg's avatar
Deike Kleberg committed
159
160
161
162
163
164

  for ( i = 0; i < nStreams; i++ )
    {
      nVars += * ( sSizes + i );
      weightsStreams[i] = 0;
      for ( j = 0; j < * ( sSizes + i ); j++ )
165
	weightsStreams[i] += * ( vSizes + offset++ );
Deike Kleberg's avatar
Deike Kleberg committed
166
167
    }

Deike Kleberg's avatar
Deike Kleberg committed
168
  w = ( double * ) xmalloc ( nNodes * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
169
170
171
172
173
174
175
176
177
178
179
180
181
  for ( j = 0; j < nNodes; j++ )
    nPEs += * ( nodeSizes + j );

  for ( j = 0; j < nNodes; j++ )
    w[j] = ( double ) * ( nodeSizes + j ) / ( double ) nPEs;

  mapProblems ( weightsStreams, streamMapping, nStreams, nNodes, w );
  free ( w );

  for ( i = 0; i < nNodes; i++ )
    {
      nVarsNode = 0;
      for ( j = 0; j < nStreams; j++ )
182
	if ( * ( streamMapping + j ) == i )
183
184
	  nVarsNode += * ( sSizes + j );

Deike Kleberg's avatar
Deike Kleberg committed
185
186
187
      weightsVarsNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      varMappingNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      w = ( double * ) xmalloc ( * ( nodeSizes + i ) * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
188
189
190
191
      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
192
193
194
195
196
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
	    * ( weightsVarsNode + offsetN++ ) = * ( vSizes + offset++ );
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
197
198

      for ( j = 0; j < * ( nodeSizes + i ); j ++ )
199
	w[j] = 1.0 / ( double ) * ( nodeSizes + i );
Deike Kleberg's avatar
Deike Kleberg committed
200

201
      mapProblems ( weightsVarsNode, varMappingNode, nVarsNode,
202
		    * ( nodeSizes + i ),  w );
Deike Kleberg's avatar
Deike Kleberg committed
203
204
205
206
207

      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
208
209
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
210
	    * ( varMapping + offset ++ ) =
211
              commCollID2RankGlob ( * ( varMappingNode + offsetN ++ ) +
212
                                    summandRank );
213
214
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
215
216
217
218
219
220
221

      summandRank += * ( nodeSizes + i );

      free ( w );
      free ( varMappingNode );
      free ( weightsVarsNode );
    }
222

223
  if ( ddebug )
Deike Kleberg's avatar
Deike Kleberg committed
224
    {
Deike Kleberg's avatar
Deike Kleberg committed
225
      xprintArray ( "varMapping", varMapping, nVars, DATATYPE_INT  );
226
      for ( i = 0; i < nProcsColl; i++ )
227
	buckets[i] = 0;
Deike Kleberg's avatar
Deike Kleberg committed
228
      for ( i = 0; i < nVars; i ++ )
229
	buckets[commRankGlob2CollID ( *(varMapping + i ))] += * ( vSizes + i );
230
      xprintArray ( "buckets", buckets, nProcsColl, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
231
    }
232
}
Deike Kleberg's avatar
Deike Kleberg committed
233

234
/************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
235

236
237
static void
varsMapNDeco(int nNodes, int *nodeSizes)
Deike Kleberg's avatar
Deike Kleberg committed
238
{
239
  int nStreams, nVars, * resHs, * streamSizes, * varSizes, * varMapping,
Thomas Jahns's avatar
Thomas Jahns committed
240
    * collectsData;
Deike Kleberg's avatar
Deike Kleberg committed
241
  int i, j, k = 0;
Deike Kleberg's avatar
Deike Kleberg committed
242
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
243

244
245
  xdebug ( "START, nProcsColl=%d", nProcsColl );

Deike Kleberg's avatar
Deike Kleberg committed
246
  nStreams = streamSize ();
247

248
249
  resHs       = xmalloc ( nStreams * sizeof ( resHs[0] ));
  streamSizes = xmalloc ( nStreams * sizeof ( streamSizes[0] ));
250
  collectsData = xmalloc ( nProcsColl * sizeof ( collectsData[0] ));
251
  streamGetIndexList ( nStreams, resHs );
252

Deike Kleberg's avatar
Deike Kleberg committed
253
  for ( i = 0; i < nStreams; i++ )
254
    streamSizes[i] = streamInqNvars ( * ( resHs + i ));
255

Deike Kleberg's avatar
Deike Kleberg committed
256
  nVars = xsum ( nStreams, streamSizes );
257
258
  varSizes   = xmalloc ( nVars * sizeof ( varSizes[0] ));
  varMapping = xmalloc ( nVars * sizeof ( varMapping[0] ));
259

Deike Kleberg's avatar
Deike Kleberg committed
260
261
  for ( i = 0; i < nStreams; i++ )
    for ( j = 0; j < * ( streamSizes + i ); j++ )
262
      varSizes[k++] += vlistInqVarSize ( streamInqVlist ( * ( resHs + i )), j );
263

Deike Kleberg's avatar
Deike Kleberg committed
264
  xassert ( k == nVars );
265

266
  varMapGen ( varSizes, streamSizes, varMapping,
Deike Kleberg's avatar
Deike Kleberg committed
267
	      nStreams, nodeSizes, nNodes );
268
269
270

  k = 0;
  for ( i = 0; i < nStreams; i++ )
271
    for ( j = 0; j < * ( streamSizes + i ); j++ )
272
      {
273
        vlistDefVarIOrank ( streamInqVlist ( * ( resHs + i )), j,
274
                            * ( varMapping + k ));
275
        vlistDefVarIOrank ( streamInqVlistIDorig ( * ( resHs + i )), j,
276
277
                            * ( varMapping + k ));
        collectsData[commRankGlob2CollID ( varMapping[k++] )] = 1;
278
      }
279
280
281

  for ( j = 0; j < nProcsColl; j++ )
    if ( collectsData[j] == 0 )
282
283
284
285
      xabort("AT LEAST ONE COLLECTOR PROCESS IDLES, "
             "CURRENTLY NOT COVERED: "
             "PE%d collects no data",
             commCollID2RankGlob(j));
286

287
288
289
290
291
  if ( varMapping )   free ( varMapping );
  if ( varSizes )     free ( varSizes );
  if ( collectsData ) free ( collectsData );
  if ( streamSizes )  free ( streamSizes );
  if ( resHs )        free ( resHs );
292

293
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
294
295
296
297
}

/************************************************************************/

298
static
Deike Kleberg's avatar
Deike Kleberg committed
299
300
void modelWinCleanup ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
301
  int collID;
Deike Kleberg's avatar
Deike Kleberg committed
302

303
  xdebug("%s", "START");
304
  if (txWin != NULL)
305
306
307
308
309
310
311
312
313
314
315
    {
      for ( collID = 0; collID < commInqNProcsColl (); collID ++ )
        {
          if (txWin[collID].postSet)
            xmpi(MPI_Win_wait(txWin[collID].win));
          xmpi(MPI_Win_free(&txWin[collID].win));
          xmpi ( MPI_Free_mem ( txWin[collID].buffer ));
          xmpi(MPI_Group_free(&txWin[collID].ioGroup));
        }
      free(txWin);
    }
Deike Kleberg's avatar
Deike Kleberg committed
316

317
  xdebug("%s", "RETURN. CLEANED UP MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
318
319
320
}

/************************************************************************/
321

322
323
struct collDesc
{
324
  int numDataRecords, numRPCRecords;
325
326
};

327
328
static void
modelWinDefBufferSizes(void)
Deike Kleberg's avatar
Deike Kleberg committed
329
{
330
  int collID, nstreams, * streamIndexList, streamNo, nvars, varID;
331
  int sumWinBufferSize = 0;
332
333
  int nProcsColl  = commInqNProcsColl ();
  int rankGlob    = commInqRankGlob ();
334
  int rankModel   = commInqRankModel ();
335
  int root = commInqRootGlob ();
336
  struct collDesc *collIndex;
Deike Kleberg's avatar
Deike Kleberg committed
337

338
  xdebug("%s", "START");
339
  xassert(txWin != NULL);
340

Deike Kleberg's avatar
Deike Kleberg committed
341
  nstreams = reshCountType ( &streamOps );
342
  streamIndexList = xmalloc ( nstreams * sizeof ( streamIndexList[0] ));
343
  collIndex = xcalloc(nProcsColl, sizeof (collIndex[0]));
344
  reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
Deike Kleberg's avatar
Deike Kleberg committed
345
346
  for ( streamNo = 0; streamNo < nstreams; streamNo++ )
    {
347
348
349
      // memory required for data
      int streamID = streamIndexList[streamNo];
      int vlistID = streamInqVlist(streamID);
Deike Kleberg's avatar
Deike Kleberg committed
350
351
352
      nvars = vlistNvars ( vlistID );
      for ( varID = 0; varID < nvars; varID++ )
        {
353
354
355
356
357
358
359
360
          int collID = commRankGlob2CollID(vlistInqVarIOrank(vlistID, varID));
          int collIDchunk;
          {
            int varSize = vlistInqVarSize(vlistID, varID);
            int nProcsModel = commInqNProcsModel();
            collIDchunk = (int)ceilf(cdiPIOpartInflate_
                                     * (varSize + nProcsModel - 1)/nProcsModel);
          }
Deike Kleberg's avatar
Deike Kleberg committed
361
          xassert ( collID != CDI_UNDEFID && collIDchunk > 0 );
362
363
364
365
366
367
368
369
370
          collIndex[collID].numDataRecords += 2;
          txWin[collID].size += (size_t)collIDchunk * sizeof (double)
            /* re-align chunks to multiple of double size */
            + sizeof (double) - 1
            /* one header for data record, one for corresponding part
             * descriptor*/
            + 2 * sizeof (union winHeaderEntry)
            /* FIXME: heuristic for size of packed Xt_idxlist */
            + sizeof (Xt_int) * collIDchunk * 3;
Deike Kleberg's avatar
Deike Kleberg committed
371
        }
372

373
      // memory required for the 3 function calls streamOpen, streamDefVlist, streamClose
Deike Kleberg's avatar
Deike Kleberg committed
374
      // once per stream and timestep for all collprocs only on the modelproc root
375
376
      if ( rankGlob == root )
        for ( collID = 0; collID < nProcsColl; collID++ )
377
          {
378
            collIndex[collID].numRPCRecords += 3;
379
380
381
382
            txWin[collID].size +=
              3 * sizeof (union winHeaderEntry)
              + MAXDATAFILENAME;
          }
Deike Kleberg's avatar
Deike Kleberg committed
383
    }
384
  for (collID = 0; collID < nProcsColl; ++collID)
385
    {
386
387
      int numRecords = 1 + collIndex[collID].numDataRecords
        + collIndex[collID].numRPCRecords;
388
      txWin[collID].dictSize = numRecords;
389
390
      txWin[collID].dictDataUsed = 1;
      txWin[collID].dictRPCUsed = 0;
391
      /* account for size header */
392
      txWin[collID].size += sizeof (union winHeaderEntry);
393
394
      txWin[collID].size = roundUpToMultiple(txWin[collID].size,
                                             PIO_WIN_ALIGN);
395
      sumWinBufferSize += txWin[collID].size;
396
    }
397
398
399
  free(collIndex);
  free ( streamIndexList );

400
401
  xdebug("sumWinBufferSize=%zu, MAXWINBUFFERSIZE=%zu", (size_t)sumWinBufferSize,
         (size_t)MAXWINBUFFERSIZE);
Deike Kleberg's avatar
Deike Kleberg committed
402
  xassert ( sumWinBufferSize <= MAXWINBUFFERSIZE );
403
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
404
}
405

Deike Kleberg's avatar
Deike Kleberg committed
406
407
408
409

/************************************************************************/


410
static
Deike Kleberg's avatar
Deike Kleberg committed
411
  void modelWinFlushBuffer ( int collID )
Deike Kleberg's avatar
Deike Kleberg committed
412
{
Deike Kleberg's avatar
Deike Kleberg committed
413
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
414

Deike Kleberg's avatar
Deike Kleberg committed
415
416
  xassert ( collID                >= 0         &&
            collID                < nProcsColl &&
417
            txWin[collID].buffer     != NULL      &&
418
419
            txWin != NULL      &&
            txWin[collID].size >= 0         &&
420
            txWin[collID].size <= MAXWINBUFFERSIZE);
421
  memset(txWin[collID].buffer, 0, txWin[collID].size);
422
423
  txWin[collID].head = txWin[collID].buffer
    + txWin[collID].dictSize * sizeof (union winHeaderEntry);
424
  txWin[collID].refuseFuncCall = 0;
425
426
  txWin[collID].dictDataUsed = 1;
  txWin[collID].dictRPCUsed = 0;
Deike Kleberg's avatar
Deike Kleberg committed
427
428
429
}


Deike Kleberg's avatar
Deike Kleberg committed
430
431
432
/************************************************************************/


Deike Kleberg's avatar
Deike Kleberg committed
433
434
435
static
void modelWinCreate ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
436
  int collID, ranks[1];
437
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
438

439
  xdebug("%s", "START");
440
  txWin = xmalloc(nProcsColl * sizeof (txWin[0]));
Deike Kleberg's avatar
Deike Kleberg committed
441

Deike Kleberg's avatar
Deike Kleberg committed
442
  modelWinDefBufferSizes ();
443
  ranks[0] = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
444

Deike Kleberg's avatar
Deike Kleberg committed
445
  for ( collID = 0; collID < nProcsColl; collID ++ )
Deike Kleberg's avatar
Deike Kleberg committed
446
    {
447
      xassert(txWin[collID].size > 0);
448
      txWin[collID].buffer = NULL;
449
      xmpi(MPI_Alloc_mem((MPI_Aint)txWin[collID].size, MPI_INFO_NULL,
450
451
                         &txWin[collID].buffer));
      xassert ( txWin[collID].buffer != NULL );
452
453
      txWin[collID].head = txWin[collID].buffer
        + txWin[collID].dictSize * sizeof (union winHeaderEntry);
454
      xmpi(MPI_Win_create(txWin[collID].buffer, (MPI_Aint)txWin[collID].size, 1,
455
                          MPI_INFO_NULL, commInqCommsIO(collID),
456
                          &txWin[collID].win));
457
458
459
      xmpi(MPI_Comm_group(commInqCommsIO(collID), &txWin[collID].ioGroup));
      xmpi(MPI_Group_incl(txWin[collID].ioGroup, 1, ranks,
                          &txWin[collID].ioGroup ));
Deike Kleberg's avatar
Deike Kleberg committed
460
    }
461
  xdebug("%s", "RETURN, CREATED MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
462
463
464
465
}

/************************************************************************/

466
static void
467
468
modelWinEnqueue(int collID,
                union winHeaderEntry header, const void *data, size_t size)
Deike Kleberg's avatar
Deike Kleberg committed
469
{
470
471
  union winHeaderEntry *winDict
    = (union winHeaderEntry *)txWin[collID].buffer;
472
  int targetEntry;
473
474
  if (header.dataRecord.streamID > 0)
    {
475
      targetEntry = (txWin[collID].dictDataUsed)++;
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
      int offset = header.dataRecord.offset
        = (int)roundUpToMultiple(txWin[collID].head - txWin[collID].buffer,
                                 sizeof (double));
      memcpy(txWin[collID].buffer + offset, data, size);
      txWin[collID].head = txWin[collID].buffer + offset + size;
    }
  else if (header.partDesc.partDescMarker == PARTDESCMARKER)
    {
      targetEntry = (txWin[collID].dictDataUsed)++;
      Xt_uid uid = header.partDesc.uid;
      int offset = -1;
      /* search if same uid entry has already been enqueued */
      for (int entry = 2; entry < targetEntry; entry += 2)
        {
          xassert(winDict[entry].partDesc.partDescMarker
                  == PARTDESCMARKER);
          if (winDict[entry].partDesc.uid == uid)
            {
              offset = winDict[entry].partDesc.offset;
              break;
            }
        }
      if (offset == -1)
        {
          /* not yet used partition descriptor, serialize at
           * current position */
          int position = 0;
          MPI_Comm comm = commInqCommsIO(collID);
          header.partDesc.offset
            = (int)(txWin[collID].head - txWin[collID].buffer);
          size_t size = xt_idxlist_get_pack_size((Xt_idxlist)data, comm);
          size_t remaining_size = txWin[collID].size
            - (txWin[collID].head - txWin[collID].buffer);
          xassert(size <= remaining_size);
          xt_idxlist_pack((Xt_idxlist)data, txWin[collID].head,
                          (int)remaining_size, &position, comm);
          txWin[collID].head += position;
        }
      else
        /* duplicate entries are copied only once per timestep */
        header.partDesc.offset = offset;
517
    }
518
  else
519
    {
520
521
522
523
524
525
526
527
      targetEntry = txWin[collID].dictSize - ++(txWin[collID].dictRPCUsed);
      if (header.funcCall.funcID == STREAMOPEN)
        {
          header.funcCall.funcArgs.newFile.offset
            = (int)(txWin[collID].head - txWin[collID].buffer);
          memcpy(txWin[collID].head, data, size);
          txWin[collID].head += size;
        }
528
    }
529
  winDict[targetEntry] = header;
Deike Kleberg's avatar
Deike Kleberg committed
530
531
}

532
533
534
535
536
537
void
pioBufferPartData(int streamID, int varID, const double *data,
                  int nmiss, Xt_idxlist partDesc)
{
  int vlistID, collID = CDI_UNDEFID;

Deike Kleberg's avatar
Deike Kleberg committed
538
539
  vlistID  = streamInqVlist ( streamID );
  collID   = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
540
541
  xassert ( collID         >= 0                    &&
            collID         <  commInqNProcsColl () &&
542
            txWin != NULL);
Deike Kleberg's avatar
Deike Kleberg committed
543

544
  if (txWin[collID].postSet)
545
    {
546
      xmpi(MPI_Win_wait(txWin[collID].win));
547
      txWin[collID].postSet = 0;
Deike Kleberg's avatar
Deike Kleberg committed
548
      modelWinFlushBuffer ( collID );
549
    }
550

551
552
553
554
  Xt_int chunk = xt_idxlist_get_num_indices(partDesc);
  xassert(chunk <= INT_MAX);

  union winHeaderEntry dataHeader
555
    = { .dataRecord = { streamID, varID, -1, nmiss } };
556
557
558
559
560
561
562
563
  modelWinEnqueue(collID, dataHeader, data, chunk * sizeof (data[0]));
  {
    union winHeaderEntry partHeader
      = { .partDesc = { .partDescMarker = PARTDESCMARKER,
                        .uid = xt_idxlist_get_uid(partDesc),
                        .offset = 0 } };
    modelWinEnqueue(collID, partHeader, partDesc, 0);
  }
564

565
  txWin[collID].refuseFuncCall = 1;
Deike Kleberg's avatar
Deike Kleberg committed
566
567
}

568
569
/************************************************************************/

570
void pioBufferFuncCall(int funcID, int argc, ... )
571
572
573
574
575
{
  va_list ap;
  int rankGlob = commInqRankGlob ();
  int root = commInqRootGlob ();
  int collID, nProcsColl = commInqNProcsColl ();
576

577
578
  xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
  xdebug("%s, func: %s", "START", funcMap[(-1 - funcID)]);
579

Deike Kleberg's avatar
Deike Kleberg committed
580
581
  if ( rankGlob != root ) return;

582
583
584
  xassert (argc          >= 1                    &&
           argc          <= 2                    &&
           txWin != NULL);
585

Deike Kleberg's avatar
Deike Kleberg committed
586
  va_start ( ap, argc );
587

Thomas Jahns's avatar
Thomas Jahns committed
588
589
590
591
592
  union winHeaderEntry header;
  char * filename = NULL;
  size_t filenamesz = 0;
  int streamID = 0, vlistID = 0, filetype = 0;

593
  switch ( funcID )
Deike Kleberg's avatar
Deike Kleberg committed
594
    {
595
    case STREAMCLOSE:
Thomas Jahns's avatar
Thomas Jahns committed
596
597
      xassert ( argc == 1 );
    case STREAMDEFVLIST:
598
      {
Thomas Jahns's avatar
Thomas Jahns committed
599
600
601
        streamID = va_arg(ap, int);
        vlistID = CDI_UNDEFID;
        if (funcID == STREAMDEFVLIST)
602
          {
Thomas Jahns's avatar
Thomas Jahns committed
603
604
            xassert ( argc == 2 );
            vlistID = va_arg(ap, int);
605
          }
Thomas Jahns's avatar
Thomas Jahns committed
606
607
608
609
        struct funcCallDesc f = { .funcID = funcID,
                                  .funcArgs.streamChange = { streamID,
                                                             vlistID } };
        header.funcCall = f;
610
      }
611
      break;
Deike Kleberg's avatar
Deike Kleberg committed
612
613
    case STREAMOPEN:
      {
Deike Kleberg's avatar
Deike Kleberg committed
614
        xassert ( argc == 2 );
Thomas Jahns's avatar
Thomas Jahns committed
615
616
        filename = va_arg(ap, char *);
        filenamesz = strlen(filename);
Deike Kleberg's avatar
Deike Kleberg committed
617
618
        xassert ( filenamesz > 0 &&
                  filenamesz < MAXDATAFILENAME );
Thomas Jahns's avatar
Thomas Jahns committed
619
620
621
622
623
624
625
        filetype = va_arg(ap, int);
        struct funcCallDesc f = { .funcID = STREAMOPEN,
                                  .funcArgs.newFile
                                  = { .fnamelen = filenamesz,
                                      .filetype = filetype } };
        header.funcCall = f;
        ++filenamesz;
Deike Kleberg's avatar
Deike Kleberg committed
626
      }
627
      break;
Thomas Jahns's avatar
Thomas Jahns committed
628
629
630
    default:
      xabort("FUNCTION NOT MAPPED!");
    }
631

Thomas Jahns's avatar
Thomas Jahns committed
632
633
  for (collID = 0; collID < nProcsColl; ++collID)
    {
634
635
      xassert((txWin[collID].dictSize - txWin[collID].dictRPCUsed)
              > txWin[collID].dictDataUsed);
Thomas Jahns's avatar
Thomas Jahns committed
636
637
638
639
640
641
642
643
644
      if (txWin[collID].postSet)
        {
          xmpi(MPI_Win_wait(txWin[collID].win));
          txWin[collID].postSet = 0;
          modelWinFlushBuffer ( collID );
        }
      xassert(txWin[collID].refuseFuncCall == 0);
      modelWinEnqueue(collID, header, filename, filenamesz);
    }
645

Thomas Jahns's avatar
Thomas Jahns committed
646
647
648
649
650
651
652
653
654
655
656
657
658
659
  switch ( funcID )
    {
    case STREAMCLOSE:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, streamID=%d",
             funcMap[(-1 - funcID)], streamID);
      break;
    case STREAMDEFVLIST:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, streamID=%d,"
             " vlistID=%d", funcMap[(-1 - funcID)], streamID, vlistID);
      break;
    case STREAMOPEN:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, filenamesz=%zu,"
             " filename=%s, filetype=%d",
             funcMap[(-1 - funcID)], filenamesz, filename, filetype);
660
      break;
661
    }
662

Thomas Jahns's avatar
Thomas Jahns committed
663
  va_end(ap);
664
  xdebug("%s", "RETURN");
665
666
}

667
668
669
670
#endif

/*****************************************************************************/

671
672
673
674
/* pioInit definition must currently compile even in non-MPI configurations */
#ifndef MPI_VERSION
#  define MPI_Comm int
#endif
675
676
677
/**
   @brief initializes the MPI_Communicators needed for the
  communication between the calculator PEs and the I/O PEs and within the
678
679
680
681
682
683
684
685
686
  group of I/O PEs.

  commGlob: all PEs

  commPIO: I/O PEs, PEs with highest ranks in commGlob

  commModel: calculating PEs, no I/O PEs

  commsIO[i]:
687
688

  Collective call
689
690
691

  @param comm MPI_Communicator of all calling PEs
  @param nIOP number of I/O PEs
692
693
694
  @param partInflate allow for array partitions on comute
  PE that are at most sized \f$ partInflate * \lceil arraySize /
  numComputePEs \rceil \f$
695
696
697
  @return int indicating wether the calling PE is a calcutator (1) or not (0)
*/

Thomas Jahns's avatar
Thomas Jahns committed
698
699
#ifdef USE_MPI
static int pioNamespace_ = -1;
700
static int xtInitByCDI = 0;
Thomas Jahns's avatar
Thomas Jahns committed
701
702
#endif

703
704
MPI_Comm
pioInit(MPI_Comm commGlob, int nProcsIO, int IOMode,
Thomas Jahns's avatar
Thomas Jahns committed
705
        int *pioNamespace, float partInflate)
706
{
707
#ifdef USE_MPI
Thomas Jahns's avatar
Thomas Jahns committed
708
  int sizeGlob;
Deike Kleberg's avatar
Deike Kleberg committed
709

Deike Kleberg's avatar
Deike Kleberg committed
710
  if ( IOMode < PIO_MINIOMODE || IOMode > PIO_MAXIOMODE )
Deike Kleberg's avatar
Deike Kleberg committed
711
    xabort ( "IOMODE IS NOT VALID." );
712

713
#ifdef _SX
Deike Kleberg's avatar
Deike Kleberg committed
714
715
  if ( IOMode ==  PIO_ASYNCH )
    xabort ( "PIO_ASYNCH DOES NOT WORK ON SX." );
716
717
#endif

718
719
720
721
  if ((xtInitByCDI = (!xt_initialized() || xt_finalized())))
    {
      xt_initialize(commGlob);
    }
722
723
  commInit ();
  commDefCommGlob ( commGlob );
724
  sizeGlob = commInqSizeGlob ();
725

726
727
  if (((IOMode != PIO_NONE && (nProcsIO <= 0 || nProcsIO > sizeGlob - 1)))
      || (IOMode == PIO_NONE && nProcsIO != 1))
728
729
    xabort("DISTRIBUTION OF TASKS ON PROCS IS NOT VALID.\n"
           "nProcsIO=%d, sizeGlob=%d\n", nProcsIO, sizeGlob);
Deike Kleberg's avatar
Deike Kleberg committed
730
731

  commDefNProcsIO ( nProcsIO );
Deike Kleberg's avatar
Deike Kleberg committed
732
  commDefIOMode   ( IOMode, PIO_MAXIOMODE, PIO_MINIOMODEWITHSPECIALPROCS );
Deike Kleberg's avatar
Deike Kleberg committed
733
  commDefCommPio  ();
734

735
736
737
  xassert(partInflate >= 1.0);
  cdiPIOpartInflate_ = partInflate;

Deike Kleberg's avatar
Deike Kleberg committed
738
  // JUST FOR TEST CASES WITH ONLY ONE MPI TASK
739
  if ( commInqSizeGlob () == 1 )
740
    {
741
      pioNamespace_ = *pioNamespace = namespaceNew();
Deike Kleberg's avatar
Deike Kleberg committed
742
743
      return commInqCommGlob ();
    }
744

745
746
  if ( commInqIsProcIO ())
    {
747
      serializeSetMPI();
Thomas Jahns's avatar
Thomas Jahns committed
748
      namespaceSwitchSet(NSSWITCH_ABORT, cdiAbortC_MPI);
749
750
      namespaceSwitchSet(NSSWITCH_FILE_OPEN, pioFileOpen);
      namespaceSwitchSet(NSSWITCH_FILE_CLOSE, pioFileClose);
751
      IOServer ();
Thomas Jahns's avatar
Thomas Jahns committed
752
      namespaceDelete(0);
753
      commDestroy ();
754
      xt_finalize();
755
      MPI_Finalize ();
756
      exit ( EXIT_SUCCESS );
757
758
    }
  else
759
    cdiPioClientSetup(&pioNamespace_, pioNamespace);
760

761
  xdebug ( "nProcsGlob=%d, RETURN", sizeGlob );
762
  return commInqCommModel ();
763
#else
764
  abort();
765
#endif
766
}
767

768
769
#ifndef MPI_VERSION
#  undef MPI_Comm
Deike Kleberg's avatar
Deike Kleberg committed
770
771
#endif

772
/*****************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
773

774
void  pioEndDef ( void )
775
776
{
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
777
  char   * buffer;
778
  int bufferSize;
779
  int rankGlob = commInqRankGlob ();
780

781
  xdebug("%s", "START");
782

Deike Kleberg's avatar
Deike Kleberg committed
783
  varsMapNDeco ( commInqNNodes (), commInqNodeSizes ());
Deike Kleberg's avatar
Deike Kleberg committed
784

785
  if ( rankGlob < commInqNProcsColl ())
Deike Kleberg's avatar
Deike Kleberg committed
786
    {
787
788
      MPI_Comm comm = commInqCommsIO ( rankGlob );
      reshPackBufferCreate(&buffer, &bufferSize, &comm);
789
790

      xmpi ( MPI_Send ( buffer, bufferSize, MPI_PACKED, commInqNProcsModel (),
791
                        RESOURCES, commInqCommsIO ( rankGlob )));
792

793
      xdebug("%s", "SENT MESSAGE WITH TAG \"RESOURCES\"");
794

Deike Kleberg's avatar
Deike Kleberg committed
795
      reshPackBufferDestroy ( &buffer );
Deike Kleberg's avatar
Deike Kleberg committed
796
    }
Deike Kleberg's avatar
Deike Kleberg committed
797
798

  modelWinCreate ();
799
  namespaceDefResStatus ( STAGE_TIMELOOP );
800
  xdebug("%s", "RETURN");
801
802
803
804
805
806
807
808
#endif
}

/************************************************************************/

void  pioEndTimestepping ( void )
{
#ifdef USE_MPI
809
  xdebug("%s", "START");
810
  namespaceDefResStatus ( STAGE_CLEANUP );
811
  xdebug("%s", "RETURN");
812
#endif
813
814
}

Deike Kleberg's avatar
Deike Kleberg committed
815

816
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
817

Deike Kleberg's avatar
Deike Kleberg committed
818

Deike Kleberg's avatar
Deike Kleberg committed
819
820
821
822
823
824
825
826
827
/**
  @brief is invoked by the calculator PEs, to inform
  the I/O PEs that no more data will be written.

  @param

  @return
*/

Deike Kleberg's avatar
Deike Kleberg committed
828
void pioFinalize ( void )
Deike Kleberg's avatar
Deike Kleberg committed
829
{
830
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
831
  int collID, ibuffer = 1111;
832
  xdebug("%s", "START");
Thomas Jahns's avatar
Thomas Jahns committed
833
  namespaceDelete(pioNamespace_);
Deike Kleberg's avatar
Deike Kleberg committed
834
  for ( collID = 0; collID < commInqNProcsColl (); collID++ )
835
    {
836
      xmpi ( MPI_Send ( &ibuffer, 1, MPI_INT, commInqNProcsModel (),
Deike Kleberg's avatar
Deike Kleberg committed
837
                        FINALIZE, commInqCommsIO ( collID )));
838
      xdebug("%s", "SENT MESSAGE WITH TAG \"FINALIZE\"");
839
    }
Deike Kleberg's avatar
Deike Kleberg committed
840
  modelWinCleanup ();
841
  commDestroy ();
842
843
  if (xtInitByCDI)
    xt_finalize();
844
  xdebug("%s", "RETURN");
845
#endif
846
847
}

848
 /************************************************************************/
849

Deike Kleberg's avatar
Deike Kleberg committed
850
void pioWriteTimestep ( int tsID, int vdate, int vtime )
851
852
{
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
853
  int collID, buffer[timestepSize], iAssert = 0;
854
  /* int tokenEnd = END; */
855
  int rankGlob = commInqRankGlob ();
856
857
  int nProcsColl = commInqNProcsColl ();
  int nProcsModel = commInqNProcsModel ();
858

859
  xdebug("%s", "START");