pio_server.c 33.7 KB
Newer Older
Deike Kleberg's avatar
Deike Kleberg committed
1
2
/** @file ioServer.c
*/
3
4
5
6
7
8
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#ifdef USE_MPI

Deike Kleberg's avatar
Deike Kleberg committed
9
10
11
#include "pio_server.h"


12
#include <limits.h>
Deike Kleberg's avatar
Deike Kleberg committed
13
14
#include <stdlib.h>
#include <stdio.h>
15
16
17
18
19
20

#ifdef HAVE_PARALLEL_NC4
#include <core/ppm_combinatorics.h>
#include <core/ppm_rectilinear.h>
#include <ppm/ppm_uniform_partition.h>
#endif
21
#include <yaxt.h>
22

Deike Kleberg's avatar
Deike Kleberg committed
23
#include "cdi.h"
24
#include "namespace.h"
Deike Kleberg's avatar
Deike Kleberg committed
25
#include "pio.h"
Deike Kleberg's avatar
Deike Kleberg committed
26
#include "pio_comm.h"
27
#include "pio_interface.h"
Deike Kleberg's avatar
Deike Kleberg committed
28
#include "pio_rpc.h"
Deike Kleberg's avatar
Deike Kleberg committed
29
#include "pio_util.h"
30
#include "cdi_int.h"
31
#include "pio_cdf_int.h"
32
#include "resource_handle.h"
33
#include "resource_unpack.h"
Deike Kleberg's avatar
Deike Kleberg committed
34
#include "vlist_var.h"
35

36

37
extern resOps streamOps;
38
extern void arrayDestroy ( void );
Deike Kleberg's avatar
Deike Kleberg committed
39

40
41
42
static struct
{
  size_t size;
Thomas Jahns's avatar
Thomas Jahns committed
43
  unsigned char *buffer;
44
  int dictSize;
45
46
} *rxWin = NULL;

Thomas Jahns's avatar
Thomas Jahns committed
47
static MPI_Win getWin = MPI_WIN_NULL;
Thomas Jahns's avatar
Thomas Jahns committed
48
static MPI_Group groupModel = MPI_GROUP_NULL;
Deike Kleberg's avatar
Deike Kleberg committed
49

50
51
52
53
54
#ifdef HAVE_PARALLEL_NC4
/* prime factorization of number of pio collectors */
static uint32_t *pioPrimes;
static int numPioPrimes;
#endif
Deike Kleberg's avatar
Deike Kleberg committed
55

Deike Kleberg's avatar
Deike Kleberg committed
56
57
/************************************************************************/

58
static
Deike Kleberg's avatar
Deike Kleberg committed
59
60
void serverWinCleanup ()
{
61
62
  if (getWin != MPI_WIN_NULL)
    xmpi(MPI_Win_free(&getWin));
63
64
  if (rxWin)
    {
65
      free(rxWin[0].buffer);
66
      free(rxWin);
Deike Kleberg's avatar
Deike Kleberg committed
67
    }
68

69
  xdebug("%s", "cleaned up mpi_win");
Deike Kleberg's avatar
Deike Kleberg committed
70
}
71

Deike Kleberg's avatar
Deike Kleberg committed
72
 /************************************************************************/
73

74
75
static size_t
collDefBufferSizes()
Deike Kleberg's avatar
Deike Kleberg committed
76
{
77
  int nstreams, * streamIndexList, streamNo, vlistID, nvars, varID, iorank;
78
79
  int modelID;
  size_t sumGetBufferSizes = 0;
80
  int rankGlob = commInqRankGlob ();
Deike Kleberg's avatar
Deike Kleberg committed
81
  int nProcsModel = commInqNProcsModel ();
82
  int root = commInqRootGlob ();
Deike Kleberg's avatar
Deike Kleberg committed
83

84
  xassert(rxWin != NULL);
Deike Kleberg's avatar
Deike Kleberg committed
85

Deike Kleberg's avatar
Deike Kleberg committed
86
  nstreams = reshCountType ( &streamOps );
87
88
  streamIndexList = xmalloc ( nstreams * sizeof ( streamIndexList[0] ));
  reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
Deike Kleberg's avatar
Deike Kleberg committed
89
90
  for ( streamNo = 0; streamNo < nstreams; streamNo++ )
    {
91
      // space required for data
92
      vlistID = streamInqVlist ( streamIndexList[streamNo] );
Deike Kleberg's avatar
Deike Kleberg committed
93
94
95
96
      nvars = vlistNvars ( vlistID );
      for ( varID = 0; varID < nvars; varID++ )
        {
          iorank = vlistInqVarIOrank ( vlistID, varID );
Deike Kleberg's avatar
Deike Kleberg committed
97
          xassert ( iorank != CDI_UNDEFID );
Deike Kleberg's avatar
Deike Kleberg committed
98
99
          if ( iorank == rankGlob )
            {
Deike Kleberg's avatar
Deike Kleberg committed
100
              for ( modelID = 0; modelID < nProcsModel; modelID++ )
101
                {
102
103
104
                  int decoChunk
                    = (int)(cdiPIOpartInflate_
                            * vlistInqVarDecoChunk(vlistID, varID, modelID));
Deike Kleberg's avatar
Deike Kleberg committed
105
                  xassert ( decoChunk > 0 );
106
                  rxWin[modelID].size += decoChunk * sizeof (double)
107
108
109
110
111
112
113
114
                    /* re-align chunks to multiple of double size */
                    + sizeof (double) - 1
                    /* one header for data record, one for
                     * corresponding part descriptor*/
                    + 2 * sizeof (union winHeaderEntry)
                    /* FIXME: heuristic for size of packed Xt_idxlist */
                    + sizeof (Xt_int) * decoChunk * 3;
                  rxWin[modelID].dictSize += 2;
115
                }
Deike Kleberg's avatar
Deike Kleberg committed
116
            }
117
        }
Deike Kleberg's avatar
Deike Kleberg committed
118
119
      // space required for the 3 function calls streamOpen, streamDefVlist, streamClose 
      // once per stream and timestep for all collprocs only on the modelproc root
120
121
      rxWin[root].size += 3 * sizeof (union winHeaderEntry)
        + MAXDATAFILENAME;
122
      rxWin[root].dictSize += 3;
Deike Kleberg's avatar
Deike Kleberg committed
123
    }
124
  free ( streamIndexList );
Deike Kleberg's avatar
Deike Kleberg committed
125
126

  for ( modelID = 0; modelID < nProcsModel; modelID++ )
127
    {
128
      /* account for size header */
129
      rxWin[modelID].dictSize += 1;
130
      rxWin[modelID].size += sizeof (union winHeaderEntry);
131
132
133
      rxWin[modelID].size = roundUpToMultiple(rxWin[modelID].size,
                                              PIO_WIN_ALIGN);
      sumGetBufferSizes += (size_t)rxWin[modelID].size;
134
    }
Deike Kleberg's avatar
Deike Kleberg committed
135
  xassert ( sumGetBufferSizes <= MAXWINBUFFERSIZE );
136
  return sumGetBufferSizes;
Deike Kleberg's avatar
Deike Kleberg committed
137
}
138

Deike Kleberg's avatar
Deike Kleberg committed
139
 /************************************************************************/
140
141

static 
Deike Kleberg's avatar
Deike Kleberg committed
142
143
 void serverWinCreate ()
{ 
Deike Kleberg's avatar
Deike Kleberg committed
144
  int ranks[1], modelID;
145
  MPI_Comm commCalc = commInqCommCalc ();
Deike Kleberg's avatar
Deike Kleberg committed
146
  MPI_Group groupCalc;
147
  int nProcsModel = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
148

Deike Kleberg's avatar
Deike Kleberg committed
149
  xmpi ( MPI_Win_create ( MPI_BOTTOM, 0, 1, MPI_INFO_NULL,
150
                          commCalc, &getWin ));
Deike Kleberg's avatar
Deike Kleberg committed
151
152

  /* target group */
153
154
  ranks[0] = nProcsModel;
  xmpi ( MPI_Comm_group ( commCalc, &groupCalc ));
Deike Kleberg's avatar
Deike Kleberg committed
155
156
  xmpi ( MPI_Group_excl ( groupCalc, 1, ranks, &groupModel ));

157
  rxWin = xcalloc(nProcsModel, sizeof (rxWin[0]));
158
159
160
161
162
163
164
165
  size_t totalBufferSize = collDefBufferSizes();
  rxWin[0].buffer = xmalloc(totalBufferSize);
  size_t ofs = 0;
  for ( modelID = 1; modelID < nProcsModel; modelID++ )
    {
      ofs += rxWin[modelID - 1].size;
      rxWin[modelID].buffer = rxWin[0].buffer + ofs;
    }
Deike Kleberg's avatar
Deike Kleberg committed
166

167
  xdebug("%s", "created mpi_win, allocated getBuffer");
Deike Kleberg's avatar
Deike Kleberg committed
168
169
}

Deike Kleberg's avatar
Deike Kleberg committed
170
171
/************************************************************************/

172
173
static void
readFuncCall(struct funcCallDesc *header)
Deike Kleberg's avatar
Deike Kleberg committed
174
175
{
  int root = commInqRootGlob ();
176
  int funcID = header->funcID;
Deike Kleberg's avatar
Deike Kleberg committed
177

178
  xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
Deike Kleberg's avatar
Deike Kleberg committed
179
180
  switch ( funcID )
    {
181
182
    case STREAMCLOSE:
      {
183
184
        int streamID
          = namespaceAdaptKey2(header->funcArgs.streamChange.streamID);
185
186
187
188
        streamClose(streamID);
        xdebug("READ FUNCTION CALL FROM WIN:  %s, streamID=%d,"
               " closed stream",
               funcMap[(-1 - funcID)], streamID);
189
190
      }
      break;
Deike Kleberg's avatar
Deike Kleberg committed
191
    case STREAMOPEN:
192
      {
193
        size_t filenamesz = header->funcArgs.newFile.fnamelen;
Deike Kleberg's avatar
Deike Kleberg committed
194
        xassert ( filenamesz > 0 && filenamesz < MAXDATAFILENAME );
195
196
197
198
199
200
        const char *filename
          = (const char *)(rxWin[root].buffer
                           + header->funcArgs.newFile.offset);
        xassert(filename[filenamesz] == '\0');
        int filetype = header->funcArgs.newFile.filetype;
        int streamID = streamOpenWrite(filename, filetype);
201
        xassert(streamID != CDI_ELIBNAVAIL);
202
203
        xdebug("READ FUNCTION CALL FROM WIN:  %s, filenamesz=%zu,"
               " filename=%s, filetype=%d, OPENED STREAM %d",
204
               funcMap[(-1 - funcID)], filenamesz, filename,
205
               filetype, streamID);
206
      }
207
      break;
208
209
    case STREAMDEFVLIST:
      {
210
211
212
        int streamID
          = namespaceAdaptKey2(header->funcArgs.streamChange.streamID);
        int vlistID = namespaceAdaptKey2(header->funcArgs.streamChange.vlistID);
213
214
215
216
        streamDefVlist(streamID, vlistID);
        xdebug("READ FUNCTION CALL FROM WIN:  %s, streamID=%d,"
               " vlistID=%d, called streamDefVlist ().",
               funcMap[(-1 - funcID)], streamID, vlistID);
217
218
      }
      break;
Deike Kleberg's avatar
Deike Kleberg committed
219
    default:
220
      xabort ( "REMOTE FUNCTIONCALL NOT IMPLEMENTED!" );
Deike Kleberg's avatar
Deike Kleberg committed
221
222
223
224
225
    }
}

/************************************************************************/

226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
static void
resizeVarGatherBuf(int vlistID, int varID, double **buf, int *bufSize)
{
  int size = vlistInqVarSize(vlistID, varID);
  if (size <= *bufSize) ; else
    *buf = xrealloc(*buf, (*bufSize = size) * sizeof (buf[0][0]));
}

static void
gatherArray(int root, int nProcsModel, int headerIdx,
            int vlistID,
            double *gatherBuf, int *nmiss)
{
  union winHeaderEntry *winDict
    = (union winHeaderEntry *)rxWin[root].buffer;
  int streamID = winDict[headerIdx].dataRecord.streamID;
  int varID = winDict[headerIdx].dataRecord.varID;
243
  int varShape[3] = { 0, 0, 0 };
244
  cdiPioQueryVarDims(varShape, vlistID, varID);
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
  Xt_int varShapeXt[3];
  static const Xt_int origin[3] = { 0, 0, 0 };
  for (unsigned i = 0; i < 3; ++i)
    varShapeXt[i] = varShape[i];
  int varSize = varShape[0] * varShape[1] * varShape[2];
  int *partOfs = xmalloc(2 * varSize * sizeof (partOfs[0])),
    *gatherOfs = partOfs + varSize;
  Xt_idxlist *part = xmalloc(nProcsModel * sizeof (part[0]));
  MPI_Comm commCalc = commInqCommCalc();
  {
    int nmiss_ = 0, partOfsOfs = 0;
    for (int modelID = 0; modelID < nProcsModel; modelID++)
      {
        struct dataRecord *dataHeader
          = &((union winHeaderEntry *)
              rxWin[modelID].buffer)[headerIdx].dataRecord;
        struct partDescRecord *partHeader
          = &((union winHeaderEntry *)
              rxWin[modelID].buffer)[headerIdx + 1].partDesc;
        int position = partHeader->offset;
265
        xassert(namespaceAdaptKey2(dataHeader->streamID) == streamID
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
                && dataHeader->varID == varID
                && partHeader->partDescMarker == PARTDESCMARKER
                && position > 0
                && ((size_t)position
                    >= sizeof (union winHeaderEntry) * rxWin[modelID].dictSize)
                && ((size_t)position < rxWin[modelID].size));
        part[modelID] = xt_idxlist_unpack(rxWin[modelID].buffer,
                                          (int)rxWin[modelID].size,
                                          &position, commCalc);
        Xt_int partSize = xt_idxlist_get_num_indices(part[modelID]);
        size_t charOfs = (rxWin[modelID].buffer + dataHeader->offset)
          - rxWin[0].buffer;
        xassert(charOfs % sizeof (double) == 0
                && charOfs / sizeof (double) + partSize <= INT_MAX);
        int elemOfs = charOfs / sizeof (double);
        for (int i = 0; i < (int)partSize; ++i)
          partOfs[partOfsOfs + i] = elemOfs + i;
        partOfsOfs += partSize;
        nmiss_ += dataHeader->nmiss;
      }
    *nmiss = nmiss_;
  }
  Xt_idxlist srcList = xt_idxlist_collection_new(part, nProcsModel);
289
  for (int modelID = 0; modelID < nProcsModel; modelID++)
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
    xt_idxlist_delete(part[modelID]);
  free(part);
  Xt_xmap gatherXmap;
  {
    Xt_idxlist dstList
      = xt_idxsection_new(0, 3, varShapeXt, varShapeXt, origin);
    struct Xt_com_list full = { .list = dstList, .rank = 0 };
    gatherXmap = xt_xmap_intersection_new(1, &full, 1, &full, srcList, dstList,
                                        MPI_COMM_SELF);
    xt_idxlist_delete(dstList);
  }
  xt_idxlist_delete(srcList);
  for (int i = 0; i < varSize; ++i)
    gatherOfs[i] = i;

  Xt_redist gatherRedist
    = xt_redist_p2p_off_new(gatherXmap, partOfs, gatherOfs, MPI_DOUBLE);
  xt_xmap_delete(gatherXmap);
308
  xt_redist_s_exchange1(gatherRedist, rxWin[0].buffer, gatherBuf);
309
310
  free(partOfs);
  xt_redist_delete(gatherRedist);
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
}

struct xyzDims
{
  int sizes[3];
};

static inline int
xyzGridSize(struct xyzDims dims)
{
  return dims.sizes[0] * dims.sizes[1] * dims.sizes[2];
}

#ifdef HAVE_PARALLEL_NC4
static void
326
queryVarBounds(struct PPM_extent varShape[3], int vlistID, int varID)
327
{
328
329
  varShape[0].first = 0;
  varShape[1].first = 0;
330
  varShape[2].first = 0;
331
  int sizes[3];
332
  cdiPioQueryVarDims(sizes, vlistID, varID);
333
334
  for (unsigned i = 0; i < 3; ++i)
    varShape[i].size = sizes[i];
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
}

/* compute distribution of collectors such that number of collectors
 * <= number of variable grid cells in each dimension */
static struct xyzDims
varDimsCollGridMatch(const struct PPM_extent varDims[3])
{
  xassert(PPM_extents_size(3, varDims) >= commInqSizeColl());
  struct xyzDims collGrid = { { 1, 1, 1 } };
  /* because of storage order, dividing dimension 3 first is preferred */
  for (int i = 0; i < numPioPrimes; ++i)
    {
      for (int dim = 2; dim >=0; --dim)
        if (collGrid.sizes[dim] * pioPrimes[i] <= varDims[dim].size)
          {
            collGrid.sizes[dim] *= pioPrimes[i];
            goto nextPrime;
          }
      /* no position found, retrack */
      xabort("Not yet implemented back-tracking needed.");
      nextPrime:
      ;
    }
  return collGrid;
}

static void
myVarPart(struct PPM_extent varShape[3], struct xyzDims collGrid,
          struct PPM_extent myPart[3])
{
  int32_t myCollGridCoord[3];
  {
    struct PPM_extent collGridShape[3];
    for (int i = 0; i < 3; ++i)
      {
        collGridShape[i].first = 0;
        collGridShape[i].size = collGrid.sizes[i];
      }
    PPM_lidx2rlcoord_e(3, collGridShape, commInqRankColl(), myCollGridCoord);
    xdebug("my coord: (%d, %d, %d)", myCollGridCoord[0], myCollGridCoord[1],
           myCollGridCoord[2]);
  }
  PPM_uniform_partition_nd(3, varShape, collGrid.sizes,
                           myCollGridCoord, myPart);
}
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
#elif defined (HAVE_LIBNETCDF)
/* needed for writing when some files are only written to by a single process */
/* cdiOpenFileMap(fileID) gives the writer process */
int cdiPioSerialOpenFileMap(int streamID)
{
  return stream_to_pointer(streamID)->ownerRank;
}
/* for load-balancing purposes, count number of files per process */
/* cdiOpenFileCounts[rank] gives number of open files rank has to himself */
static int *cdiSerialOpenFileCount = NULL;
int cdiPioNextOpenRank()
{
  xassert(cdiSerialOpenFileCount != NULL);
  int commCollSize = commInqSizeColl();
  int minRank = 0, minOpenCount = cdiSerialOpenFileCount[0];
  for (int i = 1; i < commCollSize; ++i)
    if (cdiSerialOpenFileCount[i] < minOpenCount)
      {
        minOpenCount = cdiSerialOpenFileCount[i];
        minRank = i;
      }
  return minRank;
}

void cdiPioOpenFileOnRank(int rank)
{
  xassert(cdiSerialOpenFileCount != NULL
          && rank >= 0 && rank < commInqSizeColl());
  ++(cdiSerialOpenFileCount[rank]);
}


void cdiPioCloseFileOnRank(int rank)
{
  xassert(cdiSerialOpenFileCount != NULL
          && rank >= 0 && rank < commInqSizeColl());
  xassert(cdiSerialOpenFileCount[rank] > 0);
  --(cdiSerialOpenFileCount[rank]);
}

420
421
#endif

Deike Kleberg's avatar
Deike Kleberg committed
422
423
424
static
void readGetBuffers ( int tsID, int vdate, int vtime )
{
425
  int nProcsModel = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
426
  int root        = commInqRootGlob ();
427
#ifdef HAVE_NETCDF4
428
  int myCollRank = commInqRankColl();
429
  MPI_Comm collComm = commInqCommColl();
430
#endif
431
  xdebug("%s", "START");
432

433
434
435
  union winHeaderEntry *winDict
    = (union winHeaderEntry *)rxWin[root].buffer;
  xassert(winDict[0].headerSize.sizeID == HEADERSIZEMARKER);
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
  {
    int dictSize = rxWin[root].dictSize,
      firstNonRPCEntry = dictSize - winDict[0].headerSize.numRPCEntries - 1,
      headerIdx,
      numFuncCalls = 0;
    for (headerIdx = dictSize - 1;
         headerIdx > firstNonRPCEntry;
         --headerIdx)
      {
        struct funcCallDesc *header
          = &(winDict[headerIdx].funcCall);
        xassert(header->funcID >= MINFUNCID
                && header->funcID <= MAXFUNCID);
        ++numFuncCalls;
        readFuncCall(header);
      }
    xassert(numFuncCalls == winDict[0].headerSize.numRPCEntries);
  }
Thomas Jahns's avatar
Thomas Jahns committed
454
  /* build list of streams, data was transferred for */
455
456
  {
    int numDataEntries = winDict[0].headerSize.numDataEntries;
Thomas Jahns's avatar
Thomas Jahns committed
457
458
    int streamIdx;
    struct {
459
      int streamID, filetype;
Thomas Jahns's avatar
Thomas Jahns committed
460
      int firstHeaderIdx, lastHeaderIdx;
461
      int numVars, *varMap;
Thomas Jahns's avatar
Thomas Jahns committed
462
463
464
465
    } *streamMap;
    int numStreamIDs = 0, sizeStreamMap = 16;
    streamMap = xmalloc(sizeStreamMap * sizeof (streamMap[0]));
    int streamIDOld = CDI_UNDEFID;
466
    int oldStreamIdx = CDI_UNDEFID;
467
    int filetype = CDI_UNDEFID;
468
    for (int headerIdx = 1; headerIdx < numDataEntries; headerIdx += 2)
469
      {
470
471
472
        int streamID
          = winDict[headerIdx].dataRecord.streamID
          = namespaceAdaptKey2(winDict[headerIdx].dataRecord.streamID);
Thomas Jahns's avatar
Thomas Jahns committed
473
474
        xassert(streamID > 0);
        if (streamID != streamIDOld)
475
          {
476
            for (int i = numStreamIDs - 1; i >= 0; --i)
Thomas Jahns's avatar
Thomas Jahns committed
477
              if ((streamIDOld = streamMap[i].streamID) == streamID)
478
479
480
481
482
                {
                  filetype = streamMap[i].filetype;
                  oldStreamIdx = i;
                  goto streamIDInventorized;
                }
Thomas Jahns's avatar
Thomas Jahns committed
483
484
485
486
487
488
            if (numStreamIDs < sizeStreamMap) ; else
              streamMap = xrealloc(streamMap,
                                   (sizeStreamMap *= 2)
                                   * sizeof (streamMap[0]));
            streamMap[numStreamIDs].streamID = streamID;
            streamMap[numStreamIDs].firstHeaderIdx = headerIdx;
489
            streamMap[numStreamIDs].numVars = -1;
Thomas Jahns's avatar
Thomas Jahns committed
490
491
            oldStreamIdx = numStreamIDs;
            streamIDOld = streamID;
492
493
494
495
496
497
498
499
500
501
502
503
504
505
            filetype = streamInqFiletype(streamID);
            streamMap[numStreamIDs].filetype = filetype;
            if (filetype == FILETYPE_NC || filetype == FILETYPE_NC2
                || filetype == FILETYPE_NC4)
              {
                int vlistID = streamInqVlist(streamID);
                int nvars = vlistNvars(vlistID);
                streamMap[numStreamIDs].numVars = nvars;
                streamMap[numStreamIDs].varMap
                  = xmalloc(sizeof (streamMap[numStreamIDs].varMap[0])
                            * nvars);
                for (int i = 0; i < nvars; ++i)
                  streamMap[numStreamIDs].varMap[i] = -1;
              }
Thomas Jahns's avatar
Thomas Jahns committed
506
            ++numStreamIDs;
507
          }
Thomas Jahns's avatar
Thomas Jahns committed
508
509
        streamIDInventorized:
        streamMap[oldStreamIdx].lastHeaderIdx = headerIdx;
510
511
512
513
514
515
        if (filetype == FILETYPE_NC || filetype == FILETYPE_NC2
                || filetype == FILETYPE_NC4)
          {
            int varID = winDict[headerIdx].dataRecord.varID;
            streamMap[oldStreamIdx].varMap[varID] = headerIdx;
          }
Thomas Jahns's avatar
Thomas Jahns committed
516
      }
517
518
519
520
    double *data = NULL;
#if defined (HAVE_PARALLEL_NC4)
    double *writeBuf = NULL;
#endif
Thomas Jahns's avatar
Thomas Jahns committed
521
522
523
524
525
    int currentDataBufSize = 0;
    for (streamIdx = 0; streamIdx < numStreamIDs; ++streamIdx)
      {
        int streamID = streamMap[streamIdx].streamID;
        int vlistID = streamInqVlist(streamID);
526
        int fileType = streamMap[streamIdx].filetype;
Thomas Jahns's avatar
Thomas Jahns committed
527
528
529
530
531
        int taxisID = vlistInqTaxis(vlistID);
        taxisDefVdate(taxisID, vdate);
        taxisDefVtime(taxisID, vtime);
        streamDefTimestep(streamID, tsID);

532
533
534
535
        switch (fileType)
          {
          case FILETYPE_GRB:
          case FILETYPE_GRB2:
Thomas Jahns's avatar
Thomas Jahns committed
536
            {
537
538
539
              int headerIdx, lastHeaderIdx = streamMap[streamIdx].lastHeaderIdx;
              for (headerIdx = streamMap[streamIdx].firstHeaderIdx;
                   headerIdx <= lastHeaderIdx;
540
                   headerIdx += 2)
541
542
543
544
                if (streamID == winDict[headerIdx].dataRecord.streamID)
                  {
                    int varID = winDict[headerIdx].dataRecord.varID;
                    int size = vlistInqVarSize(vlistID, varID);
545
546
547
548
549
                    int nmiss;
                    resizeVarGatherBuf(vlistID, varID, &data,
                                       &currentDataBufSize);
                    gatherArray(root, nProcsModel, headerIdx,
                                vlistID, data, &nmiss);
550
551
552
553
554
555
556
557
558
                    streamWriteVar(streamID, varID, data, nmiss);
                    if ( ddebug > 2 )
                      {
                        char text[1024];
                        sprintf(text, "streamID=%d, var[%d], size=%d",
                                streamID, varID, size);
                        xprintArray(text, data, size, DATATYPE_FLT);
                      }
                  }
Thomas Jahns's avatar
Thomas Jahns committed
559
            }
560
            break;
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
#ifdef HAVE_NETCDF4
          case FILETYPE_NC:
          case FILETYPE_NC2:
          case FILETYPE_NC4:
#ifdef HAVE_PARALLEL_NC4
            /* HAVE_PARALLE_NC4 implies having ScalES-PPM and yaxt */
            {
              int nvars = streamMap[streamIdx].numVars;
              int *varMap = streamMap[streamIdx].varMap;
              int *varIsWritten = xmalloc(sizeof (varIsWritten[0]) * nvars);
              for (int varID = 0; varID < nvars; ++varID)
                varIsWritten[varID] = ((varMap[varID] != -1)
                                       ?myCollRank+1 : 0);
              xmpi(MPI_Allreduce(MPI_IN_PLACE, varIsWritten, nvars,
                                 MPI_INT, MPI_BOR, collComm));
              for (int varID = 0; varID < nvars; ++varID)
                if (varIsWritten[varID])
                  {
                    struct PPM_extent varShape[3];
580
                    queryVarBounds(varShape, vlistID, varID);
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
                    struct xyzDims collGrid = varDimsCollGridMatch(varShape);
                    xdebug("writing varID %d with dimensions: "
                           "x=%d, y=%d, z=%d,\n"
                           "found distribution with dimensions:"
                           " x=%d, y=%d, z=%d.", varID,
                           varShape[0].size, varShape[1].size, varShape[2].size,
                           collGrid.sizes[0], collGrid.sizes[1],
                           collGrid.sizes[2]);
                    struct PPM_extent varChunk[3];
                    myVarPart(varShape, collGrid, varChunk);
                    int myChunk[3][2];
                    for (int i = 0; i < 3; ++i)
                      {
                        myChunk[i][0] = PPM_extent_start(varChunk[i]);
                        myChunk[i][1] = PPM_extent_end(varChunk[i]);
                      }
                    xdebug("Writing chunk { { %d, %d }, { %d, %d },"
                           " { %d, %d } }", myChunk[0][0], myChunk[0][1],
                           myChunk[1][0], myChunk[1][1], myChunk[2][0],
                           myChunk[2][1]);
                    Xt_int varSize[3];
                    for (int i = 0; i < 3; ++i)
                      varSize[2 - i] = varShape[i].size;
                    Xt_idxlist preRedistChunk, preWriteChunk;
                    /* prepare yaxt descriptor for current data
                       distribution after collect */
                    int nmiss;
                    if (varMap[varID] == -1)
                      {
                        preRedistChunk = xt_idxempty_new();
                        xdebug("%s", "I got none\n");
                      }
                    else
                      {
                        Xt_int preRedistStart[3] = { 0, 0, 0 };
                        preRedistChunk
                          = xt_idxsection_new(0, 3, varSize, varSize,
                                              preRedistStart);
                        resizeVarGatherBuf(vlistID, varID, &data,
                                           &currentDataBufSize);
                        int headerIdx = varMap[varID];
                        gatherArray(root, nProcsModel, headerIdx,
                                    vlistID, data, &nmiss);
                        xdebug("%s", "I got all\n");
                      }
                    MPI_Bcast(&nmiss, 1, MPI_INT, varIsWritten[varID] - 1,
                              collComm);
                    /* prepare yaxt descriptor for write chunk */
                    {
                      Xt_int preWriteChunkStart[3], preWriteChunkSize[3];
                      for (int i = 0; i < 3; ++i)
                        {
                          preWriteChunkStart[2 - i] = varChunk[i].first;
                          preWriteChunkSize[2 - i] = varChunk[i].size;
                        }
                      preWriteChunk = xt_idxsection_new(0, 3, varSize,
                                                        preWriteChunkSize,
                                                        preWriteChunkStart);
                    }
                    /* prepare redistribution */
                    {
                      Xt_xmap xmap = xt_xmap_all2all_new(preRedistChunk,
                                                         preWriteChunk,
                                                         collComm);
                      Xt_redist redist = xt_redist_p2p_new(xmap, MPI_DOUBLE);
                      xt_idxlist_delete(preRedistChunk);
                      xt_idxlist_delete(preWriteChunk);
                      xt_xmap_delete(xmap);
                      writeBuf = xrealloc(writeBuf,
                                          sizeof (double)
                                          * PPM_extents_size(3, varChunk));
652
                      xt_redist_s_exchange1(redist, data, writeBuf);
653
654
655
656
657
658
659
660
661
662
                      xt_redist_delete(redist);
                    }
                    /* write chunk */
                    streamWriteVarChunk(streamID, varID,
                                        (const int (*)[2])myChunk, writeBuf,
                                        nmiss);
                  }
            }
#else
            /* determine process which has stream open (writer) and
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
             * which has data for which variable (var owner)
             * three cases need to be distinguished */
            {
              int nvars = streamMap[streamIdx].numVars;
              int *varMap = streamMap[streamIdx].varMap;
              int *varIsWritten = xmalloc(sizeof (varIsWritten[0]) * nvars);
              for (int varID = 0; varID < nvars; ++varID)
                varIsWritten[varID] = ((varMap[varID] != -1)
                                       ?myCollRank+1 : 0);
              xmpi(MPI_Allreduce(MPI_IN_PLACE, varIsWritten, nvars,
                                 MPI_INT, MPI_BOR, collComm));
              int writerRank;
              if ((writerRank = cdiPioSerialOpenFileMap(streamID))
                  == myCollRank)
                {
                  for (int varID = 0; varID < nvars; ++varID)
                    if (varIsWritten[varID])
                      {
                        int nmiss;
                        int size = vlistInqVarSize(vlistID, varID);
                        resizeVarGatherBuf(vlistID, varID, &data,
                                           &currentDataBufSize);
                        int headerIdx = varMap[varID];
                        if (varIsWritten[varID] == myCollRank + 1)
                          {
                            /* this process has the full array and will
                             * write it */
                            xdebug("gathering varID=%d for direct writing",
                                   varID);
                            gatherArray(root, nProcsModel, headerIdx,
                                        vlistID, data, &nmiss);
                          }
                        else
                          {
                            /* another process has the array and will
                             * send it over */
                            MPI_Status stat;
                            xdebug("receiving varID=%d for writing from"
                                   " process %d",
                                   varID, varIsWritten[varID] - 1);
                            xmpiStat(MPI_Recv(&nmiss, 1, MPI_INT,
                                              varIsWritten[varID] - 1,
                                              COLLBUFNMISS,
                                              collComm, &stat), &stat);
                            xmpiStat(MPI_Recv(data, size, MPI_DOUBLE,
                                              varIsWritten[varID] - 1,
                                              COLLBUFTX,
                                              collComm, &stat), &stat);
                          }
                        streamWriteVar(streamID, varID, data, nmiss);
                      }
                }
              else
                for (int varID = 0; varID < nvars; ++varID)
                  if (varIsWritten[varID] == myCollRank + 1)
                    {
                      /* this process has the full array and another
                       * will write it */
                      int nmiss;
                      int size = vlistInqVarSize(vlistID, varID);
                      resizeVarGatherBuf(vlistID, varID, &data,
                                         &currentDataBufSize);
                      int headerIdx = varMap[varID];
                      gatherArray(root, nProcsModel, headerIdx,
                                  vlistID, data, &nmiss);
                      MPI_Request req;
                      MPI_Status stat;
                      xdebug("sending varID=%d for writing to"
                             " process %d",
                             varID, writerRank);
                      xmpi(MPI_Isend(&nmiss, 1, MPI_INT,
                                     writerRank, COLLBUFNMISS,
                                     collComm, &req));
                      xmpi(MPI_Send(data, size, MPI_DOUBLE,
                                    writerRank, COLLBUFTX,
                                    collComm));
                      xmpiStat(MPI_Wait(&req, &stat), &stat);
                    }
            }
742
743
744
#endif
            break;
#endif
745
746
747
          default:
            xabort("unhandled filetype in parallel I/O.");
          }
748
      }
Thomas Jahns's avatar
Thomas Jahns committed
749
750
    free(streamMap);
    free(data);
751
  }
752
  xdebug("%s", "RETURN");
753
754
755
756
} 

/************************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
757

Thomas Jahns's avatar
Thomas Jahns committed
758
759
static
void clearModelWinBuffer(int modelID)
Deike Kleberg's avatar
Deike Kleberg committed
760
761
762
{
  int nProcsModel = commInqNProcsModel ();

Deike Kleberg's avatar
Deike Kleberg committed
763
764
  xassert ( modelID                >= 0           &&
            modelID                 < nProcsModel &&
765
            rxWin != NULL && rxWin[modelID].buffer != NULL &&
766
767
            rxWin[modelID].size > 0 &&
            rxWin[modelID].size <= MAXWINBUFFERSIZE );
768
  memset(rxWin[modelID].buffer, 0, rxWin[modelID].size);
Deike Kleberg's avatar
Deike Kleberg committed
769
770
771
772
773
774
}


/************************************************************************/


775
static
Thomas Jahns's avatar
Thomas Jahns committed
776
void getTimeStepData ( int tsID, int vdate, int vtime )
Deike Kleberg's avatar
Deike Kleberg committed
777
{
778
  int modelID;
779
  char text[1024];
780
  int nProcsModel = commInqNProcsModel ();
Thomas Jahns's avatar
Thomas Jahns committed
781
782
  void *getWinBaseAddr;
  int attrFound;
783
          
784
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
785
786

  // todo put in correct lbs and ubs
Deike Kleberg's avatar
Deike Kleberg committed
787
  xassert ( tsID >= 0 && vdate >= 0 && vtime >= 0 );
788
  xmpi(MPI_Win_start(groupModel, 0, getWin));
789
790
  xmpi(MPI_Win_get_attr(getWin, MPI_WIN_BASE, &getWinBaseAddr, &attrFound));
  xassert(attrFound);
Deike Kleberg's avatar
Deike Kleberg committed
791
792
  for ( modelID = 0; modelID < nProcsModel; modelID++ )
    {
Thomas Jahns's avatar
Thomas Jahns committed
793
      clearModelWinBuffer(modelID);
794
      xdebug("modelID=%d, nProcsModel=%d, rxWin[%d].size=%zu,"
Thomas Jahns's avatar
Thomas Jahns committed
795
             " getWin=%p, sizeof(int)=%u",
796
             modelID, nProcsModel, modelID, rxWin[modelID].size,
Thomas Jahns's avatar
Thomas Jahns committed
797
             getWinBaseAddr, (unsigned)sizeof(int));
798
      /* FIXME: this needs to use MPI_PACK for portability */
799
800
801
      xmpi(MPI_Get(rxWin[modelID].buffer, rxWin[modelID].size,
                   MPI_UNSIGNED_CHAR, modelID, 0,
                   rxWin[modelID].size, MPI_UNSIGNED_CHAR, getWin));
Deike Kleberg's avatar
Deike Kleberg committed
802
    }
803
  xmpi ( MPI_Win_complete ( getWin ));
Deike Kleberg's avatar
Deike Kleberg committed
804

805
  if ( ddebug > 2 )
Deike Kleberg's avatar
Deike Kleberg committed
806
    for ( modelID = 0; modelID < nProcsModel; modelID++ )
807
      {
808
        sprintf(text, "rxWin[%d].size=%zu from PE%d rxWin[%d].buffer",
809
                modelID, rxWin[modelID].size, modelID, modelID);
810
        xprintArray(text, rxWin[modelID].buffer,
811
812
                    rxWin[modelID].size / sizeof (double),
                    DATATYPE_FLT);
813
      }
Deike Kleberg's avatar
Deike Kleberg committed
814
  readGetBuffers ( tsID, vdate, vtime );
815
          
816
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
817
}
Deike Kleberg's avatar
Deike Kleberg committed
818
819
820
821
822
823
824
825
826
827
828

/************************************************************************/

/**
  @brief is encapsulated in CDI library and run on I/O PEs.

  @param

  @return
*/

829
void IOServer ()
Deike Kleberg's avatar
Deike Kleberg committed
830
{
831
  int source, tag, size, nProcsModel=commInqNProcsModel();
832
  static int nfinished = 0;
Deike Kleberg's avatar
Deike Kleberg committed
833
  char * buffer;
834
835
  MPI_Comm commCalc;
  MPI_Status status;
Deike Kleberg's avatar
Deike Kleberg committed
836

837
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
838

839
  backendInit ();
840
841
  if ( commInqRankNode () == commInqSpecialRankNode ()) 
    backendFinalize ();
842
  commCalc = commInqCommCalc ();
843
#ifdef HAVE_PARALLEL_NC4
844
  cdiPioEnableNetCDFParAccess();
845
846
  numPioPrimes = PPM_prime_factorization_32((uint32_t)commInqSizeColl(),
                                            &pioPrimes);
847
848
849
850
#elif defined (HAVE_LIBNETCDF)
  cdiSerialOpenFileCount = xcalloc(sizeof (cdiSerialOpenFileCount[0]),
                                   commInqSizeColl());
#endif
851

Deike Kleberg's avatar
Deike Kleberg committed
852
  for ( ;; )
853
    {
Deike Kleberg's avatar
Deike Kleberg committed
854
      xmpi ( MPI_Probe ( MPI_ANY_SOURCE, MPI_ANY_TAG, commCalc, &status ));
855
      
Deike Kleberg's avatar
Deike Kleberg committed
856
857
      source = status.MPI_SOURCE;
      tag    = status.MPI_TAG;
858
      
Deike Kleberg's avatar
Deike Kleberg committed
859
      switch ( tag )
860
        {
861
862
863
864
865
866
867
        case FINALIZE:
          {
            int i;
            xdebugMsg(tag, source, nfinished);
            xmpi(MPI_Recv(&i, 1, MPI_INTEGER, source,
                          tag, commCalc, &status));
          }
868
          xdebug("%s", "RECEIVED MESSAGE WITH TAG \"FINALIZE\"");
869
          nfinished++;
Thomas Jahns's avatar
Thomas Jahns committed
870
871
872
          xdebug("nfinished=%d, nProcsModel=%d", nfinished, nProcsModel);
          if ( nfinished == nProcsModel )
            {
873
              {
Deike Kleberg's avatar
Deike Kleberg committed
874
                int nStreams = streamSize ();
Thomas Jahns's avatar
Thomas Jahns committed
875

Deike Kleberg's avatar
Deike Kleberg committed
876
877
878
879
                if ( nStreams > 0 )
                  {
                    int streamNo;
                    int * resHs;
Thomas Jahns's avatar
Thomas Jahns committed
880

Deike Kleberg's avatar
Deike Kleberg committed
881
                    resHs       = xmalloc ( nStreams * sizeof ( resHs[0] ));
882
                    streamGetIndexList ( nStreams, resHs );
Deike Kleberg's avatar
Deike Kleberg committed
883
884
885
886
                    for ( streamNo = 0; streamNo < nStreams; streamNo++ )
                      streamClose ( resHs[streamNo] );
                    free ( resHs );
                  }
Deike Kleberg's avatar
Deike Kleberg committed
887
              }
Thomas Jahns's avatar
Thomas Jahns committed
888
889
              backendCleanup();
              serverWinCleanup();
890
              /* listDestroy(); */
891
              xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
892
893
              return;
            }
894
	  
895
896
          break;
          
Deike Kleberg's avatar
Deike Kleberg committed
897
	case RESOURCES:
898
	  xdebugMsg (  tag, source, nfinished );
899
	  xmpi ( MPI_Get_count ( &status, MPI_CHAR, &size ));
Thomas Jahns's avatar
Thomas Jahns committed
900
	  buffer = xmalloc(size);
901
902
	  xmpi ( MPI_Recv ( buffer, size, MPI_PACKED, source,
                            tag, commCalc, &status ));
903
          xdebug("%s", "RECEIVED MESSAGE WITH TAG \"RESOURCES\"");
904
	  reshUnpackResources(buffer, size, &commCalc);
905
          xdebug("%s", "");
Deike Kleberg's avatar
Deike Kleberg committed
906
	  free ( buffer );
Thomas Jahns's avatar
Thomas Jahns committed
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
          {
            int rankGlob = commInqRankGlob();
            if ( ddebug > 0 && rankGlob == nProcsModel)
              {
                static const char baseName[] = "reshListIOServer.",
                  suffix[] = ".txt";
                /* 9 digits for rank at most */
                char buf[sizeof (baseName) + 9 + sizeof (suffix) + 1];
                snprintf(buf, sizeof (buf), "%s%d%s", baseName, rankGlob,
                         suffix);
                FILE *fp = fopen(buf, "w");
                xassert(fp);
                reshListPrint(fp);
                fclose(fp);
              }
          }
          serverWinCreate ();
924
	  break;
Deike Kleberg's avatar
Deike Kleberg committed
925

926
	case WRITETS:
927
928
929
930
931
932
933
934
935
          {
            int iBuffer[timestepSize];
            xdebugMsg(tag, source, nfinished);
            xmpi(MPI_Get_count(&status, MPI_INTEGER, &size));
            xassert(size == timestepSize);
            xmpi(MPI_Recv(iBuffer, size, MPI_INTEGER, source,
                          tag, commCalc, &status));
            xdebug("RECEIVED MESSAGE WITH TAG \"WRITETS\": "
                   "tsID=%d, vdate=%d, vtime=%d, source=%d",
Deike Kleberg's avatar
Deike Kleberg committed
936
                   iBuffer[0], iBuffer[1], iBuffer[2], source );
Thomas Jahns's avatar
Thomas Jahns committed
937
            getTimeStepData(iBuffer[0], iBuffer[1], iBuffer[2]);
938
          }
Deike Kleberg's avatar
Deike Kleberg committed
939
	  break;
Deike Kleberg's avatar
Deike Kleberg committed
940

Deike Kleberg's avatar
Deike Kleberg committed
941
	default:
Deike Kleberg's avatar
Deike Kleberg committed
942
	  xabort ( "TAG NOT DEFINED!" );
943
	}
Deike Kleberg's avatar
Deike Kleberg committed
944
945
    }
}
946

947
#endif
948
949
950
951
952
953
954
955
956
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */