pio_server.c 51.8 KB
Newer Older
Deike Kleberg's avatar
Deike Kleberg committed
1
2
/** @file ioServer.c
*/
3
4
5
6
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

Deike Kleberg's avatar
Deike Kleberg committed
7
8
9
#include "pio_server.h"


10
#include <limits.h>
Deike Kleberg's avatar
Deike Kleberg committed
11
12
#include <stdlib.h>
#include <stdio.h>
13
14
15
16
17
18

#ifdef HAVE_PARALLEL_NC4
#include <core/ppm_combinatorics.h>
#include <core/ppm_rectilinear.h>
#include <ppm/ppm_uniform_partition.h>
#endif
19
#include <yaxt.h>
20

Deike Kleberg's avatar
Deike Kleberg committed
21
#include "cdi.h"
22
#include "cdipio.h"
23
#include "cdi_int.h"
24
#include "dmemory.h"
25
#include "namespace.h"
26
#include "taxis.h"
Deike Kleberg's avatar
Deike Kleberg committed
27
#include "pio.h"
Deike Kleberg's avatar
Deike Kleberg committed
28
#include "pio_comm.h"
29
#include "pio_conf.h"
30
#include "pio_id_set.h"
31
#include "pio_interface.h"
Deike Kleberg's avatar
Deike Kleberg committed
32
#include "pio_rpc.h"
Deike Kleberg's avatar
Deike Kleberg committed
33
#include "pio_util.h"
34
35
36
#ifndef HAVE_NETCDF_PAR_H
#define MPI_INCLUDED
#endif
37
#include "pio_cdf_int.h"
38
#include "resource_handle.h"
39
#include "resource_unpack.h"
Thomas Jahns's avatar
Thomas Jahns committed
40
#include "stream_cdf.h"
Deike Kleberg's avatar
Deike Kleberg committed
41
#include "vlist_var.h"
42

43

44
struct clientBuf
45
46
{
  size_t size;
47
  unsigned char *mem;
48
  int dictSize;
49
};
50

51
52
53
54
55
56
57
58
59
60
61
struct streamMemLayout
{
  Xt_uid varPartIdxListUID;
  size_t offset;
};

struct cacheRedist {
  Xt_redist redist;
  int sliceSize;
};

62
63
64
65
static struct
{
  MPI_Win getWin;
  struct clientBuf *clientBuf;
66
67
68
#if defined HAVE_LIBNETCDF && ! defined HAVE_PARALLEL_NC4
  int ownerRank;
#endif
69
70
71
72
  /* put data for description of last layout from RMA GET here */
  struct streamMemLayout *prevLayout;
  size_t numRetained;
  struct cacheRedist *retained;
73
74
  size_t aggBufSize, aggBufUsed;
  void *aggBuf;
75
76
} *rxWin;

77
static struct idList openStreams, openFiles;
Deike Kleberg's avatar
Deike Kleberg committed
78

79
80
81
struct recordWrite
{
  int varID, level;
82
  size_t dataSize;
83
84
85
86
87
88
89
90
91
92
93
94
};

struct streamMapping {
  /* data entry varMap[i] contains data for variable i or -1 if no
   * data entry for i has been transferred */
  int *varMap;
  /* numLvls[i] number of levels written for variable i or 0 if
   * variable is not written to this timestep */
  int *numLvlsW;
  /* nMiss[i] = missing values were provided for variable i */
  int *hasMissing;
  int numWrittenRecords;
Thomas Jahns's avatar
Thomas Jahns committed
95
  int numVars;
96
  struct streamMemLayout *layout;
97
98
99
100
  struct recordWrite writtenRecords[];
};


101
102
103
104
105
#ifdef HAVE_PARALLEL_NC4
/* prime factorization of number of pio collectors */
static uint32_t *pioPrimes;
static int numPioPrimes;
#endif
Deike Kleberg's avatar
Deike Kleberg committed
106

Deike Kleberg's avatar
Deike Kleberg committed
107
108
/************************************************************************/

109
static void
110
cdiPioServerStreamWinDestroy(size_t streamIdx)
111
{
112
  if (rxWin[streamIdx].getWin != MPI_WIN_NULL)
113
    {
114
115
      Free(rxWin[streamIdx].clientBuf[0].mem);
      xmpi(MPI_Win_free(&rxWin[streamIdx].getWin));
116
    }
Deike Kleberg's avatar
Deike Kleberg committed
117
}
118

119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
static int numClients_, *clientRanks_;

static void
setupClientRanks(void)
{
  MPI_Group clientGroup = cdiPioInqRemoteGroup();
  xmpi(MPI_Group_size(clientGroup, &numClients_));
  clientRanks_ = Malloc((size_t)numClients_ * sizeof (clientRanks_[0]));
  int *ranks = Malloc((size_t)numClients_ * sizeof (ranks[0]));
  for (int i = 0; i < numClients_; ++i)
    ranks[i] = i;
  MPI_Comm collClientIntraComm = cdiPioInqCollClientIntraComm();
  MPI_Group groupCollClient;
  xmpi(MPI_Comm_group(collClientIntraComm, &groupCollClient));
  xmpi(MPI_Group_translate_ranks(clientGroup, numClients_, ranks,
                                 groupCollClient, clientRanks_));
  xmpi(MPI_Group_free(&groupCollClient));
  Free(ranks);
}
138

139
static void
140
141
142
cdiPioServerStreamWinCreate(size_t streamIdx, MPI_Info no_locks_info,
                            MPI_Comm collClientIntraComm,
                            struct clientBufSize *bufSizes)
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
{
  xmpi(MPI_Win_create(MPI_BOTTOM, 0, 1, no_locks_info, collClientIntraComm,
                      &rxWin[streamIdx].getWin));
  size_t streamBufferSize = 0;
  for (size_t i = 0; i < (size_t)numClients_; ++i)
    {
      streamBufferSize +=
        (rxWin[streamIdx].clientBuf[i].size = bufSizes[i].bufSize);
      rxWin[streamIdx].clientBuf[i].dictSize
        = bufSizes[i].numDataRecords + bufSizes[i].numRPCRecords;
    }
  rxWin[streamIdx].clientBuf[0].mem = Malloc(streamBufferSize);
  for (size_t i = 1; i < (size_t)numClients_; ++i)
    rxWin[streamIdx].clientBuf[i].mem
      = rxWin[streamIdx].clientBuf[i-1].mem + bufSizes[i-1].bufSize;
}


Deike Kleberg's avatar
Deike Kleberg committed
161
162
/************************************************************************/

163
static void
164
readFuncCall(struct winHeaderEntry *header, size_t streamIdx)
Deike Kleberg's avatar
Deike Kleberg committed
165
{
166
  int funcID = header->id;
167
  union funcArgs *funcArgs = &(header->specific.funcArgs);
Deike Kleberg's avatar
Deike Kleberg committed
168

169
  xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
Deike Kleberg's avatar
Deike Kleberg committed
170
171
  switch ( funcID )
    {
172
173
    case STREAMDEFTIMESTEP:
      {
174
        MPI_Comm pioInterComm = cdiPioInqInterComm();
175
        int streamID = funcArgs->streamNewTimestep.streamID;
176
        int originNamespace = namespaceResHDecode(streamID).nsp;
177
178
179
        streamID = namespaceAdaptKey2(streamID);
        int oldTaxisID
          = vlistInqTaxis(streamInqVlist(streamID));
180
        int position = header->offset;
181
        int changedTaxisID
182
183
184
          = taxisUnpack((char *)rxWin[streamIdx].clientBuf[0].mem,
                        (int)rxWin[streamIdx].clientBuf[0].size,
                        &position, originNamespace, &pioInterComm, 0);
185
186
187
188
        taxis_t *oldTaxisPtr = taxisPtr(oldTaxisID);
        taxis_t *changedTaxisPtr = taxisPtr(changedTaxisID);
        ptaxisCopy(oldTaxisPtr, changedTaxisPtr);
        taxisDestroy(changedTaxisID);
189
        streamDefTimestep(streamID, funcArgs->streamNewTimestep.tsID);
190
191
      }
      break;
Deike Kleberg's avatar
Deike Kleberg committed
192
    default:
193
      xabort ( "REMOTE FUNCTIONCALL NOT IMPLEMENTED!" );
Deike Kleberg's avatar
Deike Kleberg committed
194
195
196
197
198
    }
}

/************************************************************************/

199
static void
200
resizeVarGatherBuf(int size, double **buf, int *bufSize)
201
202
{
  if (size <= *bufSize) ; else
203
    *buf = Realloc(*buf, (size_t)(*bufSize = size) * sizeof (buf[0][0]));
204
205
}

206
207
#define wHECast struct winHeaderEntry *)(void *

208
209
210
211
static Xt_redist
buildVarRedist(int headerIdx, size_t streamIdx,
               /* index list representing the data elements gathered on
                * this rank */
212
213
               Xt_idxlist dstList,
               const struct cdiPioConf *conf)
214
{
215
216
  const struct clientBuf *restrict clientBuf = rxWin[streamIdx].clientBuf;
  const struct winHeaderEntry *winDict
217
    = (wHECast)clientBuf[0].mem;
218
  int streamID = openStreams.entries[streamIdx];
219
  int varID = winDict[headerIdx].specific.dataRecord.varID;
220
  struct Xt_offset_ext *partExts
221
222
223
224
225
226
227
228
    = Malloc((size_t)numClients_ * sizeof (partExts[0]));
  Xt_idxlist *part = Malloc((size_t)numClients_ * sizeof (part[0]));
  MPI_Comm pioInterComm = cdiPioInqInterComm(),
    collComm = commInqCommColl();
  for (size_t clientIdx = 0; clientIdx < (size_t)numClients_; ++clientIdx)
    {
      unsigned char *clientMem = clientBuf[clientIdx].mem;
      struct dataRecord *dataHeader
229
230
231
        = &((wHECast)clientMem)[headerIdx].specific.dataRecord;
      int position = ((wHECast)clientMem)[headerIdx + 1].offset;
      xassert(namespaceAdaptKey2(((wHECast)clientMem)[headerIdx].id) == streamID
232
              && dataHeader->varID == varID
233
              && (((wHECast)clientMem)[headerIdx + 1].id == PARTDESCMARKER)
234
235
236
237
238
239
240
241
242
243
244
              && position > 0
              && ((size_t)position
                  >= sizeof (struct winHeaderEntry)
                  * (size_t)clientBuf[clientIdx].dictSize)
              && ((size_t)position < clientBuf[clientIdx].size));
      part[clientIdx]
        = xt_idxlist_unpack(clientMem, (int)clientBuf[clientIdx].size,
                            &position, pioInterComm);
      unsigned partSize
        = (unsigned)xt_idxlist_get_num_indices(part[clientIdx]);
      size_t charOfs = (size_t)((clientMem
245
                                 + ((wHECast)clientMem)[headerIdx].offset)
246
247
248
249
250
251
252
253
254
255
256
                                - clientBuf[0].mem);
      xassert(charOfs % sizeof (double) == 0
              && charOfs / sizeof (double) + partSize <= INT_MAX);
      int elemOfs = (int)(charOfs / sizeof (double));
      partExts[clientIdx].start = elemOfs;
      partExts[clientIdx].size = (int)partSize;
      partExts[clientIdx].stride = 1;
    }
  Xt_idxlist srcList = xt_idxlist_collection_new(part, numClients_);
  for (size_t clientIdx = 0; clientIdx < (size_t)numClients_; ++clientIdx)
    xt_idxlist_delete(part[clientIdx]);
257
  Free(part);
258
259
260
261
262
263
  if (conf->stripify)
    {
      Xt_idxlist srcListStriped = xt_idxstripes_from_idxlist_new(srcList);
      xt_idxlist_delete(srcList);
      srcList = srcListStriped;
    }
264
  Xt_xmap gatherXmap = conf->xmap_new(srcList, dstList, collComm);
265
  xt_idxlist_delete(srcList);
266
267
268
269
270
271
  struct Xt_offset_ext gatherExt
    = { .start = 0,
        .size = xt_idxlist_get_num_indices(dstList),
        .stride = 1 };
  Xt_redist varRedist
    = xt_redist_p2p_ext_new(gatherXmap, numClients_, partExts, 1, &gatherExt,
272
                            MPI_DOUBLE);
273
  xt_xmap_delete(gatherXmap);
274
  Free(partExts);
275
276
277
278
  return varRedist;
}


279
280
281
282
283
struct xyzDims
{
  int sizes[3];
};

284
285
286
287
288
289
static Xt_idxlist
buildVarSlicesIdxList(int vlistID, int varID, int startLvl, int numLvl)
{
  int varShape[3] = { 0, 0, 0 };
  cdiPioQueryVarDims(varShape, vlistID, varID);
  /* int varSize = varShape[0] * varShape[1] * varShape[2]; */
Thomas Jahns's avatar
Thomas Jahns committed
290
291
292
  Xt_int varShapeXt[3],
    origin[3] = { startLvl >= 0 ? (Xt_int)startLvl:0, 0, 0 };
  int sliceShape[3];
293
  for (unsigned i = 0; i < 3; ++i)
Thomas Jahns's avatar
Thomas Jahns committed
294
295
296
297
298
    varShapeXt[2 - i] = (Xt_int)varShape[i];
  sliceShape[0] = numLvl >= 0 ? numLvl : (int)varShape[2];
  sliceShape[1] = varShape[1];
  sliceShape[2] = varShape[0];
  return xt_idxsection_new(0, 3, varShapeXt, sliceShape, origin);
299
300
301
302
303
304
305
}

static int
countVarChunkMissingVals(int vlistID, int varID,
                         struct streamMapping *mapping,
                         int chunkLen, const double *restrict data)
{
306
  size_t nmiss = 0;
307
308
309
310
311
312
313
314
315
  if (mapping->hasMissing[varID])
    {
      double missval = vlistInqVarMissval(vlistID, varID);
      for (size_t i = 0; i < (size_t)chunkLen; ++i)
        nmiss += (data[i] == missval);
    }
  return nmiss;
}

316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
#ifdef HAVE_LIBNETCDF
static void
gatherArray(int headerIdx, size_t streamIdx,
            double *gatherBuf,
            /* index list representing the data elements gathered on
             * this rank */
            Xt_idxlist dstList,
            const struct cdiPioConf *conf)
{
  Xt_redist gatherRedist = buildVarRedist(headerIdx, streamIdx,
                                          dstList, conf);
  xt_redist_s_exchange1(gatherRedist,
                        rxWin[streamIdx].clientBuf[0].mem, gatherBuf);
  xt_redist_delete(gatherRedist);
}
#endif

333
334
#ifdef HAVE_PARALLEL_NC4
static void
335
queryVarBounds(struct PPM_extent varShape[3], int vlistID, int varID)
336
{
337
338
  varShape[0].first = 0;
  varShape[1].first = 0;
339
  varShape[2].first = 0;
340
  int sizes[3];
341
  cdiPioQueryVarDims(sizes, vlistID, varID);
342
343
  for (unsigned i = 0; i < 3; ++i)
    varShape[i].size = sizes[i];
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
}

/* compute distribution of collectors such that number of collectors
 * <= number of variable grid cells in each dimension */
static struct xyzDims
varDimsCollGridMatch(const struct PPM_extent varDims[3])
{
  xassert(PPM_extents_size(3, varDims) >= commInqSizeColl());
  struct xyzDims collGrid = { { 1, 1, 1 } };
  /* because of storage order, dividing dimension 3 first is preferred */
  for (int i = 0; i < numPioPrimes; ++i)
    {
      for (int dim = 2; dim >=0; --dim)
        if (collGrid.sizes[dim] * pioPrimes[i] <= varDims[dim].size)
          {
            collGrid.sizes[dim] *= pioPrimes[i];
            goto nextPrime;
          }
      /* no position found, retrack */
      xabort("Not yet implemented back-tracking needed.");
      nextPrime:
      ;
    }
  return collGrid;
}

static void
myVarPart(struct PPM_extent varShape[3], struct xyzDims collGrid,
          struct PPM_extent myPart[3])
{
  int32_t myCollGridCoord[3];
  {
    struct PPM_extent collGridShape[3];
    for (int i = 0; i < 3; ++i)
      {
        collGridShape[i].first = 0;
        collGridShape[i].size = collGrid.sizes[i];
      }
    PPM_lidx2rlcoord_e(3, collGridShape, commInqRankColl(), myCollGridCoord);
    xdebug("my coord: (%d, %d, %d)", myCollGridCoord[0], myCollGridCoord[1],
           myCollGridCoord[2]);
  }
  PPM_uniform_partition_nd(3, varShape, collGrid.sizes,
                           myCollGridCoord, myPart);
}
389
390
391
392
393

/* collective writing variant */
static void
writeNetCDFStream(size_t streamIdx,
                  struct streamMapping *mapping,
394
395
                  double **data_, int *currentDataBufSize,
                  const struct cdiPioConf *conf)
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
{
  int nvars = mapping->numVars;
  int *restrict varMap = mapping->varMap;
  int streamID = openStreams.entries[streamIdx],
    vlistID = streamInqVlist(streamID);
  for (int varID = 0; varID < nvars; ++varID)
    if (mapping->numLvlsW[varID])
      {
        struct PPM_extent varShape[3];
        queryVarBounds(varShape, vlistID, varID);
        struct xyzDims collGrid = varDimsCollGridMatch(varShape);
        xdebug("writing varID %d with dimensions: "
               "x=%d, y=%d, z=%d,\n"
               "found distribution with dimensions:"
               " x=%d, y=%d, z=%d.", varID,
               varShape[0].size, varShape[1].size, varShape[2].size,
               collGrid.sizes[0], collGrid.sizes[1],
               collGrid.sizes[2]);
        struct PPM_extent varChunk[3];
        myVarPart(varShape, collGrid, varChunk);
        int myChunk[3][2];
        for (int i = 0; i < 3; ++i)
          {
            myChunk[i][0] = PPM_extent_start(varChunk[i]);
            myChunk[i][1] = PPM_extent_end(varChunk[i]);
          }
        xdebug("Writing chunk { { %d, %d }, { %d, %d },"
               " { %d, %d } }", myChunk[0][0], myChunk[0][1],
               myChunk[1][0], myChunk[1][1], myChunk[2][0],
               myChunk[2][1]);
        Xt_int varSize[3];
        for (int i = 0; i < 3; ++i)
          varSize[2 - i] = varShape[i].size;
        Xt_idxlist preWriteChunk;
        /* prepare yaxt descriptor for write chunk */
        {
          Xt_int preWriteChunkStart[3];
          int preWriteChunkSize[3];
          for (int i = 0; i < 3; ++i)
            {
              preWriteChunkStart[2 - i] = (Xt_int)varChunk[i].first;
              preWriteChunkSize[2 - i] = (int)varChunk[i].size;
            }
          preWriteChunk = xt_idxsection_new(0, 3, varSize,
                                            preWriteChunkSize,
                                            preWriteChunkStart);
        }
        resizeVarGatherBuf(xt_idxlist_get_num_indices(preWriteChunk),
                           data_, currentDataBufSize);
        double *restrict data = *data_;
        /* transpose data into write deco */
        {
          int headerIdx = varMap[varID];
449
          gatherArray(headerIdx, streamIdx, data, preWriteChunk, conf);
450
451
452
          xt_idxlist_delete(preWriteChunk);
        }
        /* count missing values if appropriate */
453
        size_t nmiss
454
455
456
457
458
459
460
461
462
463
          = countVarChunkMissingVals(vlistID, varID, mapping,
                                     PPM_extents_size(3, varChunk),
                                     data);
        /* write chunk */
        streamWriteVarChunk(streamID, varID,
                            (const int (*)[2])myChunk, data,
                            nmiss);
      }
}

464
465
466
#elif defined (HAVE_LIBNETCDF)
/* needed for writing when some files are only written to by a single process */
/* cdiOpenFileMap(fileID) gives the writer process */
Thomas Jahns's avatar
Thomas Jahns committed
467
static int cdiPioSerialOpenFileMap(int streamID)
468
{
469
470
471
  size_t streamIdx = indexOfID(&openStreams, streamID);
  xassert(streamIdx < SIZE_MAX);
  return rxWin[streamIdx].ownerRank;
472
473
474
}
/* for load-balancing purposes, count number of files per process */
/* cdiOpenFileCounts[rank] gives number of open files rank has to himself */
475
static int *cdiSerialOpenFileCount;
Thomas Jahns's avatar
Thomas Jahns committed
476
477
478

static int
cdiPioNextOpenRank()
479
480
481
482
483
484
485
486
487
488
489
490
491
{
  xassert(cdiSerialOpenFileCount != NULL);
  int commCollSize = commInqSizeColl();
  int minRank = 0, minOpenCount = cdiSerialOpenFileCount[0];
  for (int i = 1; i < commCollSize; ++i)
    if (cdiSerialOpenFileCount[i] < minOpenCount)
      {
        minOpenCount = cdiSerialOpenFileCount[i];
        minRank = i;
      }
  return minRank;
}

492
493
static void
cdiPioOpenFileOnRank(int rank)
494
495
{
  xassert(cdiSerialOpenFileCount != NULL
496
          && (unsigned)rank < (unsigned)commInqSizeColl());
497
498
499
  ++(cdiSerialOpenFileCount[rank]);
}

500
501
static void
cdiPioCloseFileOnRank(int rank)
502
503
504
505
506
507
508
{
  xassert(cdiSerialOpenFileCount != NULL
          && rank >= 0 && rank < commInqSizeColl());
  xassert(cdiSerialOpenFileCount[rank] > 0);
  --(cdiSerialOpenFileCount[rank]);
}

509
510
511
512
513
514
515
static void
cdiPioServerCdfDefVars(stream_t *streamptr)
{
  int rank, rankOpen;
  if (commInqIOMode() == PIO_NONE
      || ((rank = commInqRankColl())
          == (rankOpen = cdiPioSerialOpenFileMap(streamptr->self))))
516
    cdfDefCoordinateVars(streamptr);
517
518
}

519
520
521
static void
writeNetCDFStream(size_t streamIdx,
                  struct streamMapping *mapping,
522
523
                  double **data_, int *currentDataBufSize,
                  const struct cdiPioConf *conf)
524
{
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
  int nvars = mapping->numVars;
  int *restrict varMap = mapping->varMap,
    *restrict numLvlsW = mapping->numLvlsW;
  /* determine process which has stream open (writer) and
   * which has data for which variable (var owner)
   * three cases need to be distinguished */
  int streamID = openStreams.entries[streamIdx],
    vlistID = streamInqVlist(streamID);
  int writerRank = cdiPioSerialOpenFileMap(streamID);
  int collRank = commInqRankColl();
  for (int varID = 0; varID < nvars; ++varID)
    if (numLvlsW[varID])
      {
        int varSize;
        Xt_idxlist dstList;
        if (writerRank == collRank)
          {
            dstList = buildVarSlicesIdxList(vlistID, varID, -1, -1);
            varSize = xt_idxlist_get_num_indices(dstList);
            resizeVarGatherBuf(varSize, data_, currentDataBufSize);
          }
        else
          {
            varSize = 0;
            dstList = xt_idxempty_new();
          }
        double *restrict data = *data_;
        int headerIdx = varMap[varID];
553
        gatherArray(headerIdx, streamIdx, data, dstList, conf);
554
555
        if (writerRank == collRank)
          {
556
            size_t nmiss = countVarChunkMissingVals(vlistID, varID,
557
558
559
560
561
562
                                                 mapping, varSize, data);
            streamWriteVar(streamID, varID, data, nmiss);
          }
        xt_idxlist_delete(dstList);
      }
}
563

564
#endif
565

566
567
568
static inline struct winHeaderEntry *
winDictEntry(size_t streamIdx, size_t client, size_t entry)
{
569
  return ((wHECast)rxWin[streamIdx].clientBuf[client].mem)
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
    + entry;
}

static struct streamMemLayout *
getLayout(size_t streamIdx)
{
  int streamID = openStreams.entries[streamIdx];
  size_t numClients = (size_t)numClients_;
  int vlistID = streamInqVlist(streamID);
  size_t numVars = (size_t)vlistNvars(vlistID);
  struct streamMemLayout (*layout)[numVars]
    = Calloc(numClients * numVars, sizeof (layout[0]));
  size_t numDataEntries
    = (size_t)(winDictEntry(streamIdx, 0, 0)->specific.headerSize.numDataEntries);
  for (size_t client = 0; client < numClients; ++client)
    for (size_t headerIdx = 1; headerIdx < numDataEntries; headerIdx += 2)
      {
        xassert(namespaceAdaptKey2(winDictEntry(streamIdx, client,
                                                headerIdx)->id) == streamID);
        struct winHeaderEntry *varHeader
          = winDictEntry(streamIdx, client, headerIdx);
        size_t varID = (size_t)varHeader[0].specific.dataRecord.varID;
        size_t offset = (size_t)varHeader[0].offset;
593
        Xt_uid uid = unpackXTUID(varHeader[1].specific.partDesc.packedUID);
594
595
596
597
598
599
        layout[client][varID] = (struct streamMemLayout){
          .varPartIdxListUID = uid, .offset = offset };
      }
  return *layout;
}

600
601
/* build inventory of written variables for stream */
static struct streamMapping *
602
603
streamMappingNew(size_t streamIdx, struct winHeaderEntry *winDict,
                 const struct cdiPioConf *conf)
Thomas Jahns's avatar
Thomas Jahns committed
604
{
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
  int streamID = openStreams.entries[streamIdx];
  int numDataEntries = winDict[0].specific.headerSize.numDataEntries;
  int vlistID = streamInqVlist(streamID);
  int numVars = vlistNvars(vlistID);
  /* varMap[i] == index of header if variable i is written to,
   * numLvlsW[i] == number of levels of variable i or 0 if not written
   */
  int *restrict varMap = Calloc((size_t)numVars * 4, sizeof (varMap[0])),
    *restrict hasMissing = varMap + numVars,
    *restrict numLvlsW = varMap + 2 * numVars,
    *restrict hasMissing_ = varMap + 3 * numVars;
  for (int headerIdx = 1; headerIdx < numDataEntries; headerIdx += 2)
    {
      xassert(namespaceAdaptKey2(winDict[headerIdx].id) == streamID);
      int varID = winDict[headerIdx].specific.dataRecord.varID;
      /* ensure a variable has not been enqueued twice */
      /* FIXME: this could better be ensured on client */
622
      xassert(varID < numVars && varID >= 0 && varMap[varID] == 0);
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
      varMap[varID] = headerIdx;
      hasMissing[varID] += winDict[headerIdx].specific.dataRecord.nmiss;
    }
  /* set numLvlsW[i] to 1 if varMap[i] != 0 on any collector,
   * also sets hasMissing_[i] to global reduction of hasMissing[i] */
  xmpi(MPI_Allreduce(varMap, numLvlsW, 2 * numVars, MPI_INT,
                     MPI_LOR, commInqCommColl()));
  /* now find numbers of levels for each variable written anywhere */
  size_t numWrittenRecords = 0;
  for (int varID = 0; varID < numVars; ++varID)
    if (numLvlsW[varID])
      numWrittenRecords
        += (size_t)(numLvlsW[varID]
                    = zaxisInqSize(vlistInqVarZaxis(vlistID, varID)));
  struct streamMapping *result
    = Malloc(sizeof (*result)
              + numWrittenRecords * sizeof (result->writtenRecords[0])
              + (size_t)numVars * 3 * sizeof (result->varMap[0]));
  result->varMap
    = (void *)((unsigned char *)result + sizeof (*result)
               + numWrittenRecords * sizeof (result->writtenRecords[0]));
  result->numLvlsW = result->varMap + numVars;
  result->hasMissing = result->varMap + 2 * numVars;
  {
    size_t j = (size_t)-1;
648
649
650
651
    /* initialized to shut up gcc, loop logic ensures initialization
     * at least once */
    size_t recordDataSize = 0;
    int lastVarID = -1;
652
653
654
    for (int varID = 0; varID < numVars; ++varID)
      {
        size_t numLvl = (size_t)(result->numLvlsW[varID] = numLvlsW[varID]);
655
656
657
658
659
660
661
662
        if (varID != lastVarID)
          {
            int varShape[3];
            cdiPioQueryVarDims(varShape, vlistID, varID);
            recordDataSize = (size_t)varShape[0] * (size_t)varShape[1]
              * sizeof (double);
            lastVarID = varID;
          }
663
664
665
666
        result->varMap[varID] = varMap[varID];
        result->hasMissing[varID] = hasMissing_[varID];
        for (size_t lvl = 0; lvl < numLvl; ++lvl)
          result->writtenRecords[++j]
667
668
            = (struct recordWrite){ .varID = varID, .level = (int)lvl,
                                    .dataSize = recordDataSize};
669
670
671
672
673
      }
  }
  result->numVars = numVars;
  result->numWrittenRecords = (int)numWrittenRecords;
  Free(varMap);
674
  result->layout = conf->cacheRedists ? getLayout(streamIdx) : NULL;
675
  return result;
Thomas Jahns's avatar
Thomas Jahns committed
676
677
}

678
679
static void
streamMappingDelete(struct streamMapping **mapping)
680
{
681
  Free((*mapping)->layout);
682
683
  Free(*mapping);
  *mapping = NULL;
684
685
}

686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
static inline void
destructRetained(struct cacheRedist *restrict retained, size_t numRetained)
{
  for (size_t i = 0; i < (size_t)numRetained; ++i)
    xt_redist_delete(retained[i].redist);
}

static inline bool
handleRedistCache(size_t streamIdx,
                  struct streamMapping *restrict mapping,
                  size_t numPasses, int vlistID, MPI_Comm collComm)
{
  bool reuseRedists = false;
  if (!rxWin[streamIdx].retained)
    {
      rxWin[streamIdx].retained
        = Malloc(numPasses * sizeof (*rxWin[streamIdx].retained));
      rxWin[streamIdx].numRetained = numPasses;
      rxWin[streamIdx].prevLayout = mapping->layout;
      mapping->layout = NULL;
    }
  else
    {
      size_t numClients = (size_t)numClients_,
        numVars = (size_t)vlistNvars(vlistID);
      reuseRedists
        = !memcmp(mapping->layout, rxWin[streamIdx].prevLayout,
                  numClients * numVars
                  * sizeof (mapping->layout[0]));
      if (!reuseRedists)
        {
          Free(rxWin[streamIdx].prevLayout);
          rxWin[streamIdx].prevLayout = mapping->layout;
          mapping->layout = NULL;
        }
      {
        int temp = reuseRedists;
        xmpi(MPI_Allreduce(MPI_IN_PLACE, &temp, 1, MPI_INT, MPI_LAND,
                           collComm));
        reuseRedists = temp;
      }
      if (!reuseRedists)
        {
          destructRetained(rxWin[streamIdx].retained,
                           rxWin[streamIdx].numRetained);
          rxWin[streamIdx].retained
            = Realloc(rxWin[streamIdx].retained,
                       numPasses * sizeof (*rxWin[streamIdx].retained));
          rxWin[streamIdx].numRetained = numPasses;
        }
    }
  return reuseRedists;
}

740
741
742
743
744
745
746
747
/* denote what will be aggregated at a single process */
struct passPlan
{
  unsigned recordAggStart, recordAggEnd;
  int varStart, varEnd;
};

/**
Thomas Jahns's avatar
Thomas Jahns committed
748
 * @param[out] passes_ pointer to pointer to 2-dimensional array of
749
 * records of dimensions $number of passes \cdot number of collectors$,
Thomas Jahns's avatar
Thomas Jahns committed
750
 * where $(*passes_)[pass][i]$ details the records written by collector
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
 * rank \a i
 * @return number of passes
 */
static size_t
planPasses(size_t streamIdx, const struct streamMapping *mapping,
           const struct cdiPioConf *conf, size_t collSize,
           struct passPlan (**passes_)[collSize])
{
  (void)streamIdx;
  size_t numPasses = 0;
  size_t recordAggBufLim = conf->recordAggBufLimMB * 1024 * 1024,
    totalAggBufSpace = recordAggBufLim * collSize,
    totalWritten = 0;
  /* find total size of data written for the stream and build prefix sums */
  size_t numWrittenRecords = (size_t)mapping->numWrittenRecords;

  if (numWrittenRecords == 0)
    return 0;
  size_t *restrict recordDataSizePfxSums
    = Malloc((numWrittenRecords + 1 + collSize + 1)
              * sizeof (*recordDataSizePfxSums)),
    *restrict recordSeparations = recordDataSizePfxSums + numWrittenRecords + 1;
  const struct recordWrite *restrict writtenRecords
    = mapping->writtenRecords;

  recordDataSizePfxSums[0] = 0;
  for (size_t i = 0; i < numWrittenRecords; ++i)
    {
      size_t recordDataSize = writtenRecords[i].dataSize;
      recordDataSizePfxSums[i + 1]
        = recordDataSizePfxSums[i] + recordDataSize;
      totalWritten += recordDataSize;
    }
  /* move if into loop for handling last pass */
  if (totalWritten < totalAggBufSpace)
    {
      /* don't go to limit of some tasks where a single pass will be
       * sufficient to write everything, compute load-balancing
       * instead */
      numPasses = 1;
      struct passPlan *passes = Malloc(sizeof (*passes) * collSize);
792
793
      cdiPioDeco1D_CCP(numWrittenRecords, recordDataSizePfxSums,
                       collSize, recordSeparations);
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
      for (size_t rank = 0; rank < collSize; ++rank)
        {
          size_t startRecord = recordSeparations[rank],
            lastRecord = recordSeparations[rank + 1] - 1;
          passes[rank] = (struct passPlan){
            .recordAggStart = (unsigned)startRecord,
            .recordAggEnd = (unsigned)lastRecord,
            .varStart = writtenRecords[startRecord].varID,
            .varEnd = writtenRecords[lastRecord].varID,
          };
        }
      *passes_ = (struct passPlan(*)[collSize])passes;
    }
  else
    {
      /* aggregate as many records on each task to fill up to
       * recordAggLim data bytes, but use at least one, unless none
       * remain */
      size_t firstRecordOfPass = 0, curRecord;
      struct passPlan (*passes)[collSize] = NULL;
      do
        {
          size_t taskBegin = firstRecordOfPass;
          curRecord = firstRecordOfPass - 1;
          passes = Realloc(passes, sizeof (*passes) * (numPasses + 1));
          for (size_t rank = 0; rank < collSize; ++rank)
            {
              size_t recordAggBufSize = 0;
              while (curRecord + 1 < numWrittenRecords
                     && ((recordAggBufSize
                          + writtenRecords[curRecord + 1].dataSize)
                         < recordAggBufLim))
                recordAggBufSize += writtenRecords[++curRecord].dataSize;
              if (curRecord == taskBegin - 1
                  && curRecord + 1 < numWrittenRecords)
                ++curRecord;
              passes[numPasses][rank] = (struct passPlan){
                .recordAggStart = (unsigned)taskBegin,
                .recordAggEnd = (unsigned)curRecord,
                .varStart = writtenRecords[taskBegin].varID,
                .varEnd = writtenRecords[curRecord].varID,
              };
              taskBegin = curRecord + 1;
            }
          ++numPasses, firstRecordOfPass = curRecord + 1;
        }
      while (curRecord + 1 < numWrittenRecords);
      *passes_ = passes;
    }
  Free(recordDataSizePfxSums);
  return numPasses;
}

static inline size_t
szmin(size_t a, size_t b)
{
  return a <= b ? a : b;
}

static inline size_t
szmax(size_t a, size_t b)
{
  return a >= b ? a : b;
}

static size_t
aggBufAppend(int fileID, const void *restrict ptr, size_t size)
{
  size_t fileIdx = indexOfID(&openFiles, fileID),
    aggBufSize = rxWin[fileIdx].aggBufSize,
    aggBufUsed = rxWin[fileIdx].aggBufUsed;
  void *restrict aggBuf = rxWin[fileIdx].aggBuf;
  if (aggBufUsed + size > aggBufSize)
    rxWin[fileIdx].aggBuf = aggBuf
      = Realloc(aggBuf, (rxWin[fileIdx].aggBufSize = aggBufUsed + size));
  memcpy((unsigned char *)aggBuf + aggBufUsed, ptr, size);
  rxWin[fileIdx].aggBufUsed = aggBufUsed + size;
  return size;
}

static void
aggBufFlush(size_t streamIdx,
            size_t (*cdiPioFileWrite)(int, const void *restrict, size_t, int))
{
  int fileID = openFiles.entries[streamIdx];
  int streamID = openStreams.entries[streamIdx];
  cdiPioFileWrite(fileID, rxWin[streamIdx].aggBuf, rxWin[streamIdx].aggBufUsed,
                  streamInqCurTimestepID(streamID));
  rxWin[streamIdx].aggBufUsed = 0;
}
884

885
static void
886
887
writeGribStream(size_t streamIdx,
                struct streamMapping *mapping,
888
889
                double **data_, int *currentDataBufSize,
                const struct cdiPioConf *conf)
890
{
891
892
  const struct clientBuf *restrict clientBuf = rxWin[streamIdx].clientBuf;
  int streamID = openStreams.entries[streamIdx];
893
  int vlistID = streamInqVlist(streamID);
894
  int fileID = streamInqFileID(streamID);
895
  MPI_Comm collComm = commInqCommColl();
896
897
898
899
900
  size_t collSize = (size_t)commInqSizeColl();
  size_t collRank = (size_t)commInqRankColl();
  struct passPlan (*passes)[collSize] = NULL;
  size_t numPasses = planPasses(streamIdx, mapping, conf, collSize, &passes);
  Xt_redist *varRedists = NULL;
901
902
  struct recordWrite *restrict writtenRecords = mapping->writtenRecords;
  size_t (*cdiPioFileWrite)(int fileID, const void *restrict buffer,
903
904
905
                            size_t len, int tsID)
    = (size_t (*)(int, const void *restrict, size_t, int))
    namespaceSwitchGet(NSSWITCH_FILE_WRITE).func;
906
907
908
909
  bool reuseRedists = conf->cacheRedists != 0
    ? handleRedistCache(streamIdx, mapping, (size_t)numPasses, vlistID, collComm)
    : false;
  struct cacheRedist *restrict retained = rxWin[streamIdx].retained;
910
911
912
913
914
915
  struct {
    int varID;
    unsigned recordStart, recordEnd;
  } *varsInPass = NULL;
  MPI_Aint *displ = NULL;
  for (size_t pass = 0; pass < numPasses; ++pass)
916
    {
917
918
919
920
921
922
923
924
      unsigned base = passes[pass][0].recordAggStart;
      size_t numRecordsInPass = passes[pass][collSize - 1].recordAggEnd
        - base + 1;
      size_t maxVarsInPass = (size_t)(passes[pass][collSize - 1].varEnd
                                      - passes[pass][0].varStart + 1);
      varsInPass
        = Realloc(varsInPass, sizeof (*varsInPass)
                   * szmin(numRecordsInPass, maxVarsInPass));
925
      /* establish variables involved in this pass */
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
      size_t numVarsInPass = 1;
      varsInPass[0].recordStart = base;
      int lastSeenVarID =
        varsInPass[0].varID = writtenRecords[base].varID;
      for (size_t i = 1; i < numRecordsInPass; ++i)
        if (lastSeenVarID != writtenRecords[base + i].varID)
          {
            varsInPass[numVarsInPass - 1].recordEnd = (unsigned)(base + i - 1);
            varsInPass[numVarsInPass].varID
              = lastSeenVarID = writtenRecords[base + i].varID;
            varsInPass[numVarsInPass].recordStart = (unsigned)(base + i);
            ++numVarsInPass;
          }
      varsInPass[numVarsInPass - 1].recordEnd
        = (unsigned)(base + numRecordsInPass - 1);
      varRedists = Realloc(varRedists, numVarsInPass * sizeof (*varRedists));
      size_t myRecordStart = passes[pass][collRank].recordAggStart,
        myRecordEnd = passes[pass][collRank].recordAggEnd;
      size_t myAggSize = 0;
945
946
947
      /* build or fetch from cache redists for all variables involved in current write pass */
      Xt_redist compositePassRedist;
      if (reuseRedists)
948
        {
949
          compositePassRedist = retained[pass].redist;
950
          myAggSize = (size_t)retained[pass].sliceSize;
951
        }
952
      else
953
        {
954
955
956
957
          int myVarStart = passes[pass][collRank].varStart,
            myVarEnd = passes[pass][collRank].varEnd;
          displ = Realloc(displ, sizeof (*displ) * (numVarsInPass * 2 + 1));
          memset(displ, 0, sizeof (*displ) * (numVarsInPass + 1));
958
          for (unsigned varIdx = 0; varIdx < numVarsInPass; ++varIdx)
959
            {
960
              int varID = varsInPass[varIdx].varID;
961
962
              Xt_idxlist dstList;
              /* is this process writing part of this variable? */
963
964
              if (myRecordStart <= myRecordEnd
                  && myVarStart <= varID && myVarEnd >= varID)
965
                {
966
967
968
969
970
971
972
973
974
975
976
                  size_t myVarRecordStart
                    = writtenRecords[myRecordStart].varID == varID
                    ? myRecordStart : varsInPass[varIdx].recordStart;
                  size_t myLevelStart
                    = (size_t)writtenRecords[myVarRecordStart].level;
                  size_t myVarRecordEnd
                    = writtenRecords[myRecordEnd].varID == varID
                    ? myRecordEnd : (size_t)varsInPass[varIdx].recordEnd;
                  size_t myNumLevels
                    = (size_t)writtenRecords[myVarRecordEnd].level
                    - myLevelStart + 1;
977
                  dstList
978
979
980
981
982
983
984
                    = buildVarSlicesIdxList(vlistID, varID, (int)myLevelStart,
                                            (int)myNumLevels);
                  size_t sliceSize = (size_t)xt_idxlist_get_num_indices(dstList);
                  assert(sliceSize * sizeof (double)
                         == (writtenRecords[myVarRecordStart].dataSize
                             * myNumLevels));
                  myAggSize += sliceSize;
985
986
                }
              else
987
988
989
990
991
                {
                  dstList = xt_idxempty_new();
                }
              displ[numVarsInPass + varIdx + 1]
                = (MPI_Aint)(sizeof (double) * myAggSize);
992
              varRedists[varIdx] = buildVarRedist(mapping->varMap[varID],
993
                                                  streamIdx, dstList, conf);
994
995
996
997
998
999
              xt_idxlist_delete(dstList);
            }
          /* merge all redists for current pass */
          if (numVarsInPass > 1)
            {
              compositePassRedist
1000
1001
1002
1003
                = xt_redist_collection_static_new(varRedists,
                                                  (int)numVarsInPass,
                                                  displ, displ + numVarsInPass,
                                                  collComm);
1004
              /* free individual redists */
1005
              for (size_t varIdx = 0; varIdx < numVarsInPass; ++varIdx)
1006
1007
1008
1009
1010
1011
1012
                xt_redist_delete(varRedists[varIdx]);
            }
          else
            compositePassRedist = varRedists[0];
          if (conf->cacheRedists)
            {
              retained[pass].redist = compositePassRedist;
1013
              retained[pass].sliceSize = (int)myAggSize;
1014
            }
1015
1016
        }
      /* resize gather buffer if needed */
1017
      resizeVarGatherBuf((int)myAggSize, data_, currentDataBufSize);
1018
1019
1020
      /* execute composite redist */
      xt_redist_s_exchange1(compositePassRedist, clientBuf[0].mem, *data_);
      /* delete composite redist */
1021
1022
      if (!conf->cacheRedists)
        xt_redist_delete(compositePassRedist);
1023
1024
1025
1026

      /* append encoded data records from this pass to buffer written later */
      /* todo: develop better heuristic for buffer size */
      if (sizeof (double) * myAggSize > rxWin[streamIdx].aggBufSize)
1027
        {
1028
1029
1030
1031
1032
          Free(rxWin[streamIdx].aggBuf);
          size_t aggBufSize = szmax((size_t)conf->recordAggBufLimMB
                                    * (size_t)1024 * (size_t)1024,
                                    sizeof (double) * myAggSize);
          if (posix_memalign(&rxWin[streamIdx].aggBuf,
1033
                             cdiGetPageSize(conf->largePageAlign),
1034
1035
1036
                             aggBufSize) == 0) ;
          else
            rxWin[streamIdx].aggBuf = Malloc(aggBufSize);
1037
        }
1038
1039
1040
1041
1042
1043
      namespaceSwitchSet(NSSWITCH_FILE_WRITE, NSSW_FUNC(aggBufAppend));
      /* write records to aggregation buffer */
      if (myRecordStart <= myRecordEnd)
      {
        size_t varIdx = (size_t)-1;
        int varID = -1;
1044
        size_t recordDataOfs = 0;
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
        const double *data = *data_;
        for (size_t recordIdx = myRecordStart;
             recordIdx <= myRecordEnd;
             ++recordIdx)
          {
            int level = writtenRecords[recordIdx].level;
            int prevVarID = varID;
            varID = writtenRecords[recordIdx].varID;
            varIdx += varID != prevVarID;
            size_t recordSize = writtenRecords[recordIdx].dataSize;
            size_t nvals = recordSize / sizeof (double);
1056
            size_t nmiss
1057
              = countVarChunkMissingVals(vlistID, varID, mapping, (int)nvals,
1058
1059
1060
                                         data + recordDataOfs);
            streamWriteVarSlice(streamID, varID, level, data + recordDataOfs, nmiss);
            recordDataOfs += nvals;
1061
1062
1063
          }
        aggBufFlush(streamIdx, cdiPioFileWrite);
      }
1064
1065
      else
        /* write zero bytes to trigger synchronization code in fileWrite */
1066
        cdiPioFileWrite(fileID, NULL, 0,
1067
                        streamInqCurTimestepID(streamID));
1068
      namespaceSwitchSet(NSSWITCH_FILE_WRITE, NSSW_FUNC(cdiPioFileWrite));
1069
    }
1070
1071
1072
1073
  Free(displ);
  Free(varRedists);
  Free(varsInPass);
  Free(passes);
1074
}
1075

1076
1077
static void
readGetBuffers(size_t streamIdx, const struct cdiPioConf *conf)
Deike Kleberg's avatar
Deike Kleberg committed
1078
{
1079
  int streamID = openStreams.entries[streamIdx];
1080
  xdebug("%s", "START");
1081

1082
  struct winHeaderEntry *winDict
1083
    = (wHECast)rxWin[streamIdx].clientBuf[0].mem;
1084
  xassert(winDict[0].id == HEADERSIZEMARKER);
1085
  {
1086
    int dictSize = rxWin[streamIdx].clientBuf[0].dictSize,
1087
      firstNonRPCEntry = dictSize - winDict[0].specific.headerSize.numRPCEntries - 1,
1088
1089
1090
1091
1092
1093
      headerIdx,
      numFuncCalls = 0;
    for (headerIdx = dictSize - 1;
         headerIdx > firstNonRPCEntry;
         --headerIdx)
      {
1094
1095
        xassert(winDict[headerIdx].id >= MINFUNCID
                && winDict[headerIdx].id <= MAXFUNCID);
1096
        ++numFuncCalls;
1097
        readFuncCall(winDict + headerIdx, streamIdx);
1098
      }
1099
    xassert(numFuncCalls == winDict[0].specific.headerSize.numRPCEntries);
1100
  }
Thomas Jahns's avatar
Thomas Jahns committed
1101
  /* build list of streams, data was transferred for */
1102
  {
1103
1104
1105
    struct streamMapping *map = streamMappingNew(streamIdx, winDict, conf);
    /* TODO: build list of rma buffer layout here to check if caching
     * can be done */
1106
    double *data = NULL;
Thomas Jahns's avatar
Thomas Jahns committed
1107
    int currentDataBufSize = 0;
1108
    int filetype = streamInqFiletype(streamID);
Thomas Jahns's avatar
Thomas Jahns committed
1109

1110
1111
    switch (filetype)
      {
1112
1113
      case CDI_FILETYPE_GRB:
      case CDI_FILETYPE_GRB2:
1114
        writeGribStream(streamIdx, map, &data, &currentDataBufSize, conf);
1115
        break;
1116
#ifdef HAVE_NETCDF4
1117
1118
1119
      case CDI_FILETYPE_NC:
      case CDI_FILETYPE_NC2:
      case CDI_FILETYPE_NC4:
Uwe Schulzweida's avatar
Uwe Schulzweida committed
1120
1121
      case CDI_FILETYPE_NC4C:
      case CDI_FILETYPE_NC5:
1122
        writeNetCDFStream(streamIdx, map, &data, &currentDataBufSize, conf);
1123
        break;
1124
#endif
1125
1126
      default:
        xabort("unhandled filetype in parallel I/O.");
1127
      }
1128
1129
1130
1131
    streamMappingDelete(&map);
    Free(map);
    Free(data);
  }
1132
  xdebug("%s", "RETURN");
1133
}
1134
1135
1136

/************************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
1137

Thomas Jahns's avatar
Thomas Jahns committed
1138
static
1139
void clearModelWinBuffer(size_t streamIdx)
Deike Kleberg's avatar
Deike Kleberg committed
1140
{
1141
1142
1143
1144
1145
1146
  xassert(streamIdx < openStreams.size &&
          rxWin != NULL && rxWin[streamIdx].clientBuf[0].mem != NULL);
  size_t bufSizeTotal = (size_t)(rxWin[streamIdx].clientBuf[numClients_ - 1].mem
                                 - rxWin[streamIdx].clientBuf[0].mem)
    + rxWin[streamIdx].clientBuf[numClients_ - 1].size;
  memset(rxWin[streamIdx].clientBuf[0].mem, 0, bufSizeTotal);
Deike Kleberg's avatar
Deike Kleberg committed
1147
1148
1149
1150
1151
1152
}


/************************************************************************/


1153
1154
static void
getTimeStepData(int *streamActivity, const struct cdiPioConf *conf)
Deike Kleberg's avatar
Deike Kleberg committed
1155
{
1156
  MPI_Group clientGroup = cdiPioInqRemoteGroup();
1157

1158
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
1159

1160
1161
  for (size_t streamIdx = 0; streamIdx < openStreams.size; ++streamIdx)
    if (streamActivity[streamIdx])
1162
      {
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
        clearModelWinBuffer(streamIdx);
        // todo put in correct lbs and ubs
        xmpi(MPI_Win_start(clientGroup, 0, rxWin[streamIdx].getWin));
        /* FIXME: this needs to use MPI_PACKED for portability */
        for (size_t i = 0; i < (size_t)numClients_; ++i)
          xmpi(MPI_Get(rxWin[streamIdx].clientBuf[i].mem,
                       (int)rxWin[streamIdx].clientBuf[i].size, MPI_UNSIGNED_CHAR,
                       clientRanks_[i], 0,
                       (int)rxWin[streamIdx].clientBuf[i].size, MPI_UNSIGNED_CHAR,
                       rxWin[streamIdx].getWin));
        xmpi(MPI_Win_complete(rxWin[streamIdx].getWin));
1174
      }
1175
1176
1177

  for (size_t streamIdx = 0; streamIdx < openStreams.size; ++streamIdx)
    if (streamActivity[streamIdx])
1178
      readGetBuffers(streamIdx, conf);
1179

1180
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
1181
}
Deike Kleberg's avatar
Deike Kleberg committed
1182
1183
1184

/************************************************************************/

1185
static int
1186
1187
1188
cdiPioServerStreamOpen(const char *filename, char filemode,
                       int filetype, stream_t *streamptr,
                       int recordBufIsToBeCreated)
1189
{
1190
  int fileID = -1;
1191
1192
1193
1194
#if defined HAVE_LIBNETCDF && ! defined HAVE_PARALLEL_NC4
  /* Only needs initialization to shut up gcc */
  int rank = -1;
#endif
1195
1196
  switch (filetype)
    {
1197
#if defined HAVE_LIBNETCDF && ! defined HAVE_PARALLEL_NC4
1198
1199
    case CDI_FILETYPE_NC4:
    case CDI_FILETYPE_NC4C:
1200
      {
Thomas Jahns's avatar
Thomas Jahns committed
1201
1202
        int ioMode = commInqIOMode();
        if (ioMode == PIO_NONE
1203
1204
1205
1206
            || commInqRankColl() == (rank = cdiPioNextOpenRank()))
          fileID = cdiStreamOpenDefaultDelegate(filename, filemode, filetype,
                                                streamptr,
                                                recordBufIsToBeCreated);
1207
1208
        else
          streamptr->filetype = filetype;
Thomas Jahns's avatar
Thomas Jahns committed
1209
        if (ioMode != PIO_NONE)
1210
          xmpi(MPI_Bcast(&fileID, 1, MPI_INT, rank, commInqCommColl()));
1211
        cdiPioOpenFileOnRank(rank);
1212
      }
1213
1214
      break;
#endif
1215
    default:
1216
1217
      fileID = cdiStreamOpenDefaultDelegate(filename, filemode, filetype,
                                            streamptr, recordBufIsToBeCreated);
1218
    }
1219
  if (fileID >= 0)
1220
    {
1221
1222
      size_t oldNumStreams = openStreams.size;
      size_t streamIdx = insertID(&openStreams, streamptr->self);
1223
1224
      size_t fileIdx = insertID(&openFiles, fileID);
      xassert(fileIdx == streamIdx);