pstream.cc 50.9 KB
Newer Older
Uwe Schulzweida's avatar
Uwe Schulzweida committed
1
2
3
4
/*
  This file is part of CDO. CDO is a collection of Operators to
  manipulate and analyse Climate model Data.

Uwe Schulzweida's avatar
Uwe Schulzweida committed
5
  Copyright (C) 2003-2017 Uwe Schulzweida, <uwe.schulzweida AT mpimet.mpg.de>
Uwe Schulzweida's avatar
Uwe Schulzweida committed
6
7
8
9
10
11
12
13
14
15
16
17
  See COPYING file for copying and redistribution conditions.

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation; version 2 of the License.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.
*/

Uwe Schulzweida's avatar
Uwe Schulzweida committed
18
#if defined(HAVE_CONFIG_H)
19
#include "config.h"
Uwe Schulzweida's avatar
Uwe Schulzweida committed
20
21
#endif

Uwe Schulzweida's avatar
Uwe Schulzweida committed
22
#if defined(_OPENMP)
23
#include <omp.h>
Uwe Schulzweida's avatar
Uwe Schulzweida committed
24
25
#endif

Uwe Schulzweida's avatar
Uwe Schulzweida committed
26
27
28
29
#include <stdio.h>
#include <string.h>
#include <stdarg.h>
#include <errno.h>
30
#include <sys/stat.h> /* stat */
Uwe Schulzweida's avatar
Uwe Schulzweida committed
31
32
33
34

FILE *popen(const char *command, const char *type);
int pclose(FILE *stream);

Ralf Mueller's avatar
Ralf Mueller committed
35
#include <cdi.h>
Uwe Schulzweida's avatar
Uwe Schulzweida committed
36
37
38
#include "cdo.h"
#include "cdo_int.h"
#include "modules.h"
39
#include "pstream.h"
Uwe Schulzweida's avatar
Uwe Schulzweida committed
40
41
42
43
44
45
46
47
48
#include "pstream_int.h"
#include "cdo_int.h"
#include "util.h"
#include "pipe.h"
#include "error.h"
#include "dmemory.h"

static int PSTREAM_Debug = 0;

Oliver Heidmann's avatar
Oliver Heidmann committed
49
#define MAX_PSTREAMS 4096
Uwe Schulzweida's avatar
Uwe Schulzweida committed
50
51
52
53
54

static int _pstream_max = MAX_PSTREAMS;

static void pstream_initialize(void);

55
static bool _pstream_init = false;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
56

Uwe Schulzweida's avatar
Uwe Schulzweida committed
57
#if defined(HAVE_LIBPTHREAD)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
58
59
60
#include <pthread.h>
#include "pthread_debug.h"

61
// TODO: make threadsafe
Uwe Schulzweida's avatar
Uwe Schulzweida committed
62
63
static int pthreadScope = 0;

Oliver Heidmann's avatar
Oliver Heidmann committed
64
static pthread_mutex_t streamOpenReadMutex = PTHREAD_MUTEX_INITIALIZER;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
65
static pthread_mutex_t streamOpenWriteMutex = PTHREAD_MUTEX_INITIALIZER;
Oliver Heidmann's avatar
Oliver Heidmann committed
66
static pthread_mutex_t streamMutex = PTHREAD_MUTEX_INITIALIZER;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
67
68
69
70

static pthread_once_t _pstream_init_thread = PTHREAD_ONCE_INIT;
static pthread_mutex_t _pstream_mutex;

Oliver Heidmann's avatar
Oliver Heidmann committed
71
72
73
74
75
#define PSTREAM_LOCK() pthread_mutex_lock(&_pstream_mutex)
#define PSTREAM_UNLOCK() pthread_mutex_unlock(&_pstream_mutex)
#define PSTREAM_INIT() \
  if (!_pstream_init)  \
  pthread_once(&_pstream_init_thread, pstream_initialize)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
76
77
78

#else

Oliver Heidmann's avatar
Oliver Heidmann committed
79
80
81
82
83
#define PSTREAM_LOCK()
#define PSTREAM_UNLOCK()
#define PSTREAM_INIT() \
  if (!_pstream_init)  \
  pstream_initialize()
Uwe Schulzweida's avatar
Uwe Schulzweida committed
84
85
86

#endif

Oliver Heidmann's avatar
Oliver Heidmann committed
87
88
typedef struct _pstreamPtrToIdx
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
89
  int idx;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
90
  pstream_t *ptr;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
91
92
93
  struct _pstreamPtrToIdx *next;
} pstreamPtrToIdx;

Oliver Heidmann's avatar
Oliver Heidmann committed
94
static pstreamPtrToIdx *_pstreamList = NULL;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
95
static pstreamPtrToIdx *_pstreamAvail = NULL;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
96

Oliver Heidmann's avatar
Oliver Heidmann committed
97
98
static void
pstream_list_new(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
99
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
100
101
  assert(_pstreamList == NULL);

Oliver Heidmann's avatar
Oliver Heidmann committed
102
  _pstreamList = (pstreamPtrToIdx *) Malloc(_pstream_max * sizeof(pstreamPtrToIdx));
Uwe Schulzweida's avatar
Uwe Schulzweida committed
103
104
}

Oliver Heidmann's avatar
Oliver Heidmann committed
105
106
static void
pstream_list_delete(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
107
{
Oliver Heidmann's avatar
Oliver Heidmann committed
108
109
  if (_pstreamList)
    Free(_pstreamList);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
110
111
}

Oliver Heidmann's avatar
Oliver Heidmann committed
112
113
114
115
static void
pstream_init_pointer(void)
{
  for (int i = 0; i < _pstream_max; ++i)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
116
117
    {
      _pstreamList[i].next = _pstreamList + i + 1;
Oliver Heidmann's avatar
Oliver Heidmann committed
118
119
      _pstreamList[i].idx = i;
      _pstreamList[i].ptr = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
120
121
    }

Oliver Heidmann's avatar
Oliver Heidmann committed
122
  _pstreamList[_pstream_max - 1].next = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
123
124
125
126

  _pstreamAvail = _pstreamList;
}

Oliver Heidmann's avatar
Oliver Heidmann committed
127
128
static pstream_t *
pstream_to_pointer(int idx)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
129
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
130
  pstream_t *pstreamptr = NULL;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
131

132
  PSTREAM_INIT();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
133

Oliver Heidmann's avatar
Oliver Heidmann committed
134
  if (idx >= 0 && idx < _pstream_max)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
135
    {
136
      PSTREAM_LOCK();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
137
138
139

      pstreamptr = _pstreamList[idx].ptr;

140
      PSTREAM_UNLOCK();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
141
142
    }
  else
143
    Error("pstream index %d undefined!", idx);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
144

145
  return pstreamptr;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
146
147
148
}

/* Create an index from a pointer */
Oliver Heidmann's avatar
Oliver Heidmann committed
149
150
static int
pstream_from_pointer(pstream_t *ptr)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
151
{
152
  int idx = -1;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
153

Oliver Heidmann's avatar
Oliver Heidmann committed
154
  if (ptr)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
155
    {
156
      PSTREAM_LOCK();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
157

Oliver Heidmann's avatar
Oliver Heidmann committed
158
159
160
161
162
163
164
165
166
167
168
      if (_pstreamAvail)
        {
          pstreamPtrToIdx *newptr = _pstreamAvail;
          _pstreamAvail = _pstreamAvail->next;
          newptr->next = 0;
          idx = newptr->idx;
          newptr->ptr = ptr;

          if (PSTREAM_Debug)
            Message("Pointer %p has idx %d from pstream list", ptr, idx);
        }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
169
      else
Oliver Heidmann's avatar
Oliver Heidmann committed
170
        Error("Too many open pstreams (limit is %d)!", _pstream_max);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
171

172
      PSTREAM_UNLOCK();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
173
174
    }
  else
175
    Error("Internal problem (pointer %p undefined)", ptr);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
176

177
  return idx;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
178
179
}

Oliver Heidmann's avatar
Oliver Heidmann committed
180
181
static void
pstream_init_entry(pstream_t *pstreamptr)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
182
{
Oliver Heidmann's avatar
Oliver Heidmann committed
183
184
185
186
187
  pstreamptr->self = pstream_from_pointer(pstreamptr);

  pstreamptr->isopen = true;
  pstreamptr->ispipe = false;
  pstreamptr->fileID = -1;
188
  pstreamptr->m_vlistID = -1;
Oliver Heidmann's avatar
Oliver Heidmann committed
189
  pstreamptr->tsID = -1;
190
  pstreamptr->m_filetype = -1;
Oliver Heidmann's avatar
Oliver Heidmann committed
191
192
193
194
195
196
197
  pstreamptr->name = NULL;
  pstreamptr->tsID0 = 0;
  pstreamptr->mfiles = 0;
  pstreamptr->nfiles = 0;
  pstreamptr->varID = -1;
  pstreamptr->name = NULL;
  pstreamptr->mfnames = NULL;
198
  pstreamptr->m_varlist = NULL;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
199
#if defined(HAVE_LIBPTHREAD)
Oliver Heidmann's avatar
Oliver Heidmann committed
200
201
202
203
  pstreamptr->argument = NULL;
  pstreamptr->pipe = NULL;
//  pstreamptr->rthreadID  = 0;
//  pstreamptr->wthreadID  = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
204
205
#endif
}
206
pstream_t::pstream_t() { pstream_init_entry(this); }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
207

Oliver Heidmann's avatar
Oliver Heidmann committed
208
209
static void
pstream_delete_entry(pstream_t *pstreamptr)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
210
{
211
  int idx = pstreamptr->self;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
212

213
  PSTREAM_LOCK();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
214

215
  delete (pstreamptr);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
216
217

  _pstreamList[idx].next = _pstreamAvail;
Oliver Heidmann's avatar
Oliver Heidmann committed
218
219
  _pstreamList[idx].ptr = 0;
  _pstreamAvail = &_pstreamList[idx];
Uwe Schulzweida's avatar
Uwe Schulzweida committed
220

221
  PSTREAM_UNLOCK();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
222

Oliver Heidmann's avatar
Oliver Heidmann committed
223
  if (PSTREAM_Debug)
224
    Message("Removed idx %d from pstream list", idx);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
225
226
}

Oliver Heidmann's avatar
Oliver Heidmann committed
227
228
static void
pstream_initialize(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
229
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
230
#if defined(HAVE_LIBPTHREAD)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
231
232
233
234
  /* initialize global API mutex lock */
  pthread_mutex_init(&_pstream_mutex, NULL);
#endif

235
  char *env = getenv("PSTREAM_DEBUG");
Oliver Heidmann's avatar
Oliver Heidmann committed
236
237
  if (env)
    PSTREAM_Debug = atoi(env);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
238
239

  env = getenv("PSTREAM_MAX");
Oliver Heidmann's avatar
Oliver Heidmann committed
240
241
  if (env)
    _pstream_max = atoi(env);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
242

Oliver Heidmann's avatar
Oliver Heidmann committed
243
  if (PSTREAM_Debug)
244
    Message("PSTREAM_MAX = %d", _pstream_max);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
245
246
247
248
249
250

  pstream_list_new();
  atexit(pstream_list_delete);

  pstream_init_pointer();

251
  _pstream_init = true;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
252
253
}

Oliver Heidmann's avatar
Oliver Heidmann committed
254
255
static int
pstreamFindID(const char *name)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
256
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
257
  pstream_t *pstreamptr;
258
  int pstreamID;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
259

Oliver Heidmann's avatar
Oliver Heidmann committed
260
  for (pstreamID = 0; pstreamID < _pstream_max; ++pstreamID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
261
262
263
    {
      pstreamptr = pstream_to_pointer(pstreamID);

Oliver Heidmann's avatar
Oliver Heidmann committed
264
265
266
267
      if (pstreamptr)
        if (pstreamptr->name)
          if (strcmp(pstreamptr->name, name) == 0)
            break;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
268
269
    }

Oliver Heidmann's avatar
Oliver Heidmann committed
270
271
  if (pstreamID == _pstream_max)
    pstreamID = -1;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
272

273
  return pstreamID;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
274
}
275
276
bool
pstream_t::isPipe()
Uwe Schulzweida's avatar
Uwe Schulzweida committed
277
{
278
  return ispipe;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
279
280
}

Oliver Heidmann's avatar
Oliver Heidmann committed
281
282
283
static void
createPipeName(char *pipename, int pnlen)
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
284

285
  snprintf(pipename, pnlen, "(pipe%d.%d)", processSelf() + 1, processInqChildNum() + 1);
286
}
287

Oliver Heidmann's avatar
Oliver Heidmann committed
288
289
290
pthread_t
pCreateReadThread(argument_t *argument)
{
291
292
  pthread_attr_t attr;
  int status = pthread_attr_init(&attr);
Oliver Heidmann's avatar
Oliver Heidmann committed
293
294
  if (status)
    SysError("pthread_attr_init failed for '%s'", argument->operatorName.c_str());
295
  status = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
Oliver Heidmann's avatar
Oliver Heidmann committed
296
297
  if (status)
    SysError("pthread_attr_setdetachstate failed for '%s'", argument->operatorName.c_str());
298
299
300
301
302
303
304
  /*
    param.sched_priority = 0;
    status = pthread_attr_setschedparam(&attr, &param);
    if ( status ) SysError("pthread_attr_setschedparam failed for '%s'", newarg+1);
  */
  /* status = pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED); */
  /* if ( status ) SysError("pthread_attr_setinheritsched failed for '%s'", newarg+1); */
305

306
  pthread_attr_getscope(&attr, &pthreadScope);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
307

308
309
310
311
312
313
314
  /* status = pthread_attr_setscope(&attr, PTHREAD_SCOPE_PROCESS); */
  /* if ( status ) SysError("pthread_attr_setscope failed for '%s'", newarg+1); */
  /* If system scheduling scope is specified, then the thread is scheduled against all threads in the system */
  /* pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); */

  size_t stacksize = 0;
  status = pthread_attr_getstacksize(&attr, &stacksize);
Oliver Heidmann's avatar
Oliver Heidmann committed
315
  if (stacksize < 2097152)
316
317
318
    {
      stacksize = 2097152;
      pthread_attr_setstacksize(&attr, stacksize);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
319
    }
320
321

  pthread_t thrID;
322
  int rval = pthread_create(&thrID, &attr, operatorModule(argument->operatorName.c_str()), argument);
Oliver Heidmann's avatar
Oliver Heidmann committed
323
  if (rval != 0)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
324
    {
325
      errno = rval;
326
      SysError("pthread_create failed for '%s'", argument->operatorName.c_str());
327
    }
328
329
330
  return thrID;
}

Oliver Heidmann's avatar
Oliver Heidmann committed
331
332
static void
pstreamOpenReadPipe(const argument_t *argument, pstream_t *pstreamptr)
333
334
335
{
#if defined(HAVE_LIBPTHREAD)
  int pstreamID = pstreamptr->self;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
336

337
  size_t pnlen = 16;
Oliver Heidmann's avatar
Oliver Heidmann committed
338
  char *pipename = (char *) Malloc(pnlen);
339
  // struct sched_param param;
Oliver Heidmann's avatar
Oliver Heidmann committed
340

341
  argument_t *newargument = new argument_t();
342
  newargument->argc = argument->argc + 1;
Oliver Heidmann's avatar
Oliver Heidmann committed
343
  newargument->argv = (char **) Malloc(newargument->argc * sizeof(char *));
344
  newargument->operatorName = "";
Oliver Heidmann's avatar
Oliver Heidmann committed
345
  memcpy(newargument->argv, argument->argv, argument->argc * sizeof(char *));
346

Oliver Heidmann's avatar
Oliver Heidmann committed
347
  char *operatorArg = argument->argv[0];
348
349
350
  const char *operatorName = getOperatorName(operatorArg);

  size_t len = strlen(argument->args);
Oliver Heidmann's avatar
Oliver Heidmann committed
351
  char *newarg = (char *) Malloc(len + pnlen);
352
353
354
  strcpy(newarg, argument->args);
  createPipeName(pipename, pnlen);
  newarg[len] = ' ';
Oliver Heidmann's avatar
Oliver Heidmann committed
355
  strcpy(&newarg[len + 1], pipename);
356
357
358

  newargument->argv[argument->argc] = pipename;
  newargument->args = newarg;
Oliver Heidmann's avatar
Oliver Heidmann committed
359
  newargument->operatorName = std::string(operatorName, strlen(operatorName));
360
361
362
363
364
  /*
    printf("pstreamOpenRead: new args >%s<\n", newargument->args);
    for ( int i = 0; i < newargument->argc; ++i )
    printf("pstreamOpenRead: new arg %d >%s<\n", i, newargument->argv[i]);
  */
Oliver Heidmann's avatar
Oliver Heidmann committed
365
366
  pstreamptr->ispipe = true;
  pstreamptr->name = pipename;
367
  pstreamptr->rthreadID = pthread_self();
Oliver Heidmann's avatar
Oliver Heidmann committed
368
  pstreamptr->pipe = new pipe_t();
369
  pstreamptr->pipe->name = std::string(pipename);
Oliver Heidmann's avatar
Oliver Heidmann committed
370
371
372
373
374
375
  pstreamptr->argument = (void *) newargument;

  if (!cdoSilentMode)
    {
      cdoPrint("Started child process \"%s\".", newarg + 1);
    }
376
  pCreateReadThread(newargument);
377
  /* Free(operatorName); */
378
  processAddInputStream(pstreamptr);
379
  /*      pipeInqInfo(pstreamID); */
Oliver Heidmann's avatar
Oliver Heidmann committed
380
381
  if (PSTREAM_Debug)
    Message("pipe %s", pipename);
382
383
384
385
386
#else
  cdoAbort("Cannot use pipes, pthread support not compiled in!");
#endif
}

Oliver Heidmann's avatar
Oliver Heidmann committed
387
388
static void
pstreamCreateFilelist(const argument_t *argument, pstream_t *pstreamptr)
389
390
391
{
  size_t i;
  size_t len = strlen(argument->args);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
392

Oliver Heidmann's avatar
Oliver Heidmann committed
393
394
395
  for (i = 0; i < len; i++)
    if (argument->args[i] == ':')
      break;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
396

Oliver Heidmann's avatar
Oliver Heidmann committed
397
  if (i < len)
398
399
400
    {
      int nfiles = 1, j;

Oliver Heidmann's avatar
Oliver Heidmann committed
401
402
403
      const char *pch = &argument->args[i + 1];
      len -= (i + 1);
      if (len && (strncmp(argument->args, "filelist:", 9) == 0 || strncmp(argument->args, "flist:", 6) == 0))
404
        {
Oliver Heidmann's avatar
Oliver Heidmann committed
405
406
407
          for (i = 0; i < len; i++)
            if (pch[i] == ',')
              nfiles++;
408

Oliver Heidmann's avatar
Oliver Heidmann committed
409
          if (nfiles == 1)
410
411
412
413
            {
              char line[4096];
              FILE *fp, *fp2;
              fp = fopen(pch, "r");
Oliver Heidmann's avatar
Oliver Heidmann committed
414
415
              if (fp == NULL)
                {
416
                  cdoAbort("Open failed on %s", pch);
Oliver Heidmann's avatar
Oliver Heidmann committed
417
418
419
420
421
                }
              if (cdoVerbose)
                {
                  cdoPrint("Reading file names from %s", pch);
                }
422
423
              /* find number of files */
              nfiles = 0;
Oliver Heidmann's avatar
Oliver Heidmann committed
424
              while (readline(fp, line, 4096))
425
                {
Oliver Heidmann's avatar
Oliver Heidmann committed
426
427
                  if (line[0] == '#' || line[0] == '\0' || line[0] == ' ')
                    continue;
428

Oliver Heidmann's avatar
Oliver Heidmann committed
429
430
431
                  fp2 = fopen(line, "r");
                  if (fp2 == NULL)
                    cdoAbort("Open failed on %s", line);
432
433
                  fclose(fp2);
                  nfiles++;
Oliver Heidmann's avatar
Oliver Heidmann committed
434
                  if (cdoVerbose)
435
436
437
                    cdoPrint("File number %d is %s", nfiles, line);
                }

Oliver Heidmann's avatar
Oliver Heidmann committed
438
439
              if (nfiles == 0)
                cdoAbort("No imput file found in %s", pch);
440
441

              pstreamptr->mfiles = nfiles;
Oliver Heidmann's avatar
Oliver Heidmann committed
442
              pstreamptr->mfnames = (char **) Malloc(nfiles * sizeof(char *));
443

444
445
446
              rewind(fp);

              nfiles = 0;
Oliver Heidmann's avatar
Oliver Heidmann committed
447
              while (readline(fp, line, 4096))
448
                {
Oliver Heidmann's avatar
Oliver Heidmann committed
449
450
                  if (line[0] == '#' || line[0] == '\0' || line[0] == ' ')
                    continue;
451
452
453
454
455
456
457
458
459
460
461
462

                  pstreamptr->mfnames[nfiles] = strdupx(line);
                  nfiles++;
                }

              fclose(fp);
            }
          else
            {
              char line[65536];

              pstreamptr->mfiles = nfiles;
Oliver Heidmann's avatar
Oliver Heidmann committed
463
              pstreamptr->mfnames = (char **) Malloc(nfiles * sizeof(char *));
464

465
              strcpy(line, pch);
Oliver Heidmann's avatar
Oliver Heidmann committed
466
467
468
469
              for (i = 0; i < len; i++)
                {
                  if (line[i] == ',')
                    {
470
                      line[i] = 0;
Oliver Heidmann's avatar
Oliver Heidmann committed
471
472
                    }
                }
473
              i = 0;
Oliver Heidmann's avatar
Oliver Heidmann committed
474
              for (j = 0; j < nfiles; j++)
475
476
477
478
479
480
                {
                  pstreamptr->mfnames[j] = strdupx(&line[i]);
                  i += strlen(&line[i]) + 1;
                }
            }
        }
Oliver Heidmann's avatar
Oliver Heidmann committed
481
      else if (len && strncmp(argument->args, "ls:", 3) == 0)
482
483
484
485
486
        {
          char line[4096];
          char command[4096];
          char *fnames[16384];
          FILE *pfp;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
487

488
489
490
491
          strcpy(command, "ls ");
          strcat(command, pch);

          pfp = popen(command, "r");
Oliver Heidmann's avatar
Oliver Heidmann committed
492
          if (pfp == 0)
493
494
495
            SysError("popen %s failed", command);

          nfiles = 0;
Oliver Heidmann's avatar
Oliver Heidmann committed
496
          while (readline(pfp, line, 4096))
497
            {
Oliver Heidmann's avatar
Oliver Heidmann committed
498
499
              if (nfiles >= 16384)
                cdoAbort("Too many input files (limit: 16384)");
500
501
502
503
504
505
              fnames[nfiles++] = strdupx(line);
            }

          pclose(pfp);

          pstreamptr->mfiles = nfiles;
Oliver Heidmann's avatar
Oliver Heidmann committed
506
          pstreamptr->mfnames = (char **) Malloc(nfiles * sizeof(char *));
507

Oliver Heidmann's avatar
Oliver Heidmann committed
508
          for (j = 0; j < nfiles; j++)
509
510
511
512
513
            pstreamptr->mfnames[j] = fnames[j];
        }
    }
}

Oliver Heidmann's avatar
Oliver Heidmann committed
514
515
516
static void
pstreamOpenReadFile(const argument_t *argument, pstream_t *pstreamptr)
{
517
518
519
  pstreamCreateFilelist(argument, pstreamptr);

  char *filename = NULL;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
520

Oliver Heidmann's avatar
Oliver Heidmann committed
521
  if (pstreamptr->mfiles)
522
    {
523
      size_t len = strlen(pstreamptr->mfnames[0]);
Oliver Heidmann's avatar
Oliver Heidmann committed
524
      filename = (char *) Malloc(len + 1);
525
526
527
528
529
      strcpy(filename, pstreamptr->mfnames[0]);
      pstreamptr->nfiles = 1;
    }
  else
    {
530
      size_t len = strlen(argument->args);
Oliver Heidmann's avatar
Oliver Heidmann committed
531
      filename = (char *) Malloc(len + 1);
532
533
534
      strcpy(filename, argument->args);
    }

Oliver Heidmann's avatar
Oliver Heidmann committed
535
536
  if (PSTREAM_Debug)
    Message("file %s", filename);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
537

Uwe Schulzweida's avatar
Uwe Schulzweida committed
538
#if defined(HAVE_LIBPTHREAD)
Oliver Heidmann's avatar
Oliver Heidmann committed
539
  if (cdoLockIO)
540
541
542
    pthread_mutex_lock(&streamMutex);
  else
    pthread_mutex_lock(&streamOpenReadMutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
543
#endif
544
  int fileID = streamOpenRead(filename);
Oliver Heidmann's avatar
Oliver Heidmann committed
545
  if (fileID < 0)
546
    {
547
      pstreamptr->isopen = false;
548
549
      cdiOpenError(fileID, "Open failed on >%s<", filename);
    }
Oliver Heidmann's avatar
Oliver Heidmann committed
550
551

  if (cdoDefaultFileType == CDI_UNDEFID)
552
553
554
555
556
557
    cdoDefaultFileType = streamInqFiletype(fileID);
  /*
    if ( cdoDefaultInstID == CDI_UNDEFID )
    cdoDefaultInstID = streamInqInstID(fileID);
  */
  cdoInqHistory(fileID);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
558
#if defined(HAVE_LIBPTHREAD)
Oliver Heidmann's avatar
Oliver Heidmann committed
559
  if (cdoLockIO)
560
561
562
    pthread_mutex_unlock(&streamMutex);
  else
    pthread_mutex_unlock(&streamOpenReadMutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
563
#endif
Uwe Schulzweida's avatar
Uwe Schulzweida committed
564

Oliver Heidmann's avatar
Oliver Heidmann committed
565
566
  pstreamptr->mode = 'r';
  pstreamptr->name = filename;
567
568
569
  pstreamptr->fileID = fileID;
}

Oliver Heidmann's avatar
Oliver Heidmann committed
570
571
int
pstreamOpenRead(const argument_t *argument)
572
573
574
{
  PSTREAM_INIT();

575
  pstream_t *pstreamptr = new pstream_t();
Oliver Heidmann's avatar
Oliver Heidmann committed
576
577
  if (!pstreamptr)
    Error("No memory");
578
579
580
581
582
583
584
585
586

  int pstreamID = pstreamptr->self;

  int ispipe = argument->args[0] == '-';
  /*
  printf("pstreamOpenRead: args >%s<\n", argument->args);
  for ( int i = 0; i < argument->argc; ++i )
    printf("pstreamOpenRead: arg %d >%s<\n", i, argument->argv[i]);
  */
Oliver Heidmann's avatar
Oliver Heidmann committed
587
  if (ispipe)
588
589
590
591
592
593
    {
      pstreamOpenReadPipe(argument, pstreamptr);
    }
  else
    {
      pstreamOpenReadFile(argument, pstreamptr);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
594
595
    }

Oliver Heidmann's avatar
Oliver Heidmann committed
596
597
598
  if (pstreamID < 0)
    cdiOpenError(pstreamID, "Open failed on >%s<", argument->args);

599
  return pstreamID;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
600
601
}

Oliver Heidmann's avatar
Oliver Heidmann committed
602
603
static void
query_user_exit(const char *argument)
604
{
Oliver Heidmann's avatar
Oliver Heidmann committed
605
/* modified code from NCO */
606
607
608
609
#define USR_RPL_MAX_LNG 10 /* Maximum length for user reply */
#define USR_RPL_MAX_NBR 10 /* Maximum number of chances for user to reply */
  char usr_rpl[USR_RPL_MAX_LNG];
  int usr_rpl_int;
Oliver Heidmann's avatar
Oliver Heidmann committed
610
  short nbr_itr = 0;
611
612
613
  size_t usr_rpl_lng = 0;

  /* Initialize user reply string */
Oliver Heidmann's avatar
Oliver Heidmann committed
614
615
  usr_rpl[0] = 'z';
  usr_rpl[1] = '\0';
616

Oliver Heidmann's avatar
Oliver Heidmann committed
617
  while (!(usr_rpl_lng == 1 && (*usr_rpl == 'o' || *usr_rpl == 'O' || *usr_rpl == 'e' || *usr_rpl == 'E')))
618
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
      if (nbr_itr++ > USR_RPL_MAX_NBR)
        {
          (void) fprintf(stdout,
                         "\n%s: ERROR %d failed attempts to obtain valid interactive input.\n",
                         processInqPrompt(),
                         nbr_itr - 1);
          exit(EXIT_FAILURE);
        }

      if (nbr_itr > 1)
        (void) fprintf(stdout, "%s: ERROR Invalid response.\n", processInqPrompt());
      (void) fprintf(stdout,
                     "%s: %s exists ---`e'xit, or `o'verwrite (delete existing file) (e/o)? ",
                     processInqPrompt(),
                     argument);
      (void) fflush(stdout);
      if (fgets(usr_rpl, USR_RPL_MAX_LNG, stdin) == NULL)
        continue;
637
638
639

      /* Ensure last character in input string is \n and replace that with \0 */
      usr_rpl_lng = strlen(usr_rpl);
Oliver Heidmann's avatar
Oliver Heidmann committed
640
641
642
643
644
645
      if (usr_rpl_lng >= 1)
        if (usr_rpl[usr_rpl_lng - 1] == '\n')
          {
            usr_rpl[usr_rpl_lng - 1] = '\0';
            usr_rpl_lng--;
          }
646
647
648
    }

  /* Ensure one case statement for each exit condition in preceding while loop */
Oliver Heidmann's avatar
Oliver Heidmann committed
649
650
  usr_rpl_int = (int) usr_rpl[0];
  switch (usr_rpl_int)
651
652
    {
    case 'E':
Oliver Heidmann's avatar
Oliver Heidmann committed
653
    case 'e': exit(EXIT_SUCCESS); break;
654
    case 'O':
Oliver Heidmann's avatar
Oliver Heidmann committed
655
656
    case 'o': break;
    default: exit(EXIT_FAILURE); break;
657
658
659
    } /* end switch */
}

Oliver Heidmann's avatar
Oliver Heidmann committed
660
661
static int
pstreamOpenWritePipe(const argument_t *argument, int filetype)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
662
{
663
  int pstreamID = -1;
Oliver Heidmann's avatar
Oliver Heidmann committed
664

Uwe Schulzweida's avatar
Uwe Schulzweida committed
665
#if defined(HAVE_LIBPTHREAD)
Oliver Heidmann's avatar
Oliver Heidmann committed
666
  if (PSTREAM_Debug)
667
    {
668
      Message("pipe %s", argument->args);
669
    }
670
  pstreamID = pstreamFindID(argument->args);
Oliver Heidmann's avatar
Oliver Heidmann committed
671
  if (pstreamID == -1)
672
673
674
    {
      Error("%s is not open!", argument->args);
    }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
675

676
  pstream_t *pstreamptr = pstream_to_pointer(pstreamID);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
677

678
  pstreamptr->wthreadID = pthread_self();
679
  pstreamptr->m_filetype = filetype;
680
  processAddOutputStream(pstreamptr);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
681
682
#endif

683
684
  return pstreamID;
}
Uwe Schulzweida's avatar
Uwe Schulzweida committed
685

Oliver Heidmann's avatar
Oliver Heidmann committed
686
687
static void
set_comp(int fileID, int filetype)
688
{
Oliver Heidmann's avatar
Oliver Heidmann committed
689
  if (cdoCompress)
690
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
691
      if (filetype == CDI_FILETYPE_GRB)
692
        {
Oliver Heidmann's avatar
Oliver Heidmann committed
693
          cdoCompType = CDI_COMPRESS_SZIP;
694
695
          cdoCompLevel = 0;
        }
Oliver Heidmann's avatar
Oliver Heidmann committed
696
      else if (filetype == CDI_FILETYPE_NC4 || filetype == CDI_FILETYPE_NC4C)
697
        {
Oliver Heidmann's avatar
Oliver Heidmann committed
698
          cdoCompType = CDI_COMPRESS_ZIP;
699
700
701
702
          cdoCompLevel = 1;
        }
    }

Oliver Heidmann's avatar
Oliver Heidmann committed
703
  if (cdoCompType != CDI_COMPRESS_NONE)
704
705
706
707
    {
      streamDefCompType(fileID, cdoCompType);
      streamDefCompLevel(fileID, cdoCompLevel);

Oliver Heidmann's avatar
Oliver Heidmann committed
708
709
710
      if (cdoCompType == CDI_COMPRESS_SZIP
          && (filetype != CDI_FILETYPE_GRB && filetype != CDI_FILETYPE_GRB2 && filetype != CDI_FILETYPE_NC4
              && filetype != CDI_FILETYPE_NC4C))
711
712
        cdoWarning("SZIP compression not available for non GRIB/NetCDF4 data!");

Oliver Heidmann's avatar
Oliver Heidmann committed
713
      if (cdoCompType == CDI_COMPRESS_JPEG && filetype != CDI_FILETYPE_GRB2)
714
715
        cdoWarning("JPEG compression not available for non GRIB2 data!");

Oliver Heidmann's avatar
Oliver Heidmann committed
716
      if (cdoCompType == CDI_COMPRESS_ZIP && (filetype != CDI_FILETYPE_NC4 && filetype != CDI_FILETYPE_NC4C))
717
718
719
720
        cdoWarning("Deflate compression not available for non NetCDF4 data!");
    }
}

Oliver Heidmann's avatar
Oliver Heidmann committed
721
722
static int
pstreamOpenWriteFile(const argument_t *argument, int filetype)
723
{
Oliver Heidmann's avatar
Oliver Heidmann committed
724
  char *filename = (char *) Malloc(strlen(argument->args) + 1);
725

726
  pstream_t *pstreamptr = new pstream_t();
Oliver Heidmann's avatar
Oliver Heidmann committed
727
728
  if (!pstreamptr)
    Error("No memory");
729
730

  int pstreamID = pstreamptr->self;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
731

Oliver Heidmann's avatar
Oliver Heidmann committed
732
733
734
735
736
  if (PSTREAM_Debug)
    Message("file %s", argument->args);

  if (filetype == CDI_UNDEFID)
    filetype = CDI_FILETYPE_GRB;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
737

Oliver Heidmann's avatar
Oliver Heidmann committed
738
  if (cdoInteractive)
739
740
    {
      struct stat stbuf;
741

742
743
      int rstatus = stat(argument->args, &stbuf);
      /* If permanent file already exists, query user whether to overwrite or exit */
Oliver Heidmann's avatar
Oliver Heidmann committed
744
745
      if (rstatus != -1)
        query_user_exit(argument->args);
746
747
    }

Oliver Heidmann's avatar
Oliver Heidmann committed
748
749
  if (processNums() == 1 && ompNumThreads == 1)
    timer_start(timer_write);
750

Uwe Schulzweida's avatar
Uwe Schulzweida committed
751
#if defined(HAVE_LIBPTHREAD)
Oliver Heidmann's avatar
Oliver Heidmann committed
752
  if (cdoLockIO)
753
754
755
    pthread_mutex_lock(&streamMutex);
  else
    pthread_mutex_lock(&streamOpenWriteMutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
756
#endif
757
758

  int fileID = streamOpenWrite(argument->args, filetype);
Oliver Heidmann's avatar
Oliver Heidmann committed
759

Uwe Schulzweida's avatar
Uwe Schulzweida committed
760
#if defined(HAVE_LIBPTHREAD)
Oliver Heidmann's avatar
Oliver Heidmann committed
761
  if (cdoLockIO)
762
763
764
    pthread_mutex_unlock(&streamMutex);
  else
    pthread_mutex_unlock(&streamOpenWriteMutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
765
#endif
Oliver Heidmann's avatar
Oliver Heidmann committed
766
767
768
769
770

  if (processNums() == 1 && ompNumThreads == 1)
    timer_stop(timer_write);
  if (fileID < 0)
    cdiOpenError(fileID, "Open failed on >%s<", argument->args);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
771

772
  cdoDefHistory(fileID, commandLine());
Uwe Schulzweida's avatar
Uwe Schulzweida committed
773

Oliver Heidmann's avatar
Oliver Heidmann committed
774
  if (cdoDefaultByteorder != CDI_UNDEFID)
775
    streamDefByteorder(fileID, cdoDefaultByteorder);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
776

777
  set_comp(fileID, filetype);
778
779
780
781
782
  /*
    if ( cdoDefaultInstID != CDI_UNDEFID )
    streamDefInstID(fileID, cdoDefaultInstID);
  */
  strcpy(filename, argument->args);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
783

Oliver Heidmann's avatar
Oliver Heidmann committed
784
785
786
  pstreamptr->mode = 'w';
  pstreamptr->name = filename;
  pstreamptr->fileID = fileID;
787
  pstreamptr->m_filetype = filetype;
788
789
790
791

  return pstreamID;
}

Oliver Heidmann's avatar
Oliver Heidmann committed
792
793
int
pstreamOpenWrite(const argument_t *argument, int filetype)
794
795
796
797
798
799
800
{
  int pstreamID = -1;

  PSTREAM_INIT();

  int ispipe = strncmp(argument->args, "(pipe", 5) == 0;

Oliver Heidmann's avatar
Oliver Heidmann committed
801
  if (ispipe)
802
803
804
805
806
807
808
    {
      pstreamID = pstreamOpenWritePipe(argument, filetype);
    }
  else
    {
      pstreamID = pstreamOpenWriteFile(argument, filetype);
    }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
809

810
  return pstreamID;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
811
812
}

Oliver Heidmann's avatar
Oliver Heidmann committed
813
814
int
pstreamOpenAppend(const argument_t *argument)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
815
816
817
{
  int pstreamID = -1;

818
  int ispipe = strncmp(argument->args, "(pipe", 5) == 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
819

Oliver Heidmann's avatar
Oliver Heidmann committed
820
  if (ispipe)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
821
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
822
      if (PSTREAM_Debug)
823
824
825
826
        {
          Message("pipe %s", argument->args);
          cdoAbort("this operator doesn't work with pipes!");
        }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
827
828
829
    }
  else
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
830
      char *filename = (char *) Malloc(strlen(argument->args) + 1);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
831

832
      pstream_t *pstreamptr = new pstream_t();
Oliver Heidmann's avatar
Oliver Heidmann committed
833
834
      if (!pstreamptr)
        Error("No memory");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
835
836
837

      pstreamID = pstreamptr->self;

Oliver Heidmann's avatar
Oliver Heidmann committed
838
839
840
841
842
      if (PSTREAM_Debug)
        Message("file %s", argument->args);

      if (processNums() == 1 && ompNumThreads == 1)
        timer_start(timer_write);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
843
#if defined(HAVE_LIBPTHREAD)
Oliver Heidmann's avatar
Oliver Heidmann committed
844
845
      if (cdoLockIO)
        pthread_mutex_lock(&streamMutex);
846
      else
Oliver Heidmann's avatar
Oliver Heidmann committed
847
        pthread_mutex_lock(&streamOpenReadMutex);
848
#endif
Oliver Heidmann's avatar
Oliver Heidmann committed
849

850
      int fileID = streamOpenAppend(argument->args);
Oliver Heidmann's avatar
Oliver Heidmann committed
851

Uwe Schulzweida's avatar
Uwe Schulzweida committed
852
#if defined(HAVE_LIBPTHREAD)
Oliver Heidmann's avatar
Oliver Heidmann committed
853
854
      if (cdoLockIO)
        pthread_mutex_unlock(&streamMutex);
855
      else
Oliver Heidmann's avatar
Oliver Heidmann committed
856
        pthread_mutex_unlock(&streamOpenReadMutex);
857
#endif
Oliver Heidmann's avatar
Oliver Heidmann committed
858
859
860
861
862

      if (processNums() == 1 && ompNumThreads == 1)
        timer_stop(timer_write);
      if (fileID < 0)
        cdiOpenError(fileID, "Open failed on >%s<", argument->args);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
863
864
865
866
      /*
      cdoInqHistory(fileID);
      cdoDefHistory(fileID, commandLine());
      */
867
868
      int filetype = streamInqFiletype(fileID);
      set_comp(fileID, filetype);
Oliver Heidmann's avatar
Oliver Heidmann committed
869

870
      strcpy(filename, argument->args);
871

Oliver Heidmann's avatar
Oliver Heidmann committed
872
873
      pstreamptr->mode = 'a';
      pstreamptr->name = filename;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
874
875
876
      pstreamptr->fileID = fileID;
    }

877
  return pstreamID;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
878
879
}

Oliver Heidmann's avatar
Oliver Heidmann committed
880
881
void
pstreamClose(int pstreamID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
882
{
883
  pstream_t *pstreamptr = pstream_to_pointer(pstreamID);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
884

Oliver Heidmann's avatar
Oliver Heidmann committed
885
  if (pstreamptr == NULL)
886
    Error("Internal problem, stream %d not open!", pstreamID);
887

Oliver Heidmann's avatar
Oliver Heidmann committed
888
  if (pstreamptr->ispipe)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
889
    {
Uwe Schulzweida's avatar
Uwe Schulzweida committed
890
#if defined(HAVE_LIBPTHREAD)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
891
      pipe_t *pipe;
892
      bool lread = false, lwrite = false;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
893
894
      pthread_t threadID = pthread_self();

Oliver Heidmann's avatar
Oliver Heidmann committed
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
      if (pthread_equal(threadID, pstreamptr->rthreadID))
        lread = true;
      else if (pthread_equal(threadID, pstreamptr->wthreadID))
        lwrite = true;
      else
        Error("Internal problem! Close pipe %s", pstreamptr->name);

      if (lread)
        {
          pipe = pstreamptr->pipe;
          pthread_mutex_lock(pipe->mutex);
          pipe->EOP = true;
          if (PSTREAM_Debug)
            Message("%s read closed", pstreamptr->name);
          pthread_mutex_unlock(pipe->mutex);
          pthread_cond_signal(pipe->tsDef);
          pthread_cond_signal(pipe->tsInq);

          pthread_cond_signal(pipe->recInq);

          pthread_mutex_lock(pipe->mutex);
          pstreamptr->isopen = false;
          pthread_mutex_unlock(pipe->mutex);
          pthread_cond_signal(pipe->isclosed);

          pthread_join(pstreamptr->wthreadID, NULL);

          pthread_mutex_lock(pipe->mutex);
          if (pstreamptr->name)
            Free(pstreamptr->name);
          if (pstreamptr->argument)
            {
              argument_t *argument = (argument_t *) (pstreamptr->argument);
              if (argument->argv)
                Free(argument->argv);
              if (argument->args)
                Free(argument->args);
              delete (argument);
            }
934
          vlistDestroy(pstreamptr->m_vlistID);
Oliver Heidmann's avatar
Oliver Heidmann committed
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
          pthread_mutex_unlock(pipe->mutex);

          processAddNvals(pipe->nvals);
          delete (pipe);
          pstream_delete_entry(pstreamptr);
        }
      else if (lwrite)
        {
          pipe = pstreamptr->pipe;
          pthread_mutex_lock(pipe->mutex);
          pipe->EOP = true;
          if (PSTREAM_Debug)
            Message("%s write closed", pstreamptr->name);
          pthread_mutex_unlock(pipe->mutex);
          pthread_cond_signal(pipe->tsDef);
          pthread_cond_signal(pipe->tsInq);

          pthread_mutex_lock(pipe->mutex);
          while (pstreamptr->isopen)
            {
              if (PSTREAM_Debug)
                Message("wait of read close");
              pthread_cond_wait(pipe->isclosed, pipe->mutex);
            }
          pthread_mutex_unlock(pipe->mutex);
        }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
961
962

      processDelStream(pstreamID);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
963
#else
Uwe Schulzweida's avatar
Uwe Schulzweida committed
964
      cdoAbort("Cannot use pipes, pthread support not compiled in!");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
965
966
967
968
#endif
    }
  else
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
969
970
      if (PSTREAM_Debug)
        Message("%s fileID %d", pstreamptr->name, pstreamptr->fileID);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
971

Oliver Heidmann's avatar
Oliver Heidmann committed
972
973
974
975
      if (pstreamptr->mode == 'r')
        {
          processAddNvals(streamNvals(pstreamptr->fileID));
        }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
976

Uwe Schulzweida's avatar
Uwe Schulzweida committed
977
#if defined(HAVE_LIBPTHREAD)