pio_interface.c 24.4 KB
Newer Older
1
2
3
4
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

5
#include <limits.h>
Deike Kleberg's avatar
Deike Kleberg committed
6
#include <stdlib.h>
7
#include <stdio.h>
8
#include <stdarg.h>
9
10
11

#ifdef USE_MPI
#include <mpi.h>
12
#include <yaxt.h>
13
14
#endif

15
#include "cdi.h"
Deike Kleberg's avatar
Deike Kleberg committed
16
#include "pio_util.h"
Deike Kleberg's avatar
Deike Kleberg committed
17
#include "vlist_var.h"
18
19

#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
20
#include "namespace.h"
21
#include "pio.h"
22
#include "pio_serialize.h"
23
#include "pio_interface.h"
Deike Kleberg's avatar
Deike Kleberg committed
24
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
25
#include "pio_rpc.h"
26
#include "pio_client.h"
Deike Kleberg's avatar
Deike Kleberg committed
27
#include "pio_server.h"
28
#include "resource_handle.h"
29
#include "cdi_int.h"
Deike Kleberg's avatar
Deike Kleberg committed
30
#include "vlist.h"
31

Deike Kleberg's avatar
Deike Kleberg committed
32
extern resOps streamOps;
Deike Kleberg's avatar
Deike Kleberg committed
33

34

35
static struct rdmaWin
36
37
{
  size_t size;
38
  unsigned char *buffer, *head;
39
  MPI_Win win;
40
  int postSet, refuseFuncCall;
41
  MPI_Group ioGroup;
42
  int dictSize, dictDataUsed, dictRPCUsed, dict;
43
44
} *txWin = NULL;

Deike Kleberg's avatar
Deike Kleberg committed
45

46
47
48
const char * const funcMap[numRPCFuncs] = {
  "streamOpen",
  "streamDefVlist",
49
50
  "streamClose",
  "streamDefTimestep",
51
};
Deike Kleberg's avatar
Deike Kleberg committed
52

53
float cdiPIOpartInflate_;
Deike Kleberg's avatar
Deike Kleberg committed
54

Deike Kleberg's avatar
Deike Kleberg committed
55
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
56

57
static int cmp ( const void * va, const void * vb )
Deike Kleberg's avatar
Deike Kleberg committed
58
{
59
60
    const int ** a, ** b;

Deike Kleberg's avatar
Deike Kleberg committed
61
62
  a = ( const int ** ) va;
  b = ( const int ** ) vb;
63

Deike Kleberg's avatar
Deike Kleberg committed
64
65
66
67
  return (( **a < **b ) - ( **a > **b ));
}

/****************************************************/
68

69
70
71
static void
mapProblems(int problemSizes[], int * problemMapping, int nProblems,
            int nWriter, double * w)
Deike Kleberg's avatar
Deike Kleberg committed
72
73
74
75
76
77
78
{
  int *ip[nProblems];
  int dummy[nProblems];
  int buckets[nWriter];
  int currCapacity, nextCapacity;
  double meanBucket[nWriter];
  int sum = 0;
Thomas Jahns's avatar
Thomas Jahns committed
79
  int writerIdx = 0;
Deike Kleberg's avatar
Deike Kleberg committed
80
  int i, j;
81

Deike Kleberg's avatar
Deike Kleberg committed
82
83
84
85
86
87
88
89
90
91
92

  for ( i = 0; i < nProblems; i++ )
    {
      ip[i] = &problemSizes[i];
      sum += problemSizes[i];
    }

  qsort ( ip, nProblems, sizeof ( int * ), cmp );

  for ( i = 0; i < nProblems; i++ )
    dummy[i] = ip[i] - problemSizes;
93
94

  for ( j = 0; j < nWriter; j++ )
Deike Kleberg's avatar
Deike Kleberg committed
95
96
    meanBucket[j] = ( double ) sum * ( * ( w + j ));

Thomas Jahns's avatar
Thomas Jahns committed
97
98
  memset(buckets, 0, sizeof (buckets));

Deike Kleberg's avatar
Deike Kleberg committed
99
100
101
  for ( i = 0; i < nProblems; i++ )
    {
      currCapacity = INT_MIN;
102

Deike Kleberg's avatar
Deike Kleberg committed
103
      for ( j = 0; j < nWriter; j++ )
104
105
	{
	  nextCapacity = meanBucket[j] - ( buckets[j] + ( *ip[i] ));
106

107
108
109
110
111
112
	  if ( nextCapacity > currCapacity )
	    {
	      currCapacity = nextCapacity;
	      writerIdx = j;
	    }
	}
Thomas Jahns's avatar
Thomas Jahns committed
113
      problemMapping[ dummy[i] ] = writerIdx;
Deike Kleberg's avatar
Deike Kleberg committed
114
115
      buckets[writerIdx] +=  *ip[i];
    }
Deike Kleberg's avatar
Deike Kleberg committed
116

117
118
119
120
121
122
123
124
125
  if ( ddebug )
    {
      xprintArray3 (  "problemSizes = ", problemSizes, nProblems, DATATYPE_INT );
      xprintArray3 ( "vector of indices, qsort of problemSizes", dummy,
                    nProblems, DATATYPE_INT );
      xprintArray3 ( "problemMapping", problemMapping, nProblems, DATATYPE_INT );
      xprintArray3 ( "meanBucket", meanBucket, nWriter, DATATYPE_FLT );
      xprintArray3 ( "actual buckets", buckets, nWriter, DATATYPE_INT );
    }
Deike Kleberg's avatar
Deike Kleberg committed
126
127
}

128
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
129
130
131
132
133

/**
   @brief is encapsulated in CDI library.

   @param vSizes array with number of levels for all var_t 's in order of streams
134

Deike Kleberg's avatar
Deike Kleberg committed
135
136
   @param sSizes array with number of var_t for all stream_t 's

137
   @param varMapping return value, array with ranks of I/O PEs assigned to var_t 's
Deike Kleberg's avatar
Deike Kleberg committed
138
139
140
in order of vSizes

   @param nStreams number of stream_t 's
141
142

   @param nodeSizes array of number of I/O PEs on each physical nodes, increasing
Deike Kleberg's avatar
Deike Kleberg committed
143
144
145
   order of ranks assumed

   @param nNodes number of physical nodes hosting I/O PEs
146

Deike Kleberg's avatar
Deike Kleberg committed
147
148
   @return
*/
149

150
151
152
static void
varMapGen(int *vSizes, int *sSizes, int *varMapping,
          int nStreams, int *nodeSizes, int nNodes)
Deike Kleberg's avatar
Deike Kleberg committed
153
{
154

Deike Kleberg's avatar
Deike Kleberg committed
155
156
157
158
159
  int weightsStreams[nStreams];
  int streamMapping[nStreams];
  int nPEs = 0, nVars = 0;
  int i, j, k, offset = 0, offsetN = 0;
  double * w;
160

Deike Kleberg's avatar
Deike Kleberg committed
161
162
163
  int * weightsVarsNode;
  int * varMappingNode;
  int nVarsNode, summandRank = 0;
164
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
165

166
  int buckets[nProcsColl];
Deike Kleberg's avatar
Deike Kleberg committed
167
168
169
170
171
172

  for ( i = 0; i < nStreams; i++ )
    {
      nVars += * ( sSizes + i );
      weightsStreams[i] = 0;
      for ( j = 0; j < * ( sSizes + i ); j++ )
173
	weightsStreams[i] += * ( vSizes + offset++ );
Deike Kleberg's avatar
Deike Kleberg committed
174
175
    }

Deike Kleberg's avatar
Deike Kleberg committed
176
  w = ( double * ) xmalloc ( nNodes * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
177
178
179
180
181
182
183
184
185
186
187
188
189
  for ( j = 0; j < nNodes; j++ )
    nPEs += * ( nodeSizes + j );

  for ( j = 0; j < nNodes; j++ )
    w[j] = ( double ) * ( nodeSizes + j ) / ( double ) nPEs;

  mapProblems ( weightsStreams, streamMapping, nStreams, nNodes, w );
  free ( w );

  for ( i = 0; i < nNodes; i++ )
    {
      nVarsNode = 0;
      for ( j = 0; j < nStreams; j++ )
190
	if ( * ( streamMapping + j ) == i )
191
192
	  nVarsNode += * ( sSizes + j );

Deike Kleberg's avatar
Deike Kleberg committed
193
194
195
      weightsVarsNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      varMappingNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      w = ( double * ) xmalloc ( * ( nodeSizes + i ) * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
196
197
198
199
      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
200
201
202
203
204
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
	    * ( weightsVarsNode + offsetN++ ) = * ( vSizes + offset++ );
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
205
206

      for ( j = 0; j < * ( nodeSizes + i ); j ++ )
207
	w[j] = 1.0 / ( double ) * ( nodeSizes + i );
Deike Kleberg's avatar
Deike Kleberg committed
208

209
      mapProblems ( weightsVarsNode, varMappingNode, nVarsNode,
210
		    * ( nodeSizes + i ),  w );
Deike Kleberg's avatar
Deike Kleberg committed
211
212
213
214
215

      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
216
217
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
218
	    * ( varMapping + offset ++ ) =
219
              commCollID2RankGlob ( * ( varMappingNode + offsetN ++ ) +
220
                                    summandRank );
221
222
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
223
224
225
226
227
228
229

      summandRank += * ( nodeSizes + i );

      free ( w );
      free ( varMappingNode );
      free ( weightsVarsNode );
    }
230

231
  if ( ddebug )
Deike Kleberg's avatar
Deike Kleberg committed
232
    {
Deike Kleberg's avatar
Deike Kleberg committed
233
      xprintArray ( "varMapping", varMapping, nVars, DATATYPE_INT  );
234
      for ( i = 0; i < nProcsColl; i++ )
235
	buckets[i] = 0;
Deike Kleberg's avatar
Deike Kleberg committed
236
      for ( i = 0; i < nVars; i ++ )
237
	buckets[commRankGlob2CollID ( *(varMapping + i ))] += * ( vSizes + i );
238
      xprintArray ( "buckets", buckets, nProcsColl, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
239
    }
240
}
Deike Kleberg's avatar
Deike Kleberg committed
241

242
/************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
243

244
245
static void
varsMapNDeco(int nNodes, int *nodeSizes)
Deike Kleberg's avatar
Deike Kleberg committed
246
{
247
  int nStreams, nVars, * resHs, * streamSizes, * varSizes, * varMapping,
Thomas Jahns's avatar
Thomas Jahns committed
248
    * collectsData;
Deike Kleberg's avatar
Deike Kleberg committed
249
  int i, j, k = 0;
Deike Kleberg's avatar
Deike Kleberg committed
250
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
251

252
253
  xdebug ( "START, nProcsColl=%d", nProcsColl );

Deike Kleberg's avatar
Deike Kleberg committed
254
  nStreams = streamSize ();
255

256
257
  resHs       = xmalloc ( nStreams * sizeof ( resHs[0] ));
  streamSizes = xmalloc ( nStreams * sizeof ( streamSizes[0] ));
258
  collectsData = xmalloc ( nProcsColl * sizeof ( collectsData[0] ));
259
  streamGetIndexList ( nStreams, resHs );
260

Deike Kleberg's avatar
Deike Kleberg committed
261
  for ( i = 0; i < nStreams; i++ )
262
    streamSizes[i] = streamInqNvars ( * ( resHs + i ));
263

Deike Kleberg's avatar
Deike Kleberg committed
264
  nVars = xsum ( nStreams, streamSizes );
265
266
  varSizes   = xmalloc ( nVars * sizeof ( varSizes[0] ));
  varMapping = xmalloc ( nVars * sizeof ( varMapping[0] ));
267

Deike Kleberg's avatar
Deike Kleberg committed
268
269
  for ( i = 0; i < nStreams; i++ )
    for ( j = 0; j < * ( streamSizes + i ); j++ )
270
      varSizes[k++] += vlistInqVarSize ( streamInqVlist ( * ( resHs + i )), j );
271

Deike Kleberg's avatar
Deike Kleberg committed
272
  xassert ( k == nVars );
273

274
  varMapGen ( varSizes, streamSizes, varMapping,
Deike Kleberg's avatar
Deike Kleberg committed
275
	      nStreams, nodeSizes, nNodes );
276
277
278

  k = 0;
  for ( i = 0; i < nStreams; i++ )
279
    for ( j = 0; j < * ( streamSizes + i ); j++ )
280
      {
281
        vlistDefVarIOrank ( streamInqVlist ( * ( resHs + i )), j,
282
                            * ( varMapping + k ));
283
        vlistDefVarIOrank ( streamInqVlistIDorig ( * ( resHs + i )), j,
284
285
                            * ( varMapping + k ));
        collectsData[commRankGlob2CollID ( varMapping[k++] )] = 1;
286
      }
287
288
289

  for ( j = 0; j < nProcsColl; j++ )
    if ( collectsData[j] == 0 )
290
291
292
293
      xabort("AT LEAST ONE COLLECTOR PROCESS IDLES, "
             "CURRENTLY NOT COVERED: "
             "PE%d collects no data",
             commCollID2RankGlob(j));
294

295
296
297
298
299
  if ( varMapping )   free ( varMapping );
  if ( varSizes )     free ( varSizes );
  if ( collectsData ) free ( collectsData );
  if ( streamSizes )  free ( streamSizes );
  if ( resHs )        free ( resHs );
300

301
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
302
303
304
305
}

/************************************************************************/

306
static
Deike Kleberg's avatar
Deike Kleberg committed
307
308
void modelWinCleanup ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
309
  int collID;
Deike Kleberg's avatar
Deike Kleberg committed
310

311
  xdebug("%s", "START");
312
  if (txWin != NULL)
313
314
315
316
317
318
319
320
321
322
323
    {
      for ( collID = 0; collID < commInqNProcsColl (); collID ++ )
        {
          if (txWin[collID].postSet)
            xmpi(MPI_Win_wait(txWin[collID].win));
          xmpi(MPI_Win_free(&txWin[collID].win));
          xmpi ( MPI_Free_mem ( txWin[collID].buffer ));
          xmpi(MPI_Group_free(&txWin[collID].ioGroup));
        }
      free(txWin);
    }
Deike Kleberg's avatar
Deike Kleberg committed
324

325
  xdebug("%s", "RETURN. CLEANED UP MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
326
327
328
}

/************************************************************************/
329

330
331
struct collDesc
{
332
  int numDataRecords, numRPCRecords;
333
334
};

335
336
static void
modelWinDefBufferSizes(void)
Deike Kleberg's avatar
Deike Kleberg committed
337
{
338
  int collID, nstreams, * streamIndexList, streamNo, nvars, varID;
339
  int sumWinBufferSize = 0;
340
341
342
  int nProcsColl  = commInqNProcsColl ();
  int rankGlob    = commInqRankGlob ();
  int root = commInqRootGlob ();
343
  struct collDesc *collIndex;
Deike Kleberg's avatar
Deike Kleberg committed
344

345
  xdebug("%s", "START");
346
  xassert(txWin != NULL);
347

Deike Kleberg's avatar
Deike Kleberg committed
348
  nstreams = reshCountType ( &streamOps );
349
  streamIndexList = xmalloc ( nstreams * sizeof ( streamIndexList[0] ));
350
  collIndex = xcalloc(nProcsColl, sizeof (collIndex[0]));
351
  reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
Deike Kleberg's avatar
Deike Kleberg committed
352
353
  for ( streamNo = 0; streamNo < nstreams; streamNo++ )
    {
354
355
356
      // memory required for data
      int streamID = streamIndexList[streamNo];
      int vlistID = streamInqVlist(streamID);
Deike Kleberg's avatar
Deike Kleberg committed
357
358
359
      nvars = vlistNvars ( vlistID );
      for ( varID = 0; varID < nvars; varID++ )
        {
360
361
362
363
364
365
366
367
          int collID = commRankGlob2CollID(vlistInqVarIOrank(vlistID, varID));
          int collIDchunk;
          {
            int varSize = vlistInqVarSize(vlistID, varID);
            int nProcsModel = commInqNProcsModel();
            collIDchunk = (int)ceilf(cdiPIOpartInflate_
                                     * (varSize + nProcsModel - 1)/nProcsModel);
          }
Deike Kleberg's avatar
Deike Kleberg committed
368
          xassert ( collID != CDI_UNDEFID && collIDchunk > 0 );
369
370
371
372
373
374
          collIndex[collID].numDataRecords += 2;
          txWin[collID].size += (size_t)collIDchunk * sizeof (double)
            /* re-align chunks to multiple of double size */
            + sizeof (double) - 1
            /* one header for data record, one for corresponding part
             * descriptor*/
375
            + 2 * sizeof (struct winHeaderEntry)
376
377
            /* FIXME: heuristic for size of packed Xt_idxlist */
            + sizeof (Xt_int) * collIDchunk * 3;
Deike Kleberg's avatar
Deike Kleberg committed
378
        }
379

380
381
      // memory required for the function calls encoded
      // for remote execution
Deike Kleberg's avatar
Deike Kleberg committed
382
      // once per stream and timestep for all collprocs only on the modelproc root
383
384
      if ( rankGlob == root )
        for ( collID = 0; collID < nProcsColl; collID++ )
385
          {
386
            collIndex[collID].numRPCRecords += numRPCFuncs;
387
            txWin[collID].size +=
388
              numRPCFuncs * sizeof (struct winHeaderEntry)
389
390
391
              + MAXDATAFILENAME
              /* data part of streamDefTimestep */
              + (2 * CDI_MAX_NAME + sizeof (taxis_t));
392
          }
Deike Kleberg's avatar
Deike Kleberg committed
393
    }
394
  for (collID = 0; collID < nProcsColl; ++collID)
395
    {
396
397
      int numRecords = 1 + collIndex[collID].numDataRecords
        + collIndex[collID].numRPCRecords;
398
      txWin[collID].dictSize = numRecords;
399
400
      txWin[collID].dictDataUsed = 1;
      txWin[collID].dictRPCUsed = 0;
401
      /* account for size header */
402
      txWin[collID].size += sizeof (struct winHeaderEntry);
403
404
      txWin[collID].size = roundUpToMultiple(txWin[collID].size,
                                             PIO_WIN_ALIGN);
405
      sumWinBufferSize += txWin[collID].size;
406
    }
407
408
409
  free(collIndex);
  free ( streamIndexList );

410
411
  xdebug("sumWinBufferSize=%zu, MAXWINBUFFERSIZE=%zu", (size_t)sumWinBufferSize,
         (size_t)MAXWINBUFFERSIZE);
Deike Kleberg's avatar
Deike Kleberg committed
412
  xassert ( sumWinBufferSize <= MAXWINBUFFERSIZE );
413
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
414
}
415

Deike Kleberg's avatar
Deike Kleberg committed
416
417
418
419

/************************************************************************/


420
static
Deike Kleberg's avatar
Deike Kleberg committed
421
  void modelWinFlushBuffer ( int collID )
Deike Kleberg's avatar
Deike Kleberg committed
422
{
Deike Kleberg's avatar
Deike Kleberg committed
423
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
424

Deike Kleberg's avatar
Deike Kleberg committed
425
426
  xassert ( collID                >= 0         &&
            collID                < nProcsColl &&
427
            txWin[collID].buffer     != NULL      &&
428
429
            txWin != NULL      &&
            txWin[collID].size >= 0         &&
430
            txWin[collID].size <= MAXWINBUFFERSIZE);
431
  memset(txWin[collID].buffer, 0, txWin[collID].size);
432
  txWin[collID].head = txWin[collID].buffer
433
    + txWin[collID].dictSize * sizeof (struct winHeaderEntry);
434
  txWin[collID].refuseFuncCall = 0;
435
436
  txWin[collID].dictDataUsed = 1;
  txWin[collID].dictRPCUsed = 0;
Deike Kleberg's avatar
Deike Kleberg committed
437
438
439
}


Deike Kleberg's avatar
Deike Kleberg committed
440
441
442
/************************************************************************/


Deike Kleberg's avatar
Deike Kleberg committed
443
444
445
static
void modelWinCreate ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
446
  int collID, ranks[1];
447
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
448

449
  xdebug("%s", "START");
450
  txWin = xmalloc(nProcsColl * sizeof (txWin[0]));
Deike Kleberg's avatar
Deike Kleberg committed
451

Deike Kleberg's avatar
Deike Kleberg committed
452
  modelWinDefBufferSizes ();
453
  ranks[0] = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
454

Deike Kleberg's avatar
Deike Kleberg committed
455
  for ( collID = 0; collID < nProcsColl; collID ++ )
Deike Kleberg's avatar
Deike Kleberg committed
456
    {
457
      xassert(txWin[collID].size > 0);
458
      txWin[collID].buffer = NULL;
459
      xmpi(MPI_Alloc_mem((MPI_Aint)txWin[collID].size, MPI_INFO_NULL,
460
461
                         &txWin[collID].buffer));
      xassert ( txWin[collID].buffer != NULL );
462
      txWin[collID].head = txWin[collID].buffer
463
        + txWin[collID].dictSize * sizeof (struct winHeaderEntry);
464
      xmpi(MPI_Win_create(txWin[collID].buffer, (MPI_Aint)txWin[collID].size, 1,
465
                          MPI_INFO_NULL, commInqCommsIO(collID),
466
                          &txWin[collID].win));
467
468
469
      xmpi(MPI_Comm_group(commInqCommsIO(collID), &txWin[collID].ioGroup));
      xmpi(MPI_Group_incl(txWin[collID].ioGroup, 1, ranks,
                          &txWin[collID].ioGroup ));
Deike Kleberg's avatar
Deike Kleberg committed
470
    }
471
  xdebug("%s", "RETURN, CREATED MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
472
473
474
475
}

/************************************************************************/

476
static void
477
modelWinEnqueue(int collID,
478
                struct winHeaderEntry header, const void *data, size_t size)
Deike Kleberg's avatar
Deike Kleberg committed
479
{
480
481
  struct winHeaderEntry *winDict
    = (struct winHeaderEntry *)txWin[collID].buffer;
482
  int targetEntry;
483
  if (header.id > 0)
484
    {
485
      targetEntry = (txWin[collID].dictDataUsed)++;
486
      int offset = header.offset
487
488
489
490
491
        = (int)roundUpToMultiple(txWin[collID].head - txWin[collID].buffer,
                                 sizeof (double));
      memcpy(txWin[collID].buffer + offset, data, size);
      txWin[collID].head = txWin[collID].buffer + offset + size;
    }
492
  else if (header.id == PARTDESCMARKER)
493
494
    {
      targetEntry = (txWin[collID].dictDataUsed)++;
495
      Xt_uid uid = header.specific.partDesc.uid;
496
497
498
499
      int offset = -1;
      /* search if same uid entry has already been enqueued */
      for (int entry = 2; entry < targetEntry; entry += 2)
        {
500
          xassert(winDict[entry].id == PARTDESCMARKER);
501
          if (winDict[entry].specific.partDesc.uid == uid)
502
            {
503
              offset = winDict[entry].offset;
504
505
506
507
508
509
510
511
512
              break;
            }
        }
      if (offset == -1)
        {
          /* not yet used partition descriptor, serialize at
           * current position */
          int position = 0;
          MPI_Comm comm = commInqCommsIO(collID);
513
          header.offset
514
515
516
517
518
519
520
521
522
523
524
            = (int)(txWin[collID].head - txWin[collID].buffer);
          size_t size = xt_idxlist_get_pack_size((Xt_idxlist)data, comm);
          size_t remaining_size = txWin[collID].size
            - (txWin[collID].head - txWin[collID].buffer);
          xassert(size <= remaining_size);
          xt_idxlist_pack((Xt_idxlist)data, txWin[collID].head,
                          (int)remaining_size, &position, comm);
          txWin[collID].head += position;
        }
      else
        /* duplicate entries are copied only once per timestep */
525
        header.offset = offset;
526
    }
527
  else
528
    {
529
      targetEntry = txWin[collID].dictSize - ++(txWin[collID].dictRPCUsed);
530
      if (header.id == STREAMOPEN)
531
        {
532
          header.offset
533
534
535
536
            = (int)(txWin[collID].head - txWin[collID].buffer);
          memcpy(txWin[collID].head, data, size);
          txWin[collID].head += size;
        }
537
      else if (header.id == STREAMDEFTIMESTEP)
538
        {
539
          header.offset
540
541
542
543
            = (int)(txWin[collID].head - txWin[collID].buffer);
          memcpy(txWin[collID].head, data, size);
          txWin[collID].head += size;
        }
544
    }
545
  winDict[targetEntry] = header;
Deike Kleberg's avatar
Deike Kleberg committed
546
547
}

548
549
550
551
552
553
void
pioBufferPartData(int streamID, int varID, const double *data,
                  int nmiss, Xt_idxlist partDesc)
{
  int vlistID, collID = CDI_UNDEFID;

Deike Kleberg's avatar
Deike Kleberg committed
554
555
  vlistID  = streamInqVlist ( streamID );
  collID   = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
556
557
  xassert ( collID         >= 0                    &&
            collID         <  commInqNProcsColl () &&
558
            txWin != NULL);
Deike Kleberg's avatar
Deike Kleberg committed
559

560
  if (txWin[collID].postSet)
561
    {
562
      xmpi(MPI_Win_wait(txWin[collID].win));
563
      txWin[collID].postSet = 0;
Deike Kleberg's avatar
Deike Kleberg committed
564
      modelWinFlushBuffer ( collID );
565
    }
566

567
568
569
  Xt_int chunk = xt_idxlist_get_num_indices(partDesc);
  xassert(chunk <= INT_MAX);

570
  struct winHeaderEntry dataHeader
571
    = { .id = streamID, .specific.dataRecord = { varID, nmiss }, .offset = -1 };
572
573
  modelWinEnqueue(collID, dataHeader, data, chunk * sizeof (data[0]));
  {
574
    struct winHeaderEntry partHeader
575
576
      = { .id = PARTDESCMARKER,
          .specific.partDesc = { .uid = xt_idxlist_get_uid(partDesc) },
577
          .offset = 0 };
578
579
    modelWinEnqueue(collID, partHeader, partDesc, 0);
  }
580

581
  txWin[collID].refuseFuncCall = 1;
Deike Kleberg's avatar
Deike Kleberg committed
582
583
}

584
585
/************************************************************************/

586
void pioBufferFuncCall(struct winHeaderEntry header,
587
                       const void *data, size_t data_len)
588
589
590
591
{
  int rankGlob = commInqRankGlob ();
  int root = commInqRootGlob ();
  int collID, nProcsColl = commInqNProcsColl ();
592
  int funcID = header.id;
593

594
595
  xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
  xdebug("%s, func: %s", "START", funcMap[(-1 - funcID)]);
596

Deike Kleberg's avatar
Deike Kleberg committed
597
598
  if ( rankGlob != root ) return;

599
  xassert(txWin != NULL);
600

Thomas Jahns's avatar
Thomas Jahns committed
601
602
603
604
605
606
607
608
  for (collID = 0; collID < nProcsColl; ++collID)
    {
      if (txWin[collID].postSet)
        {
          xmpi(MPI_Win_wait(txWin[collID].win));
          txWin[collID].postSet = 0;
          modelWinFlushBuffer ( collID );
        }
609
610
      xassert(txWin[collID].dictRPCUsed + txWin[collID].dictDataUsed
              < txWin[collID].dictSize);
Thomas Jahns's avatar
Thomas Jahns committed
611
      xassert(txWin[collID].refuseFuncCall == 0);
612
      modelWinEnqueue(collID, header, data, data_len);
613
    }
614

615
  xdebug("%s", "RETURN");
616
617
}

618
619
620
621
#endif

/*****************************************************************************/

622
623
624
625
/* pioInit definition must currently compile even in non-MPI configurations */
#ifndef MPI_VERSION
#  define MPI_Comm int
#endif
626
627
628
/**
   @brief initializes the MPI_Communicators needed for the
  communication between the calculator PEs and the I/O PEs and within the
629
630
631
632
633
634
635
636
637
  group of I/O PEs.

  commGlob: all PEs

  commPIO: I/O PEs, PEs with highest ranks in commGlob

  commModel: calculating PEs, no I/O PEs

  commsIO[i]:
638
639

  Collective call
640
641
642

  @param comm MPI_Communicator of all calling PEs
  @param nIOP number of I/O PEs
643
644
645
  @param partInflate allow for array partitions on comute
  PE that are at most sized \f$ partInflate * \lceil arraySize /
  numComputePEs \rceil \f$
646
647
648
  @return int indicating wether the calling PE is a calcutator (1) or not (0)
*/

Thomas Jahns's avatar
Thomas Jahns committed
649
650
#ifdef USE_MPI
static int pioNamespace_ = -1;
651
static int xtInitByCDI = 0;
Thomas Jahns's avatar
Thomas Jahns committed
652
653
#endif

654
655
MPI_Comm
pioInit(MPI_Comm commGlob, int nProcsIO, int IOMode,
Thomas Jahns's avatar
Thomas Jahns committed
656
        int *pioNamespace, float partInflate)
657
{
658
#ifdef USE_MPI
Thomas Jahns's avatar
Thomas Jahns committed
659
  int sizeGlob;
Deike Kleberg's avatar
Deike Kleberg committed
660

Deike Kleberg's avatar
Deike Kleberg committed
661
  if ( IOMode < PIO_MINIOMODE || IOMode > PIO_MAXIOMODE )
Deike Kleberg's avatar
Deike Kleberg committed
662
    xabort ( "IOMODE IS NOT VALID." );
663

664
#ifdef _SX
Deike Kleberg's avatar
Deike Kleberg committed
665
666
  if ( IOMode ==  PIO_ASYNCH )
    xabort ( "PIO_ASYNCH DOES NOT WORK ON SX." );
667
668
#endif

669
670
671
672
  if ((xtInitByCDI = (!xt_initialized() || xt_finalized())))
    {
      xt_initialize(commGlob);
    }
673
674
  commInit ();
  commDefCommGlob ( commGlob );
675
  sizeGlob = commInqSizeGlob ();
676

677
678
  if (((IOMode != PIO_NONE && (nProcsIO <= 0 || nProcsIO > sizeGlob - 1)))
      || (IOMode == PIO_NONE && nProcsIO != 1))
679
680
    xabort("DISTRIBUTION OF TASKS ON PROCS IS NOT VALID.\n"
           "nProcsIO=%d, sizeGlob=%d\n", nProcsIO, sizeGlob);
Deike Kleberg's avatar
Deike Kleberg committed
681
682

  commDefNProcsIO ( nProcsIO );
Deike Kleberg's avatar
Deike Kleberg committed
683
  commDefIOMode   ( IOMode, PIO_MAXIOMODE, PIO_MINIOMODEWITHSPECIALPROCS );
Deike Kleberg's avatar
Deike Kleberg committed
684
  commDefCommPio  ();
685

686
687
688
  xassert(partInflate >= 1.0);
  cdiPIOpartInflate_ = partInflate;

Deike Kleberg's avatar
Deike Kleberg committed
689
  // JUST FOR TEST CASES WITH ONLY ONE MPI TASK
690
  if ( commInqSizeGlob () == 1 )
691
    {
692
      pioNamespace_ = *pioNamespace = namespaceNew();
Deike Kleberg's avatar
Deike Kleberg committed
693
694
      return commInqCommGlob ();
    }
695

696
697
  if ( commInqIsProcIO ())
    {
698
      serializeSetMPI();
699
700
701
      namespaceSwitchSet(NSSWITCH_ABORT, NSSW_FUNC(cdiAbortC_MPI));
      namespaceSwitchSet(NSSWITCH_FILE_OPEN, NSSW_FUNC(pioFileOpen));
      namespaceSwitchSet(NSSWITCH_FILE_CLOSE, NSSW_FUNC(pioFileClose));
702
      IOServer ();
Thomas Jahns's avatar
Thomas Jahns committed
703
      namespaceDelete(0);
704
      commDestroy ();
705
      xt_finalize();
706
      MPI_Finalize ();
707
      exit ( EXIT_SUCCESS );
708
709
    }
  else
710
    cdiPioClientSetup(&pioNamespace_, pioNamespace);
711

712
  xdebug ( "nProcsGlob=%d, RETURN", sizeGlob );
713
  return commInqCommModel ();
714
#else
715
  abort();
716
#endif
717
}
718

719
720
#ifndef MPI_VERSION
#  undef MPI_Comm
Deike Kleberg's avatar
Deike Kleberg committed
721
722
#endif

723
/*****************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
724

725
void  pioEndDef ( void )
726
727
{
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
728
  char   * buffer;
729
  int bufferSize;
730
  int rankGlob = commInqRankGlob ();
731

732
  xdebug("%s", "START");
733

Deike Kleberg's avatar
Deike Kleberg committed
734
  varsMapNDeco ( commInqNNodes (), commInqNodeSizes ());
Deike Kleberg's avatar
Deike Kleberg committed
735

736
  if ( rankGlob < commInqNProcsColl ())
Deike Kleberg's avatar
Deike Kleberg committed
737
    {
738
739
      MPI_Comm comm = commInqCommsIO ( rankGlob );
      reshPackBufferCreate(&buffer, &bufferSize, &comm);
740
741

      xmpi ( MPI_Send ( buffer, bufferSize, MPI_PACKED, commInqNProcsModel (),
742
                        RESOURCES, commInqCommsIO ( rankGlob )));
743

744
      xdebug("%s", "SENT MESSAGE WITH TAG \"RESOURCES\"");
745

Deike Kleberg's avatar
Deike Kleberg committed
746
      reshPackBufferDestroy ( &buffer );
Deike Kleberg's avatar
Deike Kleberg committed
747
    }
Deike Kleberg's avatar
Deike Kleberg committed
748
749

  modelWinCreate ();
750
  namespaceDefResStatus ( STAGE_TIMELOOP );
751
  xdebug("%s", "RETURN");
752
753
754
755
756
757
758
759
#endif
}

/************************************************************************/

void  pioEndTimestepping ( void )
{
#ifdef USE_MPI
760
  xdebug("%s", "START");
761
  namespaceDefResStatus ( STAGE_CLEANUP );
762
  xdebug("%s", "RETURN");
763
#endif
764
765
}

Deike Kleberg's avatar
Deike Kleberg committed
766

767
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
768

Deike Kleberg's avatar
Deike Kleberg committed
769

Deike Kleberg's avatar
Deike Kleberg committed
770
771
772
773
774
775
776
777
778
/**
  @brief is invoked by the calculator PEs, to inform
  the I/O PEs that no more data will be written.

  @param

  @return
*/

Deike Kleberg's avatar
Deike Kleberg committed
779
void pioFinalize ( void )
Deike Kleberg's avatar
Deike Kleberg committed
780
{
781
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
782
  int collID, ibuffer = 1111;
783
  xdebug("%s", "START");
Thomas Jahns's avatar
Thomas Jahns committed
784
  namespaceDelete(pioNamespace_);
Deike Kleberg's avatar
Deike Kleberg committed
785
  for ( collID = 0; collID < commInqNProcsColl (); collID++ )
786
    {
787
      xmpi ( MPI_Send ( &ibuffer, 1, MPI_INT, commInqNProcsModel (),
Deike Kleberg's avatar
Deike Kleberg committed
788
                        FINALIZE, commInqCommsIO ( collID )));
789
      xdebug("%s", "SENT MESSAGE WITH TAG \"FINALIZE\"");
790
    }
Deike Kleberg's avatar
Deike Kleberg committed
791
  modelWinCleanup ();
792
  commDestroy ();
793
794
  if (xtInitByCDI)
    xt_finalize();
795
  xdebug("%s", "RETURN");
796
#endif
797
798
}

799
 /************************************************************************/
800

801
void pioWriteTimestep()
802
803
{
#ifdef USE_MPI
804
  int collID, iAssert = 0;
805
  /* int tokenEnd = END; */
806
  int rankGlob = commInqRankGlob ();
807
808
  int nProcsColl = commInqNProcsColl ();
  int nProcsModel = commInqNProcsModel ();
809

810
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
811

812
  xassert(txWin != NULL);
813
814

  if ( rankGlob < nProcsColl )
Deike Kleberg's avatar
Deike Kleberg committed
815