pio_interface.c 29.1 KB
Newer Older
1
2
3
4
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

5
#include <limits.h>
Deike Kleberg's avatar
Deike Kleberg committed
6
#include <stdlib.h>
7
#include <stdio.h>
8
#include <stdarg.h>
9
10

#include <mpi.h>
11
#include <yaxt.h>
12

13
#include "cdi.h"
14
#include "cdipio.h"
Thomas Jahns's avatar
Thomas Jahns committed
15
#include "cdi_int.h"
16
#include "dmemory.h"
Deike Kleberg's avatar
Deike Kleberg committed
17
#include "namespace.h"
18
#include "pio.h"
Thomas Jahns's avatar
Thomas Jahns committed
19
#include "pio_client.h"
20
#include "pio_serialize.h"
21
#include "pio_interface.h"
Deike Kleberg's avatar
Deike Kleberg committed
22
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
23
#include "pio_rpc.h"
Deike Kleberg's avatar
Deike Kleberg committed
24
#include "pio_server.h"
Thomas Jahns's avatar
Thomas Jahns committed
25
#include "pio_util.h"
26
#include "resource_handle.h"
Deike Kleberg's avatar
Deike Kleberg committed
27
#include "vlist.h"
Thomas Jahns's avatar
Thomas Jahns committed
28
#include "vlist_var.h"
29

30

31
static struct rdmaWin
32
33
{
  size_t size;
34
  unsigned char *buffer, *head;
35
  MPI_Win win;
36
  int postSet, refuseFuncCall;
37
  MPI_Group ioGroup;
Thomas Jahns's avatar
Thomas Jahns committed
38
  int dictSize, dictDataUsed, dictRPCUsed;
39
40
} *txWin = NULL;

Deike Kleberg's avatar
Deike Kleberg committed
41

42
43
44
const char * const funcMap[numRPCFuncs] = {
  "streamOpen",
  "streamDefVlist",
45
46
  "streamClose",
  "streamDefTimestep",
47
};
Deike Kleberg's avatar
Deike Kleberg committed
48

49
float cdiPIOpartInflate_;
Deike Kleberg's avatar
Deike Kleberg committed
50

Deike Kleberg's avatar
Deike Kleberg committed
51
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
52

53
static int cmp ( const void * va, const void * vb )
Deike Kleberg's avatar
Deike Kleberg committed
54
{
55
56
    const int ** a, ** b;

Deike Kleberg's avatar
Deike Kleberg committed
57
58
  a = ( const int ** ) va;
  b = ( const int ** ) vb;
59

Deike Kleberg's avatar
Deike Kleberg committed
60
61
62
  return (( **a < **b ) - ( **a > **b ));
}

63
64
65
66
void memcpyPackFunc(void *dataDesc, void *buf, int size, int *pos,
                    void *context)
{
  struct memCpyDataDesc *p = dataDesc;
67
  (void)context;
68
69
70
71
72
  xassert(size >= *pos && (size_t)(size - *pos) >= p->obj_size);
  memcpy((unsigned char *)buf + *pos, p->obj, p->obj_size);
  *pos += (int)p->obj_size;
}

Deike Kleberg's avatar
Deike Kleberg committed
73
/****************************************************/
74

75
76
77
static void
mapProblems(int problemSizes[], int * problemMapping, int nProblems,
            int nWriter, double * w)
Deike Kleberg's avatar
Deike Kleberg committed
78
79
80
81
82
83
84
{
  int *ip[nProblems];
  int dummy[nProblems];
  int buckets[nWriter];
  int currCapacity, nextCapacity;
  double meanBucket[nWriter];
  int sum = 0;
Thomas Jahns's avatar
Thomas Jahns committed
85
  int writerIdx = 0;
Deike Kleberg's avatar
Deike Kleberg committed
86
  int i, j;
87

Deike Kleberg's avatar
Deike Kleberg committed
88
89
90
91
92
93
94

  for ( i = 0; i < nProblems; i++ )
    {
      ip[i] = &problemSizes[i];
      sum += problemSizes[i];
    }

95
  qsort(ip, (size_t)nProblems, sizeof ( int * ), cmp );
Deike Kleberg's avatar
Deike Kleberg committed
96
97

  for ( i = 0; i < nProblems; i++ )
98
    dummy[i] = (int)(ip[i] - problemSizes);
99
100

  for ( j = 0; j < nWriter; j++ )
Deike Kleberg's avatar
Deike Kleberg committed
101
102
    meanBucket[j] = ( double ) sum * ( * ( w + j ));

Thomas Jahns's avatar
Thomas Jahns committed
103
104
  memset(buckets, 0, sizeof (buckets));

Deike Kleberg's avatar
Deike Kleberg committed
105
106
107
  for ( i = 0; i < nProblems; i++ )
    {
      currCapacity = INT_MIN;
108

Deike Kleberg's avatar
Deike Kleberg committed
109
      for ( j = 0; j < nWriter; j++ )
110
	{
111
	  nextCapacity = (int)meanBucket[j] - ( buckets[j] + ( *ip[i] ));
112

113
114
115
116
117
118
	  if ( nextCapacity > currCapacity )
	    {
	      currCapacity = nextCapacity;
	      writerIdx = j;
	    }
	}
Thomas Jahns's avatar
Thomas Jahns committed
119
      problemMapping[ dummy[i] ] = writerIdx;
Deike Kleberg's avatar
Deike Kleberg committed
120
121
      buckets[writerIdx] +=  *ip[i];
    }
Deike Kleberg's avatar
Deike Kleberg committed
122

123
124
125
126
127
128
129
130
131
  if ( ddebug )
    {
      xprintArray3 (  "problemSizes = ", problemSizes, nProblems, DATATYPE_INT );
      xprintArray3 ( "vector of indices, qsort of problemSizes", dummy,
                    nProblems, DATATYPE_INT );
      xprintArray3 ( "problemMapping", problemMapping, nProblems, DATATYPE_INT );
      xprintArray3 ( "meanBucket", meanBucket, nWriter, DATATYPE_FLT );
      xprintArray3 ( "actual buckets", buckets, nWriter, DATATYPE_INT );
    }
Deike Kleberg's avatar
Deike Kleberg committed
132
133
}

134
/****************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
135
136
137
138
139

/**
   @brief is encapsulated in CDI library.

   @param vSizes array with number of levels for all var_t 's in order of streams
140

Deike Kleberg's avatar
Deike Kleberg committed
141
142
   @param sSizes array with number of var_t for all stream_t 's

143
   @param varMapping return value, array with ranks of I/O PEs assigned to var_t 's
Deike Kleberg's avatar
Deike Kleberg committed
144
145
146
in order of vSizes

   @param nStreams number of stream_t 's
147
148

   @param nodeSizes array of number of I/O PEs on each physical nodes, increasing
Deike Kleberg's avatar
Deike Kleberg committed
149
150
151
   order of ranks assumed

   @param nNodes number of physical nodes hosting I/O PEs
152

Deike Kleberg's avatar
Deike Kleberg committed
153
154
   @return
*/
155

156
157
158
static void
varMapGen(int *vSizes, int *sSizes, int *varMapping,
          int nStreams, int *nodeSizes, int nNodes)
Deike Kleberg's avatar
Deike Kleberg committed
159
{
160

Deike Kleberg's avatar
Deike Kleberg committed
161
162
163
164
  int weightsStreams[nStreams];
  int streamMapping[nStreams];
  int nPEs = 0, nVars = 0;
  int i, j, k, offset = 0, offsetN = 0;
165

Deike Kleberg's avatar
Deike Kleberg committed
166
167
168
  int * weightsVarsNode;
  int * varMappingNode;
  int nVarsNode, summandRank = 0;
169
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
170

171
  int buckets[nProcsColl];
Deike Kleberg's avatar
Deike Kleberg committed
172
173
174
175
176
177

  for ( i = 0; i < nStreams; i++ )
    {
      nVars += * ( sSizes + i );
      weightsStreams[i] = 0;
      for ( j = 0; j < * ( sSizes + i ); j++ )
178
	weightsStreams[i] += * ( vSizes + offset++ );
Deike Kleberg's avatar
Deike Kleberg committed
179
180
    }

181
  double *w = (double *)xmalloc((size_t)nNodes * sizeof ( double ));
Deike Kleberg's avatar
Deike Kleberg committed
182
183
184
185
186
187
188
189
190
191
192
193
194
  for ( j = 0; j < nNodes; j++ )
    nPEs += * ( nodeSizes + j );

  for ( j = 0; j < nNodes; j++ )
    w[j] = ( double ) * ( nodeSizes + j ) / ( double ) nPEs;

  mapProblems ( weightsStreams, streamMapping, nStreams, nNodes, w );
  free ( w );

  for ( i = 0; i < nNodes; i++ )
    {
      nVarsNode = 0;
      for ( j = 0; j < nStreams; j++ )
195
	if ( * ( streamMapping + j ) == i )
196
197
	  nVarsNode += * ( sSizes + j );

198
199
200
      weightsVarsNode = xmalloc((size_t)nVarsNode * sizeof (int));
      varMappingNode = xmalloc((size_t)nVarsNode * sizeof ( int ));
      w = xmalloc((size_t)nodeSizes[i] * sizeof (double));
Deike Kleberg's avatar
Deike Kleberg committed
201
202
203
204
      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
205
206
207
208
209
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
	    * ( weightsVarsNode + offsetN++ ) = * ( vSizes + offset++ );
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
210
211

      for ( j = 0; j < * ( nodeSizes + i ); j ++ )
212
	w[j] = 1.0 / ( double ) * ( nodeSizes + i );
Deike Kleberg's avatar
Deike Kleberg committed
213

214
      mapProblems ( weightsVarsNode, varMappingNode, nVarsNode,
215
		    * ( nodeSizes + i ),  w );
Deike Kleberg's avatar
Deike Kleberg committed
216
217
218
219
220

      offset = 0;
      offsetN = 0;

      for ( j = 0; j < nStreams; j++ )
221
222
	if ( * ( streamMapping + j ) == i )
	  for ( k = 0; k < * ( sSizes + j ); k ++ )
223
	    * ( varMapping + offset ++ ) =
224
              commCollID2RankGlob ( * ( varMappingNode + offsetN ++ ) +
225
                                    summandRank );
226
227
	else
	  offset += * ( sSizes + j );
Deike Kleberg's avatar
Deike Kleberg committed
228
229
230
231
232
233
234

      summandRank += * ( nodeSizes + i );

      free ( w );
      free ( varMappingNode );
      free ( weightsVarsNode );
    }
235

236
  if ( ddebug )
Deike Kleberg's avatar
Deike Kleberg committed
237
    {
Deike Kleberg's avatar
Deike Kleberg committed
238
      xprintArray ( "varMapping", varMapping, nVars, DATATYPE_INT  );
239
      for ( i = 0; i < nProcsColl; i++ )
240
	buckets[i] = 0;
Deike Kleberg's avatar
Deike Kleberg committed
241
      for ( i = 0; i < nVars; i ++ )
242
	buckets[commRankGlob2CollID ( *(varMapping + i ))] += * ( vSizes + i );
243
      xprintArray ( "buckets", buckets, nProcsColl, DATATYPE_INT );
Deike Kleberg's avatar
Deike Kleberg committed
244
    }
245
}
Deike Kleberg's avatar
Deike Kleberg committed
246

247
/************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
248

249
250
static void
varsMapNDeco(int nNodes, int *nodeSizes)
Deike Kleberg's avatar
Deike Kleberg committed
251
{
252
  int nStreams, nVars, * resHs, * streamSizes, * varSizes, * varMapping,
Thomas Jahns's avatar
Thomas Jahns committed
253
    * collectsData;
254
  int k = 0;
Deike Kleberg's avatar
Deike Kleberg committed
255
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
256

257
258
  xdebug ( "START, nProcsColl=%d", nProcsColl );

Deike Kleberg's avatar
Deike Kleberg committed
259
  nStreams = streamSize ();
260

261
262
263
  resHs       = xmalloc((size_t)nStreams * sizeof (resHs[0]));
  streamSizes = xmalloc((size_t)nStreams * sizeof (streamSizes[0]));
  collectsData = xmalloc((size_t)nProcsColl * sizeof (collectsData[0]));
264
  streamGetIndexList ( nStreams, resHs );
265

266
  for (int i = 0; i < nStreams; i++ )
267
    streamSizes[i] = streamInqNvars ( * ( resHs + i ));
268

269
  nVars = sum_int((size_t)nStreams, streamSizes);
270
  varSizes   = xcalloc((size_t)nVars, sizeof (varSizes[0]));
271
  varMapping = xmalloc((size_t)nVars * sizeof (varMapping[0]));
272

273
274
275
276
277
278
  for (int i = 0; i < nStreams; i++ )
    {
      int vlistID = streamInqVlist(resHs[i]);
      for (int j = 0; j < streamSizes[i]; j++ )
        varSizes[k++] += vlistInqVarSize(vlistID, j);
    }
279

Deike Kleberg's avatar
Deike Kleberg committed
280
  xassert ( k == nVars );
281

282
  varMapGen ( varSizes, streamSizes, varMapping,
Deike Kleberg's avatar
Deike Kleberg committed
283
	      nStreams, nodeSizes, nNodes );
284
285

  k = 0;
286
287
  for (int i = 0; i < nStreams; i++ )
    for (int j = 0; j < * ( streamSizes + i ); j++ )
288
      {
289
        vlistDefVarIOrank ( streamInqVlist ( * ( resHs + i )), j,
290
                            * ( varMapping + k ));
291
        vlistDefVarIOrank ( streamInqVlistIDorig ( * ( resHs + i )), j,
292
293
                            * ( varMapping + k ));
        collectsData[commRankGlob2CollID ( varMapping[k++] )] = 1;
294
      }
295

296
  for (int j = 0; j < nProcsColl; j++ )
297
    if ( collectsData[j] == 0 )
298
299
300
301
      xabort("AT LEAST ONE COLLECTOR PROCESS IDLES, "
             "CURRENTLY NOT COVERED: "
             "PE%d collects no data",
             commCollID2RankGlob(j));
302

303
304
305
306
307
  if ( varMapping )   free ( varMapping );
  if ( varSizes )     free ( varSizes );
  if ( collectsData ) free ( collectsData );
  if ( streamSizes )  free ( streamSizes );
  if ( resHs )        free ( resHs );
308

309
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
310
311
312
313
}

/************************************************************************/

314
static
Deike Kleberg's avatar
Deike Kleberg committed
315
316
void modelWinCleanup ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
317
  int collID;
Deike Kleberg's avatar
Deike Kleberg committed
318

319
  xdebug("%s", "START");
320
  if (txWin != NULL)
321
322
323
324
325
326
327
328
329
330
331
    {
      for ( collID = 0; collID < commInqNProcsColl (); collID ++ )
        {
          if (txWin[collID].postSet)
            xmpi(MPI_Win_wait(txWin[collID].win));
          xmpi(MPI_Win_free(&txWin[collID].win));
          xmpi ( MPI_Free_mem ( txWin[collID].buffer ));
          xmpi(MPI_Group_free(&txWin[collID].ioGroup));
        }
      free(txWin);
    }
Deike Kleberg's avatar
Deike Kleberg committed
332

333
  xdebug("%s", "RETURN. CLEANED UP MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
334
335
336
}

/************************************************************************/
337

338
339
struct collDesc
{
340
  int numDataRecords, numRPCRecords;
341
342
};

343
344
static void
modelWinDefBufferSizes(void)
Deike Kleberg's avatar
Deike Kleberg committed
345
{
346
  int collID, nstreams, * streamIndexList, streamNo, nvars, varID;
347
  size_t sumWinBufferSize = 0;
348
349
350
  int nProcsColl  = commInqNProcsColl ();
  int rankGlob    = commInqRankGlob ();
  int root = commInqRootGlob ();
351
  struct collDesc *collIndex;
Deike Kleberg's avatar
Deike Kleberg committed
352

353
  xdebug("%s", "START");
354
  xassert(txWin != NULL);
355

Deike Kleberg's avatar
Deike Kleberg committed
356
  nstreams = reshCountType ( &streamOps );
357
358
  streamIndexList = xmalloc((size_t)nstreams * sizeof (streamIndexList[0]));
  collIndex = xcalloc((size_t)nProcsColl, sizeof (collIndex[0]));
359
  reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
Deike Kleberg's avatar
Deike Kleberg committed
360
361
  for ( streamNo = 0; streamNo < nstreams; streamNo++ )
    {
362
363
364
      // memory required for data
      int streamID = streamIndexList[streamNo];
      int vlistID = streamInqVlist(streamID);
Deike Kleberg's avatar
Deike Kleberg committed
365
366
367
      nvars = vlistNvars ( vlistID );
      for ( varID = 0; varID < nvars; varID++ )
        {
368
          int collID = commRankGlob2CollID(vlistInqVarIOrank(vlistID, varID));
369
          size_t collIDchunk;
370
371
372
          {
            int varSize = vlistInqVarSize(vlistID, varID);
            int nProcsModel = commInqNProcsModel();
373
374
375
            collIDchunk = (size_t)ceilf(cdiPIOpartInflate_
                                        * (float)(varSize + nProcsModel - 1)
                                        / (float)nProcsModel);
376
          }
Deike Kleberg's avatar
Deike Kleberg committed
377
          xassert ( collID != CDI_UNDEFID && collIDchunk > 0 );
378
379
380
381
382
383
          collIndex[collID].numDataRecords += 2;
          txWin[collID].size += (size_t)collIDchunk * sizeof (double)
            /* re-align chunks to multiple of double size */
            + sizeof (double) - 1
            /* one header for data record, one for corresponding part
             * descriptor*/
384
            + 2 * sizeof (struct winHeaderEntry)
385
386
            /* FIXME: heuristic for size of packed Xt_idxlist */
            + sizeof (Xt_int) * collIDchunk * 3;
Deike Kleberg's avatar
Deike Kleberg committed
387
        }
388

389
390
      // memory required for the function calls encoded
      // for remote execution
Deike Kleberg's avatar
Deike Kleberg committed
391
      // once per stream and timestep for all collprocs only on the modelproc root
392
393
      if ( rankGlob == root )
        for ( collID = 0; collID < nProcsColl; collID++ )
394
          {
395
            collIndex[collID].numRPCRecords += numRPCFuncs;
396
            txWin[collID].size +=
397
              numRPCFuncs * sizeof (struct winHeaderEntry)
398
399
400
              + MAXDATAFILENAME
              /* data part of streamDefTimestep */
              + (2 * CDI_MAX_NAME + sizeof (taxis_t));
401
          }
Deike Kleberg's avatar
Deike Kleberg committed
402
    }
403
  for (collID = 0; collID < nProcsColl; ++collID)
404
    {
405
406
      int numRecords = 1 + collIndex[collID].numDataRecords
        + collIndex[collID].numRPCRecords;
407
      txWin[collID].dictSize = numRecords;
408
409
      txWin[collID].dictDataUsed = 1;
      txWin[collID].dictRPCUsed = 0;
410
      /* account for size header */
411
      txWin[collID].size += sizeof (struct winHeaderEntry);
412
413
      txWin[collID].size = roundUpToMultiple(txWin[collID].size,
                                             PIO_WIN_ALIGN);
414
      sumWinBufferSize += (size_t)txWin[collID].size;
415
    }
416
417
418
  free(collIndex);
  free ( streamIndexList );

419
  xdebug("sumWinBufferSize=%zu, MAXWINBUFFERSIZE=%zu", sumWinBufferSize,
420
         (size_t)MAXWINBUFFERSIZE);
421
  xassert ( sumWinBufferSize <= (size_t)MAXWINBUFFERSIZE );
422
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
423
}
424

Deike Kleberg's avatar
Deike Kleberg committed
425
426
427
428

/************************************************************************/


429
static
Deike Kleberg's avatar
Deike Kleberg committed
430
  void modelWinFlushBuffer ( int collID )
Deike Kleberg's avatar
Deike Kleberg committed
431
{
Deike Kleberg's avatar
Deike Kleberg committed
432
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
433

Deike Kleberg's avatar
Deike Kleberg committed
434
435
  xassert ( collID                >= 0         &&
            collID                < nProcsColl &&
436
            txWin != NULL      &&
Thomas Jahns's avatar
Thomas Jahns committed
437
            txWin[collID].buffer     != NULL      &&
438
            txWin[collID].size <= MAXWINBUFFERSIZE);
439
  memset(txWin[collID].buffer, 0, txWin[collID].size);
440
  txWin[collID].head = txWin[collID].buffer
441
    + (size_t)txWin[collID].dictSize * sizeof (struct winHeaderEntry);
442
  txWin[collID].refuseFuncCall = 0;
443
444
  txWin[collID].dictDataUsed = 1;
  txWin[collID].dictRPCUsed = 0;
Deike Kleberg's avatar
Deike Kleberg committed
445
446
447
}


Deike Kleberg's avatar
Deike Kleberg committed
448
449
450
/************************************************************************/


Deike Kleberg's avatar
Deike Kleberg committed
451
452
453
static
void modelWinCreate ( void )
{
Deike Kleberg's avatar
Deike Kleberg committed
454
  int collID, ranks[1];
455
  int nProcsColl = commInqNProcsColl ();
Deike Kleberg's avatar
Deike Kleberg committed
456

457
  xdebug("%s", "START");
458
  txWin = xcalloc((size_t)nProcsColl, sizeof (txWin[0]));
Deike Kleberg's avatar
Deike Kleberg committed
459

Deike Kleberg's avatar
Deike Kleberg committed
460
  modelWinDefBufferSizes ();
461
  ranks[0] = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
462

463
464
465
  MPI_Info no_locks_info;
  xmpi(MPI_Info_create(&no_locks_info));
  xmpi(MPI_Info_set(no_locks_info, "no_locks", "true"));
Deike Kleberg's avatar
Deike Kleberg committed
466
  for ( collID = 0; collID < nProcsColl; collID ++ )
Deike Kleberg's avatar
Deike Kleberg committed
467
    {
468
      xassert(txWin[collID].size > 0);
469
      txWin[collID].buffer = NULL;
470
      xmpi(MPI_Alloc_mem((MPI_Aint)txWin[collID].size, MPI_INFO_NULL,
471
472
                         &txWin[collID].buffer));
      xassert ( txWin[collID].buffer != NULL );
473
      txWin[collID].head = txWin[collID].buffer
474
        + (size_t)txWin[collID].dictSize * sizeof (struct winHeaderEntry);
475
      xmpi(MPI_Win_create(txWin[collID].buffer, (MPI_Aint)txWin[collID].size, 1,
476
                          no_locks_info, commInqCommsIO(collID),
477
                          &txWin[collID].win));
Thomas Jahns's avatar
Thomas Jahns committed
478
479
480
481
      MPI_Group commGroup;
      xmpi(MPI_Comm_group(commInqCommsIO(collID), &commGroup));
      xmpi(MPI_Group_incl(commGroup, 1, ranks, &txWin[collID].ioGroup));
      xmpi(MPI_Group_free(&commGroup));
Deike Kleberg's avatar
Deike Kleberg committed
482
    }
483
484
485

  xmpi(MPI_Info_free(&no_locks_info));

486
  xdebug("%s", "RETURN, CREATED MPI_WIN'S");
Deike Kleberg's avatar
Deike Kleberg committed
487
488
489
490
}

/************************************************************************/

491
static void
492
modelWinEnqueue(int collID,
493
                struct winHeaderEntry header, const void *data,
494
                valPackFunc packFunc)
Deike Kleberg's avatar
Deike Kleberg committed
495
{
496
497
  struct winHeaderEntry *winDict
    = (struct winHeaderEntry *)txWin[collID].buffer;
498
  int targetEntry;
499
500
501
502
  if (header.id > 0 || header.id == PARTDESCMARKER)
    targetEntry = (txWin[collID].dictDataUsed)++;
  else
    targetEntry = txWin[collID].dictSize - ++(txWin[collID].dictRPCUsed);
503
  if (header.id > 0)
504
    {
505
      int offset = header.offset
506
507
        = (int)roundUpToMultiple((size_t)(txWin[collID].head
                                          - txWin[collID].buffer),
508
                                 sizeof (double));
509
510
511
512
      MPI_Comm comm = commInqCommsIO(collID);
      packFunc((void *)data, txWin[collID].buffer, (int)txWin[collID].size,
               &offset, &comm);
      txWin[collID].head = txWin[collID].buffer + offset;
513
    }
514
  else if (header.id == PARTDESCMARKER)
515
    {
516
      Xt_uid uid = header.specific.partDesc.uid;
517
518
519
520
      int offset = -1;
      /* search if same uid entry has already been enqueued */
      for (int entry = 2; entry < targetEntry; entry += 2)
        {
521
          xassert(winDict[entry].id == PARTDESCMARKER);
522
          if (winDict[entry].specific.partDesc.uid == uid)
523
            {
524
              offset = winDict[entry].offset;
525
526
527
528
529
530
531
              break;
            }
        }
      if (offset == -1)
        {
          /* not yet used partition descriptor, serialize at
           * current position */
532
          int position = header.offset
533
            = (int)(txWin[collID].head - txWin[collID].buffer);
534
535
536
537
          MPI_Comm comm = commInqCommsIO(collID);
          packFunc((void *)data, txWin[collID].buffer, (int)txWin[collID].size,
                   &position, &comm);
          txWin[collID].head = txWin[collID].buffer + position;
538
539
540
        }
      else
        /* duplicate entries are copied only once per timestep */
541
        header.offset = offset;
542
    }
543
  else
544
    {
545
546
547
548
549
550
      int position = header.offset
        = (int)(txWin[collID].head - txWin[collID].buffer);
      MPI_Comm comm = commInqCommsIO(collID);
      packFunc((void *)data, txWin[collID].buffer, (int)txWin[collID].size,
               &position, &comm);
      txWin[collID].head = txWin[collID].buffer + position;
551
    }
552
  winDict[targetEntry] = header;
Deike Kleberg's avatar
Deike Kleberg committed
553
554
}

555
556
557
558
559
static void
cdiPio_xt_idxlist_pack_wrap(void *data, void *buf, int size, int *pos,
                            void *context)
{
  MPI_Comm comm = *(MPI_Comm *)context;
560
561
  size_t pack_size = xt_idxlist_get_pack_size((Xt_idxlist)data, comm);
  xassert(size >= *pos && pack_size <= (size_t)(size - *pos));
562
563
564
565
  xt_idxlist_pack((Xt_idxlist)data, (unsigned char *)buf,
                  size, pos, comm);
}

566
567
568
569
570
571
572
573
574
575
576
static inline void
collWait(int collID)
{
  if (txWin[collID].postSet)
    {
      xmpi(MPI_Win_wait(txWin[collID].win));
      txWin[collID].postSet = 0;
      modelWinFlushBuffer(collID);
    }
}

577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
static inline void
collProbe(int collID)
{
  if (txWin[collID].postSet)
    {
      int flag;
      xmpi(MPI_Win_test(txWin[collID].win, &flag));
      if (flag)
        {
          txWin[collID].postSet = 0;
          modelWinFlushBuffer(collID);
        }
    }
}

void
cdiPioRDMAProgress()
{
  int nProcsColl = commInqNProcsColl();
  for (int collID = 0; collID < nProcsColl; collID++)
    collProbe(collID);
}


601
602
603
604
static void
pioBufferPartData_(int streamID, int varID,
                   const void *packData, valPackFunc packDataFunc,
                   int nmiss, Xt_idxlist partDesc)
605
606
607
{
  int vlistID, collID = CDI_UNDEFID;

Deike Kleberg's avatar
Deike Kleberg committed
608
609
  vlistID  = streamInqVlist ( streamID );
  collID   = commRankGlob2CollID ( vlistInqVarIOrank ( vlistID, varID ));
610
611
  xassert ( collID         >= 0                    &&
            collID         <  commInqNProcsColl () &&
612
            txWin != NULL);
Deike Kleberg's avatar
Deike Kleberg committed
613

614
  collWait(collID);
615

616

617
  struct winHeaderEntry dataHeader
618
    = { .id = streamID, .specific.dataRecord = { varID, nmiss }, .offset = -1 };
619
  modelWinEnqueue(collID, dataHeader, packData, packDataFunc);
620
  {
621
    struct winHeaderEntry partHeader
622
623
      = { .id = PARTDESCMARKER,
          .specific.partDesc = { .uid = xt_idxlist_get_uid(partDesc) },
624
          .offset = 0 };
625
    modelWinEnqueue(collID, partHeader, partDesc, cdiPio_xt_idxlist_pack_wrap);
626
  }
627

628
  txWin[collID].refuseFuncCall = 1;
Deike Kleberg's avatar
Deike Kleberg committed
629
630
}

631
632
633
634
635
636
637
void
pioBufferPartData(int streamID, int varID, const double *data,
                  int nmiss, Xt_idxlist partDesc)
{
  int chunk = xt_idxlist_get_num_indices(partDesc);
  xassert(chunk <= INT_MAX);
  pioBufferPartData_(streamID, varID,
638
                     &(struct memCpyDataDesc){data, (size_t)chunk * sizeof (data[0])},
639
640
641
642
                     memcpyPackFunc,
                     nmiss, partDesc);
}

643
644
645
646
647
648
struct scatterGatherDesc
{
  void *data;
  const int *blocklengths, *displacements;
  size_t elemSize;
  unsigned numBlocks;
649
  unsigned numElems;
650
651
652
653
654
655
};

static void
scatterGatherPackFunc(void *dataDesc, void *buf, int size, int *pos,
                      void *context)
{
656
  (void)context;
657
658
659
660
661
662
  const struct scatterGatherDesc *p = dataDesc;
  unsigned numBlocks = p->numBlocks;
  const int *bls = p->blocklengths, *disps = p->displacements;
  int pos_ = *pos;
  unsigned char *dstBuf = buf + pos_, *bufEnd = (unsigned char *)buf + size;
  size_t elemSize = p->elemSize;
663
  xassert(elemSize <= SSIZE_MAX);
664
  const unsigned char *data = p->data;
665
666
  unsigned copyCount = 0, numElems = p->numElems;
  for (unsigned j = 0; j < numBlocks && copyCount < numElems; ++j)
667
668
669
670
    {
      int bl = bls[j];
      if (bl > 0)
        {
671
672
673
674
675
676
          if ((unsigned)bl + copyCount > numElems)
            {
              bl = (int)(numElems - copyCount);
              Warning("%s: %s", "streamWriteScatteredVarPart",
                      "blocks longer than number of elements in index list!");
            }
677
678
          size_t bsize = (size_t)bl * elemSize;
          xassert(dstBuf + bsize <= bufEnd);
679
          memcpy(dstBuf, data + (ssize_t)elemSize * (ssize_t)disps[j], bsize);
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
          dstBuf += bsize;
        }
    }
  *pos = (int)(dstBuf - (unsigned char *)buf);
}


void
cdiPioBufferPartDataGather(int streamID, int varID, const double *data,
                           int numBlocks, const int blocklengths[],
                           const int displacements[],
                           int nmiss, Xt_idxlist partDesc)
{
  xassert(numBlocks >= 0);
  pioBufferPartData_(streamID, varID,
                     &(struct scatterGatherDesc)
                     { .data = (void *)data, .blocklengths = blocklengths,
                       .displacements = displacements,
698
699
                       .elemSize = sizeof (data[0]),
                       .numBlocks = (unsigned)numBlocks,
700
701
                       .numElems
                         = (unsigned)xt_idxlist_get_num_indices(partDesc) },
702
703
704
705
706
                     scatterGatherPackFunc,
                     nmiss, partDesc);
}


707
708
/************************************************************************/

709
void pioBufferFuncCall(struct winHeaderEntry header,
710
                       const void *data, valPackFunc dataPackFunc)
711
712
713
714
{
  int rankGlob = commInqRankGlob ();
  int root = commInqRootGlob ();
  int collID, nProcsColl = commInqNProcsColl ();
715
  int funcID = header.id;
716

717
718
  xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
  xdebug("%s, func: %s", "START", funcMap[(-1 - funcID)]);
719

Deike Kleberg's avatar
Deike Kleberg committed
720
721
  if ( rankGlob != root ) return;

722
  xassert(txWin != NULL);
723

Thomas Jahns's avatar
Thomas Jahns committed
724
725
  for (collID = 0; collID < nProcsColl; ++collID)
    {
726
      collWait(collID);
727
728
      xassert(txWin[collID].dictRPCUsed + txWin[collID].dictDataUsed
              < txWin[collID].dictSize);
Thomas Jahns's avatar
Thomas Jahns committed
729
      xassert(txWin[collID].refuseFuncCall == 0);
730
      modelWinEnqueue(collID, header, data, dataPackFunc);
731
    }
732

733
  xdebug("%s", "RETURN");
734
735
}

736
737
738
739
740
741

void
cdiPioNoPostCommSetup(void)
{
}

742
743
/*****************************************************************************/

744
/* pioInit definition must currently compile even in non-MPI configurations */
745
746
747
/**
   @brief initializes the MPI_Communicators needed for the
  communication between the calculator PEs and the I/O PEs and within the
748
749
750
751
752
753
754
755
756
  group of I/O PEs.

  commGlob: all PEs

  commPIO: I/O PEs, PEs with highest ranks in commGlob

  commModel: calculating PEs, no I/O PEs

  commsIO[i]:
757
758

  Collective call
759
760
761

  @param comm MPI_Communicator of all calling PEs
  @param nIOP number of I/O PEs
762
763
764
  @param partInflate allow for array partitions on comute
  PE that are at most sized \f$ partInflate * \lceil arraySize /
  numComputePEs \rceil \f$
765
766
  @param postSetupActions function which is called by all I/O servers
  after communicator split
767
768
769
  @return int indicating wether the calling PE is a calcutator (1) or not (0)
*/

Thomas Jahns's avatar
Thomas Jahns committed
770
static int pioNamespace_ = -1;
771
static int xtInitByCDI = 0;
Thomas Jahns's avatar
Thomas Jahns committed
772

773
774
MPI_Comm
pioInit(MPI_Comm commGlob, int nProcsIO, int IOMode,
775
776
        int *pioNamespace, float partInflate,
        void (*postCommSetupActions)(void))
777
{
Thomas Jahns's avatar
Thomas Jahns committed
778
  int sizeGlob;
Deike Kleberg's avatar
Deike Kleberg committed
779

780
781
  namespaceSwitchSet(NSSWITCH_WARNING, NSSW_FUNC(cdiPioWarning));

Deike Kleberg's avatar
Deike Kleberg committed
782
  if ( IOMode < PIO_MINIOMODE || IOMode > PIO_MAXIOMODE )
Deike Kleberg's avatar
Deike Kleberg committed
783
    xabort ( "IOMODE IS NOT VALID." );
784

785
#ifdef _SX
Deike Kleberg's avatar
Deike Kleberg committed
786
787
  if ( IOMode ==  PIO_ASYNCH )
    xabort ( "PIO_ASYNCH DOES NOT WORK ON SX." );
788
789
#endif

790
  if ((xtInitByCDI = (!xt_initialized() || xt_finalized())))
791
    xt_initialize(commGlob);
792
793
  commInit ();
  commDefCommGlob ( commGlob );
794
  sizeGlob = commInqSizeGlob ();
795

796
797
  if (((IOMode != PIO_NONE && (nProcsIO <= 0 || nProcsIO > sizeGlob - 1)))
      || (IOMode == PIO_NONE && nProcsIO != 1))
798
799
    xabort("DISTRIBUTION OF TASKS ON PROCS IS NOT VALID.\n"
           "nProcsIO=%d, sizeGlob=%d\n", nProcsIO, sizeGlob);
Deike Kleberg's avatar
Deike Kleberg committed
800
801

  commDefNProcsIO ( nProcsIO );
802
  commDefIOMode   ( IOMode );
Deike Kleberg's avatar
Deike Kleberg committed
803
  commDefCommPio  ();
804

805
806
807
  xassert(partInflate >= 1.0);
  cdiPIOpartInflate_ = partInflate;

Deike Kleberg's avatar
Deike Kleberg committed
808
  // JUST FOR TEST CASES WITH ONLY ONE MPI TASK
809
  if ( commInqSizeGlob () == 1 )
810
    {
811
      pioNamespace_ = *pioNamespace = namespaceNew();
Deike Kleberg's avatar
Deike Kleberg committed
812
813
      return commInqCommGlob ();
    }
814

815
816
  if ( commInqIsProcIO ())
    {
817
      cdiPioSerializeSetMPI();
818
819
820
      namespaceSwitchSet(NSSWITCH_ABORT, NSSW_FUNC(cdiAbortC_MPI));
      namespaceSwitchSet(NSSWITCH_FILE_OPEN, NSSW_FUNC(pioFileOpen));
      namespaceSwitchSet(NSSWITCH_FILE_CLOSE, NSSW_FUNC(pioFileClose));
821
      cdiPioServer(postCommSetupActions);
822
      namespaceNew();
823
      commDestroy ();
824
825
826
      if (xtInitByCDI)
        xt_finalize();
      return MPI_COMM_NULL;
827
828
    }
  else
829
    cdiPioClientSetup(&pioNamespace_, pioNamespace);
830

831
  xdebug ( "nProcsGlob=%d, RETURN", sizeGlob );
832
  return commInqCommModel ();
833
}
834

835
/*****************************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
836

837
void  pioEndDef ( void )
838
{
Deike Kleberg's avatar
Deike Kleberg committed
839
  char   * buffer;
840
  int bufferSize;
841
  int rankGlob = commInqRankGlob ();
842

843
  xdebug("%s", "START");
844

Deike Kleberg's avatar
Deike Kleberg committed
845
  varsMapNDeco ( commInqNNodes (), commInqNodeSizes ());
Deike Kleberg's avatar
Deike Kleberg committed
846

847
  if ( rankGlob < commInqNProcsColl ())
Deike Kleberg's avatar
Deike Kleberg committed
848
    {
849
850
      MPI_Comm comm = commInqCommsIO ( rankGlob );
      reshPackBufferCreate(&buffer, &bufferSize, &comm);
851
852

      xmpi ( MPI_Send ( buffer, bufferSize, MPI_PACKED, commInqNProcsModel (),
853
                        RESOURCES, commInqCommsIO ( rankGlob )));
854

855
      xdebug("%s", "SENT MESSAGE WITH TAG \"RESOURCES\"");
856

Deike Kleberg's avatar
Deike Kleberg committed
857
      reshPackBufferDestroy ( &buffer );
Deike Kleberg's avatar