process.cc 32.6 KB
Newer Older
Uwe Schulzweida's avatar
Uwe Schulzweida committed
1
2
3
4
/*
  This file is part of CDO. CDO is a collection of Operators to
  manipulate and analyse Climate model Data.

Uwe Schulzweida's avatar
Uwe Schulzweida committed
5
  Copyright (C) 2003-2017 Uwe Schulzweida, <uwe.schulzweida AT mpimet.mpg.de>
Uwe Schulzweida's avatar
Uwe Schulzweida committed
6
7
8
9
10
11
12
13
14
15
16
17
  See COPYING file for copying and redistribution conditions.

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation; version 2 of the License.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.
*/

Uwe Schulzweida's avatar
Uwe Schulzweida committed
18
#if defined(HAVE_CONFIG_H)
19
#include "config.h"
Uwe Schulzweida's avatar
Uwe Schulzweida committed
20
21
#endif

Uwe Schulzweida's avatar
Uwe Schulzweida committed
22
#if defined(HAVE_PTHREAD_H)
23
#include <pthread.h>
Uwe Schulzweida's avatar
Uwe Schulzweida committed
24
25
26
27
#endif

#include <stdio.h>
#include <string.h>
Uwe Schulzweida's avatar
Uwe Schulzweida committed
28

Uwe Schulzweida's avatar
Uwe Schulzweida committed
29
#if defined(HAVE_GLOB_H)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
30
#include <glob.h>
Uwe Schulzweida's avatar
Uwe Schulzweida committed
31
#endif
32
33
34
#if defined(HAVE_WORDEXP_H)
#include <wordexp.h>
#endif
Uwe Schulzweida's avatar
Uwe Schulzweida committed
35
36
37
38
39
40

#include "cdo.h"
#include "cdo_int.h"
#include "error.h"
#include "modules.h"
#include "util.h"
41
#include "pstream.h"
Uwe Schulzweida's avatar
Uwe Schulzweida committed
42
#include "dmemory.h"
43
#include "pthread.h"
44
#include "cdoDebugOutput.h"
Uwe Schulzweida's avatar
Uwe Schulzweida committed
45

46
#include <algorithm>
47
#include <map>
48
#include <stack>
49

Uwe Schulzweida's avatar
Uwe Schulzweida committed
50
#if defined(HAVE_LIBPTHREAD)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
51
52
53
pthread_mutex_t processMutex = PTHREAD_MUTEX_INITIALIZER;
#endif

54
static process_t *root_process;
55
std::map<int, process_t> Process;
56
57
58
59

static int NumProcess = 0;
static int NumProcessActive = 0;

60
process_t::process_t(int p_ID, const char *operatorCommand) : m_ID(p_ID)
61
{
62
63
64
65
  initProcess();
  operatorName = getOperatorName(operatorCommand);
  setOperatorArgv(operatorCommand);
  m_operatorCommand = operatorCommand;
66

67
68
69
  defPrompt();  // has to be called after get operatorName

  m_module = getModule(operatorName);
70
71
72
}

void
73
process_t::setOperatorArgv(const char *operatorArguments)
74
{
75
  if (operatorArguments)
76
    {
77
      char *operatorArg = strdup(operatorArguments);
78
79
80
81
82
83
84
85
86
87
88
89
      // fprintf(stderr, "processDefArgument: %d %s\n", oargc, operatorArg);

      while ((operatorArg = strchr(operatorArg, ',')) != NULL)
        {
          *operatorArg = '\0';
          *operatorArg++;
          if (strlen(operatorArg))
            {
              oargv.push_back(operatorArg);
            }
        }
    }
90
  oargc = oargv.size();
91
}
92
93
94

void
process_t::initProcess()
Uwe Schulzweida's avatar
Uwe Schulzweida committed
95
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
96
#if defined(HAVE_LIBPTHREAD)
97
98
  threadID = pthread_self();
  l_threadID = 1;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
99
#endif
100
  nchild = 0;
101

102
103
104
105
  cdoProcessTime(&s_utime, &s_stime);
  a_utime = 0;
  a_stime = 0;
  cputime = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
106
107
108
  nvals = 0;
  nvars = 0;
  ntimesteps = 0;
109

110
  m_streamCnt = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
111

112
  oargc = 0;
113
  m_operatorCommand = "UNINITALIZED";
114
  operatorArg = "UNINITALIZED";
Uwe Schulzweida's avatar
Uwe Schulzweida committed
115

116
117
118
119
120
121
122
123
124
125
126
127
128
129
  noper = 0;
}

int
process_t::getInStreamCnt()
{
  return inputStreams.size();
}
int
process_t::getOutStreamCnt()
{
  return outputStreams.size();
}

130
process_t *
131
processCreate(const char *command)
132
133
134
135
136
{
#if defined(HAVE_LIBPTHREAD)
  pthread_mutex_lock(&processMutex);
#endif

137
138
139
  if(CdoDebug::PROCESS){
    MESSAGE("Creating new process for command: ", command);
  }
140
  int processID = NumProcess++;
141
142
143
144
145
  auto success = Process.insert(std::make_pair(processID, process_t(processID, command)));
  if(success.second == false)
  {
    ERROR("Process ", processID," could not be created");
  }
146
147

  NumProcessActive++;
148
149
150
  if(CdoDebug::PROCESS){
    MESSAGE("NumProcessActive: ", NumProcessActive);
  }
151
152
153
154
155
156
157
158

#if defined(HAVE_LIBPTHREAD)
  pthread_mutex_unlock(&processMutex);
#endif

  if (processID >= MAX_PROCESS)
    Error("Limit of %d processes reached!", MAX_PROCESS);

159
  return &success.first->second;
160
161
}

162
163
process_t &
processSelf(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
164
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
165
#if defined(HAVE_LIBPTHREAD)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
166
  pthread_t thID = pthread_self();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
167

Uwe Schulzweida's avatar
Uwe Schulzweida committed
168
169
  pthread_mutex_lock(&processMutex);

170
  for (auto &id_process_pair : Process)
171
172
173
174
175
176
177
178
    if (id_process_pair.second.l_threadID)
      {
        if (pthread_equal(id_process_pair.second.threadID, thID))
          {
            pthread_mutex_unlock(&processMutex);
            return id_process_pair.second;
          }
      }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
179

180
  pthread_mutex_unlock(&processMutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
181

Uwe Schulzweida's avatar
Uwe Schulzweida committed
182
#endif
183
  return Process.find(0)->second;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
184
185
}

186
187
int
processNums(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
188
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
189
#if defined(HAVE_LIBPTHREAD)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
190
191
192
  pthread_mutex_lock(&processMutex);
#endif

193
  int pnums = Process.size();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
194

Uwe Schulzweida's avatar
Uwe Schulzweida committed
195
#if defined(HAVE_LIBPTHREAD)
196
  pthread_mutex_unlock(&processMutex);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
197
198
#endif

199
  return pnums;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
200
201
}

202
203
int
processNumsActive(void)
204
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
205
#if defined(HAVE_LIBPTHREAD)
206
207
208
  pthread_mutex_lock(&processMutex);
#endif

Uwe Schulzweida's avatar
cleanup    
Uwe Schulzweida committed
209
  int pnums = NumProcessActive;
210

Uwe Schulzweida's avatar
Uwe Schulzweida committed
211
#if defined(HAVE_LIBPTHREAD)
212
  pthread_mutex_unlock(&processMutex);
213
214
#endif

215
  return pnums;
216
217
}

218
void
219
processAddNvals(size_t nvals)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
220
{
221
  processSelf().nvals += nvals;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
222
223
}

224
size_t
225
processInqNvals(int processID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
226
{
227
  return Process.find(processID)->second.nvals;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
228
229
}

230
231
void
processAddOutputStream(pstream_t *p_pstream_ptr)
232
{
233
  process_t &process = processSelf();
234

235
  int sindex = process.getOutStreamCnt();
236

237
238
  if (sindex >= MAX_STREAM)
    Error("Limit of %d output streams per process reached (processID = %d)!", MAX_STREAM, process.m_ID);
239

240
  process.outputStreams.push_back(p_pstream_ptr);
241
}
Uwe Schulzweida's avatar
Uwe Schulzweida committed
242

243
244
void
processAddInputStream(pstream_t *p_pstream_ptr)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
245
{
246
  process_t &process = processSelf();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
247

248
  if (p_pstream_ptr->isPipe())
249
250
251
    {
      process.nchild++;
    }
252
253
254
  else
  {
    }
255
  int sindex = process.getInStreamCnt();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
256

257
258
  if (sindex >= MAX_STREAM)
    Error("Limit of %d input streams per process reached (processID = %d)!", MAX_STREAM, process.m_ID);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
259

260
  process.inputStreams.push_back(p_pstream_ptr);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
261
262
}

263
264
void
processDelStream(int streamID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
265
{
266
  UNUSED(streamID);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
267
268
}

269
270
void
processDefCputime(int processID, double cputime)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
271
{
272
  Process.find(processID)->second.cputime = cputime;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
273
274
}

275
276
double
processInqCputime(int processID)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
277
{
278
  return Process.find(processID)->second.cputime;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
279
280
}

281
282
void
processStartTime(double *utime, double *stime)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
283
{
284
  process_t &process = processSelf();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
285

286
287
  *utime = process.s_utime;
  *stime = process.s_stime;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
288
289
}

290
291
void
processEndTime(double *utime, double *stime)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
292
{
293
294
295
  process_t &process = Process.find(0)->second;
  *utime = process.a_utime;
  *stime = process.a_stime;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
296
297
}

298
299
void
processAccuTime(double utime, double stime)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
300
{
301
302
303
  process_t &process = Process.find(0)->second;
  process.a_utime += utime;
  process.a_stime += stime;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
304
305
}

306
307
int
processInqOutputStreamNum(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
308
{
309
  return processSelf().getOutStreamCnt();
310
311
}

312
313
int
processInqInputStreamNum(void)
314
{
315
  return processSelf().getInStreamCnt();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
316
317
}

318
319
int
processInqChildNum(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
320
{
321
  return processSelf().nchild;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
322
323
}

324
325
pstream_t *
processInqOutputStream(int streamindex)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
326
{
327
  return (processSelf().outputStreams[streamindex]);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
328
}
329
330
pstream_t *
processInqInputStream(int streamindex)
331
{
332
  return (processSelf().inputStreams[streamindex]);
333
}
Uwe Schulzweida's avatar
Uwe Schulzweida committed
334

335
336
const char *
processInqOpername2(process_t &process)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
337
{
338
  return process.operatorName;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
339
340
}

341
342
const char *
processInqOpername(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
343
{
344
  return processSelf().operatorName;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
345
346
}

347
348
void
process_t::defPrompt()
Uwe Schulzweida's avatar
Uwe Schulzweida committed
349
{
350
  if (m_ID == 0)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
351
    sprintf(prompt, "%s %s", CDO_progname, operatorName);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
352
  else
Uwe Schulzweida's avatar
Uwe Schulzweida committed
353
    sprintf(prompt, "%s(%d) %s", CDO_progname, m_ID + 1, operatorName);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
354
355
}

356
357
const char *
processInqPrompt(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
358
{
359
  process_t &process = processSelf();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
360

Uwe Schulzweida's avatar
Uwe Schulzweida committed
361
  const char *prompt = "cdo";
362
363
  if (process.prompt[0])
    prompt = process.prompt;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
364
365

  return prompt;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
366
367
}

Uwe Schulzweida's avatar
Uwe Schulzweida committed
368
#if defined(HAVE_GLOB_H)
369
370
static int
get_glob_flags(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
371
372
373
{
  int glob_flags = 0;

374
#if defined(GLOB_NOCHECK)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
375
376
  glob_flags |= GLOB_NOCHECK;
#endif
377
#if defined(GLOB_TILDE)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
378
379
380
  glob_flags |= GLOB_TILDE;
#endif

381
  return glob_flags;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
382
}
383
#endif
Uwe Schulzweida's avatar
Uwe Schulzweida committed
384

385
#if defined(HAVE_WORDEXP_H)
386
/* Convert a shell pattern into a list of filenames. */
387
388
static argument_t *
glob_pattern(const char *restrict string)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
389
390
{
  size_t cnt, length = 0;
391
  int flags = WRDE_UNDEF;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
392
  char **p;
393
394

  wordexp_t glob_results;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
395
  glob_results.we_wordc = 0;
396
  argument_t *argument = NULL;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
397

398
  // glob the input argument or do even more shell magic
Uwe Schulzweida's avatar
Uwe Schulzweida committed
399
  int status = wordexp(string, &glob_results, flags);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
400

401
  // How much space do we need?
402
  for (p = glob_results.we_wordv, cnt = glob_results.we_wordc; cnt; p++, cnt--)
403
404
405
    {
      length += strlen(*p) + 1;
    }
406
407

  // Allocate the space and generate the list.
408
  argument = argument_new(glob_results.we_wordc, length);
409

410
  // put all generated filenames into the argument_t data structure
411
  for (cnt = 0; cnt < glob_results.we_wordc; cnt++)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
412
    {
413
414
      argument->argv[cnt] = strdupx(glob_results.we_wordv[cnt]);
      strcat(argument->args, glob_results.we_wordv[cnt]);
415
416
      if (cnt < glob_results.we_wordc - 1)
        strcat(argument->args, " ");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
417
418
    }

419
420
  if (status == 0)
    wordfree(&glob_results);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
421

422
  return argument;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
423
}
424
#endif
Uwe Schulzweida's avatar
Uwe Schulzweida committed
425

426
427
int
cdoStreamCnt(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
428
{
429
  int cnt = processSelf().m_streamCnt;
430
  return cnt;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
431
432
}

433
434
const argument_t *
cdoStreamName(int cnt)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
435
{
436
  process_t &process = processSelf();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
437

438
  if (cnt > process.m_streamCnt || cnt < 0)
439
    Error("count %d out of range!", cnt);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
440

441
  return &(process.streamArguments[cnt]);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
442
443
}

444
445
const char *
processOperator(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
446
{
447
  return processSelf().m_operatorCommand;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
448
449
}

450
static int skipInputStreams(int argc, std::vector<char *> &argv, int globArgc, int nstreams);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
451

452
static int
453
getGlobArgc(int argc, std::vector<char *> &argv, int globArgc)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
454
{
455
456
  char *opername = &argv[globArgc][1];
  char *comma_position = strchr(opername, ',');
457
458
  if (comma_position)
    *comma_position = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
459

460
  int streamInCnt = operatorStreamInCnt(opername);
461
  int streamOutCnt = operatorStreamOutCnt(opername);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
462

463
464
  if (streamInCnt == -1)
    streamInCnt = 1;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
465

466
  if (streamOutCnt > 1)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
467
    cdoAbort("More than one output stream not allowed in CDO pipes (Operator %s)!", opername);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
468
469
470

  globArgc++;

471
  if (streamInCnt > 0)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
472
    globArgc = skipInputStreams(argc, argv, globArgc, streamInCnt);
473
474
  if (comma_position)
    *comma_position = ',';
Uwe Schulzweida's avatar
Uwe Schulzweida committed
475

476
  return globArgc;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
477
}
Uwe Schulzweida's avatar
Uwe Schulzweida committed
478

479
static int
480
skipInputStreams(int argc, std::vector<char *> &argv, int globArgc, int nstreams)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
481
{
482
  while (nstreams > 0)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
483
    {
484
      if (globArgc >= argc)
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
485
486
487
488
        {
          cdoAbort("Too few arguments. Check command line!");
          break;
        }
489
      if (argv[globArgc][0] == '-')
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
490
491
492
        {
          globArgc = getGlobArgc(argc, argv, globArgc);
        }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
493
      else
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
494
        globArgc++;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
495
496
497
498

      nstreams--;
    }

499
  return globArgc;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
500
501
}

502
static int
503
getStreamCnt(int argc, std::vector<char *> &argv)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
504
505
506
507
{
  int streamCnt = 0;
  int globArgc = 1;

508
  while (globArgc < argc)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
509
    {
510
      if (argv[globArgc][0] == '-')
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
511
512
513
        {
          globArgc = getGlobArgc(argc, argv, globArgc);
        }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
514
      else
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
515
        globArgc++;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
516
517
518
519

      streamCnt++;
    }

520
  return streamCnt;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
521
522
}

523
524
525
526
527
528
529
530
531
532
/*
static void setStreamNames(int argc, std::vector<char *> *argv)
{
    //check for output of first
    int current_argv_entry;
    std::vector<pstream_t*> &current_instreams = root_process->inputStreams;
    std::vector<pstream_t*> &current_outstreams = root_process->outputStreams;

}
*/
533
void
534
535
536
537
538
539
540
541
542
543
process_t::setStreams(int argc, std::vector<char *> &argv)
{
  int streamCnt = getStreamCnt(argc, argv);

  nvals = 0;
  nvars = 0;
  ntimesteps = 0;

  m_streamCnt = 0; /* filled in setStreamNames */
  if (streamCnt)
544
    streamArguments = std::vector<argument_t>(streamCnt);
545
546
  for (int i = 0; i < streamCnt; i++)
    {
547
548
      streamArguments[i].argc = 0;
      streamArguments[i].args = NULL;
549
550
551
552
553
554
555
556
557
558
    }

  setStreamNames(argc, argv);

  int status = checkStreamCnt();

  if (status == 0 && streamCnt != streamCnt)
    Error("Internal problem with stream count %d %d", streamCnt, streamCnt);
  /*
  for ( i = 0; i < streamCnt; i++ )
559
    fprintf(stderr, "setStreams: stream %d %s\n", i+1, process.streamArguments[i].args);
560
561
562
  */
}

563
void
564
process_t::setStreamNames(int argc, std::vector<char *> &argv)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
565
{
566
  int i, ac;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
567
568
569
  int globArgc = 1;
  int globArgcStart;
  char *streamname;
570
  int len;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
571

572
  while (globArgc < argc)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
573
    {
574
      if (argv[globArgc][0] == '-')
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
575
576
577
578
579
        {
          globArgcStart = globArgc;

          globArgc = getGlobArgc(argc, argv, globArgc);
          len = 0;
580
          for (i = globArgcStart; i < globArgc; i++)
581
582
583
            {
              len += strlen(argv[i]) + 1;
            }
584
585
          streamname = (char *) Calloc(1, len);
          for (i = globArgcStart; i < globArgc; i++)
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
586
587
            {
              strcat(streamname, argv[i]);
588
              if (i < globArgc - 1)
589
590
591
                {
                  strcat(streamname, " ");
                }
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
592
            }
593
594
595
596
597
598
          for (i = 1; i < len - 1; i++)
            {
              if (streamname[i] == '\0')
                {
                  streamname[i] = ' ';
                }
599
600
            }

601
          streamArguments[m_streamCnt].args = streamname;
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
602
          ac = globArgc - globArgcStart;
603
          // printf("setStreamNames:  ac %d  streamname1: %s\n", ac, streamname);
604
          streamArguments[m_streamCnt].argv.resize(ac);
605
          for (i = 0; i < ac; ++i)
606
607
            streamArguments[m_streamCnt].argv[i] = argv[i + globArgcStart];
          streamArguments[m_streamCnt].argc = ac;
608
          m_streamCnt++;
609
          // printf("setStreamNames:  streamname1: %s\n", streamname);
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
610
        }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
611
      else
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
612
613
        {
          len = strlen(argv[globArgc]) + 1;
614
          streamname = (char *) Malloc(len);
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
615
          strcpy(streamname, argv[globArgc]);
616
          streamArguments[m_streamCnt].args = streamname;
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
617
          ac = 1;
618
619
620
621
          streamArguments[m_streamCnt].argv.resize(ac);
          streamArguments[m_streamCnt].argv[0] = argv[globArgc];
          streamArguments[m_streamCnt].argc = ac;
          streamArguments[m_streamCnt].args = streamname;
622
          m_streamCnt++;
623
          // printf("setStreamNames:  streamname2: %s\n", streamname);
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
624
625
          globArgc++;
        }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
626
627
628
    }
}

629
630
static int
find_wildcard(const char *string, size_t len)
631
632
633
{
  int status = 0;

634
  if (len > 0)
635
    {
636
637
      if (string[0] == '~')
        status = 1;
638

639
      if (status == 0)
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
640
        {
641
642
          for (size_t i = 0; i < len; ++i)
            if (string[i] == '?' || string[i] == '*' || string[i] == '[')
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
643
644
645
646
647
              {
                status = 1;
                break;
              }
        }
648
649
650
651
652
    }

  return status;
}

653
654
char *
expand_filename(const char *string)
655
656
657
{
  char *filename = NULL;

658
  if (find_wildcard(string, strlen(string)))
659
660
    {
#if defined(HAVE_GLOB_H)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
661
      int glob_flags = get_glob_flags();
662
663
664
665
      glob_t glob_results;

      glob(string, glob_flags, 0, &glob_results);

666
667
      if (glob_results.gl_pathc == 1)
        filename = strdupx(glob_results.gl_pathv[0]);
668
669
670
671
672
673
674
675

      globfree(&glob_results);
#endif
    }

  return filename;
}

676
677
 int
process_t::expand_wildcards(int streamCnt)
678
{
679
  const char *streamname0 = streamArguments[0].args;
680

681
682
  if (streamname0[0] == '-')
    return 1;
683

684
685
#if defined(HAVE_WORDEXP_H)
  argument_t *glob_arg = glob_pattern(streamname0);
686

687
688
689
  // skip if the input argument starts with an operator (starts with -)
  // otherwise adapt streams if there are several files (>1)
  // in case of one filename skip, no adaption needed
690
  if (glob_arg->argc > 1 && glob_arg->argv[0][0] != '-')
691
    {
692
      if (cdoVerbose)
693
        cdoPrint("Replaced >%s< by", streamArguments[0].args);
694

695
      streamCnt = streamCnt - 1 + glob_arg->argc;
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
696

697
      Free(streamArguments[0].args);
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
698

699
      streamArguments.resize(streamCnt);
700

701
      // move output streams to the end
702
703
      for (int i = 1; i < m_streamCnt; ++i)
        streamArguments[i + glob_arg->argc - 1] = streamArguments[i];
704

705
      for (int i = 0; i < glob_arg->argc; ++i)
706
        {
707
          argument_t &current_argument = streamArguments[i];
708
709
710
711
          current_argument.argc = 1;
          current_argument.argv.resize(current_argument.argc);
          current_argument.argv[0] = strdupx(glob_arg->argv[i]);
          current_argument.args = strdupx(glob_arg->argv[i]);
712
713
          if (cdoVerbose)
            cdoPrint("         >%s<", glob_arg->argv[i]);
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
714
        }
715

716
      m_streamCnt = streamCnt;
717
    }
718

719
  Free(glob_arg);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
720
#endif
721
722
723
724

  return 1;
}

725
int process_t::checkStreamCnt(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
726
{
727
  int wantedStreamInCnt,wantedStreamOutCnt;
728
  int streamInCnt0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
729
730
  int streamCnt = 0;
  int i, j;
731
  int obase = FALSE;
732
  int status = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
733

734
735
  wantedStreamInCnt= operatorStreamInCnt(operatorName);
  wantedStreamOutCnt = operatorStreamOutCnt(operatorName);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
736

737
  streamInCnt0 = wantedStreamInCnt;
738

739
  if (wantedStreamOutCnt == -1)
740
    {
741
      wantedStreamOutCnt = 1;
742
743
744
      obase = TRUE;
    }

745
  if (wantedStreamInCnt== -1 && wantedStreamOutCnt == -1)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
746
    cdoAbort("I/O stream counts unlimited no allowed!");
747

748
749
  // printf(" wantedStreamInCnt,wantedStreamOutCnt %d %d\n", wantedStreamInCnt,wantedStreamOutCnt);
  if (wantedStreamInCnt== -1)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
750
    {
751
752
      wantedStreamInCnt = m_streamCnt - wantedStreamOutCnt;
      if (wantedStreamInCnt< 1)
753
        cdoAbort("Input streams missing!");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
754
755
    }

756
  if (wantedStreamOutCnt == -1)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
757
    {
758
759
      wantedStreamOutCnt = m_streamCnt - wantedStreamInCnt;
      if (wantedStreamOutCnt < 1)
760
        cdoAbort("Output streams missing!");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
761
    }
762
  // printf(" wantedStreamInCnt,wantedStreamOutCnt %d %d\n", wantedStreamInCnt,wantedStreamOutCnt);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
763

764
765
  streamCnt = wantedStreamInCnt+ wantedStreamOutCnt;
  // printf(" streamCnt %d %d\n", m_streamCnt, streamCnt);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
766

767
  if (m_streamCnt > streamCnt)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
768
    cdoAbort("Too many streams!"
769
             " Operator needs %d input and %d output streams.",
770
771
             wantedStreamInCnt,
             wantedStreamOutCnt);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
772

773
  if (m_streamCnt < streamCnt)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
774
    cdoAbort("Too few streams specified!"
775
             " Operator needs %d input and %d output streams.",
776
777
             wantedStreamInCnt,
             wantedStreamOutCnt);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
778

779
  for (i = wantedStreamInCnt;i < streamCnt; i++)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
780
    {
781
      if (streamArguments[i].args[0] == '-')
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
782
        {
783
          cdoAbort("Output file name %s must not begin with \"-\"!", streamArguments[i].args);
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
784
        }
785
      else if (!obase)
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
786
        {
787
788
          for (j = 0; j < wantedStreamInCnt;j++) /* does not work with files in pipes */
            if (strcmp(streamArguments[i].args, streamArguments[j].args) == 0)
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
789
              cdoAbort("Output file name %s is equal to input file name"
790
                       " on position %d!\n",
791
                       streamArguments[i].args,
792
                       j + 1);
Yvonne Kuestermann's avatar
layout    
Yvonne Kuestermann committed
793
        }
794
    }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
795

796
797
  if (wantedStreamInCnt == 1 && streamInCnt0 == -1)
    status = expand_wildcards( streamCnt);
798

799
  return status;
800
}
Uwe Schulzweida's avatar
Uwe Schulzweida committed
801

802
803
bool
process_t::hasAllInputs()
804
{
805
806
807
808
809
  if(m_module.streamInCnt == -1)
  {
      return false;
  }
  return  m_module.streamInCnt == (inputStreams.size());
810
}
Uwe Schulzweida's avatar
Uwe Schulzweida committed
811

812
#include <fstream>
813
void print_creation_results(std::ofstream &p_outfile)
814
{
815
816
 p_outfile << std::endl << "RESULTS:" << std::endl;
  for (auto &process : Process)
817
    {
818
819
      p_outfile << "process: " << process.second.operatorName << " has children: " << std::endl;
      for (auto child : process.second.childProcesses)
820
        {
821
          p_outfile << child->m_ID << ", ";
822
        }