pio_interface.c 26.5 KB
Newer Older
1
2
3
4
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

Deike Kleberg's avatar
Deike Kleberg committed
5
#include <stdlib.h>
6
#include <stdio.h>
7
#include <stdarg.h>
8
#include "cdi.h"
9
#include "limits.h"
Deike Kleberg's avatar
Deike Kleberg committed
10
#include "pio_util.h"
Deike Kleberg's avatar
Deike Kleberg committed
11
#include "vlist_var.h"
12
13

#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
14
#include "namespace.h"
15
#include "pio_interface.h"
Deike Kleberg's avatar
Deike Kleberg committed
16
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
17
#include "pio_rpc.h"
Deike Kleberg's avatar
Deike Kleberg committed
18
#include "pio_server.h"
19
#include "resource_handle.h"
Deike Kleberg's avatar
Deike Kleberg committed
20
#include "stream_int.h"
Deike Kleberg's avatar
Deike Kleberg committed
21
#include "vlist.h"
Deike Kleberg's avatar
Deike Kleberg committed
22
extern resOps streamOps;
Deike Kleberg's avatar
Deike Kleberg committed
23

24

25
static struct rdmaWin
26
27
{
  size_t size;
28
  unsigned char *buffer, *head;
29
  MPI_Win win;
30
  int postSet, refuseFuncCall;
31
  MPI_Group ioGroup;
32
33
} *txWin = NULL;

Deike Kleberg's avatar
Deike Kleberg committed
34

Deike Kleberg's avatar
Deike Kleberg committed
35
char * funcMap[nFuncs] = {"streamOpen", "streamDefVlist", "streamClose" };
Deike Kleberg's avatar
Deike Kleberg committed
36
37


Deike Kleberg's avatar
Deike Kleberg committed
38
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
39

40
static int cmp ( const void * va, const void * vb )
Deike Kleberg's avatar
Deike Kleberg committed
41
{
42
43
    const int ** a, ** b;

Deike Kleberg's avatar
Deike Kleberg committed
44
45
  a = ( const int ** ) va;
  b = ( const int ** ) vb;
46

Deike Kleberg's avatar
Deike Kleberg committed
47
48
49
50
  return (( **a < **b ) - ( **a > **b ));
}

/****************************************************/
51

52
53
54
static void
mapProblems(int problemSizes[], int * problemMapping, int nProblems,
            int nWriter, double * w)
Deike Kleberg's avatar
Deike Kleberg committed
55
56
57
58
59
60
61
{
  int *ip[nProblems];
  int dummy[nProblems];
  int buckets[nWriter];
  int currCapacity, nextCapacity;
  double meanBucket[nWriter];
  int sum = 0;
Thomas Jahns's avatar
Thomas Jahns committed
62
  int writerIdx = 0;
Deike Kleberg's avatar
Deike Kleberg committed
63
  int i, j;
64

Deike Kleberg's avatar
Deike Kleberg committed
65
66
67
68
69
70
71
72
73
74
75

  for ( i = 0; i < nProblems; i++ )
    {
      ip[i] = &problemSizes[i];
      sum += problemSizes[i];
    }

  qsort ( ip, nProblems, sizeof ( int * ), cmp );

  for ( i = 0; i < nProblems; i++ )
    dummy[i] = ip[i] - problemSizes;
76
77

  for ( j = 0; j < nWriter; j++ )
Deike Kleberg's avatar
Deike Kleberg committed
78
79
80
81
82
    meanBucket[j] = ( double ) sum * ( * ( w + j ));

  for ( i = 0; i < nProblems; i++ )
    {
      currCapacity = INT_MIN;
83

Deike Kleberg's avatar
Deike Kleberg committed
84
      for ( j = 0; j < nWriter; j++ )
85
86
87
	{
	  if ( !i ) buckets[j] = 0.0;
	  nextCapacity = meanBucket[j] - ( buckets[j] + ( *ip[i] ));
88

89
90
91
92
93
94
	  if ( nextCapacity > currCapacity )
	    {
	      currCapacity = nextCapacity;
	      writerIdx = j;
	    }
	}
Thomas Jahns's avatar
Thomas Jahns committed
95
      problemMapping[ dummy[i] ] = writerIdx;
Deike Kleberg's avatar
Deike Kleberg committed
96
97
      buckets[writerIdx] +=  *ip[i];
    }
Deike Kleberg's avatar
Deike Kleberg committed
98

99
100
101
  xprintArray3 (  "problemSizes = ", problemSizes, nProblems, DATATYPE_INT );
  xprintArray3 ( "vector of indices, qsort of problemSizes", dummy,
                nProblems, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
102
103
  xprintArray3 ( "problemMapping", problemMapping, nProblems, DATATYPE_INT );
  xprintArray3 ( "meanBucket", meanBucket, nWriter, DATATYPE_FLT );
104
  xprintArray3 ( "actual buckets", buckets, nWriter, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
105
106
}

107
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
108
109
110
111
112

/**
   @brief is encapsulated in CDI library.

   @param vSizes array with number of levels for all var_t 's in order of streams
113

Deike Kleberg's avatar
Deike Kleberg committed
114
115
   @param sSizes array with number of var_t for all stream_t 's

116
   @param varMapping return value, array with ranks of I/O PEs assigned to var_t 's
Deike Kleberg's avatar
Deike Kleberg committed
117
118
119
in order of vSizes

   @param nStreams number of stream_t 's
120
121

   @param nodeSizes array of number of I/O PEs on each physical nodes, increasing
Deike Kleberg's avatar
Deike Kleberg committed
122
123
124
   order of ranks assumed

   @param nNodes number of physical nodes hosting I/O PEs
125

Deike Kleberg's avatar
Deike Kleberg committed
126
127
   @return
*/
128

129
130
131
static void
varMapGen(int *vSizes, int *sSizes, int *varMapping,
          int nStreams, int *nodeSizes, int nNodes)
Deike Kleberg's avatar
Deike Kleberg committed
132
{
133

Deike Kleberg's avatar
Deike Kleberg committed
134
135
136
137
138
  int weightsStreams[nStreams];
  int streamMapping[nStreams];
  int nPEs = 0, nVars = 0;
  int i, j, k, offset = 0, offsetN = 0;
  double * w;
139

Deike Kleberg's avatar
Deike Kleberg committed
140
141
142
  int * weightsVarsNode;
  int * varMappingNode;
  int nVarsNode, summandRank = 0;
143
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
144

145
  int buckets[nProcsColl];
Deike Kleberg's avatar
Deike Kleberg committed
146
147
148
149
150
151

  for ( i = 0; i < nStreams; i++ )
    {
      nVars += * ( sSizes + i );
      weightsStreams[i] = 0;
      for ( j = 0; j < * ( sSizes + i ); j++ )
152
	weightsStreams[i] += * ( vSizes + offset++ );
Deike Kleberg's avatar
Deike Kleberg committed
153
154
    }

Deike Kleberg's avatar
Deike Kleberg committed
155
  w = ( double * ) xmalloc ( nNodes * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
156
157
158
159
160
161
162
163
164
165
166
167
168
  for ( j = 0; j < nNodes; j++ )
    nPEs += * ( nodeSizes + j );

  for ( j = 0; j < nNodes; j++ )
    w[j] = ( double ) * ( nodeSizes + j ) / ( double ) nPEs;

  mapProblems ( weightsStreams, streamMapping, nStreams, nNodes, w );
  free ( w );

  for ( i = 0; i < nNodes; i++ )
    {
      nVarsNode = 0;
      for ( j = 0; j < nStreams; j++ )
169
	if ( * ( streamMapping + j ) == i )
170
171
	  nVarsNode += * ( sSizes + j );

Deike Kleberg's avatar
Deike Kleberg committed
172
173
174
      weightsVarsNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      varMappingNode = ( int * ) xmalloc ( nVarsNode * sizeof ( int ));
      w = ( double * ) xmalloc ( * ( nodeSizes + i ) * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
175
176
177
178
      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
179
180
181
182
183
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
	    * ( weightsVarsNode + offsetN++ ) = * ( vSizes + offset++ );
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
184
185

      for ( j = 0; j < * ( nodeSizes + i ); j ++ )
186
	w[j] = 1.0 / ( double ) * ( nodeSizes + i );
Deike Kleberg's avatar
Deike Kleberg committed
187

188
      mapProblems ( weightsVarsNode, varMappingNode, nVarsNode,
189
		    * ( nodeSizes + i ),  w );
Deike Kleberg's avatar
Deike Kleberg committed
190
191
192
193
194

      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
195
196
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
197
	    * ( varMapping + offset ++ ) =
198
              commCollID2RankGlob ( * ( varMappingNode + offsetN ++ ) +
199
                                    summandRank );
200
201
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
202
203
204
205
206
207
208

      summandRank += * ( nodeSizes + i );

      free ( w );
      free ( varMappingNode );
      free ( weightsVarsNode );
    }
209

210
  if ( ddebug )
Deike Kleberg's avatar
Deike Kleberg committed
211
    {
Deike Kleberg's avatar
Deike Kleberg committed
212
      xprintArray ( "varMapping", varMapping, nVars, DATATYPE_INT  );
213
      for ( i = 0; i < nProcsColl; i++ )
214
	buckets[i] = 0;
Deike Kleberg's avatar
Deike Kleberg committed
215
      for ( i = 0; i < nVars; i ++ )
216
	buckets[commRankGlob2CollID ( *(varMapping + i ))] += * ( vSizes + i );
217
      xprintArray ( "buckets", buckets, nProcsColl, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
218
    }
219
}
Deike Kleberg's avatar
Deike Kleberg committed
220

221
/************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
222

223
224
static void
defVarDeco(int vlistID, int varID)
Deike Kleberg's avatar
Deike Kleberg committed
225
226
{
  int varSize, cRank, lChunk, rem, lOffset;
227
228
  int nProcsModel = commInqNProcsModel ();
  deco_t deco[nProcsModel];
229

Deike Kleberg's avatar
Deike Kleberg committed
230
231
  varSize = vlistInqVarSize ( vlistID, varID );

232
  for ( cRank = 0; cRank < nProcsModel; cRank++ )
Deike Kleberg's avatar
Deike Kleberg committed
233
    {
234
      lChunk = varSize / nProcsModel;
Deike Kleberg's avatar
Deike Kleberg committed
235
      lOffset = cRank * lChunk;
236
      rem = varSize % nProcsModel;
237
      if ( cRank < rem )
Deike Kleberg's avatar
Deike Kleberg committed
238
239
240
241
242
243
        {
          lChunk++;
          lOffset += cRank;
        }
      else
        lOffset += rem;
244

Deike Kleberg's avatar
Deike Kleberg committed
245
246
247
248
      deco[cRank].rank   = cRank;
      deco[cRank].offset = lOffset;
      deco[cRank].chunk  = lChunk;
    }
249
  vlistDefVarDeco ( vlistID, varID, nProcsModel, &deco[0] );
Deike Kleberg's avatar
Deike Kleberg committed
250
251
}

252
/************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
253

Deike Kleberg's avatar
Deike Kleberg committed
254

255
256
static void
varsMapNDeco(int nNodes, int *nodeSizes)
Deike Kleberg's avatar
Deike Kleberg committed
257
{
258
  int nStreams, nVars, * resHs, * streamSizes, * varSizes, * varMapping,
Thomas Jahns's avatar
Thomas Jahns committed
259
    * collectsData;
Deike Kleberg's avatar
Deike Kleberg committed
260
  int i, j, k = 0;
Deike Kleberg's avatar
Deike Kleberg committed
261
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
262
  char text[1024];
Deike Kleberg's avatar
Deike Kleberg committed
263

264
265
  xdebug ( "START, nProcsColl=%d", nProcsColl );

Deike Kleberg's avatar
Deike Kleberg committed
266
  nStreams = streamSize ();
267

268
269
  resHs       = xmalloc ( nStreams * sizeof ( resHs[0] ));
  streamSizes = xmalloc ( nStreams * sizeof ( streamSizes[0] ));
270
  collectsData = xmalloc ( nProcsColl * sizeof ( collectsData[0] ));
271
  streamGetIndexList ( nStreams, resHs );
272

Deike Kleberg's avatar
Deike Kleberg committed
273
  for ( i = 0; i < nStreams; i++ )
274
    streamSizes[i] = streamInqNvars ( * ( resHs + i ));
275

Deike Kleberg's avatar
Deike Kleberg committed
276
  nVars = xsum ( nStreams, streamSizes );
277
278
  varSizes   = xmalloc ( nVars * sizeof ( varSizes[0] ));
  varMapping = xmalloc ( nVars * sizeof ( varMapping[0] ));
279

Deike Kleberg's avatar
Deike Kleberg committed
280
281
  for ( i = 0; i < nStreams; i++ )
    for ( j = 0; j < * ( streamSizes + i ); j++ )
282
      varSizes[k++] += vlistInqVarSize ( streamInqVlist ( * ( resHs + i )), j );
283

Deike Kleberg's avatar
Deike Kleberg committed
284
  xassert ( k == nVars );
285

286
  varMapGen ( varSizes, streamSizes, varMapping,
Deike Kleberg's avatar
Deike Kleberg committed
287
	      nStreams, nodeSizes, nNodes );
288
289
290

  k = 0;
  for ( i = 0; i < nStreams; i++ )
291
    for ( j = 0; j < * ( streamSizes + i ); j++ )
292
      {
293
294
295
        defVarDeco ( streamInqVlist ( *( resHs + i )), j );
        defVarDeco ( streamInqVlistIDorig ( * ( resHs + i )), j );
        vlistDefVarIOrank ( streamInqVlist ( * ( resHs + i )), j,
296
                            * ( varMapping + k ));
297
        vlistDefVarIOrank ( streamInqVlistIDorig ( * ( resHs + i )), j,
298
299
                            * ( varMapping + k ));
        collectsData[commRankGlob2CollID ( varMapping[k++] )] = 1;
300
      }
301
302
303
304

  for ( j = 0; j < nProcsColl; j++ )
    if ( collectsData[j] == 0 )
      {
305
        sprintf ( text,
306
307
308
309
310
311
                  "\nAT LEAST ONE COLLECTOR PROCESS IDLES, "
                  "CURRENTLY NOT COVERED: "
                  "PE%d collects no data",
                  commCollID2RankGlob ( j ));
        xabort ( text );
      }
312

313
314
315
316
317
  if ( varMapping )   free ( varMapping );
  if ( varSizes )     free ( varSizes );
  if ( collectsData ) free ( collectsData );
  if ( streamSizes )  free ( streamSizes );
  if ( resHs )        free ( resHs );
318

319
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
320
321
322
323
}

/************************************************************************/

324
static
Deike Kleberg's avatar
Deike Kleberg committed
325
326
void modelWinCleanup ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
327
  int collID;
Deike Kleberg's avatar
Deike Kleberg committed
328

329
  xdebug("%s", "START");
330
  if (txWin != NULL)
Deike Kleberg's avatar
Deike Kleberg committed
331
332
    for ( collID = 0; collID < commInqNProcsColl (); collID ++ )
      {
333
        if (txWin[collID].postSet)
334
335
          xmpi(MPI_Win_wait(txWin[collID].win));
        xmpi(MPI_Win_free(&txWin[collID].win));
336
        xmpi ( MPI_Free_mem ( txWin[collID].buffer ));
337
        xmpi(MPI_Group_free(&txWin[collID].ioGroup));
Deike Kleberg's avatar
Deike Kleberg committed
338
      }
Deike Kleberg's avatar
Deike Kleberg committed
339

340
  if (txWin) free(txWin);
Deike Kleberg's avatar
Deike Kleberg committed
341

342
  xdebug("%s", "RETURN. CLEANED UP MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
343
344
345
}

/************************************************************************/
346

347
348
static void
modelWinDefBufferSizes(void)
Deike Kleberg's avatar
Deike Kleberg committed
349
{
350
  int collID, nstreams, * streamIndexList, streamNo, vlistID, nvars, varID;
Deike Kleberg's avatar
Deike Kleberg committed
351
  int collIDchunk = 0, sumWinBufferSize = 0;
352
353
  int nProcsColl  = commInqNProcsColl ();
  int rankGlob    = commInqRankGlob ();
354
  int rankModel   = commInqRankModel ();
355
  int root = commInqRootGlob ();
Deike Kleberg's avatar
Deike Kleberg committed
356

357
  xdebug("%s", "START");
358
  xassert(txWin != NULL);
359

Deike Kleberg's avatar
Deike Kleberg committed
360
  nstreams = reshCountType ( &streamOps );
361
362
  streamIndexList = xmalloc ( nstreams * sizeof ( streamIndexList[0] ));
  reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
Deike Kleberg's avatar
Deike Kleberg committed
363
364
  for ( streamNo = 0; streamNo < nstreams; streamNo++ )
    {
365
      // space required for data
366
      vlistID = streamInqVlist ( streamIndexList[streamNo] );
Deike Kleberg's avatar
Deike Kleberg committed
367
368
369
      nvars = vlistNvars ( vlistID );
      for ( varID = 0; varID < nvars; varID++ )
        {
Deike Kleberg's avatar
Deike Kleberg committed
370
371
          collID = CDI_UNDEFID;
          collID = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
372
          collIDchunk = vlistInqVarDecoChunk ( vlistID, varID, rankModel );
Deike Kleberg's avatar
Deike Kleberg committed
373
          xassert ( collID != CDI_UNDEFID && collIDchunk > 0 );
374
375
          txWin[collID].size += collIDchunk * sizeof (double) +
            winBufferOverheadChunk * sizeof (int);
Deike Kleberg's avatar
Deike Kleberg committed
376
        }
377

378
      // space required for the 3 function calls streamOpen, streamDefVlist, streamClose
Deike Kleberg's avatar
Deike Kleberg committed
379
      // once per stream and timestep for all collprocs only on the modelproc root
380
381
      if ( rankGlob == root )
        for ( collID = 0; collID < nProcsColl; collID++ )
382
383
          txWin[collID].size += 3 * winBufferOverheadFuncCall * sizeof (int)
            + 5 * sizeof (int) + MAXDATAFILENAME;
Deike Kleberg's avatar
Deike Kleberg committed
384
    }
385
  free ( streamIndexList );
386
387

  for ( collID = 0; collID < nProcsColl; collID++ )
388
    {
389
390
      txWin[collID].size += winBufferOverhead * sizeof (int);
      sumWinBufferSize += txWin[collID].size;
391
    }
392
  xdebug ("sumWinBufferSize=%d, MAXWINBUFFERSIZE=%d", sumWinBufferSize, MAXWINBUFFERSIZE );
Deike Kleberg's avatar
Deike Kleberg committed
393
  xassert ( sumWinBufferSize <= MAXWINBUFFERSIZE );
394
  /* xprintArray("txWin.size", txWin, nProcsColl, DATATYPE_INT); */
395
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
396
}
397

Deike Kleberg's avatar
Deike Kleberg committed
398
399
400
401

/************************************************************************/


402
static
Deike Kleberg's avatar
Deike Kleberg committed
403
  void modelWinFlushBuffer ( int collID )
Deike Kleberg's avatar
Deike Kleberg committed
404
{
Deike Kleberg's avatar
Deike Kleberg committed
405
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
406

Deike Kleberg's avatar
Deike Kleberg committed
407
408
  xassert ( collID                >= 0         &&
            collID                < nProcsColl &&
409
            txWin[collID].buffer     != NULL      &&
410
411
            txWin != NULL      &&
            txWin[collID].size >= 0         &&
412
            txWin[collID].size <= MAXWINBUFFERSIZE);
413
  memset(txWin[collID].buffer, 0, txWin[collID].size);
414
  txWin[collID].head = txWin[collID].buffer;
415
  txWin[collID].refuseFuncCall = 0;
Deike Kleberg's avatar
Deike Kleberg committed
416
417
418
}


Deike Kleberg's avatar
Deike Kleberg committed
419
420
421
/************************************************************************/


Deike Kleberg's avatar
Deike Kleberg committed
422
423
424
static
void modelWinCreate ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
425
  int collID, ranks[1];
426
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
427

428
  xdebug("%s", "START");
429
  txWin = xmalloc(nProcsColl * sizeof (txWin[0]));
Deike Kleberg's avatar
Deike Kleberg committed
430

Deike Kleberg's avatar
Deike Kleberg committed
431
  modelWinDefBufferSizes ();
432
  ranks[0] = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
433

Deike Kleberg's avatar
Deike Kleberg committed
434
  for ( collID = 0; collID < nProcsColl; collID ++ )
Deike Kleberg's avatar
Deike Kleberg committed
435
    {
436
      xassert(txWin[collID].size > 0);
437
      txWin[collID].buffer = NULL;
438
      xmpi(MPI_Alloc_mem((MPI_Aint)txWin[collID].size, MPI_INFO_NULL,
439
440
                         &txWin[collID].buffer));
      xassert ( txWin[collID].buffer != NULL );
441
      txWin[collID].head = txWin[collID].buffer;
442
      xmpi(MPI_Win_create(txWin[collID].buffer, (MPI_Aint)txWin[collID].size, 1,
443
                          MPI_INFO_NULL, commInqCommsIO(collID),
444
                          &txWin[collID].win));
445
446
447
      xmpi(MPI_Comm_group(commInqCommsIO(collID), &txWin[collID].ioGroup));
      xmpi(MPI_Group_incl(txWin[collID].ioGroup, 1, ranks,
                          &txWin[collID].ioGroup ));
Deike Kleberg's avatar
Deike Kleberg committed
448
    }
449
  xdebug("%s", "RETURN, CREATED MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
450
451
452
453
}

/************************************************************************/

454
455
456
static void
modelWinBufferPutAtEnd(const char * caller,
                       int collID, const void * argBuffer, size_t size)
Deike Kleberg's avatar
Deike Kleberg committed
457
{
Deike Kleberg's avatar
Deike Kleberg committed
458
  /*
459
    xdebug ( "collID=%d, size=%d, newBufferHead=%d, oldBufferSize=%d",
460
    collID, size, txWin[collID].head - txWin[collID].buffer + size,
461
    txWin[collID].size );
Deike Kleberg's avatar
Deike Kleberg committed
462
  */
463
  if ( txWin == NULL ||
Deike Kleberg's avatar
Deike Kleberg committed
464
465
       argBuffer     == NULL ||
       size           < 0    ||
466
467
       collID         < 0    ||
       collID        >= commInqNProcsColl () ||
468
       txWin[collID].head - txWin[collID].buffer + size > txWin[collID].size)
469
    xabort("caller: %s", caller);
470

471
472
  memcpy ( txWin[collID].head, argBuffer, size );
  txWin[collID].head += size;
Deike Kleberg's avatar
Deike Kleberg committed
473
474
475
476
}

/************************************************************************/

477
void pioBufferData ( const int streamID, const int varID, const double *data, int nmiss )
Deike Kleberg's avatar
Deike Kleberg committed
478
{
479
  int chunk, vlistID, collID = CDI_UNDEFID;
Deike Kleberg's avatar
Deike Kleberg committed
480
481
  int tokenSep = SEPARATOR, tokenData = DATATOKEN;
  size_t size;
482
  int rankModel = commInqRankModel ();
Deike Kleberg's avatar
Deike Kleberg committed
483
484
485

  vlistID  = streamInqVlist ( streamID );
  collID   = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
486
  chunk    = vlistInqVarDecoChunk ( vlistID, varID, rankModel );
487
488
489
  xassert ( collID         >= 0                    &&
            collID         <  commInqNProcsColl () &&
            chunk          >= 0                    &&
490
            txWin != NULL);
Deike Kleberg's avatar
Deike Kleberg committed
491

492
  if (txWin[collID].postSet)
493
    {
494
      xmpi(MPI_Win_wait(txWin[collID].win));
495
      txWin[collID].postSet = 0;
Deike Kleberg's avatar
Deike Kleberg committed
496
      modelWinFlushBuffer ( collID );
497
    }
498

Deike Kleberg's avatar
Deike Kleberg committed
499
  size = chunk * sizeof ( double ) + winBufferOverheadChunk * sizeof ( int );
500
  xassert(txWin[collID].head - txWin[collID].buffer + size < txWin[collID].size);
Deike Kleberg's avatar
Deike Kleberg committed
501

Deike Kleberg's avatar
Deike Kleberg committed
502
503
504
505
  modelWinBufferPutAtEnd ( __func__, collID, &tokenData, sizeof ( tokenData ));
  modelWinBufferPutAtEnd ( __func__, collID, &streamID , sizeof ( streamID ));
  modelWinBufferPutAtEnd ( __func__, collID, &varID    , sizeof ( varID ));
  modelWinBufferPutAtEnd ( __func__, collID, data      , chunk * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
506
  modelWinBufferPutAtEnd ( __func__, collID, &nmiss    , sizeof ( nmiss ));
507
  modelWinBufferPutAtEnd ( __func__, collID, &tokenSep , sizeof ( tokenSep ));
508

509
  txWin[collID].refuseFuncCall = 1;
Deike Kleberg's avatar
Deike Kleberg committed
510
511
}

512
513
/************************************************************************/

514
void pioBufferFuncCall(int funcID, int argc, ... )
515
516
517
518
519
{
  va_list ap;
  int rankGlob = commInqRankGlob ();
  int root = commInqRootGlob ();
  int collID, nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
520
521
  int tokenSep = SEPARATOR, tokenFuncCall = FUNCCALL;
  size_t size = 0;
522

523
  xdebug("%s", "START");
524

Deike Kleberg's avatar
Deike Kleberg committed
525
526
  if ( rankGlob != root ) return;

527
528
529
  xassert (argc          >= 1                    &&
           argc          <= 2                    &&
           txWin != NULL);
530

Deike Kleberg's avatar
Deike Kleberg committed
531
  va_start ( ap, argc );
532
533

  switch ( funcID )
Deike Kleberg's avatar
Deike Kleberg committed
534
    {
535
    case STREAMCLOSE:
536
537
538
      {
        int streamID;

Deike Kleberg's avatar
Deike Kleberg committed
539
        xassert ( argc == 1 );
540
        streamID  = va_arg ( ap, int );
541
542

        for ( collID = 0; collID < nProcsColl; collID++ )
543
544
          {
            size = ( winBufferOverheadFuncCall + 1 ) * sizeof ( int );
545
            xassert(txWin[collID].head - txWin[collID].buffer + size <
546
                    txWin[collID].size);
547

548
            if (txWin[collID].postSet)
549
              {
550
                xmpi(MPI_Win_wait(txWin[collID].win));
551
                txWin[collID].postSet = 0;
552
553
                modelWinFlushBuffer ( collID );
              }
554

555
            xassert(txWin[collID].refuseFuncCall == 0);
556

557
            modelWinBufferPutAtEnd ( __func__, collID, &tokenFuncCall,
558
                                     sizeof ( tokenFuncCall));
559
            modelWinBufferPutAtEnd ( __func__, collID, &funcID,
560
                                     sizeof ( funcID ));
561
            modelWinBufferPutAtEnd ( __func__, collID, &streamID,
562
                                     sizeof ( streamID ));
563
            modelWinBufferPutAtEnd ( __func__, collID, &tokenSep,
564
565
                                     sizeof ( tokenSep ));
          }
566
      xdebug ( "WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, streamID=%d",
567
               funcMap[funcID], streamID );
568
      }
569
      break;
Deike Kleberg's avatar
Deike Kleberg committed
570
571
572
573
    case STREAMOPEN:
      {
        char * filename;
        int    filetype;
574
        size_t filenamesz;
575

Deike Kleberg's avatar
Deike Kleberg committed
576
        xassert ( argc == 2 );
Deike Kleberg's avatar
Deike Kleberg committed
577
        filename  = va_arg ( ap, char * );
578
        filenamesz = strlen ( filename );
Deike Kleberg's avatar
Deike Kleberg committed
579
580
        xassert ( filenamesz > 0 &&
                  filenamesz < MAXDATAFILENAME );
Deike Kleberg's avatar
Deike Kleberg committed
581
        filetype  = va_arg ( ap, int );
582
583

        for ( collID = 0; collID < nProcsColl; collID++ )
Deike Kleberg's avatar
Deike Kleberg committed
584
          {
585
            size = ( winBufferOverheadFuncCall + 2 ) * sizeof ( int ) +
586
              MAXDATAFILENAME;
587
            xassert(txWin[collID].head - txWin[collID].buffer + size <
588
                    txWin[collID].size);
Deike Kleberg's avatar
Deike Kleberg committed
589

590
            if (txWin[collID].postSet)
591
              {
592
                xmpi(MPI_Win_wait(txWin[collID].win));
593
                txWin[collID].postSet = 0;
Deike Kleberg's avatar
Deike Kleberg committed
594
                modelWinFlushBuffer ( collID );
595
              }
596
            modelWinBufferPutAtEnd ( __func__, collID, &tokenFuncCall,
Deike Kleberg's avatar
Deike Kleberg committed
597
                                     sizeof ( tokenFuncCall));
598
            modelWinBufferPutAtEnd ( __func__, collID, &funcID,
Deike Kleberg's avatar
Deike Kleberg committed
599
                                     sizeof ( funcID ));
600
            modelWinBufferPutAtEnd ( __func__, collID, &filenamesz,
601
                                     sizeof ( filenamesz ));
602
            modelWinBufferPutAtEnd ( __func__, collID, filename,
603
                                     filenamesz );
604
            modelWinBufferPutAtEnd ( __func__, collID, &filetype,
605
                                     sizeof ( filetype ));
606
            modelWinBufferPutAtEnd ( __func__, collID, &tokenSep,
Deike Kleberg's avatar
Deike Kleberg committed
607
                                     sizeof ( tokenSep ));
608
          }
609

610
611
612
        xdebug("WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, filenamesz=%zu,"
               " filename=%s, filetype=%d",
               funcMap[funcID], filenamesz, filename, filetype );
Deike Kleberg's avatar
Deike Kleberg committed
613
      }
614
      break;
615
616
617
618
    case STREAMDEFVLIST:
      {
        int streamID, vlistID;

Deike Kleberg's avatar
Deike Kleberg committed
619
        xassert ( argc == 2 );
620
621
        streamID  = va_arg ( ap, int );
        vlistID   = va_arg ( ap, int );
622
623

        for ( collID = 0; collID < nProcsColl; collID++ )
624
625
          {
            size = ( winBufferOverheadFuncCall + 2 ) * sizeof ( int );
626
            xassert(txWin[collID].head - txWin[collID].buffer + size <
627
                    txWin[collID].size);
628

629
            if (txWin[collID].postSet)
630
              {
631
                xmpi(MPI_Win_wait(txWin[collID].win));
632
                txWin[collID].postSet = 0;
633
634
                modelWinFlushBuffer ( collID );
              }
635
            modelWinBufferPutAtEnd ( __func__, collID, &tokenFuncCall,
636
                                     sizeof ( tokenFuncCall));
637
            modelWinBufferPutAtEnd ( __func__, collID, &funcID,
638
                                     sizeof ( funcID ));
639
            modelWinBufferPutAtEnd ( __func__, collID, &streamID,
640
                                     sizeof ( streamID ));
641
            modelWinBufferPutAtEnd ( __func__, collID, &vlistID,
642
                                     sizeof ( streamID ));
643
            modelWinBufferPutAtEnd ( __func__, collID, &tokenSep,
644
645
                                     sizeof ( tokenSep ));
          }
646

647
        xdebug ( "WROTE FUNCTION CALL IN BUFFER OF WINS:  %s, streamID=%d,"
648
                 " vlistID=%d",
649
650
651
                 funcMap[funcID], streamID, vlistID );
      }
      break;
Deike Kleberg's avatar
Deike Kleberg committed
652
653
    default:
      xabort ( "FUNCTION NOT MAPPED!" );
654
    }
655
656

  va_end ( ap );
657

658
  xdebug("%s", "RETURN");
659
660
}

661
662
663
664
#endif

/*****************************************************************************/

665
666
667
668
int pioInqVarDecoChunk ( int vlistID, int varID )
{
#ifdef USE_MPI
   int rankModel = commInqRankModel ();
Deike Kleberg's avatar
Deike Kleberg committed
669
   xassert ( rankModel != CDI_UNDEFID );
670
671
672
673
674
675
676
677
678
679
680
   return vlistInqVarDecoChunk ( vlistID, varID, rankModel );
#endif
   return vlistInqVarDecoChunk ( vlistID, varID, CDI_UNDEFID );
}

/*****************************************************************************/

int pioInqVarDecoOff ( int vlistID, int varID )
{
#ifdef USE_MPI
   int rankModel = commInqRankModel ();
Deike Kleberg's avatar
Deike Kleberg committed
681
   xassert ( rankModel != CDI_UNDEFID );
682
   return vlistInqVarDecoOff ( vlistID, varID, rankModel );
Thomas Jahns's avatar
Thomas Jahns committed
683
#else
684
   return vlistInqVarDecoOff ( vlistID, varID, CDI_UNDEFID );
Thomas Jahns's avatar
Thomas Jahns committed
685
#endif
686
687
}

688
/*****************************************************************************/
689
690
691
/**
   @brief initializes the MPI_Communicators needed for the
  communication between the calculator PEs and the I/O PEs and within the
692
693
694
695
696
697
698
699
700
  group of I/O PEs.

  commGlob: all PEs

  commPIO: I/O PEs, PEs with highest ranks in commGlob

  commModel: calculating PEs, no I/O PEs

  commsIO[i]:
701
702

  Collective call
703
704
705
706
707
708
709

  @param comm MPI_Communicator of all calling PEs
  @param nIOP number of I/O PEs
  @return int indicating wether the calling PE is a calcutator (1) or not (0)
*/

#ifdef USE_MPI
710
MPI_Comm pioInit_c ( MPI_Comm commGlob, int nProcsIO, int IOMode,
Deike Kleberg's avatar
Deike Kleberg committed
711
                     int nNamespaces, int * hasLocalFile )
712
{
Thomas Jahns's avatar
Thomas Jahns committed
713
  int sizeGlob;
Deike Kleberg's avatar
Deike Kleberg committed
714

Deike Kleberg's avatar
Deike Kleberg committed
715
  if ( IOMode < PIO_MINIOMODE || IOMode > PIO_MAXIOMODE )
Deike Kleberg's avatar
Deike Kleberg committed
716
    xabort ( "IOMODE IS NOT VALID." );
717

718
#ifdef _SX
Deike Kleberg's avatar
Deike Kleberg committed
719
720
  if ( IOMode ==  PIO_ASYNCH )
    xabort ( "PIO_ASYNCH DOES NOT WORK ON SX." );
721
722
723
724
#endif

  commInit ();
  commDefCommGlob ( commGlob );
725
  sizeGlob = commInqSizeGlob ();
726

727
  if ( nProcsIO <= 0 || nProcsIO > sizeGlob - 1 )
Deike Kleberg's avatar
Deike Kleberg committed
728
    xabort ( "DISTRIBUTION OF TASKS ON PROCS IS NOT VALID." );
Deike Kleberg's avatar
Deike Kleberg committed
729
730

  commDefNProcsIO ( nProcsIO );
Deike Kleberg's avatar
Deike Kleberg committed
731
  commDefIOMode   ( IOMode, PIO_MAXIOMODE, PIO_MINIOMODEWITHSPECIALPROCS );
Deike Kleberg's avatar
Deike Kleberg committed
732
  commDefCommPio  ();
733

Deike Kleberg's avatar
Deike Kleberg committed
734
  // JUST FOR TEST CASES WITH ONLY ONE MPI TASK
735
  if ( commInqSizeGlob () == 1 )
736
737
    {
      namespaceInit ( nNamespaces, hasLocalFile );
Deike Kleberg's avatar
Deike Kleberg committed
738
739
      return commInqCommGlob ();
    }
740

741
742
  if ( commInqIsProcIO ())
    {
743
744
745
      IOServer ();
      commDestroy ();
      MPI_Finalize ();
746
      exit ( EXIT_SUCCESS );
747
748
    }
  else
749
    {
750
      commEvalPhysNodes ();
751
752
      commDefCommsIO ();
      namespaceInit ( nNamespaces, hasLocalFile );
753
    }
754

755
  xdebug ( "nProcsGlob=%d, RETURN", sizeGlob );
756
757
758
759
  return commInqCommModel ();
}
#endif

Deike Kleberg's avatar
Deike Kleberg committed
760
761
762
763
764
/*****************************************************************************/

int pioInit ( int commGlobArg, int nProcsIO, int IOMode, int nNamespaces,
              int * hasLocalFile )
{
765
#ifdef USE_MPI
766
767
768
  xdebug("START: %s, nProcsIO=%d, IOMode=%d, nNamespaces=%d",
         "cdi parallel",
         nProcsIO, IOMode, nNamespaces );
769
#else
770
771
772
  xdebug("START: %s, nProcsIO=%d, IOMode=%d, nNamespaces=%d",
         "cdi serial",
         nProcsIO, IOMode, nNamespaces );
773
774
#endif