pio_interface.c 24.6 KB
Newer Older
1
2
3
4
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

Deike Kleberg's avatar
Deike Kleberg committed
5
#include <stdlib.h>
6
#include <stdio.h>
7
#include <stdarg.h>
8
#include "cdi.h"
9
#include "limits.h"
Deike Kleberg's avatar
Deike Kleberg committed
10
#include "pio_util.h"
Deike Kleberg's avatar
Deike Kleberg committed
11
#include "vlist_var.h"
12
13

#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
14
#include "namespace.h"
15
#include "pio_interface.h"
Deike Kleberg's avatar
Deike Kleberg committed
16
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
17
#include "pio_rpc.h"
Deike Kleberg's avatar
Deike Kleberg committed
18
#include "pio_server.h"
19
#include "resource_handle.h"
Deike Kleberg's avatar
Deike Kleberg committed
20
#include "stream_int.h"
Deike Kleberg's avatar
Deike Kleberg committed
21
#include "vlist.h"
Deike Kleberg's avatar
Deike Kleberg committed
22
extern resOps streamOps;
Deike Kleberg's avatar
Deike Kleberg committed
23

24

25
static struct rdmaWin
26
27
{
  size_t size;
28
  unsigned char *buffer, *head;
29
  MPI_Win win;
30
  int postSet, refuseFuncCall;
31
  MPI_Group ioGroup;
32
  int dictSize, dictUsed;
33
34
} *txWin = NULL;

Deike Kleberg's avatar
Deike Kleberg committed
35

Deike Kleberg's avatar
Deike Kleberg committed
36
char * funcMap[nFuncs] = {"streamOpen", "streamDefVlist", "streamClose" };
Deike Kleberg's avatar
Deike Kleberg committed
37
38


Deike Kleberg's avatar
Deike Kleberg committed
39
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
40

41
static int cmp ( const void * va, const void * vb )
Deike Kleberg's avatar
Deike Kleberg committed
42
{
43
44
    const int ** a, ** b;

Deike Kleberg's avatar
Deike Kleberg committed
45
46
  a = ( const int ** ) va;
  b = ( const int ** ) vb;
47

Deike Kleberg's avatar
Deike Kleberg committed
48
49
50
51
  return (( **a < **b ) - ( **a > **b ));
}

/****************************************************/
52

53
54
55
static void
mapProblems(int problemSizes[], int * problemMapping, int nProblems,
            int nWriter, double * w)
Deike Kleberg's avatar
Deike Kleberg committed
56
57
58
59
60
61
62
{
  int *ip[nProblems];
  int dummy[nProblems];
  int buckets[nWriter];
  int currCapacity, nextCapacity;
  double meanBucket[nWriter];
  int sum = 0;
Thomas Jahns's avatar
Thomas Jahns committed
63
  int writerIdx = 0;
Deike Kleberg's avatar
Deike Kleberg committed
64
  int i, j;
65

Deike Kleberg's avatar
Deike Kleberg committed
66
67
68
69
70
71
72
73
74
75
76

  for ( i = 0; i < nProblems; i++ )
    {
      ip[i] = &problemSizes[i];
      sum += problemSizes[i];
    }

  qsort ( ip, nProblems, sizeof ( int * ), cmp );

  for ( i = 0; i < nProblems; i++ )
    dummy[i] = ip[i] - problemSizes;
77
78

  for ( j = 0; j < nWriter; j++ )
Deike Kleberg's avatar
Deike Kleberg committed
79
80
81
82
83
    meanBucket[j] = ( double ) sum * ( * ( w + j ));

  for ( i = 0; i < nProblems; i++ )
    {
      currCapacity = INT_MIN;
84

Deike Kleberg's avatar
Deike Kleberg committed
85
      for ( j = 0; j < nWriter; j++ )
86
87
88
	{
	  if ( !i ) buckets[j] = 0.0;
	  nextCapacity = meanBucket[j] - ( buckets[j] + ( *ip[i] ));
89

90
91
92
93
94
95
	  if ( nextCapacity > currCapacity )
	    {
	      currCapacity = nextCapacity;
	      writerIdx = j;
	    }
	}
Thomas Jahns's avatar
Thomas Jahns committed
96
      problemMapping[ dummy[i] ] = writerIdx;
Deike Kleberg's avatar
Deike Kleberg committed
97
98
      buckets[writerIdx] +=  *ip[i];
    }
Deike Kleberg's avatar
Deike Kleberg committed
99

100
101
102
  xprintArray3 (  "problemSizes = ", problemSizes, nProblems, DATATYPE_INT );
  xprintArray3 ( "vector of indices, qsort of problemSizes", dummy,
                nProblems, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
103
104
  xprintArray3 ( "problemMapping", problemMapping, nProblems, DATATYPE_INT );
  xprintArray3 ( "meanBucket", meanBucket, nWriter, DATATYPE_FLT );
105
  xprintArray3 ( "actual buckets", buckets, nWriter, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
106
107
}

108
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
109
110
111
112
113

/**
   @brief is encapsulated in CDI library.

   @param vSizes array with number of levels for all var_t 's in order of streams
114

Deike Kleberg's avatar
Deike Kleberg committed
115
116
   @param sSizes array with number of var_t for all stream_t 's

117
   @param varMapping return value, array with ranks of I/O PEs assigned to var_t 's
Deike Kleberg's avatar
Deike Kleberg committed
118
119
120
in order of vSizes

   @param nStreams number of stream_t 's
121
122

   @param nodeSizes array of number of I/O PEs on each physical nodes, increasing
Deike Kleberg's avatar
Deike Kleberg committed
123
124
125
   order of ranks assumed

   @param nNodes number of physical nodes hosting I/O PEs
126

Deike Kleberg's avatar
Deike Kleberg committed
127
128
   @return
*/
129

130
131
132
static void
varMapGen(int *vSizes, int *sSizes, int *varMapping,
          int nStreams, int *nodeSizes, int nNodes)
Deike Kleberg's avatar
Deike Kleberg committed
133
{
134

Deike Kleberg's avatar
Deike Kleberg committed
135
136
137
138
139
  int weightsStreams[nStreams];
  int streamMapping[nStreams];
  int nPEs = 0, nVars = 0;
  int i, j, k, offset = 0, offsetN = 0;
  double * w;
140

Deike Kleberg's avatar
Deike Kleberg committed
141
142
143
  int * weightsVarsNode;
  int * varMappingNode;
  int nVarsNode, summandRank = 0;
144
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
145

146
  int buckets[nProcsColl];
Deike Kleberg's avatar
Deike Kleberg committed
147
148
149
150
151
152

  for ( i = 0; i < nStreams; i++ )
    {
      nVars += * ( sSizes + i );
      weightsStreams[i] = 0;
      for ( j = 0; j < * ( sSizes + i ); j++ )
153
	weightsStreams[i] += * ( vSizes + offset++ );
Deike Kleberg's avatar
Deike Kleberg committed
154
155
    }

Deike Kleberg's avatar
Deike Kleberg committed
156
  w = ( double * ) xmalloc ( nNodes * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
157
158
159
160
161
162
163
164
165
166
167
168
169
  for ( j = 0; j < nNodes; j++ )
    nPEs += * ( nodeSizes + j );

  for ( j = 0; j < nNodes; j++ )
    w[j] = ( double ) * ( nodeSizes + j ) / ( double ) nPEs;

  mapProblems ( weightsStreams, streamMapping, nStreams, nNodes, w );
  free ( w );

  for ( i = 0; i < nNodes; i++ )
    {
      nVarsNode = 0;
      for ( j = 0; j < nStreams; j++ )
170
	if ( * ( streamMapping + j ) == i )
171
172
	  nVarsNode += * ( sSizes + j );

Deike Kleberg's avatar
Deike Kleberg committed
173
174
175
      weightsVarsNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      varMappingNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      w = ( double * ) xmalloc ( * ( nodeSizes + i ) * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
176
177
178
179
      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
180
181
182
183
184
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
	    * ( weightsVarsNode + offsetN++ ) = * ( vSizes + offset++ );
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
185
186

      for ( j = 0; j < * ( nodeSizes + i ); j ++ )
187
	w[j] = 1.0 / ( double ) * ( nodeSizes + i );
Deike Kleberg's avatar
Deike Kleberg committed
188

189
      mapProblems ( weightsVarsNode, varMappingNode, nVarsNode,
190
		    * ( nodeSizes + i ),  w );
Deike Kleberg's avatar
Deike Kleberg committed
191
192
193
194
195

      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
196
197
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
198
	    * ( varMapping + offset ++ ) =
199
              commCollID2RankGlob ( * ( varMappingNode + offsetN ++ ) +
200
                                    summandRank );
201
202
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
203
204
205
206
207
208
209

      summandRank += * ( nodeSizes + i );

      free ( w );
      free ( varMappingNode );
      free ( weightsVarsNode );
    }
210

211
  if ( ddebug )
Deike Kleberg's avatar
Deike Kleberg committed
212
    {
Deike Kleberg's avatar
Deike Kleberg committed
213
      xprintArray ( "varMapping", varMapping, nVars, DATATYPE_INT  );
214
      for ( i = 0; i < nProcsColl; i++ )
215
	buckets[i] = 0;
Deike Kleberg's avatar
Deike Kleberg committed
216
      for ( i = 0; i < nVars; i ++ )
217
	buckets[commRankGlob2CollID ( *(varMapping + i ))] += * ( vSizes + i );
218
      xprintArray ( "buckets", buckets, nProcsColl, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
219
    }
220
}
Deike Kleberg's avatar
Deike Kleberg committed
221

222
/************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
223

224
225
static void
defVarDeco(int vlistID, int varID)
Deike Kleberg's avatar
Deike Kleberg committed
226
227
{
  int varSize, cRank, lChunk, rem, lOffset;
228
229
  int nProcsModel = commInqNProcsModel ();
  deco_t deco[nProcsModel];
230

Deike Kleberg's avatar
Deike Kleberg committed
231
232
  varSize = vlistInqVarSize ( vlistID, varID );

233
  for ( cRank = 0; cRank < nProcsModel; cRank++ )
Deike Kleberg's avatar
Deike Kleberg committed
234
    {
235
      lChunk = varSize / nProcsModel;
Deike Kleberg's avatar
Deike Kleberg committed
236
      lOffset = cRank * lChunk;
237
      rem = varSize % nProcsModel;
238
      if ( cRank < rem )
Deike Kleberg's avatar
Deike Kleberg committed
239
240
241
242
243
244
        {
          lChunk++;
          lOffset += cRank;
        }
      else
        lOffset += rem;
245

Deike Kleberg's avatar
Deike Kleberg committed
246
247
248
249
      deco[cRank].rank   = cRank;
      deco[cRank].offset = lOffset;
      deco[cRank].chunk  = lChunk;
    }
250
  vlistDefVarDeco ( vlistID, varID, nProcsModel, &deco[0] );
Deike Kleberg's avatar
Deike Kleberg committed
251
252
}

253
/************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
254

Deike Kleberg's avatar
Deike Kleberg committed
255

256
257
static void
varsMapNDeco(int nNodes, int *nodeSizes)
Deike Kleberg's avatar
Deike Kleberg committed
258
{
259
  int nStreams, nVars, * resHs, * streamSizes, * varSizes, * varMapping,
Thomas Jahns's avatar
Thomas Jahns committed
260
    * collectsData;
Deike Kleberg's avatar
Deike Kleberg committed
261
  int i, j, k = 0;
Deike Kleberg's avatar
Deike Kleberg committed
262
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
263
  char text[1024];
Deike Kleberg's avatar
Deike Kleberg committed
264

265
266
  xdebug ( "START, nProcsColl=%d", nProcsColl );

Deike Kleberg's avatar
Deike Kleberg committed
267
  nStreams = streamSize ();
268

269
270
  resHs       = xmalloc ( nStreams * sizeof ( resHs[0] ));
  streamSizes = xmalloc ( nStreams * sizeof ( streamSizes[0] ));
271
  collectsData = xmalloc ( nProcsColl * sizeof ( collectsData[0] ));
272
  streamGetIndexList ( nStreams, resHs );
273

Deike Kleberg's avatar
Deike Kleberg committed
274
  for ( i = 0; i < nStreams; i++ )
275
    streamSizes[i] = streamInqNvars ( * ( resHs + i ));
276

Deike Kleberg's avatar
Deike Kleberg committed
277
  nVars = xsum ( nStreams, streamSizes );
278
279
  varSizes   = xmalloc ( nVars * sizeof ( varSizes[0] ));
  varMapping = xmalloc ( nVars * sizeof ( varMapping[0] ));
280

Deike Kleberg's avatar
Deike Kleberg committed
281
282
  for ( i = 0; i < nStreams; i++ )
    for ( j = 0; j < * ( streamSizes + i ); j++ )
283
      varSizes[k++] += vlistInqVarSize ( streamInqVlist ( * ( resHs + i )), j );
284

Deike Kleberg's avatar
Deike Kleberg committed
285
  xassert ( k == nVars );
286

287
  varMapGen ( varSizes, streamSizes, varMapping,
Deike Kleberg's avatar
Deike Kleberg committed
288
	      nStreams, nodeSizes, nNodes );
289
290
291

  k = 0;
  for ( i = 0; i < nStreams; i++ )
292
    for ( j = 0; j < * ( streamSizes + i ); j++ )
293
      {
294
295
296
        defVarDeco ( streamInqVlist ( *( resHs + i )), j );
        defVarDeco ( streamInqVlistIDorig ( * ( resHs + i )), j );
        vlistDefVarIOrank ( streamInqVlist ( * ( resHs + i )), j,
297
                            * ( varMapping + k ));
298
        vlistDefVarIOrank ( streamInqVlistIDorig ( * ( resHs + i )), j,
299
300
                            * ( varMapping + k ));
        collectsData[commRankGlob2CollID ( varMapping[k++] )] = 1;
301
      }
302
303
304
305

  for ( j = 0; j < nProcsColl; j++ )
    if ( collectsData[j] == 0 )
      {
306
        sprintf ( text,
307
308
309
310
311
312
                  "\nAT LEAST ONE COLLECTOR PROCESS IDLES, "
                  "CURRENTLY NOT COVERED: "
                  "PE%d collects no data",
                  commCollID2RankGlob ( j ));
        xabort ( text );
      }
313

314
315
316
317
318
  if ( varMapping )   free ( varMapping );
  if ( varSizes )     free ( varSizes );
  if ( collectsData ) free ( collectsData );
  if ( streamSizes )  free ( streamSizes );
  if ( resHs )        free ( resHs );
319

320
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
321
322
323
324
}

/************************************************************************/

325
static
Deike Kleberg's avatar
Deike Kleberg committed
326
327
void modelWinCleanup ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
328
  int collID;
Deike Kleberg's avatar
Deike Kleberg committed
329

330
  xdebug("%s", "START");
331
  if (txWin != NULL)
332
333
334
335
336
337
338
339
340
341
342
    {
      for ( collID = 0; collID < commInqNProcsColl (); collID ++ )
        {
          if (txWin[collID].postSet)
            xmpi(MPI_Win_wait(txWin[collID].win));
          xmpi(MPI_Win_free(&txWin[collID].win));
          xmpi ( MPI_Free_mem ( txWin[collID].buffer ));
          xmpi(MPI_Group_free(&txWin[collID].ioGroup));
        }
      free(txWin);
    }
Deike Kleberg's avatar
Deike Kleberg committed
343

344
  xdebug("%s", "RETURN. CLEANED UP MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
345
346
347
}

/************************************************************************/
348

349
350
351
352
353
struct collDesc
{
  int numRecords;
};

354
355
static void
modelWinDefBufferSizes(void)
Deike Kleberg's avatar
Deike Kleberg committed
356
{
357
  int collID, nstreams, * streamIndexList, streamNo, nvars, varID;
Deike Kleberg's avatar
Deike Kleberg committed
358
  int collIDchunk = 0, sumWinBufferSize = 0;
359
360
  int nProcsColl  = commInqNProcsColl ();
  int rankGlob    = commInqRankGlob ();
361
  int rankModel   = commInqRankModel ();
362
  int root = commInqRootGlob ();
363
  struct collDesc *collIndex;
Deike Kleberg's avatar
Deike Kleberg committed
364

365
  xdebug("%s", "START");
366
  xassert(txWin != NULL);
367

Deike Kleberg's avatar
Deike Kleberg committed
368
  nstreams = reshCountType ( &streamOps );
369
  streamIndexList = xmalloc ( nstreams * sizeof ( streamIndexList[0] ));
370
  collIndex = xcalloc(nProcsColl, sizeof (collIndex[0]));
371
  reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
Deike Kleberg's avatar
Deike Kleberg committed
372
373
  for ( streamNo = 0; streamNo < nstreams; streamNo++ )
    {
374
375
376
      // memory required for data
      int streamID = streamIndexList[streamNo];
      int vlistID = streamInqVlist(streamID);
Deike Kleberg's avatar
Deike Kleberg committed
377
378
379
      nvars = vlistNvars ( vlistID );
      for ( varID = 0; varID < nvars; varID++ )
        {
Deike Kleberg's avatar
Deike Kleberg committed
380
381
          collID = CDI_UNDEFID;
          collID = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
382
          collIDchunk = vlistInqVarDecoChunk ( vlistID, varID, rankModel );
Deike Kleberg's avatar
Deike Kleberg committed
383
          xassert ( collID != CDI_UNDEFID && collIDchunk > 0 );
384
          ++(collIndex[collID].numRecords);
385
          txWin[collID].size += collIDchunk * sizeof (double) +
386
            sizeof (union winHeaderEntry);
Deike Kleberg's avatar
Deike Kleberg committed
387
        }
388

389
      // memory required for the 3 function calls streamOpen, streamDefVlist, streamClose
Deike Kleberg's avatar
Deike Kleberg committed
390
      // once per stream and timestep for all collprocs only on the modelproc root
391
392
      if ( rankGlob == root )
        for ( collID = 0; collID < nProcsColl; collID++ )
393
394
395
396
397
398
          {
            collIndex[collID].numRecords += 3;
            txWin[collID].size +=
              3 * sizeof (union winHeaderEntry)
              + MAXDATAFILENAME;
          }
Deike Kleberg's avatar
Deike Kleberg committed
399
    }
400
  for (collID = 0; collID < nProcsColl; ++collID)
401
    {
402
403
404
405
      int numRecords = ++(collIndex[collID].numRecords);
      txWin[collID].dictSize = numRecords;
      txWin[collID].dictUsed = 1;
      txWin[collID].size += sizeof (union winHeaderEntry);
406
      sumWinBufferSize += txWin[collID].size;
407
    }
408
409
410
  free(collIndex);
  free ( streamIndexList );

411
412
  xdebug("sumWinBufferSize=%zu, MAXWINBUFFERSIZE=%zu", (size_t)sumWinBufferSize,
         (size_t)MAXWINBUFFERSIZE);
Deike Kleberg's avatar
Deike Kleberg committed
413
  xassert ( sumWinBufferSize <= MAXWINBUFFERSIZE );
414
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
415
}
416

Deike Kleberg's avatar
Deike Kleberg committed
417
418
419
420

/************************************************************************/


421
static
Deike Kleberg's avatar
Deike Kleberg committed
422
  void modelWinFlushBuffer ( int collID )
Deike Kleberg's avatar
Deike Kleberg committed
423
{
Deike Kleberg's avatar
Deike Kleberg committed
424
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
425

Deike Kleberg's avatar
Deike Kleberg committed
426
427
  xassert ( collID                >= 0         &&
            collID                < nProcsColl &&
428
            txWin[collID].buffer     != NULL      &&
429
430
            txWin != NULL      &&
            txWin[collID].size >= 0         &&
431
            txWin[collID].size <= MAXWINBUFFERSIZE);
432
  memset(txWin[collID].buffer, 0, txWin[collID].size);
433
434
  txWin[collID].head = txWin[collID].buffer
    + txWin[collID].dictSize * sizeof (union winHeaderEntry);
435
  txWin[collID].refuseFuncCall = 0;
436
  txWin[collID].dictUsed = 1;
Deike Kleberg's avatar
Deike Kleberg committed
437
438
439
}


Deike Kleberg's avatar
Deike Kleberg committed
440
441
442
/************************************************************************/


Deike Kleberg's avatar
Deike Kleberg committed
443
444
445
static
void modelWinCreate ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
446
  int collID, ranks[1];
447
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
448

449
  xdebug("%s", "START");
450
  txWin = xmalloc(nProcsColl * sizeof (txWin[0]));
Deike Kleberg's avatar
Deike Kleberg committed
451

Deike Kleberg's avatar
Deike Kleberg committed
452
  modelWinDefBufferSizes ();
453
  ranks[0] = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
454

Deike Kleberg's avatar
Deike Kleberg committed
455
  for ( collID = 0; collID < nProcsColl; collID ++ )
Deike Kleberg's avatar
Deike Kleberg committed
456
    {
457
      xassert(txWin[collID].size > 0);
458
      txWin[collID].buffer = NULL;
459
      xmpi(MPI_Alloc_mem((MPI_Aint)txWin[collID].size, MPI_INFO_NULL,
460
461
                         &txWin[collID].buffer));
      xassert ( txWin[collID].buffer != NULL );
462
463
      txWin[collID].head = txWin[collID].buffer
        + txWin[collID].dictSize * sizeof (union winHeaderEntry);
464
      xmpi(MPI_Win_create(txWin[collID].buffer, (MPI_Aint)txWin[collID].size, 1,
465
                          MPI_INFO_NULL, commInqCommsIO(collID),
466
                          &txWin[collID].win));
467
468
469
      xmpi(MPI_Comm_group(commInqCommsIO(collID), &txWin[collID].ioGroup));
      xmpi(MPI_Group_incl(txWin[collID].ioGroup, 1, ranks,
                          &txWin[collID].ioGroup ));
Deike Kleberg's avatar
Deike Kleberg committed
470
    }
471
  xdebug("%s", "RETURN, CREATED MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
472
473
474
475
}

/************************************************************************/

476
static void
477
478
modelWinEnqueue(int collID,
                union winHeaderEntry header, const void *data, size_t size)
Deike Kleberg's avatar
Deike Kleberg committed
479
{
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
  union winHeaderEntry *winDict
    = (union winHeaderEntry *)txWin[collID].buffer;
  int nextEntry = txWin[collID].dictUsed;
  if (header.dataRecord.streamID > 0)
    {
      header.dataRecord.offset
        = (int)(txWin[collID].head - txWin[collID].buffer);
      memcpy(txWin[collID].head, data, size);
      txWin[collID].head += size;
    }
  else if (header.funcCall.funcID == STREAMOPEN)
    {
      header.funcCall.funcArgs.newFile.offset
        = (int)(txWin[collID].head - txWin[collID].buffer);
      memcpy(txWin[collID].head, data, size);
      txWin[collID].head += size;
    }
  winDict[nextEntry] = header;
  ++(txWin[collID].dictUsed);
Deike Kleberg's avatar
Deike Kleberg committed
499
500
}

501
void pioBufferData ( const int streamID, const int varID, const double *data, int nmiss )
Deike Kleberg's avatar
Deike Kleberg committed
502
{
503
  int chunk, vlistID, collID = CDI_UNDEFID;
504
  int rankModel = commInqRankModel ();
Deike Kleberg's avatar
Deike Kleberg committed
505
506
507

  vlistID  = streamInqVlist ( streamID );
  collID   = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
508
  chunk    = vlistInqVarDecoChunk ( vlistID, varID, rankModel );
509
510
511
  xassert ( collID         >= 0                    &&
            collID         <  commInqNProcsColl () &&
            chunk          >= 0                    &&
512
            txWin != NULL);
Deike Kleberg's avatar
Deike Kleberg committed
513

514
  if (txWin[collID].postSet)
515
    {
516
      xmpi(MPI_Win_wait(txWin[collID].win));
517
      txWin[collID].postSet = 0;
Deike Kleberg's avatar
Deike Kleberg committed
518
      modelWinFlushBuffer ( collID );
519
    }
520

521
522
523
  union winHeaderEntry header
    = { .dataRecord = { streamID, varID, -1, nmiss } };
  modelWinEnqueue(collID, header, data, chunk * sizeof (data[0]));
524

525
  txWin[collID].refuseFuncCall = 1;
Deike Kleberg's avatar
Deike Kleberg committed
526
527
}

528
529
/************************************************************************/

530
void pioBufferFuncCall(int funcID, int argc, ... )
531
532
533
534
535
{
  va_list ap;
  int rankGlob = commInqRankGlob ();
  int root = commInqRootGlob ();
  int collID, nProcsColl = commInqNProcsColl ();
536

537
538
  xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
  xdebug("%s, func: %s", "START", funcMap[(-1 - funcID)]);
539

Deike Kleberg's avatar
Deike Kleberg committed
540
541
  if ( rankGlob != root ) return;

542
543
544
  xassert (argc          >= 1                    &&
           argc          <= 2                    &&
           txWin != NULL);
545

Deike Kleberg's avatar
Deike Kleberg committed
546
  va_start ( ap, argc );
547

Thomas Jahns's avatar
Thomas Jahns committed
548
549
550
551
552
  union winHeaderEntry header;
  char * filename = NULL;
  size_t filenamesz = 0;
  int streamID = 0, vlistID = 0, filetype = 0;

553
  switch ( funcID )
Deike Kleberg's avatar
Deike Kleberg committed
554
    {
555
    case STREAMCLOSE:
Thomas Jahns's avatar
Thomas Jahns committed
556
557
      xassert ( argc == 1 );
    case STREAMDEFVLIST:
558
      {
Thomas Jahns's avatar
Thomas Jahns committed
559
560
561
        streamID = va_arg(ap, int);
        vlistID = CDI_UNDEFID;
        if (funcID == STREAMDEFVLIST)
562
          {
Thomas Jahns's avatar
Thomas Jahns committed
563
564
            xassert ( argc == 2 );
            vlistID = va_arg(ap, int);
565
          }
Thomas Jahns's avatar
Thomas Jahns committed
566
567
568
569
        struct funcCallDesc f = { .funcID = funcID,
                                  .funcArgs.streamChange = { streamID,
                                                             vlistID } };
        header.funcCall = f;
570
      }
571
      break;
Deike Kleberg's avatar
Deike Kleberg committed
572
573
    case STREAMOPEN:
      {
Deike Kleberg's avatar
Deike Kleberg committed
574
        xassert ( argc == 2 );
Thomas Jahns's avatar
Thomas Jahns committed
575
576
        filename = va_arg(ap, char *);
        filenamesz = strlen(filename);
Deike Kleberg's avatar
Deike Kleberg committed
577
578
        xassert ( filenamesz > 0 &&
                  filenamesz < MAXDATAFILENAME );
Thomas Jahns's avatar
Thomas Jahns committed
579
580
581
582
583
584
585
        filetype = va_arg(ap, int);
        struct funcCallDesc f = { .funcID = STREAMOPEN,
                                  .funcArgs.newFile
                                  = { .fnamelen = filenamesz,
                                      .filetype = filetype } };
        header.funcCall = f;
        ++filenamesz;
Deike Kleberg's avatar
Deike Kleberg committed
586
      }
587
      break;
Thomas Jahns's avatar
Thomas Jahns committed
588
589
590
    default:
      xabort("FUNCTION NOT MAPPED!");
    }
591

Thomas Jahns's avatar
Thomas Jahns committed
592
593
594
595
596
597
598
599
600
601
602
603
  for (collID = 0; collID < nProcsColl; ++collID)
    {
      xassert(txWin[collID].dictSize > txWin[collID].dictUsed);
      if (txWin[collID].postSet)
        {
          xmpi(MPI_Win_wait(txWin[collID].win));
          txWin[collID].postSet = 0;
          modelWinFlushBuffer ( collID );
        }
      xassert(txWin[collID].refuseFuncCall == 0);
      modelWinEnqueue(collID, header, filename, filenamesz);
    }
604

Thomas Jahns's avatar
Thomas Jahns committed
605
606
607
608
609
610
611
612
613
614
615
616
617
618
  switch ( funcID )
    {
    case STREAMCLOSE:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, streamID=%d",
             funcMap[(-1 - funcID)], streamID);
      break;
    case STREAMDEFVLIST:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, streamID=%d,"
             " vlistID=%d", funcMap[(-1 - funcID)], streamID, vlistID);
      break;
    case STREAMOPEN:
      xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, filenamesz=%zu,"
             " filename=%s, filetype=%d",
             funcMap[(-1 - funcID)], filenamesz, filename, filetype);
619
      break;
620
    }
621

Thomas Jahns's avatar
Thomas Jahns committed
622
  va_end(ap);
623
  xdebug("%s", "RETURN");
624
625
}

626
627
628
629
#endif

/*****************************************************************************/

630
631
632
633
int pioInqVarDecoChunk ( int vlistID, int varID )
{
#ifdef USE_MPI
   int rankModel = commInqRankModel ();
Deike Kleberg's avatar
Deike Kleberg committed
634
   xassert ( rankModel != CDI_UNDEFID );
635
636
637
638
639
640
641
642
643
644
645
   return vlistInqVarDecoChunk ( vlistID, varID, rankModel );
#endif
   return vlistInqVarDecoChunk ( vlistID, varID, CDI_UNDEFID );
}

/*****************************************************************************/

int pioInqVarDecoOff ( int vlistID, int varID )
{
#ifdef USE_MPI
   int rankModel = commInqRankModel ();
Deike Kleberg's avatar
Deike Kleberg committed
646
   xassert ( rankModel != CDI_UNDEFID );
647
   return vlistInqVarDecoOff ( vlistID, varID, rankModel );
Thomas Jahns's avatar
Thomas Jahns committed
648
#else
649
   return vlistInqVarDecoOff ( vlistID, varID, CDI_UNDEFID );
Thomas Jahns's avatar
Thomas Jahns committed
650
#endif
651
652
}

653
/*****************************************************************************/
654
655
656
/**
   @brief initializes the MPI_Communicators needed for the
  communication between the calculator PEs and the I/O PEs and within the
657
658
659
660
661
662
663
664
665
  group of I/O PEs.

  commGlob: all PEs

  commPIO: I/O PEs, PEs with highest ranks in commGlob

  commModel: calculating PEs, no I/O PEs

  commsIO[i]:
666
667

  Collective call
668
669
670
671
672
673
674

  @param comm MPI_Communicator of all calling PEs
  @param nIOP number of I/O PEs
  @return int indicating wether the calling PE is a calcutator (1) or not (0)
*/

#ifdef USE_MPI
675
MPI_Comm pioInit_c ( MPI_Comm commGlob, int nProcsIO, int IOMode,
Deike Kleberg's avatar
Deike Kleberg committed
676
                     int nNamespaces, int * hasLocalFile )
677
{
Thomas Jahns's avatar
Thomas Jahns committed
678
  int sizeGlob;
Deike Kleberg's avatar
Deike Kleberg committed
679

Deike Kleberg's avatar
Deike Kleberg committed
680
  if ( IOMode < PIO_MINIOMODE || IOMode > PIO_MAXIOMODE )
Deike Kleberg's avatar
Deike Kleberg committed
681
    xabort ( "IOMODE IS NOT VALID." );
682

683
#ifdef _SX
Deike Kleberg's avatar
Deike Kleberg committed
684
685
  if ( IOMode ==  PIO_ASYNCH )
    xabort ( "PIO_ASYNCH DOES NOT WORK ON SX." );
686
687
688
689
#endif

  commInit ();
  commDefCommGlob ( commGlob );
690
  sizeGlob = commInqSizeGlob ();
691

692
  if ( nProcsIO <= 0 || nProcsIO > sizeGlob - 1 )
693
694
    xabort("DISTRIBUTION OF TASKS ON PROCS IS NOT VALID.\n"
           "nProcsIO=%d, sizeGlob=%d\n", nProcsIO, sizeGlob);
Deike Kleberg's avatar
Deike Kleberg committed
695
696

  commDefNProcsIO ( nProcsIO );
Deike Kleberg's avatar
Deike Kleberg committed
697
  commDefIOMode   ( IOMode, PIO_MAXIOMODE, PIO_MINIOMODEWITHSPECIALPROCS );
Deike Kleberg's avatar
Deike Kleberg committed
698
  commDefCommPio  ();
699

Deike Kleberg's avatar
Deike Kleberg committed
700
  // JUST FOR TEST CASES WITH ONLY ONE MPI TASK
701
  if ( commInqSizeGlob () == 1 )
702
703
    {
      namespaceInit ( nNamespaces, hasLocalFile );
Deike Kleberg's avatar
Deike Kleberg committed
704
705
      return commInqCommGlob ();
    }
706

707
708
  if ( commInqIsProcIO ())
    {
709
710
711
      IOServer ();
      commDestroy ();
      MPI_Finalize ();
712
      exit ( EXIT_SUCCESS );
713
714
    }
  else
715
    {
716
      commEvalPhysNodes ();
717
718
      commDefCommsIO ();
      namespaceInit ( nNamespaces, hasLocalFile );
719
    }
720

721
  xdebug ( "nProcsGlob=%d, RETURN", sizeGlob );
722
723
724
725
  return commInqCommModel ();
}
#endif

Deike Kleberg's avatar
Deike Kleberg committed
726
727
728
729
730
/*****************************************************************************/

int pioInit ( int commGlobArg, int nProcsIO, int IOMode, int nNamespaces,
              int * hasLocalFile )
{
731
#ifdef USE_MPI
732
733
734
  xdebug("START: %s, nProcsIO=%d, IOMode=%d, nNamespaces=%d",
         "cdi parallel",
         nProcsIO, IOMode, nNamespaces );
735
#else
736
737
738
  xdebug("START: %s, nProcsIO=%d, IOMode=%d, nNamespaces=%d",
         "cdi serial",
         nProcsIO, IOMode, nNamespaces );
739
740
#endif

Deike Kleberg's avatar
Deike Kleberg committed
741
742
743
744
745
746
#ifdef USE_MPI
  MPI_Comm commGlob;

  commGlob = MPI_COMM_NULL;
  commGlob = MPI_Comm_f2c (( MPI_Fint ) commGlobArg );
  xassert ( commGlob != MPI_COMM_NULL );
747
748

  return MPI_Comm_c2f ( pioInit_c ( commGlob, nProcsIO, IOMode, nNamespaces,
Deike Kleberg's avatar
Deike Kleberg committed
749
750
                                    hasLocalFile ));
#endif
751
  xdebug("%s", "RETURN" );
Deike Kleberg's avatar
Deike Kleberg committed
752
753
754
  return 0;
}

Deike Kleberg's avatar
Deike Kleberg committed
755
/************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
756

757
void  pioEndDef ( void )
758
759
{
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
760
  char   * buffer;
761
  int bufferSize;
762
  int rankGlob = commInqRankGlob ();
763

764
  xdebug("%s", "START");
765

Deike Kleberg's avatar
Deike Kleberg committed
766
  varsMapNDeco ( commInqNNodes (), commInqNodeSizes ());
Deike Kleberg's avatar
Deike Kleberg committed
767

768
  reshListPrint ( "reshListModel" );
769
  
770
  if ( rankGlob < commInqNProcsColl ())
Deike Kleberg's avatar
Deike Kleberg committed
771
    {
772
      reshPackBufferCreate ( &buffer, &bufferSize, commInqCommsIO ( rankGlob ));
773
774

      xmpi ( MPI_Send ( buffer, bufferSize, MPI_PACKED, commInqNProcsModel (),
775
                        RESOURCES, commInqCommsIO ( rankGlob )));
776

777
      xdebug("%s", "SENT MESSAGE WITH TAG \"RESOURCES\"");
778

Deike Kleberg's avatar
Deike Kleberg committed
779
      reshPackBufferDestroy ( &buffer );
Deike Kleberg's avatar
Deike Kleberg committed
780
    }
Deike Kleberg's avatar
Deike Kleberg committed
781
782

  modelWinCreate ();
783
  namespaceDefResStatus ( STAGE_TIMELOOP );
784
  xdebug("%s", "RETURN");
785
786
787
788
789
790
791
792
#endif
}

/************************************************************************/

void  pioEndTimestepping ( void )
{
#ifdef USE_MPI
793
  xdebug("%s", "START");
794
  namespaceDefResStatus ( STAGE_CLEANUP );
795
  xdebug("%s", "RETURN");
796
#endif
797
798
}

Deike Kleberg's avatar
Deike Kleberg committed
799

800
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
801

Deike Kleberg's avatar
Deike Kleberg committed
802

Deike Kleberg's avatar
Deike Kleberg committed
803
804
805
806
807
808
809
810
811
/**
  @brief is invoked by the calculator PEs, to inform
  the I/O PEs that no more data will be written.

  @param

  @return
*/

Deike Kleberg's avatar
Deike Kleberg committed
812
void pioFinalize ( void )
Deike Kleberg's avatar
Deike Kleberg committed
813
{
814
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
815
  int collID, ibuffer = 1111;
816
  xdebug("%s", "START");
817
  namespaceCleanup ();
Deike Kleberg's avatar
Deike Kleberg committed
818
  for ( collID = 0; collID < commInqNProcsColl (); collID++ )
819
    {
820
      xmpi ( MPI_Send ( &ibuffer, 1, MPI_INT, commInqNProcsModel (),
Deike Kleberg's avatar
Deike Kleberg committed
821
                        FINALIZE, commInqCommsIO ( collID )));
822
      xdebug("%s", "SENT MESSAGE WITH TAG \"FINALIZE\"");
823
    }
Deike Kleberg's avatar
Deike Kleberg committed
824
  modelWinCleanup ();
825
  commDestroy ();
826
  xdebug("%s", "RETURN");
827
#endif
828
829
}

830
 /************************************************************************/
831

Deike Kleberg's avatar
Deike Kleberg committed
832
void pioWriteTimestep ( int tsID, int vdate, int vtime )
833
834
{
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
835
  int collID, buffer[timestepSize], iAssert = 0;
836
  /* int tokenEnd = END; */
837
  int rankGlob = commInqRankGlob ();
838
839
  int nProcsColl = commInqNProcsColl ();
  int nProcsModel = commInqNProcsModel ();
840

841
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
842

Deike Kleberg's avatar
Deike Kleberg committed
843
844
845
  xassert ( tsID       >= 0     &&
            vdate      >= 0     &&
            vtime      >= 0     &&
846
            txWin != NULL);
847

Deike Kleberg's avatar
Deike Kleberg committed
848
  buffer[0] = tsID;