pipe.cc 9.93 KB
Newer Older
Uwe Schulzweida's avatar
Uwe Schulzweida committed
1
2
3
4
/*
  This file is part of CDO. CDO is a collection of Operators to
  manipulate and analyse Climate model Data.

Uwe Schulzweida's avatar
Uwe Schulzweida committed
5
  Copyright (C) 2003-2020 Uwe Schulzweida, <uwe.schulzweida AT mpimet.mpg.de>
Uwe Schulzweida's avatar
Uwe Schulzweida committed
6
7
8
9
10
11
12
13
14
15
16
17
  See COPYING file for copying and redistribution conditions.

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation; version 2 of the License.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.
*/

18
#ifdef HAVE_CONFIG_H
19
#include "config.h"
Uwe Schulzweida's avatar
Uwe Schulzweida committed
20
21
#endif

Ralf Mueller's avatar
Ralf Mueller committed
22
#include <cdi.h>
Oliver Heidmann's avatar
Oliver Heidmann committed
23

Uwe Schulzweida's avatar
Uwe Schulzweida committed
24
#include "pipe.h"
25
#include "cdo_output.h"
Uwe Schulzweida's avatar
Uwe Schulzweida committed
26

27
#ifdef HAVE_LIBPTHREAD
Uwe Schulzweida's avatar
Uwe Schulzweida committed
28

Oliver Heidmann's avatar
Oliver Heidmann committed
29
pipe_t::pipe_t() { pipe_init(); }
30

31
32
void
pipe_t::close()
33
34
35
{
  pthread_mutex_lock(m_mutex);
  EOP = true;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
36
  Debug(PIPE, "%s write closed", name);
37
38
39
  pthread_mutex_unlock(m_mutex);
  pthread_cond_signal(tsDef);
  pthread_cond_signal(tsInq);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
40
41
  pthread_cond_signal(recInq);
  pthread_cond_signal(isclosed);
42
}
Uwe Schulzweida's avatar
Uwe Schulzweida committed
43

44
void
45
pipe_t::pipe_init()
Uwe Schulzweida's avatar
Uwe Schulzweida committed
46
{
47
  EOP = false;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
48

49
50
51
52
  recIDr = -1;
  recIDw = -1;
  tsIDr = -1;
  tsIDw = -1;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
53

54
  nmiss = 0;
55
56
  data_d = nullptr;
  data_f = nullptr;
57
  hasdata = false;
58
  usedata = true;
59
  data_is_float = false;
60
61
}

62
63
int
pipe_t::pipeInqTimestep(int p_tsID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
64
{
65
  // LOCK
66
  std::unique_lock<std::mutex> locked_mutex(m_mutex);
67
68
69
70
71
  usedata = false;
  recIDr = -1;
  if (p_tsID != tsIDr + 1)
    {
      if (!(p_tsID == tsIDr && tsIDr == tsIDw && recIDr == -1))
72
        cdoAbort("%s unexpected tsID %d %d %d", name.c_str(), p_tsID, tsIDr + 1, tsIDw);
73
    }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
74

75
76
  tsIDr = p_tsID;
  while (tsIDw != p_tsID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
77
    {
78
79
      if (EOP)
        {
Oliver Heidmann's avatar
Oliver Heidmann committed
80
          Debug(PIPE, name.c_str(), " EOP");
81
82
83
84
          break;
        }
      if (hasdata)
        {
Oliver Heidmann's avatar
Oliver Heidmann committed
85
          Debug(PIPE, name.c_str(), " has data");
86
          hasdata = false;
87
88
89
          data_d = nullptr;
          data_f = nullptr;
          data_is_float = false;
90
          readCond.notify_all();
91
        }
Oliver Heidmann's avatar
Oliver Heidmann committed
92
93
94
95
      else
        {
          Debug(PIPE, "%s has no data", name);
        }
96

97
      recInq.notify_all(); /* o.k. ??? */
98

Oliver Heidmann's avatar
Oliver Heidmann committed
99
      Debug(PIPE, name.c_str(), " wait of tsDef");
100
      tsDef.wait(locked_mutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
101
    }
102

103
  int numrecs = EOP ? 0 : nrecs;
104

105
  locked_mutex.unlock();
106
107
  // UNLOCK

108
  tsInq.notify_all();
109
110

  return numrecs;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
111
112
}

Oliver Heidmann's avatar
Oliver Heidmann committed
113
void
114
pipe_t::pipeDefVlist(int &target_vlistID, int new_vlistID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
115
{
116
  // LOCK
117
  m_mutex.lock();
118
  target_vlistID = new_vlistID;
119
  m_mutex.unlock();
Oliver Heidmann's avatar
Oliver Heidmann committed
120
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
121

Oliver Heidmann's avatar
Oliver Heidmann committed
122
  // lets the program know that the vlist is now defined
123
  vlistDef.notify_all();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
124
125
}

126
127
//#define TIMEOUT 1  // wait 1 seconds
constexpr std::chrono::milliseconds TIMEOUT = std::chrono::milliseconds(1000);
128
#define MAX_WAIT_CYCLES 3600
129

130
int processNumsActive(void);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
131

Oliver Heidmann's avatar
Oliver Heidmann committed
132
133
134
int
pipe_t::pipeInqVlist(int &p_vlistID)
{
135
  Debug(PIPE, "Inquiring vlist for vlistID: %d", p_vlistID);
Oliver Heidmann's avatar
Oliver Heidmann committed
136
137
138
139
140
  std::chrono::milliseconds time_to_wait(0);
  int nwaitcycles = 0;

  // LOCK
  std::unique_lock<std::mutex> locked_mutex(m_mutex);
141
  while (p_vlistID == -1 && nwaitcycles < MAX_WAIT_CYCLES && !EOP)
Oliver Heidmann's avatar
Oliver Heidmann committed
142
143
144
145
146
147
148
149
150
151
    {
      time_to_wait += TIMEOUT;
      Debug(PIPE, name.c_str(), " wait of vlistDef");
      vlistDef.wait_for(locked_mutex, time_to_wait);
      nwaitcycles++;
    }
  // UNLOCK

  return p_vlistID;
}
Uwe Schulzweida's avatar
Uwe Schulzweida committed
152

Oliver Heidmann's avatar
Oliver Heidmann committed
153
void
154
pipe_t::pipeDefTimestep(int p_vlistID, int p_tsID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
155
{
156
  int numrecs;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
157

158
  // LOCK
159
  m_mutex.lock();
160
161
  recIDw = -1;
  tsIDw++;
162
  if (p_tsID != tsIDw) cdoAbort("unexpected p_tsID %d(%d) for %s", p_tsID, tsIDw, name.c_str());
163
164
165

  if (p_tsID == 0)
    numrecs = vlistNrecs(p_vlistID);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
166
167
  else
    {
168
      auto vlistID = p_vlistID;
169
      numrecs = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
170
      for (int i = 0; i < vlistNvars(vlistID); i++)
171
        {
Uwe Schulzweida's avatar
Uwe Schulzweida committed
172
173
          if (vlistInqVarTimetype(vlistID, i) != TIME_CONSTANT)
            numrecs += zaxisInqSize(vlistInqVarZaxis(vlistID, i));
Oliver Heidmann's avatar
Oliver Heidmann committed
174
        }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
175
      Debug(PIPE, " %s numrecs= %d nvars= %d ", name.c_str(), numrecs, vlistNvars(vlistID));
Uwe Schulzweida's avatar
Uwe Schulzweida committed
176
177
    }

178
  nrecs = numrecs;
Oliver Heidmann's avatar
Oliver Heidmann committed
179
  Debug(PIPE, "%s numrecs %d p_tsID %d %d %d", name.c_str(), numrecs, p_tsID, tsIDw, tsIDr);
180
  if (numrecs == 0) EOP = true;
181
  m_mutex.unlock();
182
183
  // UNLOCK

184
  tsDef.notify_all();
Oliver Heidmann's avatar
Oliver Heidmann committed
185
  // sleep(1);
186
187

  // LOCK
188
  std::unique_lock<std::mutex> locked_mutex(m_mutex);
189
  while (tsIDr < p_tsID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
190
    {
191
      if (EOP)
Oliver Heidmann's avatar
Oliver Heidmann committed
192
        {
Oliver Heidmann's avatar
Oliver Heidmann committed
193
          Debug(PIPE, "EOP");
Oliver Heidmann's avatar
Oliver Heidmann committed
194
195
          break;
        }
Oliver Heidmann's avatar
Oliver Heidmann committed
196
      Debug(PIPE, name.c_str(), " wait of tsInq (p_tsID ", p_tsID, " ", tsIDr, ")");
197
      tsInq.wait(locked_mutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
198
    }
199
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
200
201
}

Oliver Heidmann's avatar
Oliver Heidmann committed
202
int
203
pipe_t::pipeInqRecord(int *p_varID, int *p_levelID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
204
{
205
  bool condSignal = false;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
206

207
  // LOCK
208
  m_mutex.lock();
Oliver Heidmann's avatar
Oliver Heidmann committed
209
  Debug(PIPE, name.c_str(), " has no data ", recIDr, " ", recIDw);
210
  if (hasdata || usedata)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
211
    {
212
      hasdata = false;
213
214
      data_d = nullptr;
      data_f = nullptr;
215
      usedata = false;
216
      condSignal = true;
Oliver Heidmann's avatar
Oliver Heidmann committed
217
    }
218
  m_mutex.unlock();
219
220
  // UNLOCK

221
  if (condSignal) readCond.notify_all();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
222

223
  // LOCK
224
  std::unique_lock<std::mutex> locked_mutex(m_mutex);
225
226
  usedata = true;
  recIDr++;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
227

Oliver Heidmann's avatar
Oliver Heidmann committed
228
  Debug(PIPE, name.c_str(), "recID", recIDr, " ", recIDw);
Oliver Heidmann's avatar
Oliver Heidmann committed
229

230
  while (recIDw != recIDr)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
231
    {
232
      if (EOP)
Oliver Heidmann's avatar
Oliver Heidmann committed
233
        {
Oliver Heidmann's avatar
Oliver Heidmann committed
234
          Debug(PIPE, "EOP");
Oliver Heidmann's avatar
Oliver Heidmann committed
235
236
          break;
        }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
237
      Debug(PIPE, "%s wait for recDef", name);
238
      recDef.wait(locked_mutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
239
240
    }

241
  if (EOP)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
242
    {
243
244
      *p_varID = -1;
      *p_levelID = -1;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
245
246
247
    }
  else
    {
248
249
      *p_varID = varID;
      *p_levelID = levelID;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
250
251
    }

252
  locked_mutex.unlock();
253
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
254

255
  recInq.notify_all();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
256

Uwe Schulzweida's avatar
Uwe Schulzweida committed
257
  return 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
258
259
}

Oliver Heidmann's avatar
Oliver Heidmann committed
260
void
261
pipe_t::pipeDefRecord(int p_varID, int p_levelID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
262
{
263
  bool condSignal = false;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
264

265
  // LOCK
266
  m_mutex.lock();
Oliver Heidmann's avatar
Oliver Heidmann committed
267
  Debug(PIPE, name.c_str(), " has data ", recIDr, " ", recIDw);  //<- TODO: rethink positioning
268
  if (hasdata)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
269
    {
270
      hasdata = false;
271
272
      data_d = nullptr;
      data_f = nullptr;
273
      condSignal = true;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
274
    }
275
  m_mutex.unlock();
276
277
  // UNLOCK

278
  if (condSignal) readCond.notify_all();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
279

280
  // LOCK
281
  m_mutex.lock();
282
283
284
285
  usedata = true;
  recIDw++;
  varID = p_varID;
  levelID = p_levelID;
Oliver Heidmann's avatar
Oliver Heidmann committed
286
  Debug(PIPE, name.c_str(), "recID", recIDr, " ", recIDw);
287
  m_mutex.unlock();
288
289
  // UNLOCK

290
  recDef.notify_all();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
291

292
  // LOCK
293
  std::unique_lock<std::mutex> locked_mutex(m_mutex);
294
  while (recIDr < recIDw)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
295
    {
296
297
      if (tsIDw != tsIDr) break;
      if (EOP) break;
Oliver Heidmann's avatar
Oliver Heidmann committed
298
      Debug(PIPE, name.c_str(), "wait of recInq ", recIDr);
299
      recInq.wait(locked_mutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
300
    }
301
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
302
303
}

304
/***
Oliver Heidmann's avatar
Oliver Heidmann committed
305
 * copys data from a pipe to data
306
307
 *
 * @param data destination for the record data
Oliver Heidmann's avatar
Oliver Heidmann committed
308
 * @param pipe pipe that has the wanted data
309
 */
310
size_t
Uwe Schulzweida's avatar
Uwe Schulzweida committed
311
pipe_t::pipeReadPipeRecord(double *p_data, int vlistID, size_t *p_nmiss)
312
{
313
  if (!p_data) cdoAbort("No data pointer for %s", name.c_str());
314

315
  auto datasize = gridInqSize(vlistInqVarGrid(vlistID, varID));
316
  if (vlistNumber(vlistID) != CDI_REAL) datasize *= 2;
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347

  if (data_is_float)
    {
      for (size_t i = 0; i < datasize; ++i) p_data[i] = (double)data_f[i];
    }
  else
    {
      memcpy(p_data, data_d, datasize * sizeof(double));
    }

  *p_nmiss = nmiss;
  return datasize;
}

size_t
pipe_t::pipeReadPipeRecord(float *p_data, int vlistID, size_t *p_nmiss)
{
  if (!p_data) cdoAbort("No data pointer for %s", name.c_str());

  auto datasize = gridInqSize(vlistInqVarGrid(vlistID, varID));
  if (vlistNumber(vlistID) != CDI_REAL) datasize *= 2;

  if (data_is_float)
    {
      memcpy(p_data, data_f, datasize * sizeof(float));
    }
  else
    {
      for (size_t i = 0; i < datasize; ++i) p_data[i] = (float)data_d[i];
    }

348
  *p_nmiss = nmiss;
349
  return datasize;
350
351
}

352
size_t
Uwe Schulzweida's avatar
Uwe Schulzweida committed
353
pipe_t::pipeReadRecord(int p_vlistID, double *p_data, size_t *p_nmiss)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
354
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
355
  *p_nmiss = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
356
  size_t nvals = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
357

358
  // LOCK
359
  std::unique_lock<std::mutex> locked_mutex(m_mutex);
360
  while (!hasdata)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
361
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
362
      Debug(PIPE, name.c_str(), " wait of writeCond");
363
      writeCond.wait(locked_mutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
364
365
    }

366
  if (hasdata)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
367
    {
Uwe Schulzweida's avatar
Uwe Schulzweida committed
368
      nvals = pipeReadPipeRecord(p_data, p_vlistID, p_nmiss);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
369
370
371
    }
  else
    {
372
      cdoAbort("data type %d not implemented", hasdata);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
373
374
    }

Oliver Heidmann's avatar
Oliver Heidmann committed
375
  Debug(PIPE, name.c_str(), " read record ", recIDr);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
376

377
  hasdata = false;
378
  data_d = nullptr;
379
  locked_mutex.unlock();
380
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
381

382
  readCond.notify_all();
383
  return nvals;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
384
}
385

386
size_t
Uwe Schulzweida's avatar
Uwe Schulzweida committed
387
pipe_t::pipeReadRecord(int p_vlistID, float *p_data, size_t *p_nmiss)
388
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
389
  *p_nmiss = 0;
390
  size_t nvals = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
391

392
  // LOCK
393
394
395
396
397
398
399
400
401
  std::unique_lock<std::mutex> locked_mutex(m_mutex);
  while (!hasdata)
    {
      Debug(PIPE, name.c_str(), " wait of writeCond");
      writeCond.wait(locked_mutex);
    }

  if (hasdata)
    {
Uwe Schulzweida's avatar
Uwe Schulzweida committed
402
      nvals = pipeReadPipeRecord(p_data, p_vlistID, p_nmiss);
403
404
405
406
407
408
409
410
411
412
413
    }
  else
    {
      cdoAbort("data type %d not implemented", hasdata);
    }

  Debug(PIPE, name.c_str(), " read record ", recIDr);

  hasdata = false;
  data_f = nullptr;
  locked_mutex.unlock();
414
415
  // UNLOCK

416
417
418
419
420
421
422
  readCond.notify_all();
  return nvals;
}

size_t
pipe_t::pipeReadRecord(int p_vlistID, Field *p_field, size_t *p_nmiss)
{
423
  return pipeReadRecord(p_vlistID, p_field->vec_d.data(), p_nmiss);
424
425
426
427
428
}

void
pipe_t::wait_for_read()
{
429
  writeCond.notify_all();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
430

Oliver Heidmann's avatar
Oliver Heidmann committed
431
  Debug(PIPE, "%s write record $d", name, recIDw);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
432

433
  // LOCK
434
  std::unique_lock<std::mutex> locked_mutex(m_mutex);
435
  while (hasdata)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
436
    {
437
438
      if (!usedata) break;
      if (recIDw != recIDr) break;
Oliver Heidmann's avatar
Oliver Heidmann committed
439

440
      if (EOP)
Oliver Heidmann's avatar
Oliver Heidmann committed
441
        {
Oliver Heidmann's avatar
Oliver Heidmann committed
442
          Debug(PIPE, "EOP");
Oliver Heidmann's avatar
Oliver Heidmann committed
443
444
          break;
        }
Oliver Heidmann's avatar
Oliver Heidmann committed
445
      Debug(PIPE, " wait of readCond %s", name);
446
      readCond.wait(locked_mutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
447
    }
448
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
449
}
450
451

void
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
pipe_t::pipeWriteRecord(double *p_data, size_t p_nmiss)
{
  // LOCK
  m_mutex.lock();
  hasdata = true; // data pointer
  data_is_float = false;
  data_d = p_data;
  nmiss = p_nmiss;
  m_mutex.unlock();
  // UNLOCK

  wait_for_read();
}

void
pipe_t::pipeWriteRecord(float *p_data, size_t p_nmiss)
{
  // LOCK
  m_mutex.lock();
  hasdata = true; // data pointer
  data_is_float = true;
  data_f = p_data;
  nmiss = p_nmiss;
  m_mutex.unlock();
  // UNLOCK

  wait_for_read();
}

void
pipe_t::pipeWriteRecord(Field *p_field, size_t p_nmiss)
483
{
484
  pipeWriteRecord(p_field->vec_d.data(), p_nmiss);
485
}
Uwe Schulzweida's avatar
Uwe Schulzweida committed
486

487
488
void
pipe_t::pipeSetName(int processID, int inputIDX)
489
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
490
  name = "(pipe" + std::to_string(processID + 1) + "." + std::to_string(inputIDX + 1) + ")";
491
}
492

Uwe Schulzweida's avatar
Uwe Schulzweida committed
493
#endif