pipe.cc 16.4 KB
Newer Older
Uwe Schulzweida's avatar
Uwe Schulzweida committed
1
2
3
4
/*
  This file is part of CDO. CDO is a collection of Operators to
  manipulate and analyse Climate model Data.

Uwe Schulzweida's avatar
Uwe Schulzweida committed
5
  Copyright (C) 2003-2017 Uwe Schulzweida, <uwe.schulzweida AT mpimet.mpg.de>
Uwe Schulzweida's avatar
Uwe Schulzweida committed
6
7
8
9
10
11
12
13
14
15
16
17
  See COPYING file for copying and redistribution conditions.

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation; version 2 of the License.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.
*/

Uwe Schulzweida's avatar
Uwe Schulzweida committed
18
#if defined(HAVE_CONFIG_H)
19
#include "config.h"
Uwe Schulzweida's avatar
Uwe Schulzweida committed
20
21
#endif

22
23
24
25
#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 600 /* struct timespec */
#endif

Uwe Schulzweida's avatar
Uwe Schulzweida committed
26
27
#include <stdio.h>
#include <string.h>
Oliver Heidmann's avatar
Oliver Heidmann committed
28
#include <time.h>  // time()
Ralf Müller's avatar
Ralf Müller committed
29
#include <cdi.h>
Uwe Schulzweida's avatar
Uwe Schulzweida committed
30
#include "cdo.h"
31
#include "cdo_int.h"
Uwe Schulzweida's avatar
Uwe Schulzweida committed
32
33
#include "error.h"
#include "dmemory.h"
Uwe Schulzweida's avatar
Uwe Schulzweida committed
34
35
#include "pipe.h"
#include "pstream_int.h"
Uwe Schulzweida's avatar
Uwe Schulzweida committed
36

Uwe Schulzweida's avatar
Uwe Schulzweida committed
37
#if defined(HAVE_LIBPTHREAD)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
38
39
40

static int PipeDebug = 0;

Oliver Heidmann's avatar
Oliver Heidmann committed
41
42
static void
pipe_init(pipe_t *pipe)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
43
44
45
46
47
48
49
{
  pthread_mutexattr_t m_attr;
  pthread_condattr_t c_attr;

  pthread_mutexattr_init(&m_attr);
  pthread_condattr_init(&c_attr);
  /*
Uwe Schulzweida's avatar
Uwe Schulzweida committed
50
#if defined(_POSIX_THREAD_PROCESS_SHARED)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
51
52
  if ( PipeDebug )
    {
53
54
      Message("setpshared mutexattr to PTHREAD_PROCESS_SHARED");
      Message("setpshared condattr to PTHREAD_PROCESS_SHARED");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
55
56
57
58
59
60
61
62
63
64
    }

  pthread_mutexattr_setpshared(&m_attr, PTHREAD_PROCESS_SHARED);
  pthread_condattr_setpshared(&c_attr, PTHREAD_PROCESS_SHARED);

  if ( PipeDebug )
    {
      int pshared;
      pthread_mutexattr_getpshared(&m_attr, &pshared);
      if ( pshared == PTHREAD_PROCESS_SHARED )
Oliver Heidmann's avatar
Oliver Heidmann committed
65
        Message("getpshared mutexattr is PTHREAD_PROCESS_SHARED");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
66
      else if ( pshared == PTHREAD_PROCESS_PRIVATE )
Oliver Heidmann's avatar
Oliver Heidmann committed
67
        Message("getpshared mutexattr is PTHREAD_PROCESS_PRIVATE");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
68
69
70

      pthread_condattr_getpshared(&c_attr, &pshared);
      if ( pshared == PTHREAD_PROCESS_SHARED )
Oliver Heidmann's avatar
Oliver Heidmann committed
71
        Message("getpshared condattr is PTHREAD_PROCESS_SHARED");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
72
      else if ( pshared == PTHREAD_PROCESS_PRIVATE )
Oliver Heidmann's avatar
Oliver Heidmann committed
73
        Message("getpshared condattr is PTHREAD_PROCESS_PRIVATE");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
74
75
76
    }
#else
  if ( PipeDebug )
77
    Message("_POSIX_THREAD_PROCESS_SHARED undefined");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
78
79
#endif
  */
Oliver Heidmann's avatar
Oliver Heidmann committed
80
  pipe->EOP = false;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
81

Oliver Heidmann's avatar
Oliver Heidmann committed
82
83
84
85
  pipe->recIDr = -1;
  pipe->recIDw = -1;
  pipe->tsIDr = -1;
  pipe->tsIDw = -1;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
86

Oliver Heidmann's avatar
Oliver Heidmann committed
87
88
89
  pipe->nvals = 0;
  pipe->nmiss = 0;
  pipe->data = NULL;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
90
  pipe->hasdata = 0;
91
  pipe->usedata = true;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
92
93
  pipe->pstreamptr_in = 0;

Oliver Heidmann's avatar
Oliver Heidmann committed
94
  pipe->mutex = (pthread_mutex_t *) Malloc(sizeof(pthread_mutex_t));
Uwe Schulzweida's avatar
Uwe Schulzweida committed
95
96
  pthread_mutex_init(pipe->mutex, &m_attr);

Oliver Heidmann's avatar
Oliver Heidmann committed
97
  pipe->tsDef = (pthread_cond_t *) Malloc(sizeof(pthread_cond_t));
Uwe Schulzweida's avatar
Uwe Schulzweida committed
98
  pthread_cond_init(pipe->tsDef, &c_attr);
Oliver Heidmann's avatar
Oliver Heidmann committed
99
  pipe->tsInq = (pthread_cond_t *) Malloc(sizeof(pthread_cond_t));
Uwe Schulzweida's avatar
Uwe Schulzweida committed
100
101
  pthread_cond_init(pipe->tsInq, &c_attr);

Oliver Heidmann's avatar
Oliver Heidmann committed
102
  pipe->recDef = (pthread_cond_t *) Malloc(sizeof(pthread_cond_t));
Uwe Schulzweida's avatar
Uwe Schulzweida committed
103
  pthread_cond_init(pipe->recDef, &c_attr);
Oliver Heidmann's avatar
Oliver Heidmann committed
104
  pipe->recInq = (pthread_cond_t *) Malloc(sizeof(pthread_cond_t));
Uwe Schulzweida's avatar
Uwe Schulzweida committed
105
  pthread_cond_init(pipe->recInq, &c_attr);
Oliver Heidmann's avatar
Oliver Heidmann committed
106
107

  pipe->vlistDef = (pthread_cond_t *) Malloc(sizeof(pthread_cond_t));
Uwe Schulzweida's avatar
Uwe Schulzweida committed
108
  pthread_cond_init(pipe->vlistDef, &c_attr);
Oliver Heidmann's avatar
Oliver Heidmann committed
109
  pipe->isclosed = (pthread_cond_t *) Malloc(sizeof(pthread_cond_t));
Uwe Schulzweida's avatar
Uwe Schulzweida committed
110
111
  pthread_cond_init(pipe->isclosed, &c_attr);

Oliver Heidmann's avatar
Oliver Heidmann committed
112
  pipe->writeCond = (pthread_cond_t *) Malloc(sizeof(pthread_cond_t));
Uwe Schulzweida's avatar
Uwe Schulzweida committed
113
114
  pthread_cond_init(pipe->writeCond, &c_attr);

Oliver Heidmann's avatar
Oliver Heidmann committed
115
  pipe->readCond = (pthread_cond_t *) Malloc(sizeof(pthread_cond_t));
Uwe Schulzweida's avatar
Uwe Schulzweida committed
116
117
118
119
120
121
  pthread_cond_init(pipe->readCond, &c_attr);

  pthread_mutexattr_destroy(&m_attr);
  pthread_condattr_destroy(&c_attr);
}

Oliver Heidmann's avatar
Oliver Heidmann committed
122
123
pipe_t *
pipeNew()
Uwe Schulzweida's avatar
Uwe Schulzweida committed
124
{
Oliver Heidmann's avatar
Oliver Heidmann committed
125
  pipe_t *pipe = (pipe_t *) Malloc(sizeof(pipe_t));
Uwe Schulzweida's avatar
Uwe Schulzweida committed
126
127
128

  pipe_init(pipe);

129
  return pipe;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
130
131
}

Oliver Heidmann's avatar
Oliver Heidmann committed
132
133
void
pipeDelete(pipe_t *pipe)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
134
{
Oliver Heidmann's avatar
Oliver Heidmann committed
135
  if (pipe)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
136
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
      if (pipe->mutex)
        Free(pipe->mutex);
      if (pipe->tsDef)
        Free(pipe->tsDef);
      if (pipe->tsInq)
        Free(pipe->tsInq);
      if (pipe->recDef)
        Free(pipe->recDef);
      if (pipe->recInq)
        Free(pipe->recInq);
      if (pipe->vlistDef)
        Free(pipe->vlistDef);
      if (pipe->isclosed)
        Free(pipe->isclosed);
      if (pipe->writeCond)
        Free(pipe->writeCond);
      if (pipe->readCond)
        Free(pipe->readCond);
155
      Free(pipe);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
156
157
158
    }
}

Oliver Heidmann's avatar
Oliver Heidmann committed
159
160
void
pipeDefVlist(pstream_t *pstreamptr, int vlistID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
161
162
{
  char *pname = pstreamptr->name;
163
  pipe_t *pipe = pstreamptr->pipe;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
164

Oliver Heidmann's avatar
Oliver Heidmann committed
165
166
  if (PipeDebug)
    Message("%s pstreamID %d", pname, pstreamptr->self);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
167

168
  // LOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
169
170
  pthread_mutex_lock(pipe->mutex);
  pstreamptr->vlistID = vlistID;
171
  pthread_mutex_unlock(pipe->mutex);
Oliver Heidmann's avatar
Oliver Heidmann committed
172
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
173
174
175
176

  pthread_cond_signal(pipe->vlistDef);
}

Oliver Heidmann's avatar
Oliver Heidmann committed
177
178
#define TIMEOUT 1  // wait 1 seconds
#define MIN_WAIT_CYCLES 10
179
#define MAX_WAIT_CYCLES 3600
180
int processNumsActive(void);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
181

Oliver Heidmann's avatar
Oliver Heidmann committed
182
183
int
pipeInqVlist(pstream_t *pstreamptr)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
184
185
{
  char *pname = pstreamptr->name;
186
  pipe_t *pipe = pstreamptr->pipe;
187
  int vlistID = -1;
188
  struct timespec time_to_wait;
189
  int retcode = 0;
190
  int nwaitcycles = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
191

Oliver Heidmann's avatar
Oliver Heidmann committed
192
  time_to_wait.tv_sec = 0;
193
194
  time_to_wait.tv_nsec = 0;

Oliver Heidmann's avatar
Oliver Heidmann committed
195
196
  if (PipeDebug)
    Message("%s pstreamID %d", pname, pstreamptr->self);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
197

198
  // LOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
199
  pthread_mutex_lock(pipe->mutex);
200
  time_to_wait.tv_sec = time(NULL);
Oliver Heidmann's avatar
Oliver Heidmann committed
201
  while (pstreamptr->vlistID == -1 && retcode == 0)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
202
    {
203
204
      time_to_wait.tv_sec += TIMEOUT;
      // fprintf(stderr, "tvsec %g\n", (double) time_to_wait.tv_sec);
Oliver Heidmann's avatar
Oliver Heidmann committed
205
206
      if (PipeDebug)
        Message("%s wait of vlistDef", pname);
207
      // pthread_cond_wait(pipe->vlistDef, pipe->mutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
208
      retcode = pthread_cond_timedwait(pipe->vlistDef, pipe->mutex, &time_to_wait);
Oliver Heidmann's avatar
Oliver Heidmann committed
209
210
211
212
213
214
215
      // fprintf(stderr, "self %d retcode %d %d %d\n", pstreamptr->self, retcode, processNumsActive(),
      // pstreamptr->vlistID);
      if (retcode != 0 && nwaitcycles++ < MAX_WAIT_CYCLES)
        {
          if (processNumsActive() > 1 || (processNumsActive() == 1 && nwaitcycles < MIN_WAIT_CYCLES))
            retcode = 0;
        }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
216
    }
217

Oliver Heidmann's avatar
Oliver Heidmann committed
218
  if (retcode == 0)
219
    vlistID = pstreamptr->vlistID;
Oliver Heidmann's avatar
Oliver Heidmann committed
220
  else if (PipeDebug)
221
222
    Message("%s timeout!", pname);

Uwe Schulzweida's avatar
Uwe Schulzweida committed
223
  pthread_mutex_unlock(pipe->mutex);
224
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
225

226
  return vlistID;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
227
228
}

Oliver Heidmann's avatar
Oliver Heidmann committed
229
230
int
pipeInqTimestep(pstream_t *pstreamptr, int tsID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
231
232
{
  char *pname = pstreamptr->name;
233
  pipe_t *pipe = pstreamptr->pipe;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
234
235
  int nrecs;

Oliver Heidmann's avatar
Oliver Heidmann committed
236
237
  if (PipeDebug)
    Message("%s pstreamID %d", pname, pstreamptr->self);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
238

239
  // LOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
240
  pthread_mutex_lock(pipe->mutex);
241
  pipe->usedata = false;
Oliver Heidmann's avatar
Oliver Heidmann committed
242
243
  pipe->recIDr = -1;
  if (tsID != pipe->tsIDr + 1)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
244
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
245
246
      if (!(tsID == pipe->tsIDr && pipe->tsIDr == pipe->tsIDw && pipe->recIDr == -1))
        Error("%s unexpected tsID %d %d %d", pname, tsID, pipe->tsIDr + 1, pipe->tsIDw);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
247
    }
Oliver Heidmann's avatar
Oliver Heidmann committed
248

Uwe Schulzweida's avatar
Uwe Schulzweida committed
249
  pipe->tsIDr = tsID;
Oliver Heidmann's avatar
Oliver Heidmann committed
250
  while (pipe->tsIDw != tsID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
251
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
      if (pipe->EOP)
        {
          if (PipeDebug)
            Message("%s EOP", pname);
          break;
        }
      if (pipe->hasdata)
        {
          if (PipeDebug)
            Message("%s has data", pname);
          pipe->hasdata = 0;
          pipe->data = NULL;
          pthread_cond_signal(pipe->readCond);
        }
      else if (PipeDebug)
        Message("%s has no data", pname);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
268
269
270

      pthread_cond_signal(pipe->recInq); /* o.k. ??? */

Oliver Heidmann's avatar
Oliver Heidmann committed
271
272
      if (PipeDebug)
        Message("%s wait of tsDef", pname);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
273
274
275
      pthread_cond_wait(pipe->tsDef, pipe->mutex);
    }

Oliver Heidmann's avatar
Oliver Heidmann committed
276
  if (pipe->EOP)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
277
278
279
280
281
    nrecs = 0;
  else
    nrecs = pipe->nrecs;

  pthread_mutex_unlock(pipe->mutex);
282
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
283
284
285

  pthread_cond_signal(pipe->tsInq);

286
  return nrecs;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
287
288
}

Oliver Heidmann's avatar
Oliver Heidmann committed
289
290
void
pipeDefTimestep(pstream_t *pstreamptr, int tsID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
291
292
{
  char *pname = pstreamptr->name;
293
  pipe_t *pipe = pstreamptr->pipe;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
294
295
  int nrecs;

Oliver Heidmann's avatar
Oliver Heidmann committed
296
297
  if (PipeDebug)
    Message("%s pstreamID %d", pname, pstreamptr->self);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
298

299
  // LOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
300
301
302
  pthread_mutex_lock(pipe->mutex);
  pipe->recIDw = -1;
  pipe->tsIDw++;
Oliver Heidmann's avatar
Oliver Heidmann committed
303
  if (tsID != pipe->tsIDw)
304
    Error("unexpected tsID %d(%d) for %s", tsID, pipe->tsIDw, pname);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
305

Oliver Heidmann's avatar
Oliver Heidmann committed
306
  if (tsID == 0)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
307
308
309
310
311
312
    nrecs = vlistNrecs(pstreamptr->vlistID);
  else
    {
      int vlistID, varID;
      vlistID = pstreamptr->vlistID;
      nrecs = 0;
Oliver Heidmann's avatar
Oliver Heidmann committed
313
314
315
      for (varID = 0; varID < vlistNvars(vlistID); varID++)
        if (vlistInqVarTsteptype(vlistID, varID) != TSTEP_CONSTANT)
          nrecs += zaxisInqSize(vlistInqVarZaxis(vlistID, varID));
316
      // Message("nrecs = %d nvars = %d", nrecs, vlistNvars(vlistID));
Uwe Schulzweida's avatar
Uwe Schulzweida committed
317
318
319
    }

  pipe->nrecs = nrecs;
Oliver Heidmann's avatar
Oliver Heidmann committed
320
321
322
323
  if (PipeDebug)
    Message("%s nrecs %d tsID %d %d %d", pname, nrecs, tsID, pipe->tsIDw, pipe->tsIDr);
  if (nrecs == 0)
    pipe->EOP = true;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
324
  pthread_mutex_unlock(pipe->mutex);
325
326
  // UNLOCK

Uwe Schulzweida's avatar
Uwe Schulzweida committed
327
  pthread_cond_signal(pipe->tsDef);
Oliver Heidmann's avatar
Oliver Heidmann committed
328
  // sleep(1);
329
330

  // LOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
331
  pthread_mutex_lock(pipe->mutex);
Oliver Heidmann's avatar
Oliver Heidmann committed
332
  while (pipe->tsIDr < tsID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
333
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
334
335
336
337
338
339
340
341
      if (pipe->EOP)
        {
          if (PipeDebug)
            Message("EOP");
          break;
        }
      if (PipeDebug)
        Message("%s wait of tsInq (tsID %d %d)", pname, tsID, pipe->tsIDr);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
342
343
344
      pthread_cond_wait(pipe->tsInq, pipe->mutex);
    }
  pthread_mutex_unlock(pipe->mutex);
345
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
346
347
}

Oliver Heidmann's avatar
Oliver Heidmann committed
348
349
int
pipeInqRecord(pstream_t *pstreamptr, int *varID, int *levelID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
350
351
{
  char *pname = pstreamptr->name;
352
  pipe_t *pipe = pstreamptr->pipe;
353
  bool condSignal = false;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
354

Oliver Heidmann's avatar
Oliver Heidmann committed
355
356
357
  if (PipeDebug)
    Message("%s pstreamID %d", pname, pstreamptr->self);

358
  // LOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
359
  pthread_mutex_lock(pipe->mutex);
Oliver Heidmann's avatar
Oliver Heidmann committed
360
361
362
  if (PipeDebug)
    Message("%s has no data %d %d", pname, pipe->recIDr, pipe->recIDw);
  if (pipe->hasdata || pipe->usedata)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
363
364
365
    {
      pipe->hasdata = 0;
      pipe->data = NULL;
366
367
      pipe->usedata = false;
      condSignal = true;
Oliver Heidmann's avatar
Oliver Heidmann committed
368
    }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
369
  pthread_mutex_unlock(pipe->mutex);
370
371
  // UNLOCK

Oliver Heidmann's avatar
Oliver Heidmann committed
372
373
  if (condSignal)
    pthread_cond_signal(pipe->readCond);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
374

375
  // LOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
376
  pthread_mutex_lock(pipe->mutex);
377
  pipe->usedata = true;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
378
379
  pipe->recIDr++;

Oliver Heidmann's avatar
Oliver Heidmann committed
380
381
382
383
  if (PipeDebug)
    Message("%s recID %d %d", pname, pipe->recIDr, pipe->recIDw);

  while (pipe->recIDw != pipe->recIDr)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
384
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
385
386
387
388
389
390
391
392
      if (pipe->EOP)
        {
          if (PipeDebug)
            Message("EOP");
          break;
        }
      if (PipeDebug)
        Message("%s wait of recDef", pname);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
393
394
395
      pthread_cond_wait(pipe->recDef, pipe->mutex);
    }

Oliver Heidmann's avatar
Oliver Heidmann committed
396
  if (pipe->EOP)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
397
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
398
      *varID = -1;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
399
400
401
402
      *levelID = -1;
    }
  else
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
403
      *varID = pipe->varID;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
404
405
406
407
      *levelID = pipe->levelID;
    }

  pthread_mutex_unlock(pipe->mutex);
408
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
409
410
411

  pthread_cond_signal(pipe->recInq);

Uwe Schulzweida's avatar
Uwe Schulzweida committed
412
  return 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
413
414
}

Oliver Heidmann's avatar
Oliver Heidmann committed
415
416
void
pipeDefRecord(pstream_t *pstreamptr, int varID, int levelID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
417
418
{
  char *pname = pstreamptr->name;
419
  pipe_t *pipe = pstreamptr->pipe;
420
  bool condSignal = false;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
421

Oliver Heidmann's avatar
Oliver Heidmann committed
422
423
  if (PipeDebug)
    Message("%s pstreamID %d", pname, pstreamptr->self);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
424

425
  // LOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
426
  pthread_mutex_lock(pipe->mutex);
Oliver Heidmann's avatar
Oliver Heidmann committed
427
428
429
  if (PipeDebug)
    Message("%s has data %d %d", pname, pipe->recIDr, pipe->recIDw);
  if (pipe->hasdata)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
430
431
432
    {
      pipe->hasdata = 0;
      pipe->data = NULL;
433
      condSignal = true;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
434
435
    }
  pthread_mutex_unlock(pipe->mutex);
436
437
  // UNLOCK

Oliver Heidmann's avatar
Oliver Heidmann committed
438
439
  if (condSignal)
    pthread_cond_signal(pipe->readCond);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
440

441
  // LOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
442
  pthread_mutex_lock(pipe->mutex);
443
  pipe->usedata = true;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
444
  pipe->recIDw++;
Oliver Heidmann's avatar
Oliver Heidmann committed
445
  pipe->varID = varID;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
446
  pipe->levelID = levelID;
Oliver Heidmann's avatar
Oliver Heidmann committed
447
448
  if (PipeDebug)
    Message("%s recID %d %d", pname, pipe->recIDr, pipe->recIDw);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
449
  pthread_mutex_unlock(pipe->mutex);
450
451
  // UNLOCK

Uwe Schulzweida's avatar
Uwe Schulzweida committed
452
453
  pthread_cond_signal(pipe->recDef);

454
  // LOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
455
  pthread_mutex_lock(pipe->mutex);
Oliver Heidmann's avatar
Oliver Heidmann committed
456
  while (pipe->recIDr < pipe->recIDw)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
457
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
458
459
460
461
462
463
      if (pipe->tsIDw != pipe->tsIDr)
        break;
      if (pipe->EOP)
        break;
      if (PipeDebug)
        Message("%s wait of recInq %d", pname, pipe->recIDr);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
464
465
466
      pthread_cond_wait(pipe->recInq, pipe->mutex);
    }
  pthread_mutex_unlock(pipe->mutex);
467
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
468
469
}

Oliver Heidmann's avatar
Oliver Heidmann committed
470
471
void
pipeCopyRecord(pstream_t *pstreamptr_out, pstream_t *pstreamptr_in)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
472
473
474
{
  char *ipname = pstreamptr_in->name;
  char *opname = pstreamptr_out->name;
475
  pipe_t *pipe = pstreamptr_out->pipe;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
476

Oliver Heidmann's avatar
Oliver Heidmann committed
477
478
479
480
  if (PipeDebug)
    Message("%s pstreamIDin %d", ipname, pstreamptr_in->self);
  if (PipeDebug)
    Message("%s pstreamIDout %d", opname, pstreamptr_out->self);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
481

482
  // LOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
483
484
485
486
  pthread_mutex_lock(pipe->mutex);
  pipe->hasdata = 2; /* pipe */
  pipe->pstreamptr_in = pstreamptr_in;
  pthread_mutex_unlock(pipe->mutex);
487
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
488
489
490

  pthread_cond_signal(pipe->writeCond);

491
  // LOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
492
  pthread_mutex_lock(pipe->mutex);
Oliver Heidmann's avatar
Oliver Heidmann committed
493
  while (pipe->hasdata)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
494
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
495
496
497
498
499
500
501
502
503
504
505
506
507
508
      if (!pipe->usedata)
        break;

      if (pipe->recIDw != pipe->recIDr)
        break;

      if (pipe->EOP)
        {
          if (PipeDebug)
            Message("EOP");
          break;
        }
      if (PipeDebug)
        Message("%s wait of readCond", opname);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
509
510
511
      pthread_cond_wait(pipe->readCond, pipe->mutex);
    }
  pthread_mutex_unlock(pipe->mutex);
512
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
513
514
}

515
/***
Oliver Heidmann's avatar
Oliver Heidmann committed
516
 * copys data from a pipe to data
517
518
 *
 * @param data destination for the record data
Oliver Heidmann's avatar
Oliver Heidmann committed
519
 * @param pipe pipe that has the wanted data
520
 */
Oliver Heidmann's avatar
Oliver Heidmann committed
521
522
void
pipeReadPipeRecord(pipe_t *pipe, double *data, char *pname, int vlistID, int *nmiss)
523
524
525
526
527
528
529
530
531
532
533
534
535
536
{
  int datasize;

  if (!pipe->data)
    Error("No data pointer for %s", pname);

  datasize = gridInqSize(vlistInqVarGrid(vlistID, pipe->varID));
  pipe->nvals += datasize;
  if (vlistNumber(vlistID) != CDI_REAL)
    datasize *= 2;
  memcpy(data, pipe->data, datasize * sizeof(double));
  *nmiss = pipe->nmiss;
}

Oliver Heidmann's avatar
Oliver Heidmann committed
537
538
void
pipeGetReadTarget(pstream_t *pstreamptr, pstream_t *pstreamptr_in)
539
540
{

Oliver Heidmann's avatar
Oliver Heidmann committed
541
542
543
544
545
546
  pstreamptr_in = pstreamptr->pipe->pstreamptr_in;
  pstreamptr = pstreamptr_in;
  while (pstreamptr_in->ispipe)
    {
      if (PipeDebug)
        fprintf(stderr, "%s: istream %d is pipe\n", __func__, pstreamptr_in->self);
547
      pstreamptr = pstreamptr_in;
Oliver Heidmann's avatar
Oliver Heidmann committed
548
549
550
551
      pstreamptr_in = pstreamptr_in->pipe->pstreamptr_in;
      if (pstreamptr_in == 0)
        break;
    }
552
}
553
554
555
/***
 * Reads data from a file
 */
Oliver Heidmann's avatar
Oliver Heidmann committed
556
557
void
pipeReadFileRecord()
558
559
{
}
Oliver Heidmann's avatar
Oliver Heidmann committed
560
561
void
pipeReadRecord(pstream_t *pstreamptr, double *data, int *nmiss)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
562
563
{
  char *pname = pstreamptr->name;
564
  pipe_t *pipe = pstreamptr->pipe;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
565
566

  *nmiss = 0;
Oliver Heidmann's avatar
Oliver Heidmann committed
567
568
  if (PipeDebug)
    Message("%s pstreamID %d", pname, pstreamptr->self);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
569

570
  // LOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
571
  pthread_mutex_lock(pipe->mutex);
Oliver Heidmann's avatar
Oliver Heidmann committed
572
  while (pipe->hasdata == 0)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
573
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
574
575
      if (PipeDebug)
        Message("%s wait of writeCond", pname);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
576
577
578
      pthread_cond_wait(pipe->writeCond, pipe->mutex);
    }

Oliver Heidmann's avatar
Oliver Heidmann committed
579
  if (pipe->hasdata == 2)
Oliver Heidmann's avatar
Oliver Heidmann committed
580
    //===============================
Uwe Schulzweida's avatar
Uwe Schulzweida committed
581
    {
Uwe Schulzweida's avatar
Uwe Schulzweida committed
582
      pstream_t *pstreamptr_in;
583
      pipeGetReadTarget(pstreamptr, pstreamptr_in);
Oliver Heidmann's avatar
Oliver Heidmann committed
584
585
586
587
      if (pstreamptr_in == 0)
        {
          if (PipeDebug)
            fprintf(stderr, "pstreamID = %d\n", pstreamptr->self);
588
          if (pipe->hasdata == 1)
Oliver Heidmann's avatar
Oliver Heidmann committed
589
            {
590
              if (!pipe->data)
Oliver Heidmann's avatar
Oliver Heidmann committed
591
592
593
594
                {
                  Error("No data pointer for %s", pname);
                }
              pipeReadPipeRecord(pstreamptr->pipe, data, pname, pstreamptr->vlistID, nmiss);
Oliver Heidmann's avatar
Oliver Heidmann committed
595
596
597
598
            }
          else
            Error("Internal problem! istream undefined");
        }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
599
      else
Oliver Heidmann's avatar
Oliver Heidmann committed
600
601
602
603
604
        {
          if (PipeDebug)
            fprintf(stderr, "%s: istream %d is file\n", __func__, pstreamptr_in->self);
          streamReadRecord(pstreamptr_in->fileID, data, nmiss);
        }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
605
    }
Oliver Heidmann's avatar
Oliver Heidmann committed
606
  //===============================
Oliver Heidmann's avatar
Oliver Heidmann committed
607
  else if (pipe->hasdata == 1)
Oliver Heidmann's avatar
Oliver Heidmann committed
608
    //===============================
Uwe Schulzweida's avatar
Uwe Schulzweida committed
609
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
610
      pipeReadPipeRecord(pipe, data, pname, pstreamptr->vlistID, nmiss);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
611
    }
Oliver Heidmann's avatar
Oliver Heidmann committed
612
  //===============================
Uwe Schulzweida's avatar
Uwe Schulzweida committed
613
  else
Oliver Heidmann's avatar
Oliver Heidmann committed
614
    //===============================
Uwe Schulzweida's avatar
Uwe Schulzweida committed
615
    {
616
      Error("data type %d not implemented", pipe->hasdata);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
617
618
    }

Oliver Heidmann's avatar
Oliver Heidmann committed
619
620
  if (PipeDebug)
    Message("%s read record %d", pname, pipe->recIDr);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
621
622
623
624

  pipe->hasdata = 0;
  pipe->data = NULL;
  pthread_mutex_unlock(pipe->mutex);
625
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
626
627
628
629

  pthread_cond_signal(pipe->readCond);
}

Oliver Heidmann's avatar
Oliver Heidmann committed
630
631
void
pipeWriteRecord(pstream_t *pstreamptr, double *data, int nmiss)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
632
633
{
  char *pname = pstreamptr->name;
634
  pipe_t *pipe = pstreamptr->pipe;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
635

Oliver Heidmann's avatar
Oliver Heidmann committed
636
637
  if (PipeDebug)
    Message("%s pstreamID %d", pname, pstreamptr->self);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
638
639
640
641

  /*
  if ( ! pipe->usedata ) return;
  */
642
  // LOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
643
644
  pthread_mutex_lock(pipe->mutex);
  pipe->hasdata = 1; /* data pointer */
Oliver Heidmann's avatar
Oliver Heidmann committed
645
646
  pipe->data = data;
  pipe->nmiss = nmiss;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
647
  pthread_mutex_unlock(pipe->mutex);
648
649
  // UNLOCK

Uwe Schulzweida's avatar
Uwe Schulzweida committed
650
651
  pthread_cond_signal(pipe->writeCond);

Oliver Heidmann's avatar
Oliver Heidmann committed
652
653
  if (PipeDebug)
    Message("%s write record %d", pname, pipe->recIDw);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
654

655
  // LOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
656
  pthread_mutex_lock(pipe->mutex);
Oliver Heidmann's avatar
Oliver Heidmann committed
657
  while (pipe->hasdata)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
658
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
659
660
661
      if (!pipe->usedata)
        break;
      /*
Uwe Schulzweida's avatar
Uwe Schulzweida committed
662
663
664
      printf("ts ids %d %d\n", pipe->tsIDw, pipe->tsIDr);
      printf("rec ids %d %d\n", pipe->recIDw, pipe->recIDr);
      */
Oliver Heidmann's avatar
Oliver Heidmann committed
665
666
667
668
669
670
671
672
673
674
675
      if (pipe->recIDw != pipe->recIDr)
        break;

      if (pipe->EOP)
        {
          if (PipeDebug)
            Message("EOP");
          break;
        }
      if (PipeDebug)
        Message("%s wait of readCond", pname);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
676
677
678
      pthread_cond_wait(pipe->readCond, pipe->mutex);
    }
  pthread_mutex_unlock(pipe->mutex);
679
  // UNLOCK
Uwe Schulzweida's avatar
Uwe Schulzweida committed
680
681
}

Oliver Heidmann's avatar
Oliver Heidmann committed
682
683
void
pipeDebug(int debug)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
684
685
686
687
688
{
  PipeDebug = debug;
}

#endif