pipe.cc 10.2 KB
Newer Older
Uwe Schulzweida's avatar
Uwe Schulzweida committed
1 2 3 4
/*
  This file is part of CDO. CDO is a collection of Operators to
  manipulate and analyse Climate model Data.

Uwe Schulzweida's avatar
Uwe Schulzweida committed
5
  Copyright (C) 2003-2020 Uwe Schulzweida, <uwe.schulzweida AT mpimet.mpg.de>
Uwe Schulzweida's avatar
Uwe Schulzweida committed
6 7 8 9 10 11 12 13 14 15 16 17
  See COPYING file for copying and redistribution conditions.

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation; version 2 of the License.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.
*/

Ralf Müller's avatar
Ralf Müller committed
18
#include <cdi.h>
Oliver Heidmann's avatar
Oliver Heidmann committed
19

Uwe Schulzweida's avatar
Uwe Schulzweida committed
20
#include "pipe.h"
21
#include "cdo_output.h"
22
#include "cthread_debug.h"
Uwe Schulzweida's avatar
Uwe Schulzweida committed
23

Oliver Heidmann's avatar
Oliver Heidmann committed
24
pipe_t::pipe_t() { pipe_init(); }
25

26 27
void
pipe_t::close()
28
{
29
  cthread_mutex_lock(m_mutex);
30
  EOP = true;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
31
  Debug(PIPE, "%s write closed", name);
32
  cthread_mutex_unlock(m_mutex);
33 34 35 36
  cthread_cond_signal(tsDef_cond);
  cthread_cond_signal(tsInq_cond);
  cthread_cond_signal(recInq_cond);
  cthread_cond_signal(isClosed_cond);
37
}
Uwe Schulzweida's avatar
Uwe Schulzweida committed
38

39
void
40
pipe_t::pipe_init()
Uwe Schulzweida's avatar
Uwe Schulzweida committed
41
{
42
  EOP = false;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
43

44 45 46 47
  recIDr = -1;
  recIDw = -1;
  tsIDr = -1;
  tsIDw = -1;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
48

49 50 51 52
  varID = -1;
  levelID = -1;

  nrecs = 0;
53
  nmiss = 0;
54 55
  data_d = nullptr;
  data_f = nullptr;
56
  hasdata = false;
57
  usedata = true;
58
  data_is_float = false;
59 60
}

61
int
Uwe Schulzweida's avatar
Uwe Schulzweida committed
62
pipe_t::pipeInqTimestep(const int p_tsID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
63
{
64
  // LOCK
65
  std::unique_lock<std::mutex> locked_mutex(m_mutex);
66 67 68 69 70
  usedata = false;
  recIDr = -1;
  if (p_tsID != tsIDr + 1)
    {
      if (!(p_tsID == tsIDr && tsIDr == tsIDw && recIDr == -1))
71
        cdoAbort("%s unexpected tsID %d %d %d", name.c_str(), p_tsID, tsIDr + 1, tsIDw);
72
    }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
73

74 75
  tsIDr = p_tsID;
  while (tsIDw != p_tsID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
76
    {
77 78
      if (EOP)
        {
Oliver Heidmann's avatar
Oliver Heidmann committed
79
          Debug(PIPE, name.c_str(), " EOP");
80 81 82 83
          break;
        }
      if (hasdata)
        {
Oliver Heidmann's avatar
Oliver Heidmann committed
84
          Debug(PIPE, name.c_str(), " has data");
85
          hasdata = false;
86 87 88
          data_d = nullptr;
          data_f = nullptr;
          data_is_float = false;
89
          read_cond.notify_all();
90
        }
Oliver Heidmann's avatar
Oliver Heidmann committed
91 92 93 94
      else
        {
          Debug(PIPE, "%s has no data", name);
        }
95

96
      recInq_cond.notify_all(); /* o.k. ??? */
97

98 99
      Debug(PIPE, name.c_str(), " wait of tsDef_cond");
      tsDef_cond.wait(locked_mutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
100
    }
101

102
  int numrecs = EOP ? 0 : nrecs;
103

104
  locked_mutex.unlock();
105 106
  // UNLOCK

107
  tsInq_cond.notify_all();
108 109

  return numrecs;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
110 111
}

Oliver Heidmann's avatar
Oliver Heidmann committed
112
void
Uwe Schulzweida's avatar
Uwe Schulzweida committed
113
pipe_t::pipeDefVlist(int &target_vlistID, const int new_vlistID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
114
{
115
  // LOCK
116
  m_mutex.lock();
117
  target_vlistID = new_vlistID;
118
  m_mutex.unlock();
Oliver Heidmann's avatar
Oliver Heidmann committed
119
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
120

Oliver Heidmann's avatar
Oliver Heidmann committed
121
  // lets the program know that the vlist is now defined
122
  vlistDef_cond.notify_all();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
123 124
}

125 126
//#define TIMEOUT 1  // wait 1 seconds
constexpr std::chrono::milliseconds TIMEOUT = std::chrono::milliseconds(1000);
127
#define MAX_WAIT_CYCLES 3600
128

Oliver Heidmann's avatar
Oliver Heidmann committed
129
int
130
pipe_t::pipeInqVlist(int &p_vlistID)
Oliver Heidmann's avatar
Oliver Heidmann committed
131
{
132
  Debug(PIPE, "Inquiring vlist for vlistID: %d", p_vlistID);
Oliver Heidmann's avatar
Oliver Heidmann committed
133 134 135 136 137
  std::chrono::milliseconds time_to_wait(0);
  int nwaitcycles = 0;

  // LOCK
  std::unique_lock<std::mutex> locked_mutex(m_mutex);
138
  while (p_vlistID == -1 && nwaitcycles < MAX_WAIT_CYCLES && !EOP)
Oliver Heidmann's avatar
Oliver Heidmann committed
139 140
    {
      time_to_wait += TIMEOUT;
141 142
      Debug(PIPE, name.c_str(), " wait of vlistDef_cond");
      vlistDef_cond.wait_for(locked_mutex, time_to_wait);
Oliver Heidmann's avatar
Oliver Heidmann committed
143 144 145 146 147 148
      nwaitcycles++;
    }
  // UNLOCK

  return p_vlistID;
}
Uwe Schulzweida's avatar
Uwe Schulzweida committed
149

Oliver Heidmann's avatar
Oliver Heidmann committed
150
void
Uwe Schulzweida's avatar
Uwe Schulzweida committed
151
pipe_t::pipeDefTimestep(const int p_vlistID, const int p_tsID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
152
{
153
  int numrecs;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
154

155
  // LOCK
156
  m_mutex.lock();
157 158
  recIDw = -1;
  tsIDw++;
159
  if (p_tsID != tsIDw) cdoAbort("unexpected p_tsID %d(%d) for %s", p_tsID, tsIDw, name.c_str());
160 161 162

  if (p_tsID == 0)
    numrecs = vlistNrecs(p_vlistID);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
163 164
  else
    {
165
      const auto vlistID = p_vlistID;
166
      numrecs = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
167
      for (int i = 0; i < vlistNvars(vlistID); i++)
168
        {
Uwe Schulzweida's avatar
Uwe Schulzweida committed
169 170
          if (vlistInqVarTimetype(vlistID, i) != TIME_CONSTANT)
            numrecs += zaxisInqSize(vlistInqVarZaxis(vlistID, i));
Oliver Heidmann's avatar
Oliver Heidmann committed
171
        }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
172
      Debug(PIPE, " %s numrecs= %d nvars= %d ", name.c_str(), numrecs, vlistNvars(vlistID));
Uwe Schulzweida's avatar
Uwe Schulzweida committed
173 174
    }

175
  nrecs = numrecs;
Oliver Heidmann's avatar
Oliver Heidmann committed
176
  Debug(PIPE, "%s numrecs %d p_tsID %d %d %d", name.c_str(), numrecs, p_tsID, tsIDw, tsIDr);
177
  if (numrecs == 0) EOP = true;
178
  m_mutex.unlock();
179 180
  // UNLOCK

181
  tsDef_cond.notify_all();
Oliver Heidmann's avatar
Oliver Heidmann committed
182
  // sleep(1);
183 184

  // LOCK
185
  std::unique_lock<std::mutex> locked_mutex(m_mutex);
186
  while (tsIDr < p_tsID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
187
    {
188
      if (EOP)
Oliver Heidmann's avatar
Oliver Heidmann committed
189
        {
Oliver Heidmann's avatar
Oliver Heidmann committed
190
          Debug(PIPE, "EOP");
Oliver Heidmann's avatar
Oliver Heidmann committed
191 192
          break;
        }
193 194
      Debug(PIPE, name.c_str(), " wait of tsInq_cond (p_tsID ", p_tsID, " ", tsIDr, ")");
      tsInq_cond.wait(locked_mutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
195
    }
196
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
197 198
}

Oliver Heidmann's avatar
Oliver Heidmann committed
199
int
200
pipe_t::pipeInqRecord(int *p_varID, int *p_levelID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
201
{
202
  bool condSignal = false;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
203

204
  // LOCK
205
  m_mutex.lock();
Oliver Heidmann's avatar
Oliver Heidmann committed
206
  Debug(PIPE, name.c_str(), " has no data ", recIDr, " ", recIDw);
207
  if (hasdata || usedata)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
208
    {
209
      hasdata = false;
210 211
      data_d = nullptr;
      data_f = nullptr;
212
      usedata = false;
213
      condSignal = true;
Oliver Heidmann's avatar
Oliver Heidmann committed
214
    }
215
  m_mutex.unlock();
216 217
  // UNLOCK

218
  if (condSignal) read_cond.notify_all();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
219

220
  // LOCK
221
  std::unique_lock<std::mutex> locked_mutex(m_mutex);
222 223
  usedata = true;
  recIDr++;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
224

Oliver Heidmann's avatar
Oliver Heidmann committed
225
  Debug(PIPE, name.c_str(), "recID", recIDr, " ", recIDw);
Oliver Heidmann's avatar
Oliver Heidmann committed
226

227
  while (recIDw != recIDr)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
228
    {
229
      if (EOP)
Oliver Heidmann's avatar
Oliver Heidmann committed
230
        {
Oliver Heidmann's avatar
Oliver Heidmann committed
231
          Debug(PIPE, "EOP");
Oliver Heidmann's avatar
Oliver Heidmann committed
232 233
          break;
        }
234 235
      Debug(PIPE, "%s wait for recDef_cond", name);
      recDef_cond.wait(locked_mutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
236 237
    }

238
  if (EOP)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
239
    {
240 241
      *p_varID = -1;
      *p_levelID = -1;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
242 243 244
    }
  else
    {
245 246
      *p_varID = varID;
      *p_levelID = levelID;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
247 248
    }

249
  locked_mutex.unlock();
250
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
251

252
  recInq_cond.notify_all();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
253

Uwe Schulzweida's avatar
Uwe Schulzweida committed
254
  return 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
255 256
}

Oliver Heidmann's avatar
Oliver Heidmann committed
257
void
Uwe Schulzweida's avatar
Uwe Schulzweida committed
258
pipe_t::pipeDefRecord(const int p_varID, const int p_levelID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
259
{
260
  bool condSignal = false;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
261

262
  // LOCK
263
  m_mutex.lock();
Oliver Heidmann's avatar
Oliver Heidmann committed
264
  Debug(PIPE, name.c_str(), " has data ", recIDr, " ", recIDw);  //<- TODO: rethink positioning
265
  if (hasdata)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
266
    {
267
      hasdata = false;
268 269
      data_d = nullptr;
      data_f = nullptr;
270
      condSignal = true;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
271
    }
272
  m_mutex.unlock();
273 274
  // UNLOCK

275
  if (condSignal) read_cond.notify_all();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
276

277
  // LOCK
278
  m_mutex.lock();
279 280 281 282
  usedata = true;
  recIDw++;
  varID = p_varID;
  levelID = p_levelID;
Oliver Heidmann's avatar
Oliver Heidmann committed
283
  Debug(PIPE, name.c_str(), "recID", recIDr, " ", recIDw);
284
  m_mutex.unlock();
285 286
  // UNLOCK

287
  recDef_cond.notify_all();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
288

289
  // LOCK
290
  std::unique_lock<std::mutex> locked_mutex(m_mutex);
291
  while (recIDr < recIDw)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
292
    {
293 294
      if (tsIDw != tsIDr) break;
      if (EOP) break;
295 296
      Debug(PIPE, name.c_str(), "wait of recInq_cond ", recIDr);
      recInq_cond.wait(locked_mutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
297
    }
298
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
299 300
}

301
/***
Oliver Heidmann's avatar
Oliver Heidmann committed
302
 * copys data from a pipe to data
303 304
 *
 * @param data destination for the record data
Oliver Heidmann's avatar
Oliver Heidmann committed
305
 * @param pipe pipe that has the wanted data
306
 */
307
size_t
Uwe Schulzweida's avatar
Uwe Schulzweida committed
308
pipe_t::pipeReadPipeRecord(double *const p_data, const int vlistID, size_t *const p_nmiss)
309
{
310
  if (!p_data) cdoAbort("No data pointer for %s", name.c_str());
311

312
  auto datasize = gridInqSize(vlistInqVarGrid(vlistID, varID));
313
  if (vlistNumber(vlistID) != CDI_REAL) datasize *= 2;
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328

  if (data_is_float)
    {
      for (size_t i = 0; i < datasize; ++i) p_data[i] = (double)data_f[i];
    }
  else
    {
      memcpy(p_data, data_d, datasize * sizeof(double));
    }

  *p_nmiss = nmiss;
  return datasize;
}

size_t
Uwe Schulzweida's avatar
Uwe Schulzweida committed
329
pipe_t::pipeReadPipeRecord(float *const p_data, const int vlistID, size_t *const p_nmiss)
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
{
  if (!p_data) cdoAbort("No data pointer for %s", name.c_str());

  auto datasize = gridInqSize(vlistInqVarGrid(vlistID, varID));
  if (vlistNumber(vlistID) != CDI_REAL) datasize *= 2;

  if (data_is_float)
    {
      memcpy(p_data, data_f, datasize * sizeof(float));
    }
  else
    {
      for (size_t i = 0; i < datasize; ++i) p_data[i] = (float)data_d[i];
    }

345
  *p_nmiss = nmiss;
346
  return datasize;
347 348
}

349
size_t
Uwe Schulzweida's avatar
Uwe Schulzweida committed
350
pipe_t::pipeReadRecord(const int p_vlistID, double *const p_data, size_t *const p_nmiss)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
351
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
352
  *p_nmiss = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
353
  size_t nvals = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
354

355
  // LOCK
356
  std::unique_lock<std::mutex> locked_mutex(m_mutex);
357
  while (!hasdata)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
358
    {
359 360
      Debug(PIPE, name.c_str(), " wait of write_cond");
      write_cond.wait(locked_mutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
361 362
    }

363
  if (hasdata)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
364
    {
Uwe Schulzweida's avatar
Uwe Schulzweida committed
365
      nvals = pipeReadPipeRecord(p_data, p_vlistID, p_nmiss);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
366 367 368
    }
  else
    {
369
      cdoAbort("data type %d not implemented", hasdata);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
370 371
    }

Oliver Heidmann's avatar
Oliver Heidmann committed
372
  Debug(PIPE, name.c_str(), " read record ", recIDr);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
373

374
  hasdata = false;
375
  data_d = nullptr;
376
  locked_mutex.unlock();
377
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
378

379
  read_cond.notify_all();
380
  return nvals;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
381
}
382

383
size_t
Uwe Schulzweida's avatar
Uwe Schulzweida committed
384
pipe_t::pipeReadRecord(const int p_vlistID, float *const p_data, size_t *const p_nmiss)
385
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
386
  *p_nmiss = 0;
387
  size_t nvals = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
388

389
  // LOCK
390 391 392
  std::unique_lock<std::mutex> locked_mutex(m_mutex);
  while (!hasdata)
    {
393 394
      Debug(PIPE, name.c_str(), " wait of write_cond");
      write_cond.wait(locked_mutex);
395 396 397 398
    }

  if (hasdata)
    {
Uwe Schulzweida's avatar
Uwe Schulzweida committed
399
      nvals = pipeReadPipeRecord(p_data, p_vlistID, p_nmiss);
400 401 402 403 404 405 406 407 408 409 410
    }
  else
    {
      cdoAbort("data type %d not implemented", hasdata);
    }

  Debug(PIPE, name.c_str(), " read record ", recIDr);

  hasdata = false;
  data_f = nullptr;
  locked_mutex.unlock();
411 412
  // UNLOCK

413
  read_cond.notify_all();
414 415 416 417
  return nvals;
}

size_t
Uwe Schulzweida's avatar
Uwe Schulzweida committed
418
pipe_t::pipeReadRecord(const int p_vlistID, Field *const p_field, size_t *const p_nmiss)
419
{
420
  return pipeReadRecord(p_vlistID, p_field->vec_d.data(), p_nmiss);
421 422 423 424 425
}

void
pipe_t::wait_for_read()
{
426
  write_cond.notify_all();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
427

Oliver Heidmann's avatar
Oliver Heidmann committed
428
  Debug(PIPE, "%s write record $d", name, recIDw);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
429

430
  // LOCK
431
  std::unique_lock<std::mutex> locked_mutex(m_mutex);
432
  while (hasdata)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
433
    {
434 435
      if (!usedata) break;
      if (recIDw != recIDr) break;
Oliver Heidmann's avatar
Oliver Heidmann committed
436

437
      if (EOP)
Oliver Heidmann's avatar
Oliver Heidmann committed
438
        {
Oliver Heidmann's avatar
Oliver Heidmann committed
439
          Debug(PIPE, "EOP");
Oliver Heidmann's avatar
Oliver Heidmann committed
440 441
          break;
        }
442 443
      Debug(PIPE, " wait of read_cond %s", name);
      read_cond.wait(locked_mutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
444
    }
445
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
446
}
447 448

void
Uwe Schulzweida's avatar
Uwe Schulzweida committed
449
pipe_t::pipeWriteRecord(double *const p_data, const size_t p_nmiss)
450 451 452 453 454 455 456 457 458 459 460 461 462 463
{
  // LOCK
  m_mutex.lock();
  hasdata = true; // data pointer
  data_is_float = false;
  data_d = p_data;
  nmiss = p_nmiss;
  m_mutex.unlock();
  // UNLOCK

  wait_for_read();
}

void
Uwe Schulzweida's avatar
Uwe Schulzweida committed
464
pipe_t::pipeWriteRecord(float *const p_data, const size_t p_nmiss)
465 466 467 468 469 470 471 472 473 474 475 476 477 478
{
  // LOCK
  m_mutex.lock();
  hasdata = true; // data pointer
  data_is_float = true;
  data_f = p_data;
  nmiss = p_nmiss;
  m_mutex.unlock();
  // UNLOCK

  wait_for_read();
}

void
Uwe Schulzweida's avatar
Uwe Schulzweida committed
479
pipe_t::pipeWriteRecord(Field *const p_field, const size_t p_nmiss)
480
{
481
  pipeWriteRecord(p_field->vec_d.data(), p_nmiss);
482
}
Uwe Schulzweida's avatar
Uwe Schulzweida committed
483

484
void
Uwe Schulzweida's avatar
Uwe Schulzweida committed
485
pipe_t::pipeSetName(const int processID, const int inputIDX)
486
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
487
  name = "(pipe" + std::to_string(processID + 1) + "." + std::to_string(inputIDX + 1) + ")";
488
}