pstream.cc 46.2 KB
Newer Older
Uwe Schulzweida's avatar
Uwe Schulzweida committed
1
2
3
4
/*
  This file is part of CDO. CDO is a collection of Operators to
  manipulate and analyse Climate model Data.

Uwe Schulzweida's avatar
Uwe Schulzweida committed
5
  Copyright (C) 2003-2017 Uwe Schulzweida, <uwe.schulzweida AT mpimet.mpg.de>
Uwe Schulzweida's avatar
Uwe Schulzweida committed
6
7
8
9
10
11
12
13
14
15
16
17
  See COPYING file for copying and redistribution conditions.

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation; version 2 of the License.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.
*/

18

19
#ifdef  HAVE_CONFIG_H
20
#include "config.h"
Uwe Schulzweida's avatar
Uwe Schulzweida committed
21
22
#endif

23
24
25
#include <thread>

#ifdef  _OPENMP
26
#include <omp.h>
Uwe Schulzweida's avatar
Uwe Schulzweida committed
27
28
#endif

Uwe Schulzweida's avatar
Uwe Schulzweida committed
29
30
31
32
#include <stdio.h>
#include <string.h>
#include <stdarg.h>
#include <errno.h>
33
#include <sys/stat.h> /* stat */
Uwe Schulzweida's avatar
Uwe Schulzweida committed
34
35
36
37

FILE *popen(const char *command, const char *type);
int pclose(FILE *stream);

Ralf Mueller's avatar
Ralf Mueller committed
38
#include <cdi.h>
Uwe Schulzweida's avatar
Uwe Schulzweida committed
39
40
41
#include "cdo.h"
#include "cdo_int.h"
#include "modules.h"
42
#include "pstream.h"
Uwe Schulzweida's avatar
Uwe Schulzweida committed
43
44
45
46
#include "pstream_int.h"
#include "util.h"
#include "pipe.h"
#include "error.h"
47
#include "cdoDebugOutput.h"
Uwe Schulzweida's avatar
Uwe Schulzweida committed
48
49


Uwe Schulzweida's avatar
Uwe Schulzweida committed
50
//#define MAX_PSTREAMS 4096
Uwe Schulzweida's avatar
Uwe Schulzweida committed
51

Uwe Schulzweida's avatar
Uwe Schulzweida committed
52
//static int _pstream_max = MAX_PSTREAMS;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
53

54

Uwe Schulzweida's avatar
Uwe Schulzweida committed
55
//static void pstream_initialize(void);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
56

Uwe Schulzweida's avatar
Uwe Schulzweida committed
57
//static bool _pstream_init = false;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
58

Uwe Schulzweida's avatar
Uwe Schulzweida committed
59
#if defined(HAVE_LIBPTHREAD)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
60
61
62
#include <pthread.h>
#include "pthread_debug.h"

63
// TODO: make threadsafe
Uwe Schulzweida's avatar
Uwe Schulzweida committed
64
65
static int pthreadScope = 0;

Oliver Heidmann's avatar
Oliver Heidmann committed
66
static pthread_mutex_t streamOpenReadMutex = PTHREAD_MUTEX_INITIALIZER;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
67
static pthread_mutex_t streamOpenWriteMutex = PTHREAD_MUTEX_INITIALIZER;
Oliver Heidmann's avatar
Oliver Heidmann committed
68
static pthread_mutex_t streamMutex = PTHREAD_MUTEX_INITIALIZER;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
69

Uwe Schulzweida's avatar
Uwe Schulzweida committed
70
//static pthread_once_t _pstream_init_thread = PTHREAD_ONCE_INIT;
71
//static pthread_mutex_t _pstream_mutex;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
72

73
74
75
76
static std::mutex _pstream_map_mutex;
#define PSTREAM_LOCK() _pstream_map_mutex.lock();
#define PSTREAM_UNLOCK() _pstream_map_mutex.unlock();
/*
Oliver Heidmann's avatar
Oliver Heidmann committed
77
78
79
80
81
#define PSTREAM_LOCK() pthread_mutex_lock(&_pstream_mutex)
#define PSTREAM_UNLOCK() pthread_mutex_unlock(&_pstream_mutex)
#define PSTREAM_INIT() \
  if (!_pstream_init)  \
  pthread_once(&_pstream_init_thread, pstream_initialize)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
82

83
*/
Uwe Schulzweida's avatar
Uwe Schulzweida committed
84
85
#else

Oliver Heidmann's avatar
Oliver Heidmann committed
86
87
88
89
90
#define PSTREAM_LOCK()
#define PSTREAM_UNLOCK()
#define PSTREAM_INIT() \
  if (!_pstream_init)  \
  pstream_initialize()
Uwe Schulzweida's avatar
Uwe Schulzweida committed
91
92
93

#endif

94
/*
Oliver Heidmann's avatar
Oliver Heidmann committed
95
96
typedef struct _pstreamPtrToIdx
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
97
  int idx;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
98
  pstream_t *ptr;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
99
100
  struct _pstreamPtrToIdx *next;
} pstreamPtrToIdx;
101
102
*/
/*
Oliver Heidmann's avatar
Oliver Heidmann committed
103
static pstreamPtrToIdx *_pstreamList = NULL;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
104
static pstreamPtrToIdx *_pstreamAvail = NULL;
105
106
107
108
*/
static std::map<int,pstream_t> _pstream_map;
static int next_pstream_id = 1;
/*
Oliver Heidmann's avatar
Oliver Heidmann committed
109
110
static void
pstream_list_new(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
111
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
112
113
  assert(_pstreamList == NULL);

Oliver Heidmann's avatar
Oliver Heidmann committed
114
  _pstreamList = (pstreamPtrToIdx *) Malloc(_pstream_max * sizeof(pstreamPtrToIdx));
Uwe Schulzweida's avatar
Uwe Schulzweida committed
115
}
Oliver Heidmann's avatar
Oliver Heidmann committed
116
117
static void
pstream_list_delete(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
118
{
Oliver Heidmann's avatar
Oliver Heidmann committed
119
120
  if (_pstreamList)
    Free(_pstreamList);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
121
122
}

Oliver Heidmann's avatar
Oliver Heidmann committed
123
124
125
126
static void
pstream_init_pointer(void)
{
  for (int i = 0; i < _pstream_max; ++i)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
127
128
    {
      _pstreamList[i].next = _pstreamList + i + 1;
Oliver Heidmann's avatar
Oliver Heidmann committed
129
130
      _pstreamList[i].idx = i;
      _pstreamList[i].ptr = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
131
132
    }

Oliver Heidmann's avatar
Oliver Heidmann committed
133
  _pstreamList[_pstream_max - 1].next = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
134
135
136

  _pstreamAvail = _pstreamList;
}
137
138
*/

139
 pstream_t *create_pstream()
140
141
{
    PSTREAM_LOCK();
142
143
144
    auto new_entry  = _pstream_map.insert(
            std::make_pair(next_pstream_id, pstream_t(next_pstream_id))
            );
145
    next_pstream_id++;
146
    PSTREAM_UNLOCK();
147

148
    return &new_entry.first->second;
149
}
150

151
152
153
154
155
156
157
158
159
static pstream_t * pstream_to_pointer(int idx)
{
    PSTREAM_LOCK();
    auto pstream_iterator = _pstream_map.find(idx);
    PSTREAM_UNLOCK();
    if(pstream_iterator == _pstream_map.end())
    {
        Error("pstream index %d undefined!", idx);
    }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
160

161
162
163
    return &pstream_iterator->second;
}
/*
Oliver Heidmann's avatar
Oliver Heidmann committed
164
165
static pstream_t *
pstream_to_pointer(int idx)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
166
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
167
  pstream_t *pstreamptr = NULL;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
168

169
  PSTREAM_INIT();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
170

Oliver Heidmann's avatar
Oliver Heidmann committed
171
  if (idx >= 0 && idx < _pstream_max)
172
    //{
173
      PSTREAM_LOCK();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
174
175
176

      pstreamptr = _pstreamList[idx].ptr;

177
      PSTREAM_UNLOCK();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
178
179
    }
  else
180
    Error("pstream index %d undefined!", idx);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
181

182
  return pstreamptr;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
183
}
184
*/
Uwe Schulzweida's avatar
Uwe Schulzweida committed
185
/* Create an index from a pointer */
186
/*
Oliver Heidmann's avatar
Oliver Heidmann committed
187
188
static int
pstream_from_pointer(pstream_t *ptr)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
189
{
190
  int idx = -1;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
191

Oliver Heidmann's avatar
Oliver Heidmann committed
192
  if (ptr)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
193
    {
194
      PSTREAM_LOCK();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
195

Oliver Heidmann's avatar
Oliver Heidmann committed
196
197
198
199
200
201
202
203
      if (_pstreamAvail)
        {
          pstreamPtrToIdx *newptr = _pstreamAvail;
          _pstreamAvail = _pstreamAvail->next;
          newptr->next = 0;
          idx = newptr->idx;
          newptr->ptr = ptr;

204
          if (CdoDebug::PSTREAM)
Oliver Heidmann's avatar
Oliver Heidmann committed
205
206
            Message("Pointer %p has idx %d from pstream list", ptr, idx);
        }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
207
      else
Oliver Heidmann's avatar
Oliver Heidmann committed
208
        Error("Too many open pstreams (limit is %d)!", _pstream_max);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
209

210
      PSTREAM_UNLOCK();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
211
212
    }
  else
213
    Error("Internal problem (pointer %p undefined)", ptr);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
214

215
  return idx;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
216
}
217
*/
Uwe Schulzweida's avatar
Uwe Schulzweida committed
218

219
void pstream_t::init()
Uwe Schulzweida's avatar
Uwe Schulzweida committed
220
{
221
222
223
224
225
226
  isopen = true;
  ispipe = false;
  m_fileID = -1;
  m_vlistID = -1;
  tsID = -1;
  m_filetype = -1;
227
  m_name = "";
228
229
230
231
232
  tsID0 = 0;
  mfiles = 0;
  nfiles = 0;
  varID = -1;
  m_varlist = NULL;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
233
#if defined(HAVE_LIBPTHREAD)
234
235
  argument = NULL;
  pipe = NULL;
Oliver Heidmann's avatar
Oliver Heidmann committed
236
237
//  pstreamptr->rthreadID  = 0;
//  pstreamptr->wthreadID  = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
238
239
#endif
}
240
pstream_t::pstream_t(int p_id) : self(p_id) { init(); }
241
242
243
pstream_t::~pstream_t(){
  //vlistDestroy(m_vlistID);
}
Uwe Schulzweida's avatar
Uwe Schulzweida committed
244

Oliver Heidmann's avatar
Oliver Heidmann committed
245
246
static void
pstream_delete_entry(pstream_t *pstreamptr)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
247
{
248
  int idx = pstreamptr->self;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
249

250
  PSTREAM_LOCK();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
251

252
  _pstream_map.erase(idx);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
253

254
  PSTREAM_UNLOCK();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
255

256
257
  if (CdoDebug::PSTREAM)
    MESSAGE("Removed idx ", idx," from pstream list");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
258
}
259
/*
Oliver Heidmann's avatar
Oliver Heidmann committed
260
261
static void
pstream_initialize(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
262
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
263
#if defined(HAVE_LIBPTHREAD)
264
  // initialize global API mutex lock 
Uwe Schulzweida's avatar
Uwe Schulzweida committed
265
266
267
  pthread_mutex_init(&_pstream_mutex, NULL);
#endif

268
  char *env = getenv("PSTREAM_DEBUG");
Oliver Heidmann's avatar
Oliver Heidmann committed
269
  if (env)
270
    CdoDebug::PSTREAM = atoi(env);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
271
272

  env = getenv("PSTREAM_MAX");
Oliver Heidmann's avatar
Oliver Heidmann committed
273
274
  if (env)
    _pstream_max = atoi(env);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
275

276
  if (CdoDebug::PSTREAM)
277
    Message("PSTREAM_MAX = %d", _pstream_max);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
278
279
280
281
282
283

  pstream_list_new();
  atexit(pstream_list_delete);

  pstream_init_pointer();

284
  _pstream_init = true;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
285
}
286
*/
287
static int pstreamFindID(const char *p_name)
288
{
289
    std::string cur_name;
290
291
    for(auto map_pair :  _pstream_map)
    {
292
293
294
        cur_name = map_pair.second.m_name;
        if(!(cur_name.empty())){
            if(cur_name.compare(p_name) == 0)
295
296
297
298
299
300
301
302
            {
                return map_pair.first;
            }
        }
    }
    return -1;
}
/*
Oliver Heidmann's avatar
Oliver Heidmann committed
303
304
static int
pstreamFindID(const char *name)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
305
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
306
  pstream_t *pstreamptr;
307
  int pstreamID;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
308

Oliver Heidmann's avatar
Oliver Heidmann committed
309
  for (pstreamID = 0; pstreamID < _pstream_max; ++pstreamID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
310
311
312
    {
      pstreamptr = pstream_to_pointer(pstreamID);

Oliver Heidmann's avatar
Oliver Heidmann committed
313
      if (pstreamptr)
314
315
        if (pstreamptr->m_name)
          if (strcmp(pstreamptr->m_name, m_name) == 0)
Oliver Heidmann's avatar
Oliver Heidmann committed
316
            break;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
317
318
    }

Oliver Heidmann's avatar
Oliver Heidmann committed
319
320
  if (pstreamID == _pstream_max)
    pstreamID = -1;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
321

322
  return pstreamID;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
323
}
324
*/
325
326
bool
pstream_t::isPipe()
Uwe Schulzweida's avatar
Uwe Schulzweida committed
327
{
328
  return ispipe;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
329
330
}

Oliver Heidmann's avatar
Oliver Heidmann committed
331
332
333
pthread_t
pCreateReadThread(argument_t *argument)
{
334
335
  pthread_attr_t attr;
  int status = pthread_attr_init(&attr);
Oliver Heidmann's avatar
Oliver Heidmann committed
336
337
  if (status)
    SysError("pthread_attr_init failed for '%s'", argument->operatorName.c_str());
338
  status = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
Oliver Heidmann's avatar
Oliver Heidmann committed
339
340
  if (status)
    SysError("pthread_attr_setdetachstate failed for '%s'", argument->operatorName.c_str());
341
342
343
344
345
346
347
  /*
    param.sched_priority = 0;
    status = pthread_attr_setschedparam(&attr, &param);
    if ( status ) SysError("pthread_attr_setschedparam failed for '%s'", newarg+1);
  */
  /* status = pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED); */
  /* if ( status ) SysError("pthread_attr_setinheritsched failed for '%s'", newarg+1); */
348

349
  pthread_attr_getscope(&attr, &pthreadScope);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
350

351
352
353
354
355
356
357
  /* status = pthread_attr_setscope(&attr, PTHREAD_SCOPE_PROCESS); */
  /* if ( status ) SysError("pthread_attr_setscope failed for '%s'", newarg+1); */
  /* If system scheduling scope is specified, then the thread is scheduled against all threads in the system */
  /* pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); */

  size_t stacksize = 0;
  status = pthread_attr_getstacksize(&attr, &stacksize);
Oliver Heidmann's avatar
Oliver Heidmann committed
358
  if (stacksize < 2097152)
359
360
361
    {
      stacksize = 2097152;
      pthread_attr_setstacksize(&attr, stacksize);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
362
    }
363
364

  pthread_t thrID;
365
  int rval = pthread_create(&thrID, &attr, operatorModule(argument->operatorName.c_str()), argument);
Oliver Heidmann's avatar
Oliver Heidmann committed
366
  if (rval != 0)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
367
    {
368
      errno = rval;
369
      SysError("pthread_create failed for '%s'", argument->operatorName.c_str());
370
    }
371
372
373
  return thrID;
}

374
void
375
pstream_t::pstreamOpenReadPipe(const char *pipename)
376
377
{
#if defined(HAVE_LIBPTHREAD)
378
  // int pstreamID = pstreamptr->self;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
379

380
  ispipe = true;
381
  m_name = pipename;
382
383
384
  rthreadID = pthread_self();
  pipe = new pipe_t();
  pipe->name = std::string(pipename);
Oliver Heidmann's avatar
Oliver Heidmann committed
385

386
    /* Free(operatorName); */
387
  /*      pipeInqInfo(pstreamID); */
388
389
  if (CdoDebug::PSTREAM)
    MESSAGE("pipe ", pipename, " %s");
390
391
392
393
394
#else
  cdoAbort("Cannot use pipes, pthread support not compiled in!");
#endif
}

395
void pstream_t::createFilelist(const char * p_args)
396
397
{
  size_t i;
398
  size_t len = strlen(p_args);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
399

Oliver Heidmann's avatar
Oliver Heidmann committed
400
  for (i = 0; i < len; i++)
401
    if (p_args[i] == ':')
Oliver Heidmann's avatar
Oliver Heidmann committed
402
      break;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
403

Oliver Heidmann's avatar
Oliver Heidmann committed
404
  if (i < len)
405
406
407
    {
      int nfiles = 1, j;

408
      const char *pch = &p_args[i + 1];
Oliver Heidmann's avatar
Oliver Heidmann committed
409
      len -= (i + 1);
410
      if (len && (strncmp(p_args, "filelist:", 9) == 0 || strncmp(p_args, "flist:", 6) == 0))
411
        {
Oliver Heidmann's avatar
Oliver Heidmann committed
412
413
414
          for (i = 0; i < len; i++)
            if (pch[i] == ',')
              nfiles++;
415

Oliver Heidmann's avatar
Oliver Heidmann committed
416
          if (nfiles == 1)
417
418
419
420
            {
              char line[4096];
              FILE *fp, *fp2;
              fp = fopen(pch, "r");
Oliver Heidmann's avatar
Oliver Heidmann committed
421
422
              if (fp == NULL)
                {
423
                  cdoAbort("Open failed on %s", pch);
Oliver Heidmann's avatar
Oliver Heidmann committed
424
425
426
427
428
                }
              if (cdoVerbose)
                {
                  cdoPrint("Reading file names from %s", pch);
                }
429
430
              /* find number of files */
              nfiles = 0;
Oliver Heidmann's avatar
Oliver Heidmann committed
431
              while (readline(fp, line, 4096))
432
                {
Oliver Heidmann's avatar
Oliver Heidmann committed
433
434
                  if (line[0] == '#' || line[0] == '\0' || line[0] == ' ')
                    continue;
435

Oliver Heidmann's avatar
Oliver Heidmann committed
436
437
438
                  fp2 = fopen(line, "r");
                  if (fp2 == NULL)
                    cdoAbort("Open failed on %s", line);
439
440
                  fclose(fp2);
                  nfiles++;
Oliver Heidmann's avatar
Oliver Heidmann committed
441
                  if (cdoVerbose)
442
443
444
                    cdoPrint("File number %d is %s", nfiles, line);
                }

Oliver Heidmann's avatar
Oliver Heidmann committed
445
446
              if (nfiles == 0)
                cdoAbort("No imput file found in %s", pch);
447

448
              mfiles = nfiles;
449
              m_mfnames.resize(nfiles);
450
451
452
              rewind(fp);

              nfiles = 0;
Oliver Heidmann's avatar
Oliver Heidmann committed
453
              while (readline(fp, line, 4096))
454
                {
Oliver Heidmann's avatar
Oliver Heidmann committed
455
456
                  if (line[0] == '#' || line[0] == '\0' || line[0] == ' ')
                    continue;
457

458
                  m_mfnames[nfiles] = line;
459
460
461
462
463
464
465
466
467
                  nfiles++;
                }

              fclose(fp);
            }
          else
            {
              char line[65536];

468
              mfiles = nfiles;
469
              m_mfnames.resize(nfiles);
470

471
              strcpy(line, pch);
Oliver Heidmann's avatar
Oliver Heidmann committed
472
473
474
475
              for (i = 0; i < len; i++)
                {
                  if (line[i] == ',')
                    {
476
                      line[i] = 0;
Oliver Heidmann's avatar
Oliver Heidmann committed
477
478
                    }
                }
479
              i = 0;
Oliver Heidmann's avatar
Oliver Heidmann committed
480
              for (j = 0; j < nfiles; j++)
481
                {
482
                  m_mfnames[j] = line[i];
483
484
485
486
                  i += strlen(&line[i]) + 1;
                }
            }
        }
487
      else if (len && strncmp(p_args, "ls:", 3) == 0)
488
489
490
491
492
        {
          char line[4096];
          char command[4096];
          char *fnames[16384];
          FILE *pfp;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
493

494
495
496
497
          strcpy(command, "ls ");
          strcat(command, pch);

          pfp = popen(command, "r");
Oliver Heidmann's avatar
Oliver Heidmann committed
498
          if (pfp == 0)
499
500
501
            SysError("popen %s failed", command);

          nfiles = 0;
Oliver Heidmann's avatar
Oliver Heidmann committed
502
          while (readline(pfp, line, 4096))
503
            {
Oliver Heidmann's avatar
Oliver Heidmann committed
504
505
              if (nfiles >= 16384)
                cdoAbort("Too many input files (limit: 16384)");
506
507
508
509
510
              fnames[nfiles++] = strdupx(line);
            }

          pclose(pfp);

511
          mfiles = nfiles;
512
          m_mfnames.resize(nfiles);
513

Oliver Heidmann's avatar
Oliver Heidmann committed
514
          for (j = 0; j < nfiles; j++)
515
            m_mfnames[j] = std::string(fnames[j]);
516
517
518
519
        }
    }
}

520
void
521
pstream_t::pstreamOpenReadFile(const char* p_args)
Oliver Heidmann's avatar
Oliver Heidmann committed
522
{
523
  createFilelist(p_args);
524

525
  std::string filename; 
Uwe Schulzweida's avatar
Uwe Schulzweida committed
526

527
  if (mfiles)
528
    {
529
      filename = m_mfnames[0];
530
      nfiles = 1;
531
532
533
    }
  else
    {
534
     filename = std::string(p_args);
535
536
    }

537
538
  if (CdoDebug::PSTREAM)
    MESSAGE("file ", filename.c_str());
Uwe Schulzweida's avatar
Uwe Schulzweida committed
539

Uwe Schulzweida's avatar
Uwe Schulzweida committed
540
#if defined(HAVE_LIBPTHREAD)
Oliver Heidmann's avatar
Oliver Heidmann committed
541
  if (cdoLockIO)
542
543
544
    pthread_mutex_lock(&streamMutex);
  else
    pthread_mutex_lock(&streamOpenReadMutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
545
#endif
546
  int fileID = streamOpenRead(filename.c_str());
Oliver Heidmann's avatar
Oliver Heidmann committed
547
  if (fileID < 0)
548
    {
549
      isopen = false;
550
      cdiOpenError(fileID, "Open failed on >%s<", filename.c_str());
551
    }
Oliver Heidmann's avatar
Oliver Heidmann committed
552
553

  if (cdoDefaultFileType == CDI_UNDEFID)
554
555
556
557
558
559
    cdoDefaultFileType = streamInqFiletype(fileID);
  /*
    if ( cdoDefaultInstID == CDI_UNDEFID )
    cdoDefaultInstID = streamInqInstID(fileID);
  */
  cdoInqHistory(fileID);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
560
#if defined(HAVE_LIBPTHREAD)
Oliver Heidmann's avatar
Oliver Heidmann committed
561
  if (cdoLockIO)
562
563
564
    pthread_mutex_unlock(&streamMutex);
  else
    pthread_mutex_unlock(&streamOpenReadMutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
565
#endif
Uwe Schulzweida's avatar
Uwe Schulzweida committed
566

567
  mode = 'r';
568
  m_name = filename;
569
  m_fileID = fileID;
570
571
}

572
573
574
575
576
577
void createPipeName(char *pipename, int pnlen)
{

  snprintf(pipename, pnlen, "(pipe%d.%d)", processSelf().m_ID + 1, processInqChildNum() + 1);
}

Oliver Heidmann's avatar
Oliver Heidmann committed
578
579
int
pstreamOpenRead(const argument_t *argument)
580
{
581
  if(CdoDebug::PSTREAM)
582
  {
583
584
      MESSAGE("Opening new pstream for reading with argument:");
      MESSAGE(print_argument((argument_t*)argument));
585
  }
586

587
  pstream_t *pstreamptr = create_pstream();
Oliver Heidmann's avatar
Oliver Heidmann committed
588
589
  if (!pstreamptr)
    Error("No memory");
590
591
592
593
594
595
596
597
598

  int pstreamID = pstreamptr->self;

  int ispipe = argument->args[0] == '-';
  /*
  printf("pstreamOpenRead: args >%s<\n", argument->args);
  for ( int i = 0; i < argument->argc; ++i )
    printf("pstreamOpenRead: arg %d >%s<\n", i, argument->argv[i]);
  */
Oliver Heidmann's avatar
Oliver Heidmann committed
599
  if (ispipe)
600
    {
601
602
603
604
605
606
607
608
609
610
611
612
      size_t pnlen = 16;
      char *pipename = (char *) Malloc(pnlen);
      createPipeName(pipename, pnlen);
      argument_t * newargument = pipe_argument_new(argument, pipename, pnlen);
      pstreamptr->pstreamOpenReadPipe(pipename);
      pCreateReadThread(newargument);
      if (!cdoSilentMode)
      {
        cdoPrint("Started child process \"%s\".", newargument->args + 1);
      }

      processAddInputStream(pstreamptr);
613
614
615
    }
  else
    {
616
      pstreamptr->pstreamOpenReadFile(argument->args);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
617
618
    }

Oliver Heidmann's avatar
Oliver Heidmann committed
619
620
621
  if (pstreamID < 0)
    cdiOpenError(pstreamID, "Open failed on >%s<", argument->args);

622
  return pstreamID;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
623
624
}

Oliver Heidmann's avatar
Oliver Heidmann committed
625
626
static void
query_user_exit(const char *argument)
627
{
Oliver Heidmann's avatar
Oliver Heidmann committed
628
/* modified code from NCO */
629
630
631
632
#define USR_RPL_MAX_LNG 10 /* Maximum length for user reply */
#define USR_RPL_MAX_NBR 10 /* Maximum number of chances for user to reply */
  char usr_rpl[USR_RPL_MAX_LNG];
  int usr_rpl_int;
Oliver Heidmann's avatar
Oliver Heidmann committed
633
  short nbr_itr = 0;
634
635
636
  size_t usr_rpl_lng = 0;

  /* Initialize user reply string */
Oliver Heidmann's avatar
Oliver Heidmann committed
637
638
  usr_rpl[0] = 'z';
  usr_rpl[1] = '\0';
639

Oliver Heidmann's avatar
Oliver Heidmann committed
640
  while (!(usr_rpl_lng == 1 && (*usr_rpl == 'o' || *usr_rpl == 'O' || *usr_rpl == 'e' || *usr_rpl == 'E')))
641
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
      if (nbr_itr++ > USR_RPL_MAX_NBR)
        {
          (void) fprintf(stdout,
                         "\n%s: ERROR %d failed attempts to obtain valid interactive input.\n",
                         processInqPrompt(),
                         nbr_itr - 1);
          exit(EXIT_FAILURE);
        }

      if (nbr_itr > 1)
        (void) fprintf(stdout, "%s: ERROR Invalid response.\n", processInqPrompt());
      (void) fprintf(stdout,
                     "%s: %s exists ---`e'xit, or `o'verwrite (delete existing file) (e/o)? ",
                     processInqPrompt(),
                     argument);
      (void) fflush(stdout);
      if (fgets(usr_rpl, USR_RPL_MAX_LNG, stdin) == NULL)
        continue;
660
661
662

      /* Ensure last character in input string is \n and replace that with \0 */
      usr_rpl_lng = strlen(usr_rpl);
Oliver Heidmann's avatar
Oliver Heidmann committed
663
664
665
666
667
668
      if (usr_rpl_lng >= 1)
        if (usr_rpl[usr_rpl_lng - 1] == '\n')
          {
            usr_rpl[usr_rpl_lng - 1] = '\0';
            usr_rpl_lng--;
          }
669
670
671
    }

  /* Ensure one case statement for each exit condition in preceding while loop */
Oliver Heidmann's avatar
Oliver Heidmann committed
672
673
  usr_rpl_int = (int) usr_rpl[0];
  switch (usr_rpl_int)
674
675
    {
    case 'E':
Oliver Heidmann's avatar
Oliver Heidmann committed
676
    case 'e': exit(EXIT_SUCCESS); break;
677
    case 'O':
Oliver Heidmann's avatar
Oliver Heidmann committed
678
679
    case 'o': break;
    default: exit(EXIT_FAILURE); break;
680
681
682
    } /* end switch */
}

Oliver Heidmann's avatar
Oliver Heidmann committed
683
684
static int
pstreamOpenWritePipe(const argument_t *argument, int filetype)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
685
{
686
  int pstreamID = -1;
Oliver Heidmann's avatar
Oliver Heidmann committed
687

Uwe Schulzweida's avatar
Uwe Schulzweida committed
688
#if defined(HAVE_LIBPTHREAD)
689
  if (CdoDebug::PSTREAM)
690
    {
691
      MESSAGE("pipe ", argument->args);
692
    }
693
  pstreamID = pstreamFindID(argument->args);
Oliver Heidmann's avatar
Oliver Heidmann committed
694
  if (pstreamID == -1)
695
696
697
    {
      Error("%s is not open!", argument->args);
    }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
698

699
  pstream_t *pstreamptr = pstream_to_pointer(pstreamID);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
700

701
  pstreamptr->wthreadID = pthread_self();
702
  pstreamptr->m_filetype = filetype;
703
  processAddOutputStream(pstreamptr);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
704
705
#endif

706
707
  return pstreamID;
}
Uwe Schulzweida's avatar
Uwe Schulzweida committed
708

Oliver Heidmann's avatar
Oliver Heidmann committed
709
710
static void
set_comp(int fileID, int filetype)
711
{
Oliver Heidmann's avatar
Oliver Heidmann committed
712
  if (cdoCompress)
713
    {
Oliver Heidmann's avatar
Oliver Heidmann committed
714
      if (filetype == CDI_FILETYPE_GRB)
715
        {
Oliver Heidmann's avatar
Oliver Heidmann committed
716
          cdoCompType = CDI_COMPRESS_SZIP;
717
718
          cdoCompLevel = 0;
        }
Oliver Heidmann's avatar
Oliver Heidmann committed
719
      else if (filetype == CDI_FILETYPE_NC4 || filetype == CDI_FILETYPE_NC4C)
720
        {
Oliver Heidmann's avatar
Oliver Heidmann committed
721
          cdoCompType = CDI_COMPRESS_ZIP;
722
723
724
725
          cdoCompLevel = 1;
        }
    }

Oliver Heidmann's avatar
Oliver Heidmann committed
726
  if (cdoCompType != CDI_COMPRESS_NONE)
727
728
729
730
    {
      streamDefCompType(fileID, cdoCompType);
      streamDefCompLevel(fileID, cdoCompLevel);

Oliver Heidmann's avatar
Oliver Heidmann committed
731
732
733
      if (cdoCompType == CDI_COMPRESS_SZIP
          && (filetype != CDI_FILETYPE_GRB && filetype != CDI_FILETYPE_GRB2 && filetype != CDI_FILETYPE_NC4
              && filetype != CDI_FILETYPE_NC4C))
734
735
        cdoWarning("SZIP compression not available for non GRIB/NetCDF4 data!");

Oliver Heidmann's avatar
Oliver Heidmann committed
736
      if (cdoCompType == CDI_COMPRESS_JPEG && filetype != CDI_FILETYPE_GRB2)
737
738
        cdoWarning("JPEG compression not available for non GRIB2 data!");

Oliver Heidmann's avatar
Oliver Heidmann committed
739
      if (cdoCompType == CDI_COMPRESS_ZIP && (filetype != CDI_FILETYPE_NC4 && filetype != CDI_FILETYPE_NC4C))
740
741
742
743
        cdoWarning("Deflate compression not available for non NetCDF4 data!");
    }
}

Oliver Heidmann's avatar
Oliver Heidmann committed
744
745
static int
pstreamOpenWriteFile(const argument_t *argument, int filetype)
746
{
Oliver Heidmann's avatar
Oliver Heidmann committed
747
  char *filename = (char *) Malloc(strlen(argument->args) + 1);
748

749
  pstream_t *pstreamptr = create_pstream();
Oliver Heidmann's avatar
Oliver Heidmann committed
750
751
  if (!pstreamptr)
    Error("No memory");
752
753

  int pstreamID = pstreamptr->self;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
754

755
756
  if (CdoDebug::PSTREAM)
    MESSAGE("file ", argument->args);
Oliver Heidmann's avatar
Oliver Heidmann committed
757
758
759

  if (filetype == CDI_UNDEFID)
    filetype = CDI_FILETYPE_GRB;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
760

Oliver Heidmann's avatar
Oliver Heidmann committed
761
  if (cdoInteractive)
762
763
    {
      struct stat stbuf;
764

765
766
      int rstatus = stat(argument->args, &stbuf);
      /* If permanent file already exists, query user whether to overwrite or exit */
Oliver Heidmann's avatar
Oliver Heidmann committed
767
768
      if (rstatus != -1)
        query_user_exit(argument->args);
769
770
    }

Oliver Heidmann's avatar
Oliver Heidmann committed
771
772
  if (processNums() == 1 && ompNumThreads == 1)
    timer_start(timer_write);
773

Uwe Schulzweida's avatar
Uwe Schulzweida committed
774
#if defined(HAVE_LIBPTHREAD)
Oliver Heidmann's avatar
Oliver Heidmann committed
775
  if (cdoLockIO)
776
777
778
    pthread_mutex_lock(&streamMutex);
  else
    pthread_mutex_lock(&streamOpenWriteMutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
779
#endif
780
781

  int fileID = streamOpenWrite(argument->args, filetype);
Oliver Heidmann's avatar
Oliver Heidmann committed
782

Uwe Schulzweida's avatar
Uwe Schulzweida committed
783
#if defined(HAVE_LIBPTHREAD)
Oliver Heidmann's avatar
Oliver Heidmann committed
784
  if (cdoLockIO)
785
786
787
    pthread_mutex_unlock(&streamMutex);
  else
    pthread_mutex_unlock(&streamOpenWriteMutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
788
#endif
Oliver Heidmann's avatar
Oliver Heidmann committed
789
790
791
792
793

  if (processNums() == 1 && ompNumThreads == 1)
    timer_stop(timer_write);
  if (fileID < 0)
    cdiOpenError(fileID, "Open failed on >%s<", argument->args);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
794

795
  cdoDefHistory(fileID, commandLine());
Uwe Schulzweida's avatar
Uwe Schulzweida committed
796

Oliver Heidmann's avatar
Oliver Heidmann committed
797
  if (cdoDefaultByteorder != CDI_UNDEFID)
798
    streamDefByteorder(fileID, cdoDefaultByteorder);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
799

800
  set_comp(fileID, filetype);
801
802
803
804
805
  /*
    if ( cdoDefaultInstID != CDI_UNDEFID )
    streamDefInstID(fileID, cdoDefaultInstID);
  */
  strcpy(filename, argument->args);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
806

Oliver Heidmann's avatar
Oliver Heidmann committed
807
  pstreamptr->mode = 'w';
808
  pstreamptr->m_name = filename;
809
  pstreamptr->m_fileID = fileID;
810
  pstreamptr->m_filetype = filetype;
811
812
813
814

  return pstreamID;
}

Oliver Heidmann's avatar
Oliver Heidmann committed
815
816
int
pstreamOpenWrite(const argument_t *argument, int filetype)
817
818
819
{
  int pstreamID = -1;

820
  //PSTREAM_INIT();
821
822
823

  int ispipe = strncmp(argument->args, "(pipe", 5) == 0;

Oliver Heidmann's avatar
Oliver Heidmann committed
824
  if (ispipe)
825
826
827
828
829
830
831
    {
      pstreamID = pstreamOpenWritePipe(argument, filetype);
    }
  else
    {
      pstreamID = pstreamOpenWriteFile(argument, filetype);
    }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
832

833
  return pstreamID;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
834
835
}

Oliver Heidmann's avatar
Oliver Heidmann committed
836
837
int
pstreamOpenAppend(const argument_t *argument)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
838
{
839
  int ispipe = strncmp(argument->args, "(pipe", 5) == 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
840

Oliver Heidmann's avatar
Oliver Heidmann committed
841
  if (ispipe)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
842
    {
843
      if (CdoDebug::PSTREAM)
844
        {
845
          MESSAGE("pipe ", argument->args);
846
        }
847
      cdoAbort("this operator doesn't work with pipes!");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
848
849
    }

850
  pstream_t *pstreamptr = create_pstream();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
851

852
  if (!pstreamptr)
853
    ERROR("No memory");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
854

855
856
  if (CdoDebug::PSTREAM)
    MESSAGE("file ", argument->args);
Oliver Heidmann's avatar
Oliver Heidmann committed
857

858
859
860
861
862
863
864
865
866
867
868
  pstreamptr->openAppend(argument->args);

  return pstreamptr->self;
}
void
pstream_t::openAppend(const char *p_filename)
{
  if (processNums() == 1 && ompNumThreads == 1)
    {
      timer_start(timer_write);
    }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
869
#if defined(HAVE_LIBPTHREAD)
870
871
872
873
874
875
876
877
  if (cdoLockIO)
    {
      pthread_mutex_lock(&streamMutex);
    }
  else
    {
      pthread_mutex_lock(&streamOpenReadMutex);
    }
878
#endif
Oliver Heidmann's avatar
Oliver Heidmann committed
879

880
  int fileID = streamOpenAppend(p_filename);
Oliver Heidmann's avatar
Oliver Heidmann committed
881

Uwe Schulzweida's avatar
Uwe Schulzweida committed
882
#if defined(HAVE_LIBPTHREAD)
883
884
885
886
887
888
889
890
  if (cdoLockIO)
    {
      pthread_mutex_unlock(&streamMutex);
    }
  else
    {
      pthread_mutex_unlock(&streamOpenReadMutex);
    }
891
#endif
892
893
894
895
896
897
898
  if (processNums() == 1 && ompNumThreads == 1)
    {
      timer_stop(timer_write);
    }
  if (fileID < 0)
    {
      cdiOpenError(fileID, "Open failed on >%s<", p_filename);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
899
    }
900
901
902
903
904
905
  /*
  cdoInqHistory(fileID);
  cdoDefHistory(fileID, commandLine());
  */
  int filetype = streamInqFiletype(fileID);
  set_comp(fileID, filetype);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
906

Oliver Heidmann's avatar