pio_server.c 30.6 KB
Newer Older
Deike Kleberg's avatar
Deike Kleberg committed
1
2
/** @file ioServer.c
*/
3
4
5
6
7
8
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#ifdef USE_MPI

Deike Kleberg's avatar
Deike Kleberg committed
9
10
11
#include "pio_server.h"


12
#include <limits.h>
Deike Kleberg's avatar
Deike Kleberg committed
13
14
#include <stdlib.h>
#include <stdio.h>
15
16
17
18
19
20
21
22

#ifdef HAVE_PARALLEL_NC4
#include <core/ppm_combinatorics.h>
#include <core/ppm_rectilinear.h>
#include <ppm/ppm_uniform_partition.h>
#include <yaxt.h>
#endif

Deike Kleberg's avatar
Deike Kleberg committed
23
#include "cdi.h"
Deike Kleberg's avatar
Deike Kleberg committed
24
#include "pio.h"
Deike Kleberg's avatar
Deike Kleberg committed
25
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
26
#include "pio_rpc.h"
Deike Kleberg's avatar
Deike Kleberg committed
27
#include "pio_util.h"
28
29
#include "stream_int.h"
#include "resource_handle.h"
Deike Kleberg's avatar
Deike Kleberg committed
30
#include "vlist_var.h"
31

32

33
extern resOps streamOps;
34
extern void arrayDestroy ( void );
Deike Kleberg's avatar
Deike Kleberg committed
35

36
37
38
static struct
{
  size_t size;
Thomas Jahns's avatar
Thomas Jahns committed
39
  unsigned char *buffer;
40
  int dictSize;
41
42
} *rxWin = NULL;

Thomas Jahns's avatar
Thomas Jahns committed
43
static MPI_Win getWin = MPI_WIN_NULL;
Thomas Jahns's avatar
Thomas Jahns committed
44
static MPI_Group groupModel = MPI_GROUP_NULL;
Deike Kleberg's avatar
Deike Kleberg committed
45

46
47
48
49
50
#ifdef HAVE_PARALLEL_NC4
/* prime factorization of number of pio collectors */
static uint32_t *pioPrimes;
static int numPioPrimes;
#endif
Deike Kleberg's avatar
Deike Kleberg committed
51

Deike Kleberg's avatar
Deike Kleberg committed
52
53
/************************************************************************/

54
static
Deike Kleberg's avatar
Deike Kleberg committed
55
56
57
void serverWinCleanup ()
{
  int i;
58
  int nProcsCalc = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
59
  
Deike Kleberg's avatar
Deike Kleberg committed
60
61
62
  if ( getWin != MPI_WIN_NULL )
    xmpi ( MPI_Win_free ( &getWin ));
  
63
64
  if (rxWin)
    {
Deike Kleberg's avatar
Deike Kleberg committed
65
      for ( i = 0; i < nProcsCalc; i++ )
66
67
        free(rxWin[i].buffer);
      free(rxWin);
Deike Kleberg's avatar
Deike Kleberg committed
68
    }
69

70
  xdebug("%s", "cleaned up mpi_win");
Deike Kleberg's avatar
Deike Kleberg committed
71
72
73
}
 
 /************************************************************************/
74

Deike Kleberg's avatar
Deike Kleberg committed
75
static 
Deike Kleberg's avatar
Deike Kleberg committed
76
  void collDefBufferSizes ()
Deike Kleberg's avatar
Deike Kleberg committed
77
{
78
  int nstreams, * streamIndexList, streamNo, vlistID, nvars, varID, iorank;
Deike Kleberg's avatar
Deike Kleberg committed
79
  int modelID, decoChunk, sumGetBufferSizes = 0;
80
  int rankGlob = commInqRankGlob ();
Deike Kleberg's avatar
Deike Kleberg committed
81
  int nProcsModel = commInqNProcsModel ();
82
  int root = commInqRootGlob ();
Deike Kleberg's avatar
Deike Kleberg committed
83

84
  xassert(rxWin != NULL);
Deike Kleberg's avatar
Deike Kleberg committed
85

Deike Kleberg's avatar
Deike Kleberg committed
86
  nstreams = reshCountType ( &streamOps );
87
88
  streamIndexList = xmalloc ( nstreams * sizeof ( streamIndexList[0] ));
  reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
Deike Kleberg's avatar
Deike Kleberg committed
89
90
  for ( streamNo = 0; streamNo < nstreams; streamNo++ )
    {
91
      // space required for data
92
      vlistID = streamInqVlist ( streamIndexList[streamNo] );
Deike Kleberg's avatar
Deike Kleberg committed
93
94
95
96
      nvars = vlistNvars ( vlistID );
      for ( varID = 0; varID < nvars; varID++ )
        {
          iorank = vlistInqVarIOrank ( vlistID, varID );
Deike Kleberg's avatar
Deike Kleberg committed
97
          xassert ( iorank != CDI_UNDEFID );
Deike Kleberg's avatar
Deike Kleberg committed
98
99
          if ( iorank == rankGlob )
            {
Deike Kleberg's avatar
Deike Kleberg committed
100
              for ( modelID = 0; modelID < nProcsModel; modelID++ )
101
                {
102
                  decoChunk =  vlistInqVarDecoChunk ( vlistID, varID, modelID );
Deike Kleberg's avatar
Deike Kleberg committed
103
                  xassert ( decoChunk > 0 );
104
                  rxWin[modelID].size += decoChunk * sizeof (double)
105
                    + sizeof (union winHeaderEntry);
106
                  ++(rxWin[modelID].dictSize);
107
                }
Deike Kleberg's avatar
Deike Kleberg committed
108
            }
109
        }
Deike Kleberg's avatar
Deike Kleberg committed
110
111
      // space required for the 3 function calls streamOpen, streamDefVlist, streamClose 
      // once per stream and timestep for all collprocs only on the modelproc root
112
113
      rxWin[root].size += 3 * sizeof (union winHeaderEntry)
        + MAXDATAFILENAME;
114
      rxWin[root].dictSize += 3;
Deike Kleberg's avatar
Deike Kleberg committed
115
    }
116
  free ( streamIndexList );
Deike Kleberg's avatar
Deike Kleberg committed
117
118

  for ( modelID = 0; modelID < nProcsModel; modelID++ )
119
    {
120
      /* account for size header */
121
      rxWin[modelID].dictSize += 1;
122
      rxWin[modelID].size += sizeof (union winHeaderEntry);
123
      sumGetBufferSizes += rxWin[modelID].size;
124
    }
Deike Kleberg's avatar
Deike Kleberg committed
125
  xassert ( sumGetBufferSizes <= MAXWINBUFFERSIZE );
Deike Kleberg's avatar
Deike Kleberg committed
126
}
127

Deike Kleberg's avatar
Deike Kleberg committed
128
 /************************************************************************/
129
130

static 
Deike Kleberg's avatar
Deike Kleberg committed
131
132
 void serverWinCreate ()
{ 
Deike Kleberg's avatar
Deike Kleberg committed
133
  int ranks[1], modelID;
134
  MPI_Comm commCalc = commInqCommCalc ();
Deike Kleberg's avatar
Deike Kleberg committed
135
  MPI_Group groupCalc;
136
  int nProcsModel = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
137

Deike Kleberg's avatar
Deike Kleberg committed
138
  xmpi ( MPI_Win_create ( MPI_BOTTOM, 0, 1, MPI_INFO_NULL,
139
                          commCalc, &getWin ));
Deike Kleberg's avatar
Deike Kleberg committed
140
141

  /* target group */
142
143
  ranks[0] = nProcsModel;
  xmpi ( MPI_Comm_group ( commCalc, &groupCalc ));
Deike Kleberg's avatar
Deike Kleberg committed
144
145
  xmpi ( MPI_Group_excl ( groupCalc, 1, ranks, &groupModel ));

146
  rxWin = xcalloc(nProcsModel, sizeof (rxWin[0]));
Deike Kleberg's avatar
Deike Kleberg committed
147
  collDefBufferSizes ();
Deike Kleberg's avatar
Deike Kleberg committed
148

Deike Kleberg's avatar
Deike Kleberg committed
149
  for ( modelID = 0; modelID < nProcsModel; modelID++ )
Thomas Jahns's avatar
Thomas Jahns committed
150
    rxWin[modelID].buffer = xmalloc(rxWin[modelID].size);
Deike Kleberg's avatar
Deike Kleberg committed
151

152
  xdebug("%s", "created mpi_win, allocated getBuffer");
Deike Kleberg's avatar
Deike Kleberg committed
153
154
}

Deike Kleberg's avatar
Deike Kleberg committed
155
156
/************************************************************************/

157
158
static void
readFuncCall(struct funcCallDesc *header)
Deike Kleberg's avatar
Deike Kleberg committed
159
160
{
  int root = commInqRootGlob ();
161
  int funcID = header->funcID;
Deike Kleberg's avatar
Deike Kleberg committed
162

163
  xassert(funcID >= MINFUNCID && funcID <= MAXFUNCID);
Deike Kleberg's avatar
Deike Kleberg committed
164
165
  switch ( funcID )
    {
166
167
    case STREAMCLOSE:
      {
168
169
170
171
172
        int streamID = header->funcArgs.streamChange.streamID;
        streamClose(streamID);
        xdebug("READ FUNCTION CALL FROM WIN:  %s, streamID=%d,"
               " closed stream",
               funcMap[(-1 - funcID)], streamID);
173
174
      }
      break;
Deike Kleberg's avatar
Deike Kleberg committed
175
    case STREAMOPEN:
176
      {
177
        size_t filenamesz = header->funcArgs.newFile.fnamelen;
Deike Kleberg's avatar
Deike Kleberg committed
178
        xassert ( filenamesz > 0 && filenamesz < MAXDATAFILENAME );
179
180
181
182
183
        const char *filename
          = (const char *)(rxWin[root].buffer
                           + header->funcArgs.newFile.offset);
        xassert(filename[filenamesz] == '\0');
        int filetype = header->funcArgs.newFile.filetype;
184
        xassert ( filetype >= MINFILETYPE && filetype <= MAXFILETYPE );
185
        int streamID = streamOpenWrite(filename, filetype);
186
187
        xdebug("READ FUNCTION CALL FROM WIN:  %s, filenamesz=%zu,"
               " filename=%s, filetype=%d, OPENED STREAM %d",
188
               funcMap[(-1 - funcID)], filenamesz, filename,
189
               filetype, streamID);
190
      }
191
      break;
192
193
    case STREAMDEFVLIST:
      {
194
195
196
197
198
199
        int streamID = header->funcArgs.streamChange.streamID;
        int vlistID = header->funcArgs.streamChange.vlistID;
        streamDefVlist(streamID, vlistID);
        xdebug("READ FUNCTION CALL FROM WIN:  %s, streamID=%d,"
               " vlistID=%d, called streamDefVlist ().",
               funcMap[(-1 - funcID)], streamID, vlistID);
200
201
      }
      break;
Deike Kleberg's avatar
Deike Kleberg committed
202
    default:
203
      xabort ( "REMOTE FUNCTIONCALL NOT IMPLEMENTED!" );
Deike Kleberg's avatar
Deike Kleberg committed
204
205
206
207
208
    }
}

/************************************************************************/

209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
static void
resizeVarGatherBuf(int vlistID, int varID, double **buf, int *bufSize)
{
  int size = vlistInqVarSize(vlistID, varID);
  if (size <= *bufSize) ; else
    *buf = xrealloc(*buf, (*bufSize = size) * sizeof (buf[0][0]));
}


static void
gatherArray(int root, int nProcsModel, int headerIdx,
            int vlistID,
            double *gatherBuf, int *nmiss)
{
  double *p = gatherBuf;
  union winHeaderEntry *winDict
    = (union winHeaderEntry *)rxWin[root].buffer;
  int streamID = winDict[headerIdx].dataRecord.streamID;
  int varID = winDict[headerIdx].dataRecord.varID;
  *nmiss = 0;
  for (int modelID = 0; modelID < nProcsModel; modelID++)
    {
      struct dataRecord *dataHeader
        = (struct dataRecord *)rxWin[modelID].buffer
        + headerIdx;
      xassert(dataHeader->streamID == streamID
              && dataHeader->varID == varID);
      int chunk = vlistInqVarDecoChunk(vlistID, varID,
                                       modelID);
      memcpy(p, rxWin[modelID].buffer
             + dataHeader->offset, chunk * sizeof (p[0]));
      p += chunk;
      *nmiss = dataHeader->nmiss;
    }
}

struct xyzDims
{
  int sizes[3];
};

static inline int
xyzGridSize(struct xyzDims dims)
{
  return dims.sizes[0] * dims.sizes[1] * dims.sizes[2];
}

#ifdef HAVE_PARALLEL_NC4
static void
queryVarDims(struct PPM_extent varShape[3], int vlistID, int varID)
{
  int gridID = vlistInqVarGrid(vlistID, varID);
  int zaxisID = vlistInqVarZaxis(vlistID, varID);
  int gridType = gridInqType(gridID);
  switch (gridType)
    {
    case GRID_LONLAT:
    case GRID_GAUSSIAN:
      varShape[0].first = 0;
      varShape[0].size = gridInqXsize(gridID);
      varShape[1].first = 0;
      varShape[1].size = gridInqYsize(gridID);
      break;
    case GRID_GENERIC:
    case GRID_LCC:
    case GRID_SPECTRAL:
    case GRID_GME:
    case GRID_CURVILINEAR:
    case GRID_UNSTRUCTURED:
    case GRID_REFERENCE:
      xabort("unimplemented grid type: %d", gridType);
      break;
    }
  varShape[2].first = 0;
  varShape[2].size = zaxisInqSize(zaxisID);
}

/* compute distribution of collectors such that number of collectors
 * <= number of variable grid cells in each dimension */
static struct xyzDims
varDimsCollGridMatch(const struct PPM_extent varDims[3])
{
  xassert(PPM_extents_size(3, varDims) >= commInqSizeColl());
  struct xyzDims collGrid = { { 1, 1, 1 } };
  /* because of storage order, dividing dimension 3 first is preferred */
  for (int i = 0; i < numPioPrimes; ++i)
    {
      for (int dim = 2; dim >=0; --dim)
        if (collGrid.sizes[dim] * pioPrimes[i] <= varDims[dim].size)
          {
            collGrid.sizes[dim] *= pioPrimes[i];
            goto nextPrime;
          }
      /* no position found, retrack */
      xabort("Not yet implemented back-tracking needed.");
      nextPrime:
      ;
    }
  return collGrid;
}

static void
myVarPart(struct PPM_extent varShape[3], struct xyzDims collGrid,
          struct PPM_extent myPart[3])
{
  int32_t myCollGridCoord[3];
  {
    struct PPM_extent collGridShape[3];
    for (int i = 0; i < 3; ++i)
      {
        collGridShape[i].first = 0;
        collGridShape[i].size = collGrid.sizes[i];
      }
    PPM_lidx2rlcoord_e(3, collGridShape, commInqRankColl(), myCollGridCoord);
    xdebug("my coord: (%d, %d, %d)", myCollGridCoord[0], myCollGridCoord[1],
           myCollGridCoord[2]);
  }
  PPM_uniform_partition_nd(3, varShape, collGrid.sizes,
                           myCollGridCoord, myPart);
}
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
#elif defined (HAVE_LIBNETCDF)
/* needed for writing when some files are only written to by a single process */
/* cdiOpenFileMap(fileID) gives the writer process */
int cdiPioSerialOpenFileMap(int streamID)
{
  return stream_to_pointer(streamID)->ownerRank;
}
/* for load-balancing purposes, count number of files per process */
/* cdiOpenFileCounts[rank] gives number of open files rank has to himself */
static int *cdiSerialOpenFileCount = NULL;
int cdiPioNextOpenRank()
{
  xassert(cdiSerialOpenFileCount != NULL);
  int commCollSize = commInqSizeColl();
  int minRank = 0, minOpenCount = cdiSerialOpenFileCount[0];
  for (int i = 1; i < commCollSize; ++i)
    if (cdiSerialOpenFileCount[i] < minOpenCount)
      {
        minOpenCount = cdiSerialOpenFileCount[i];
        minRank = i;
      }
  return minRank;
}

void cdiPioOpenFileOnRank(int rank)
{
  xassert(cdiSerialOpenFileCount != NULL
          && rank >= 0 && rank < commInqSizeColl());
  ++(cdiSerialOpenFileCount[rank]);
}


void cdiPioCloseFileOnRank(int rank)
{
  xassert(cdiSerialOpenFileCount != NULL
          && rank >= 0 && rank < commInqSizeColl());
  xassert(cdiSerialOpenFileCount[rank] > 0);
  --(cdiSerialOpenFileCount[rank]);
}

369
370
#endif

Deike Kleberg's avatar
Deike Kleberg committed
371
372
373
static
void readGetBuffers ( int tsID, int vdate, int vtime )
{
374
  int nProcsModel = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
375
  int root        = commInqRootGlob ();
376
  int myCollRank = commInqRankColl();
377
#ifdef HAVE_NETCDF4
378
  MPI_Comm collComm = commInqCommColl();
379
#endif
380
  xdebug("%s", "START");
381

382
383
384
  union winHeaderEntry *winDict
    = (union winHeaderEntry *)rxWin[root].buffer;
  xassert(winDict[0].headerSize.sizeID == HEADERSIZEMARKER);
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
  {
    int dictSize = rxWin[root].dictSize,
      firstNonRPCEntry = dictSize - winDict[0].headerSize.numRPCEntries - 1,
      headerIdx,
      numFuncCalls = 0;
    for (headerIdx = dictSize - 1;
         headerIdx > firstNonRPCEntry;
         --headerIdx)
      {
        struct funcCallDesc *header
          = &(winDict[headerIdx].funcCall);
        xassert(header->funcID >= MINFUNCID
                && header->funcID <= MAXFUNCID);
        ++numFuncCalls;
        readFuncCall(header);
      }
    xassert(numFuncCalls == winDict[0].headerSize.numRPCEntries);
  }
Thomas Jahns's avatar
Thomas Jahns committed
403
  /* build list of streams, data was transferred for */
404
405
  {
    int numDataEntries = winDict[0].headerSize.numDataEntries;
Thomas Jahns's avatar
Thomas Jahns committed
406
407
    int streamIdx;
    struct {
408
      int streamID, filetype;
Thomas Jahns's avatar
Thomas Jahns committed
409
      int firstHeaderIdx, lastHeaderIdx;
410
      int numVars, *varMap;
Thomas Jahns's avatar
Thomas Jahns committed
411
412
413
414
    } *streamMap;
    int numStreamIDs = 0, sizeStreamMap = 16;
    streamMap = xmalloc(sizeStreamMap * sizeof (streamMap[0]));
    int streamIDOld = CDI_UNDEFID;
415
    int oldStreamIdx = CDI_UNDEFID;
416
    int filetype = CDI_UNDEFID;
417
    for (int headerIdx = 1; headerIdx < numDataEntries; ++headerIdx)
418
      {
Thomas Jahns's avatar
Thomas Jahns committed
419
420
421
        int streamID = winDict[headerIdx].dataRecord.streamID;
        xassert(streamID > 0);
        if (streamID != streamIDOld)
422
          {
423
            for (int i = numStreamIDs - 1; i >= 0; --i)
Thomas Jahns's avatar
Thomas Jahns committed
424
              if ((streamIDOld = streamMap[i].streamID) == streamID)
425
426
427
428
429
                {
                  filetype = streamMap[i].filetype;
                  oldStreamIdx = i;
                  goto streamIDInventorized;
                }
Thomas Jahns's avatar
Thomas Jahns committed
430
431
432
433
434
435
            if (numStreamIDs < sizeStreamMap) ; else
              streamMap = xrealloc(streamMap,
                                   (sizeStreamMap *= 2)
                                   * sizeof (streamMap[0]));
            streamMap[numStreamIDs].streamID = streamID;
            streamMap[numStreamIDs].firstHeaderIdx = headerIdx;
436
            streamMap[numStreamIDs].numVars = -1;
Thomas Jahns's avatar
Thomas Jahns committed
437
438
            oldStreamIdx = numStreamIDs;
            streamIDOld = streamID;
439
440
441
442
443
444
445
446
447
448
449
450
451
452
            filetype = streamInqFiletype(streamID);
            streamMap[numStreamIDs].filetype = filetype;
            if (filetype == FILETYPE_NC || filetype == FILETYPE_NC2
                || filetype == FILETYPE_NC4)
              {
                int vlistID = streamInqVlist(streamID);
                int nvars = vlistNvars(vlistID);
                streamMap[numStreamIDs].numVars = nvars;
                streamMap[numStreamIDs].varMap
                  = xmalloc(sizeof (streamMap[numStreamIDs].varMap[0])
                            * nvars);
                for (int i = 0; i < nvars; ++i)
                  streamMap[numStreamIDs].varMap[i] = -1;
              }
Thomas Jahns's avatar
Thomas Jahns committed
453
            ++numStreamIDs;
454
          }
Thomas Jahns's avatar
Thomas Jahns committed
455
456
        streamIDInventorized:
        streamMap[oldStreamIdx].lastHeaderIdx = headerIdx;
457
458
459
460
461
462
        if (filetype == FILETYPE_NC || filetype == FILETYPE_NC2
                || filetype == FILETYPE_NC4)
          {
            int varID = winDict[headerIdx].dataRecord.varID;
            streamMap[oldStreamIdx].varMap[varID] = headerIdx;
          }
Thomas Jahns's avatar
Thomas Jahns committed
463
      }
464
465
466
467
    double *data = NULL;
#if defined (HAVE_PARALLEL_NC4)
    double *writeBuf = NULL;
#endif
Thomas Jahns's avatar
Thomas Jahns committed
468
469
470
471
472
    int currentDataBufSize = 0;
    for (streamIdx = 0; streamIdx < numStreamIDs; ++streamIdx)
      {
        int streamID = streamMap[streamIdx].streamID;
        int vlistID = streamInqVlist(streamID);
473
        int fileType = streamMap[streamIdx].filetype;
Thomas Jahns's avatar
Thomas Jahns committed
474
475
476
477
478
        int taxisID = vlistInqTaxis(vlistID);
        taxisDefVdate(taxisID, vdate);
        taxisDefVtime(taxisID, vtime);
        streamDefTimestep(streamID, tsID);

479
480
481
482
        switch (fileType)
          {
          case FILETYPE_GRB:
          case FILETYPE_GRB2:
Thomas Jahns's avatar
Thomas Jahns committed
483
            {
484
485
486
487
488
489
490
491
              int headerIdx, lastHeaderIdx = streamMap[streamIdx].lastHeaderIdx;
              for (headerIdx = streamMap[streamIdx].firstHeaderIdx;
                   headerIdx <= lastHeaderIdx;
                   ++headerIdx)
                if (streamID == winDict[headerIdx].dataRecord.streamID)
                  {
                    int varID = winDict[headerIdx].dataRecord.varID;
                    int size = vlistInqVarSize(vlistID, varID);
492
493
494
495
496
                    int nmiss;
                    resizeVarGatherBuf(vlistID, varID, &data,
                                       &currentDataBufSize);
                    gatherArray(root, nProcsModel, headerIdx,
                                vlistID, data, &nmiss);
497
498
499
500
501
502
503
504
505
                    streamWriteVar(streamID, varID, data, nmiss);
                    if ( ddebug > 2 )
                      {
                        char text[1024];
                        sprintf(text, "streamID=%d, var[%d], size=%d",
                                streamID, varID, size);
                        xprintArray(text, data, size, DATATYPE_FLT);
                      }
                  }
Thomas Jahns's avatar
Thomas Jahns committed
506
            }
507
            break;
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
#ifdef HAVE_NETCDF4
          case FILETYPE_NC:
          case FILETYPE_NC2:
          case FILETYPE_NC4:
#ifdef HAVE_PARALLEL_NC4
            /* HAVE_PARALLE_NC4 implies having ScalES-PPM and yaxt */
            {
              int nvars = streamMap[streamIdx].numVars;
              int *varMap = streamMap[streamIdx].varMap;
              int *varIsWritten = xmalloc(sizeof (varIsWritten[0]) * nvars);
              for (int varID = 0; varID < nvars; ++varID)
                varIsWritten[varID] = ((varMap[varID] != -1)
                                       ?myCollRank+1 : 0);
              xmpi(MPI_Allreduce(MPI_IN_PLACE, varIsWritten, nvars,
                                 MPI_INT, MPI_BOR, collComm));
              for (int varID = 0; varID < nvars; ++varID)
                if (varIsWritten[varID])
                  {
                    struct PPM_extent varShape[3];
                    queryVarDims(varShape, vlistID, varID);
                    struct xyzDims collGrid = varDimsCollGridMatch(varShape);
                    xdebug("writing varID %d with dimensions: "
                           "x=%d, y=%d, z=%d,\n"
                           "found distribution with dimensions:"
                           " x=%d, y=%d, z=%d.", varID,
                           varShape[0].size, varShape[1].size, varShape[2].size,
                           collGrid.sizes[0], collGrid.sizes[1],
                           collGrid.sizes[2]);
                    struct PPM_extent varChunk[3];
                    myVarPart(varShape, collGrid, varChunk);
                    int myChunk[3][2];
                    for (int i = 0; i < 3; ++i)
                      {
                        myChunk[i][0] = PPM_extent_start(varChunk[i]);
                        myChunk[i][1] = PPM_extent_end(varChunk[i]);
                      }
                    xdebug("Writing chunk { { %d, %d }, { %d, %d },"
                           " { %d, %d } }", myChunk[0][0], myChunk[0][1],
                           myChunk[1][0], myChunk[1][1], myChunk[2][0],
                           myChunk[2][1]);
                    Xt_int varSize[3];
                    for (int i = 0; i < 3; ++i)
                      varSize[2 - i] = varShape[i].size;
                    Xt_idxlist preRedistChunk, preWriteChunk;
                    /* prepare yaxt descriptor for current data
                       distribution after collect */
                    int nmiss;
                    if (varMap[varID] == -1)
                      {
                        preRedistChunk = xt_idxempty_new();
                        xdebug("%s", "I got none\n");
                      }
                    else
                      {
                        Xt_int preRedistStart[3] = { 0, 0, 0 };
                        preRedistChunk
                          = xt_idxsection_new(0, 3, varSize, varSize,
                                              preRedistStart);
                        resizeVarGatherBuf(vlistID, varID, &data,
                                           &currentDataBufSize);
                        int headerIdx = varMap[varID];
                        gatherArray(root, nProcsModel, headerIdx,
                                    vlistID, data, &nmiss);
                        xdebug("%s", "I got all\n");
                      }
                    MPI_Bcast(&nmiss, 1, MPI_INT, varIsWritten[varID] - 1,
                              collComm);
                    /* prepare yaxt descriptor for write chunk */
                    {
                      Xt_int preWriteChunkStart[3], preWriteChunkSize[3];
                      for (int i = 0; i < 3; ++i)
                        {
                          preWriteChunkStart[2 - i] = varChunk[i].first;
                          preWriteChunkSize[2 - i] = varChunk[i].size;
                        }
                      preWriteChunk = xt_idxsection_new(0, 3, varSize,
                                                        preWriteChunkSize,
                                                        preWriteChunkStart);
                    }
                    /* prepare redistribution */
                    {
                      Xt_xmap xmap = xt_xmap_all2all_new(preRedistChunk,
                                                         preWriteChunk,
                                                         collComm);
                      Xt_redist redist = xt_redist_p2p_new(xmap, MPI_DOUBLE);
                      xt_idxlist_delete(preRedistChunk);
                      xt_idxlist_delete(preWriteChunk);
                      xt_xmap_delete(xmap);
                      writeBuf = xrealloc(writeBuf,
                                          sizeof (double)
                                          * PPM_extents_size(3, varChunk));
                      xt_redist_s_exchange(redist, (void **)&data, 1,
                                           (void **)&writeBuf, 1);
                      xt_redist_delete(redist);
                    }
                    /* write chunk */
                    streamWriteVarChunk(streamID, varID,
                                        (const int (*)[2])myChunk, writeBuf,
                                        nmiss);
                  }
            }
#else
            /* determine process which has stream open (writer) and
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
             * which has data for which variable (var owner)
             * three cases need to be distinguished */
            {
              int nvars = streamMap[streamIdx].numVars;
              int *varMap = streamMap[streamIdx].varMap;
              int *varIsWritten = xmalloc(sizeof (varIsWritten[0]) * nvars);
              for (int varID = 0; varID < nvars; ++varID)
                varIsWritten[varID] = ((varMap[varID] != -1)
                                       ?myCollRank+1 : 0);
              xmpi(MPI_Allreduce(MPI_IN_PLACE, varIsWritten, nvars,
                                 MPI_INT, MPI_BOR, collComm));
              int writerRank;
              if ((writerRank = cdiPioSerialOpenFileMap(streamID))
                  == myCollRank)
                {
                  for (int varID = 0; varID < nvars; ++varID)
                    if (varIsWritten[varID])
                      {
                        int nmiss;
                        int size = vlistInqVarSize(vlistID, varID);
                        resizeVarGatherBuf(vlistID, varID, &data,
                                           &currentDataBufSize);
                        int headerIdx = varMap[varID];
                        if (varIsWritten[varID] == myCollRank + 1)
                          {
                            /* this process has the full array and will
                             * write it */
                            xdebug("gathering varID=%d for direct writing",
                                   varID);
                            gatherArray(root, nProcsModel, headerIdx,
                                        vlistID, data, &nmiss);
                          }
                        else
                          {
                            /* another process has the array and will
                             * send it over */
                            MPI_Status stat;
                            xdebug("receiving varID=%d for writing from"
                                   " process %d",
                                   varID, varIsWritten[varID] - 1);
                            xmpiStat(MPI_Recv(&nmiss, 1, MPI_INT,
                                              varIsWritten[varID] - 1,
                                              COLLBUFNMISS,
                                              collComm, &stat), &stat);
                            xmpiStat(MPI_Recv(data, size, MPI_DOUBLE,
                                              varIsWritten[varID] - 1,
                                              COLLBUFTX,
                                              collComm, &stat), &stat);
                          }
                        streamWriteVar(streamID, varID, data, nmiss);
                      }
                }
              else
                for (int varID = 0; varID < nvars; ++varID)
                  if (varIsWritten[varID] == myCollRank + 1)
                    {
                      /* this process has the full array and another
                       * will write it */
                      int nmiss;
                      int size = vlistInqVarSize(vlistID, varID);
                      resizeVarGatherBuf(vlistID, varID, &data,
                                         &currentDataBufSize);
                      int headerIdx = varMap[varID];
                      gatherArray(root, nProcsModel, headerIdx,
                                  vlistID, data, &nmiss);
                      MPI_Request req;
                      MPI_Status stat;
                      xdebug("sending varID=%d for writing to"
                             " process %d",
                             varID, writerRank);
                      xmpi(MPI_Isend(&nmiss, 1, MPI_INT,
                                     writerRank, COLLBUFNMISS,
                                     collComm, &req));
                      xmpi(MPI_Send(data, size, MPI_DOUBLE,
                                    writerRank, COLLBUFTX,
                                    collComm));
                      xmpiStat(MPI_Wait(&req, &stat), &stat);
                    }
            }
690
691
692
#endif
            break;
#endif
693
694
695
          default:
            xabort("unhandled filetype in parallel I/O.");
          }
696
      }
Thomas Jahns's avatar
Thomas Jahns committed
697
698
    free(streamMap);
    free(data);
699
  }
700
  xdebug("%s", "RETURN");
701
702
703
704
} 

/************************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
705

Thomas Jahns's avatar
Thomas Jahns committed
706
707
static
void clearModelWinBuffer(int modelID)
Deike Kleberg's avatar
Deike Kleberg committed
708
709
710
{
  int nProcsModel = commInqNProcsModel ();

Deike Kleberg's avatar
Deike Kleberg committed
711
712
  xassert ( modelID                >= 0           &&
            modelID                 < nProcsModel &&
713
            rxWin != NULL && rxWin[modelID].buffer != NULL &&
714
715
            rxWin[modelID].size > 0 &&
            rxWin[modelID].size <= MAXWINBUFFERSIZE );
716
  memset(rxWin[modelID].buffer, 0, rxWin[modelID].size);
Deike Kleberg's avatar
Deike Kleberg committed
717
718
719
720
721
722
}


/************************************************************************/


723
static
Thomas Jahns's avatar
Thomas Jahns committed
724
void getTimeStepData ( int tsID, int vdate, int vtime )
Deike Kleberg's avatar
Deike Kleberg committed
725
{
726
  int modelID;
727
  char text[1024];
728
  int nProcsModel = commInqNProcsModel ();
Thomas Jahns's avatar
Thomas Jahns committed
729
730
  void *getWinBaseAddr;
  int attrFound;
731
          
732
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
733
734

  // todo put in correct lbs and ubs
Deike Kleberg's avatar
Deike Kleberg committed
735
  xassert ( tsID >= 0 && vdate >= 0 && vtime >= 0 );
736
  xmpi(MPI_Win_start(groupModel, 0, getWin));
737
738
  xmpi(MPI_Win_get_attr(getWin, MPI_WIN_BASE, &getWinBaseAddr, &attrFound));
  xassert(attrFound);
Deike Kleberg's avatar
Deike Kleberg committed
739
740
  for ( modelID = 0; modelID < nProcsModel; modelID++ )
    {
Thomas Jahns's avatar
Thomas Jahns committed
741
      clearModelWinBuffer(modelID);
742
      xdebug("modelID=%d, nProcsModel=%d, rxWin[%d].size=%zu,"
Thomas Jahns's avatar
Thomas Jahns committed
743
             " getWin=%p, sizeof(int)=%u",
744
             modelID, nProcsModel, modelID, rxWin[modelID].size,
Thomas Jahns's avatar
Thomas Jahns committed
745
             getWinBaseAddr, (unsigned)sizeof(int));
746
747
748
      xmpi(MPI_Get(rxWin[modelID].buffer, rxWin[modelID].size,
                   MPI_UNSIGNED_CHAR, modelID, 0,
                   rxWin[modelID].size, MPI_UNSIGNED_CHAR, getWin));
Deike Kleberg's avatar
Deike Kleberg committed
749
    }
750
  xmpi ( MPI_Win_complete ( getWin ));
Deike Kleberg's avatar
Deike Kleberg committed
751

752
  if ( ddebug > 2 )
Deike Kleberg's avatar
Deike Kleberg committed
753
    for ( modelID = 0; modelID < nProcsModel; modelID++ )
754
      {
755
        sprintf(text, "rxWin[%d].size=%zu from PE%d rxWin[%d].buffer",
756
                modelID, rxWin[modelID].size, modelID, modelID);
757
        xprintArray(text, rxWin[modelID].buffer,
758
759
                    rxWin[modelID].size / sizeof (double),
                    DATATYPE_FLT);
760
      }
Deike Kleberg's avatar
Deike Kleberg committed
761
  readGetBuffers ( tsID, vdate, vtime );
762
          
763
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
764
}
Deike Kleberg's avatar
Deike Kleberg committed
765
766
767
768
769
770
771
772
773
774
775

/************************************************************************/

/**
  @brief is encapsulated in CDI library and run on I/O PEs.

  @param

  @return
*/

776
void IOServer ()
Deike Kleberg's avatar
Deike Kleberg committed
777
{
778
  int source, tag, size, nProcsModel=commInqNProcsModel();
779
  static int nfinished = 0;
Deike Kleberg's avatar
Deike Kleberg committed
780
  char * buffer;
781
782
  MPI_Comm commCalc;
  MPI_Status status;
Deike Kleberg's avatar
Deike Kleberg committed
783

784
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
785

786
  backendInit ();
787
788
  if ( commInqRankNode () == commInqSpecialRankNode ()) 
    backendFinalize ();
789
  commCalc = commInqCommCalc ();
790
#ifdef HAVE_PARALLEL_NC4
791
792
793
  numPioPrimes = PPM_prime_factorization_32((uint32_t)commInqSizeColl(),
                                            &pioPrimes);
  xt_initialize(commInqCommColl());
794
795
796
797
#elif defined (HAVE_LIBNETCDF)
  cdiSerialOpenFileCount = xcalloc(sizeof (cdiSerialOpenFileCount[0]),
                                   commInqSizeColl());
#endif
798

Deike Kleberg's avatar
Deike Kleberg committed
799
  for ( ;; )
800
    {
Deike Kleberg's avatar
Deike Kleberg committed
801
      xmpi ( MPI_Probe ( MPI_ANY_SOURCE, MPI_ANY_TAG, commCalc, &status ));
802
      
Deike Kleberg's avatar
Deike Kleberg committed
803
804
      source = status.MPI_SOURCE;
      tag    = status.MPI_TAG;
805
      
Deike Kleberg's avatar
Deike Kleberg committed
806
      switch ( tag )
807
        {
808
809
810
811
812
813
814
        case FINALIZE:
          {
            int i;
            xdebugMsg(tag, source, nfinished);
            xmpi(MPI_Recv(&i, 1, MPI_INTEGER, source,
                          tag, commCalc, &status));
          }
815
          xdebug("%s", "RECEIVED MESSAGE WITH TAG \"FINALIZE\"");
816
          nfinished++;
Thomas Jahns's avatar
Thomas Jahns committed
817
818
819
          xdebug("nfinished=%d, nProcsModel=%d", nfinished, nProcsModel);
          if ( nfinished == nProcsModel )
            {
820
              {
Deike Kleberg's avatar
Deike Kleberg committed
821
                int nStreams = streamSize ();
Thomas Jahns's avatar
Thomas Jahns committed
822

Deike Kleberg's avatar
Deike Kleberg committed
823
824
825
826
                if ( nStreams > 0 )
                  {
                    int streamNo;
                    int * resHs;
Thomas Jahns's avatar
Thomas Jahns committed
827

Deike Kleberg's avatar
Deike Kleberg committed
828
                    resHs       = xmalloc ( nStreams * sizeof ( resHs[0] ));
829
                    streamGetIndexList ( nStreams, resHs );
Deike Kleberg's avatar
Deike Kleberg committed
830
831
832
833
                    for ( streamNo = 0; streamNo < nStreams; streamNo++ )
                      streamClose ( resHs[streamNo] );
                    free ( resHs );
                  }
Deike Kleberg's avatar
Deike Kleberg committed
834
              }
Thomas Jahns's avatar
Thomas Jahns committed
835
836
              backendCleanup();
              serverWinCleanup();
837
              /* listDestroy(); */
838
              xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
839
840
              return;
            }
841
	  
842
843
          break;
          
Deike Kleberg's avatar
Deike Kleberg committed
844
	case RESOURCES:
845
	  xdebugMsg (  tag, source, nfinished );
846
	  xmpi ( MPI_Get_count ( &status, MPI_CHAR, &size ));
Thomas Jahns's avatar
Thomas Jahns committed
847
	  buffer = xmalloc(size);
848
849
	  xmpi ( MPI_Recv ( buffer, size, MPI_PACKED, source,
                            tag, commCalc, &status ));
850
          xdebug("%s", "RECEIVED MESSAGE WITH TAG \"RESOURCES\"");
851
	  rpcUnpackResources ( buffer, size, commCalc );
852
          xdebug("%s", "");
Deike Kleberg's avatar
Deike Kleberg committed
853
	  free ( buffer );
Deike Kleberg's avatar
Deike Kleberg committed
854
          if ( ddebug > 0 && commInqRankGlob () == nProcsModel ) 
855
            reshListPrint ( "reshListIOServer" );
Deike Kleberg's avatar
Deike Kleberg committed
856
	  serverWinCreate ();
857
	  break;
Deike Kleberg's avatar
Deike Kleberg committed
858

859
	case WRITETS:
860
861
862
863
864
865
866
867
868
          {
            int iBuffer[timestepSize];
            xdebugMsg(tag, source, nfinished);
            xmpi(MPI_Get_count(&status, MPI_INTEGER, &size));
            xassert(size == timestepSize);
            xmpi(MPI_Recv(iBuffer, size, MPI_INTEGER, source,
                          tag, commCalc, &status));
            xdebug("RECEIVED MESSAGE WITH TAG \"WRITETS\": "
                   "tsID=%d, vdate=%d, vtime=%d, source=%d",
Deike Kleberg's avatar
Deike Kleberg committed
869
                   iBuffer[0], iBuffer[1], iBuffer[2], source );
Thomas Jahns's avatar
Thomas Jahns committed
870
            getTimeStepData(iBuffer[0], iBuffer[1], iBuffer[2]);
871
          }
Deike Kleberg's avatar
Deike Kleberg committed
872
	  break;
Deike Kleberg's avatar
Deike Kleberg committed
873

Deike Kleberg's avatar
Deike Kleberg committed
874
	default:
Deike Kleberg's avatar
Deike Kleberg committed
875
	  xabort ( "TAG NOT DEFINED!" );
876
	}
Deike Kleberg's avatar
Deike Kleberg committed
877
878
    }
}
879

880
#endif
881
882
883
884
885
886
887
888
889
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */