collectData.c 6.73 KB
Newer Older
Deike Kleberg's avatar
Deike Kleberg committed
1
2
3
4
#if defined (HAVE_CONFIG_H)
#  include "config.h"
#endif

5
#include <stdio.h>
6
#include <stdlib.h>
7
#include <string.h>
8

Deike Kleberg's avatar
Deike Kleberg committed
9
#ifdef USE_MPI
10
#include <unistd.h>
Deike Kleberg's avatar
Deike Kleberg committed
11
#include <mpi.h>
12
#include <yaxt.h>
13
14

#include "pio_util.h"
15
16
17
#else
typedef int MPI_Comm;
#define MPI_COMM_NULL 0
18
#endif
Deike Kleberg's avatar
Deike Kleberg committed
19
20

#include "cdi.h"
21
#include "error.h"
Deike Kleberg's avatar
Deike Kleberg committed
22

23
#ifdef USE_MPI
24
25
#include "cdipio.h"

26
27
28
static int
uniform_partition_start(int set_interval[2], int nparts, int part_idx);
#endif
29

30
static void modelRun(MPI_Comm commModel)
Deike Kleberg's avatar
Deike Kleberg committed
31
{
32
33
34
35
36
37
38
39
  enum {
    filetype    = FILETYPE_GRB,
    ntfiles     = 2,
    ntsteps     = 3,
    nVars       = 5,
    nlon        = 12,
    nlat        = 6,
    maxlev      = 5 };
40

41
  static int nlev[nVars]    = {1,1,5,5,2};
Oliver Heidmann's avatar
Oliver Heidmann committed
42
  const static char * name        = "example";
43

44
  int gridID, zaxisID[nVars], taxisID;
Deike Kleberg's avatar
Deike Kleberg committed
45
  int vlistID, varID[nVars], streamID, tsID, tfID = 0;
46
  int i, j, nmiss = 0;
Deike Kleberg's avatar
Deike Kleberg committed
47
48
  double lons[nlon] = {0, 30, 60, 90, 120, 150, 180, 210, 240, 270, 300, 330};
  double lats[nlat] = {-75, -45, -15, 15, 45, 75};
49
  double levs[maxlev] = {101300, 92500, 85000, 50000, 20000};
50
51
  double var[nlon*nlat*maxlev];
  int vdate = 19850101, vtime = 120000;
Deike Kleberg's avatar
Deike Kleberg committed
52
  char filename[1024];
53
54
55
56
57
58
59
60
  size_t varSize[nVars];
#if USE_MPI
  int rank, comm_size;
  struct var1DDeco {
    int chunkSize, start;
    Xt_idxlist partDesc;
  } varDeco[nVars];
#endif
Deike Kleberg's avatar
Deike Kleberg committed
61

Thomas Jahns's avatar
Thomas Jahns committed
62
63
64
#ifndef USE_MPI
  (void)commModel;
#endif
65
66
67
68
69
  gridID = gridCreate ( GRID_LONLAT, nlon*nlat );
  gridDefXsize ( gridID, nlon );
  gridDefYsize ( gridID, nlat );
  gridDefXvals ( gridID, lons );
  gridDefYvals ( gridID, lats );
Deike Kleberg's avatar
Deike Kleberg committed
70

71
  for ( i = 0; i < nVars; i++ )
72
    {
73
74
      zaxisID[i] = zaxisCreate ( ZAXIS_PRESSURE, nlev[i] );
      zaxisDefLevels ( zaxisID[i], levs );
75
      varSize[i] = nlon * nlat * (size_t)nlev[i];
76
    }
77
78
  vlistID = vlistCreate ();

79
80
81
82
#if USE_MPI
  xmpi ( MPI_Comm_rank ( commModel, &rank ));
  xmpi ( MPI_Comm_size ( commModel, &comm_size ));
#endif
83
  for ( i = 0; i < nVars; i++ )
84
    {
85
      varID[i] = vlistDefVar ( vlistID, gridID, zaxisID[i], TIME_VARIABLE);
86
87
#ifdef USE_MPI
      {
88
        int start = uniform_partition_start((int [2]){ 0, (int)varSize[i] - 1 },
89
                                             comm_size, rank),
90
91
92
93
94
95
96
97
98
99
100
101
          chunkSize = uniform_partition_start((int [2]){ 0, (int)varSize[i] - 1 },
                                              comm_size, rank + 1) - start;
        fprintf(stderr, "%d: start=%d, chunkSize = %d\n", rank,
                start, chunkSize);
        Xt_idxlist idxlist
          = xt_idxstripes_new(&(struct Xt_stripe){ .start = start,
                .nstrides = chunkSize, .stride = 1 }, 1);
        varDeco[i] = (struct var1DDeco){
          .start = start,
          .chunkSize = chunkSize,
          .partDesc = idxlist
        };
102
103
104
      }
#endif
    }
105
106
  taxisID = taxisCreate ( TAXIS_ABSOLUTE );
  vlistDefTaxis ( vlistID, taxisID );
107

Deike Kleberg's avatar
Deike Kleberg committed
108
109
  sprintf ( &filename[0], "%s_%d.grb", name, tfID );
  streamID = streamOpenWrite ( filename, filetype );
Deike Kleberg's avatar
Deike Kleberg committed
110
  xassert ( streamID >= 0 );
Deike Kleberg's avatar
Deike Kleberg committed
111
  streamDefVlist ( streamID, vlistID);
Deike Kleberg's avatar
Deike Kleberg committed
112

113
#ifdef USE_MPI
114
  pioEndDef ();
115
#endif
Deike Kleberg's avatar
Deike Kleberg committed
116
  for ( tfID = 0; tfID < ntfiles; tfID++ )
Deike Kleberg's avatar
Deike Kleberg committed
117
    {
118
      /* if ( tfID > 0 ) */
119
	{
Deike Kleberg's avatar
Deike Kleberg committed
120
121
122
	  streamClose ( streamID );
	  sprintf ( &filename[0], "%s_%d.grb", name, tfID );
	  streamID = streamOpenWrite ( filename, filetype );
Deike Kleberg's avatar
Deike Kleberg committed
123
	  xassert ( streamID >= 0 );
Deike Kleberg's avatar
Deike Kleberg committed
124
	  streamDefVlist ( streamID, vlistID );
125
	}
Deike Kleberg's avatar
Deike Kleberg committed
126
      for ( tsID = 0; tsID < ntsteps; tsID++ )
127
	{
128
	  taxisDefVdate ( taxisID, vdate );
Deike Kleberg's avatar
Deike Kleberg committed
129
130
131
132
	  taxisDefVtime ( taxisID, vtime );
	  streamDefTimestep ( streamID, tsID );
	  for ( i = 0; i < nVars; i++ )
	    {
133
134
135
#ifdef USE_MPI
              int chunk = varDeco[i].chunkSize;
#else
136
              int chunk = (int)varSize[i];
137
138
139
140
141
142
#endif
	      for (j = 0; j < chunk; ++j) var[j] = 2.2;
#ifdef USE_MPI
              streamWriteVarPart(streamID, varID[i], var, nmiss,
                                 varDeco[i].partDesc);
#else
Thomas Jahns's avatar
Thomas Jahns committed
143
	      streamWriteVar ( streamID, varID[i], var, nmiss );
144
#endif
Deike Kleberg's avatar
Deike Kleberg committed
145
	    }
146
#ifdef USE_MPI
147
	  pioWriteTimestep();
148
#endif
149
	}
Deike Kleberg's avatar
Deike Kleberg committed
150
    }
151
#ifdef USE_MPI
152
  pioEndTimestepping ();
153
#endif
Deike Kleberg's avatar
Deike Kleberg committed
154
  streamClose ( streamID );
155
156
  vlistDestroy ( vlistID );
  taxisDestroy ( taxisID );
157
158
  for ( i = 0; i < nVars; i++ )
    zaxisDestroy ( zaxisID[i] );
159
  gridDestroy ( gridID );
160
#ifdef USE_MPI
161
162
  for (int varID = 0; varID < nVars; ++varID)
    xt_idxlist_delete(varDeco[varID].partDesc);
163
164
  MPI_Barrier(commModel);
#endif
Deike Kleberg's avatar
Deike Kleberg committed
165
166
}

167
168
#ifdef USE_MPI
static struct {
169
170
171
172
173
174
  char *text;
  int mode;
} mode_map[] = {
  { "PIO_MPI", PIO_MPI },
  { "PIO_FPGUARD", PIO_FPGUARD },
  { "PIO_ASYNCH", PIO_ASYNCH },
175
176
  { "PIO_WRITER", PIO_WRITER },
  { "PIO_FPGUARD", PIO_FPGUARD},
177
};
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192

static inline int
search_iomode_str(const char *modestr)
{
  int retval = -1;
  for (size_t i = 0;
       i < sizeof (mode_map) / sizeof (mode_map[0]);
       ++i)
    if (!strcmp(modestr, mode_map[i].text))
      {
        retval = (int)i;
        break;
      }
  return retval;
}
193
#endif
194

Deike Kleberg's avatar
Deike Kleberg committed
195

196
int main (int argc, char *argv[])
Deike Kleberg's avatar
Deike Kleberg committed
197
{
198
  MPI_Comm commModel = MPI_COMM_NULL;
Deike Kleberg's avatar
Deike Kleberg committed
199
#ifdef USE_MPI
200
201
  MPI_Comm commGlob;
  int sizeGlob, rankGlob, pioNamespace;
202
  int IOMode = PIO_WRITER;
203
  int nProcsIO = 2;
Deike Kleberg's avatar
Deike Kleberg committed
204

205
  xmpi ( MPI_Init ( &argc, &argv));
206
207
  commGlob = MPI_COMM_WORLD;
  xt_initialize(commGlob);
Deike Kleberg's avatar
Deike Kleberg committed
208
  xmpi ( MPI_Comm_set_errhandler ( commGlob, MPI_ERRORS_RETURN ));
209
  xmpi ( MPI_Comm_size ( commGlob, &sizeGlob ));
210
  xmpi ( MPI_Comm_rank ( commGlob, &rankGlob ));
Deike Kleberg's avatar
Deike Kleberg committed
211

212
213
214
215
216
217
  {
    int opt;
    while ((opt = getopt(argc, argv, "p:w:")) != -1)
      switch (opt) {
      case 'p':
        {
218
219
          int entry = search_iomode_str(optarg);
          if (entry < 0)
220
221
222
223
            {
              fprintf(stderr, "Unsupported PIO mode requested: %s\n", optarg);
              exit(EXIT_FAILURE);
            }
224
          IOMode = mode_map[entry].mode;
225
226
227
228
        }
        break;
      case 'w':
        {
229
230
231
232
233
234
235
          long temp = strtol(optarg, NULL, 0);
          if (temp < 0 || temp > INT_MAX/2)
            {
              fprintf(stderr, "Unsupported number of I/O servers: %ld\n", temp);
              exit(EXIT_FAILURE);
            }
          nProcsIO = (int)temp;
236
237
238
239
240
          break;
        }
      }
  }

241
242
  commModel = pioInit(commGlob, nProcsIO, IOMode, &pioNamespace, 1.0f,
                      cdiPioNoPostCommSetup);
243
244
  if (commModel != MPI_COMM_NULL)
    {
245
      namespaceSetActive(pioNamespace);
Thomas Jahns's avatar
Thomas Jahns committed
246
247
248
#else
      (void)argc;
      (void)argv;
Deike Kleberg's avatar
Deike Kleberg committed
249
250
#endif

251
      modelRun(commModel);
Deike Kleberg's avatar
Deike Kleberg committed
252
253

#ifdef USE_MPI
254
    }
Deike Kleberg's avatar
Deike Kleberg committed
255
  pioFinalize ();
256
  xt_finalize();
257
  MPI_Finalize ();
Deike Kleberg's avatar
Deike Kleberg committed
258
259
260
#endif
  return 0;
}
261

262
263
264
265
266
#ifdef USE_MPI
static int
uniform_partition_start(int set_interval[2], int nparts, int part_idx)
{
  int part_offset
267
268
    = (int)((((long long)set_interval[1] - (long long)set_interval[0] + 1LL)
             * (long long)part_idx) / (long long)nparts);
269
270
271
272
273
  int start = set_interval[0] + part_offset;
  return start;
}
#endif

274
275
276
277
278
279
280
281
282
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */