pstream.c 37.1 KB
Newer Older
Uwe Schulzweida's avatar
Uwe Schulzweida committed
1
2
3
4
/*
  This file is part of CDO. CDO is a collection of Operators to
  manipulate and analyse Climate model Data.

5
  Copyright (C) 2003-2012 Uwe Schulzweida, Uwe.Schulzweida@zmaw.de
Uwe Schulzweida's avatar
Uwe Schulzweida committed
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  See COPYING file for copying and redistribution conditions.

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation; version 2 of the License.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.
*/

#if  defined  (HAVE_CONFIG_H)
#  include "config.h"
#endif

Uwe Schulzweida's avatar
Uwe Schulzweida committed
22
23
24
25
#if defined (_OPENMP)
#  include <omp.h>
#endif

Uwe Schulzweida's avatar
Uwe Schulzweida committed
26
27
28
29
#include <stdio.h>
#include <string.h>
#include <stdarg.h>
#include <errno.h>
30
#include <sys/stat.h> /* stat */
Uwe Schulzweida's avatar
Uwe Schulzweida committed
31
32
33
34

FILE *popen(const char *command, const char *type);
int pclose(FILE *stream);

Ralf Mueller's avatar
Ralf Mueller committed
35
#include <cdi.h>
Uwe Schulzweida's avatar
Uwe Schulzweida committed
36
37
#include "cdo.h"
#include "cdo_int.h"
Uwe Schulzweida's avatar
Uwe Schulzweida committed
38
#include "dtypes.h"
Uwe Schulzweida's avatar
Uwe Schulzweida committed
39
40
41
42
43
44
45
46
47
48
49
50
51
#include "modules.h"
#include "pstream_int.h"
#include "cdo_int.h"
#include "util.h"
#include "pipe.h"
#include "error.h"
#include "dmemory.h"


extern int timer_read, timer_write;

static int PSTREAM_Debug = 0;

Uwe Schulzweida's avatar
Uwe Schulzweida committed
52
#define  MAX_PSTREAMS  4096
Uwe Schulzweida's avatar
Uwe Schulzweida committed
53
54
55
56
57
58
59
60
61
62
63

static int _pstream_max = MAX_PSTREAMS;

static void pstream_initialize(void);

static int _pstream_init = FALSE;

#if  defined  (HAVE_LIBPTHREAD)
#include <pthread.h>
#include "pthread_debug.h"

Uwe Schulzweida's avatar
Uwe Schulzweida committed
64
65
static int pthreadScope = 0;

Uwe Schulzweida's avatar
Uwe Schulzweida committed
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
static pthread_mutex_t streamOpenReadMutex  = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t streamOpenWriteMutex = PTHREAD_MUTEX_INITIALIZER;

static pthread_once_t _pstream_init_thread = PTHREAD_ONCE_INIT;
static pthread_mutex_t _pstream_mutex;

#  define PSTREAM_LOCK           pthread_mutex_lock(&_pstream_mutex);
#  define PSTREAM_UNLOCK         pthread_mutex_unlock(&_pstream_mutex);
#  define PSTREAM_INIT                               \
   if ( _pstream_init == FALSE ) pthread_once(&_pstream_init_thread, pstream_initialize);

#else

#  define PSTREAM_LOCK
#  define PSTREAM_UNLOCK
#  define PSTREAM_INIT                               \
   if ( _pstream_init == FALSE ) pstream_initialize();

#endif


typedef struct _pstreamPtrToIdx {
  int idx;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
89
  pstream_t *ptr;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
90
91
92
93
  struct _pstreamPtrToIdx *next;
} pstreamPtrToIdx;


Uwe Schulzweida's avatar
Uwe Schulzweida committed
94
95
static pstreamPtrToIdx *_pstreamList  = NULL;
static pstreamPtrToIdx *_pstreamAvail = NULL;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
96
97


Uwe Schulzweida's avatar
Uwe Schulzweida committed
98
99
static
void pstream_list_new(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
100
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
101
102
  assert(_pstreamList == NULL);

Uwe Schulzweida's avatar
Uwe Schulzweida committed
103
104
105
  _pstreamList = (pstreamPtrToIdx *) malloc(_pstream_max*sizeof(pstreamPtrToIdx));
}

Uwe Schulzweida's avatar
Uwe Schulzweida committed
106
107
static
void pstream_list_delete(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
108
109
110
111
{
  if ( _pstreamList ) free(_pstreamList);
}

Uwe Schulzweida's avatar
Uwe Schulzweida committed
112
113
static
void pstream_init_pointer(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
{
  int  i;
  
  for ( i = 0; i < _pstream_max; i++ )
    {
      _pstreamList[i].next = _pstreamList + i + 1;
      _pstreamList[i].idx  = i;
      _pstreamList[i].ptr  = 0;
    }

  _pstreamList[_pstream_max-1].next = 0;

  _pstreamAvail = _pstreamList;
}

Uwe Schulzweida's avatar
Uwe Schulzweida committed
129
130
static
pstream_t *pstream_to_pointer(int idx)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
131
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
132
  pstream_t *pstreamptr = NULL;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
133
134
135
136
137
138
139
140
141
142
143
144

  PSTREAM_INIT

  if ( idx >= 0 && idx < _pstream_max )
    {
      PSTREAM_LOCK

      pstreamptr = _pstreamList[idx].ptr;

      PSTREAM_UNLOCK
    }
  else
145
    Error("pstream index %d undefined!", idx);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
146
147
148
149
150

  return (pstreamptr);
}

/* Create an index from a pointer */
Uwe Schulzweida's avatar
Uwe Schulzweida committed
151
152
static
int pstream_from_pointer(pstream_t *ptr)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
{
  int      idx = -1;
  pstreamPtrToIdx *newptr;

  if ( ptr )
    {
      PSTREAM_LOCK

      if ( _pstreamAvail )
	{
	  newptr        = _pstreamAvail;
	  _pstreamAvail = _pstreamAvail->next;
	  newptr->next  = 0;
	  idx	        = newptr->idx;
	  newptr->ptr   = ptr;
      
	  if ( PSTREAM_Debug )
170
	    Message("Pointer %p has idx %d from pstream list", ptr, idx);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
171
172
	}
      else
173
	Warning("Too many open pstreams (limit is %d)!", _pstream_max);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
174
175
176
177

      PSTREAM_UNLOCK
    }
  else
178
    Error("Internal problem (pointer %p undefined)", ptr);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
179
180
181
182

  return (idx);
}

Uwe Schulzweida's avatar
Uwe Schulzweida committed
183
184
static
void pstream_init_entry(pstream_t *pstreamptr)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
185
186
187
188
189
190
191
192
193
194
{
  pstreamptr->self       = pstream_from_pointer(pstreamptr);

  pstreamptr->isopen     = TRUE;
  pstreamptr->ispipe     = FALSE;
  pstreamptr->fileID     = -1;
  pstreamptr->vlistID    = -1;
  pstreamptr->tsID       = -1;
  pstreamptr->filetype   = -1;
  pstreamptr->name       = NULL;
195
196
197
198
199
200
201
  pstreamptr->tsID0      = 0;
  pstreamptr->mfiles     = 0;
  pstreamptr->nfiles     = 0;
  pstreamptr->varID      = -1;
  pstreamptr->name       = NULL;
  pstreamptr->mfnames    = NULL;
  pstreamptr->varlist    = NULL;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
202
203
#if  defined  (HAVE_LIBPTHREAD)
  pstreamptr->pipe       = NULL;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
204
205
  //  pstreamptr->rthreadID  = 0;
  //  pstreamptr->wthreadID  = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
206
207
208
#endif
}

Uwe Schulzweida's avatar
Uwe Schulzweida committed
209
210
static
pstream_t *pstream_new_entry(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
211
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
212
  pstream_t *pstreamptr;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
213

Uwe Schulzweida's avatar
Uwe Schulzweida committed
214
  pstreamptr = (pstream_t *) malloc(sizeof(pstream_t));
Uwe Schulzweida's avatar
Uwe Schulzweida committed
215
216
217
218
219
220

  if ( pstreamptr ) pstream_init_entry(pstreamptr);

  return (pstreamptr);
}

Uwe Schulzweida's avatar
Uwe Schulzweida committed
221
222
static
void pstream_delete_entry(pstream_t *pstreamptr)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
{
  int idx;

  idx = pstreamptr->self;

  PSTREAM_LOCK

  free(pstreamptr);

  _pstreamList[idx].next = _pstreamAvail;
  _pstreamList[idx].ptr  = 0;
  _pstreamAvail   	 = &_pstreamList[idx];

  PSTREAM_UNLOCK

  if ( PSTREAM_Debug )
239
    Message("Removed idx %d from pstream list", idx);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
240
241
}

Uwe Schulzweida's avatar
Uwe Schulzweida committed
242
243
static
void pstream_initialize(void)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
{
  char *env;

#if  defined  (HAVE_LIBPTHREAD)
  /* initialize global API mutex lock */
  pthread_mutex_init(&_pstream_mutex, NULL);
#endif

  env = getenv("PSTREAM_DEBUG");
  if ( env ) PSTREAM_Debug = atoi(env);

  env = getenv("PSTREAM_MAX");
  if ( env ) _pstream_max = atoi(env);

  if ( PSTREAM_Debug )
259
    Message("PSTREAM_MAX = %d", _pstream_max);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
260
261
262
263
264
265
266
267
268

  pstream_list_new();
  atexit(pstream_list_delete);

  pstream_init_pointer();

  _pstream_init = TRUE;
}

Uwe Schulzweida's avatar
Uwe Schulzweida committed
269
270
static
int pstreamFindID(const char *name)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
271
272
{
  int pstreamID;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
273
  pstream_t *pstreamptr;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
274
275
276
277
278
279

  for ( pstreamID = 0; pstreamID < _pstream_max; pstreamID++ )
    {
      pstreamptr = pstream_to_pointer(pstreamID);

      if ( pstreamptr )
280
281
	if ( pstreamptr->name )
	  if ( strcmp(pstreamptr->name, name) == 0 ) break;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
282
283
284
285
286
287
288
289
290
291
    }

  if ( pstreamID == _pstream_max ) pstreamID = -1;

  return (pstreamID);
}


int pstreamIsPipe(int pstreamID)
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
292
  pstream_t *pstreamptr;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
293
294
295
296
297
298
299
300
301
302
303
304
305
306

  pstreamptr = pstream_to_pointer(pstreamID);

  return (pstreamptr->ispipe);
}


int pstreamOpenRead(const char *argument)
{
  char *operatorArg = NULL;
  char *operatorName = NULL;
  int ispipe = FALSE;
  int fileID;
  int pstreamID;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
307
  pstream_t *pstreamptr;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
308
309
310
311

  PSTREAM_INIT

  pstreamptr = pstream_new_entry();
312
  if ( ! pstreamptr ) Error("No memory");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
313
314
315
316
317
318
319
320
321

  pstreamID = pstreamptr->self;

  ispipe = argument[0] == '-';

  if ( ispipe )
    {
#if  defined  (HAVE_LIBPTHREAD)
      char *newarg;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
322
      char *pipename = (char *) malloc(16);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
323
324
325
      int rval;
      pthread_t thrID;
      pthread_attr_t attr;
326
      struct sched_param param;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
327
328
      size_t len;
      size_t stacksize;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
329
      int status;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
330
331
332
333
334
335

      operatorArg = getOperator(argument);
      operatorName = getOperatorName(operatorArg);
      free(operatorArg);

      len = strlen(argument);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
336
      newarg = (char *) malloc(len+16);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
337
338
339
340
341
342
343
344
345
346
      strcpy(newarg, argument);
      sprintf(pipename, "(pipe%d.%d)", processSelf() + 1, processInqChildNum() + 1);
      newarg[len] = ' ';
      strcpy(&newarg[len+1], pipename);

      pstreamptr->ispipe = TRUE;
      pstreamptr->name   = pipename;
      pstreamptr->rthreadID = pthread_self();
      pstreamptr->pipe   = pipeNew();
 
347
348
349
      if ( ! cdoSilentMode )
	fprintf(stderr, "%s: Started child process \"%s\".\n", processInqPrompt(), newarg+1);

Uwe Schulzweida's avatar
Uwe Schulzweida committed
350
      status = pthread_attr_init(&attr);
351
      if ( status ) SysError("pthread_attr_init failed for '%s'\n", newarg+1);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
352
      status = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
353
      if ( status ) SysError("pthread_attr_setdetachstate failed for '%s'\n", newarg+1);
354
355
356
      /*
      param.sched_priority = 0;
      status = pthread_attr_setschedparam(&attr, &param);
357
      if ( status ) SysError("pthread_attr_setschedparam failed for '%s'\n", newarg+1);
358
      */
Uwe Schulzweida's avatar
Uwe Schulzweida committed
359
      /* status = pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED); */
360
      /* if ( status ) SysError("pthread_attr_setinheritsched failed for '%s'\n", newarg+1); */
Uwe Schulzweida's avatar
Uwe Schulzweida committed
361
362
363
364

      pthread_attr_getscope(&attr, &pthreadScope);

      /* status = pthread_attr_setscope(&attr, PTHREAD_SCOPE_PROCESS); */
365
      /* if ( status ) SysError("pthread_attr_setscope failed for '%s'\n", newarg+1); */
Uwe Schulzweida's avatar
Uwe Schulzweida committed
366
367
368
369
      /* If system scheduling scope is specified, then the thread is scheduled against all threads in the system */
      /* pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); */

      status = pthread_attr_getstacksize(&attr, &stacksize);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
370
371
372
373
374
375
376
377
378
      if ( stacksize < 2097152 )
	{
	  stacksize = 2097152;
	  pthread_attr_setstacksize(&attr, stacksize);
	}
      rval = pthread_create(&thrID, &attr, operatorModule(operatorName), newarg);
      if ( rval != 0 )
	{
	  errno = rval;
379
	  SysError("pthread_create failed for '%s'\n", newarg+1);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
380
381
382
383
384
	}

      /* free(operatorName); */
      processAddStream(pstreamID);
      /*      pipeInqInfo(pstreamID); */
385
      if ( PSTREAM_Debug ) Message("pipe %s", pipename);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
386
#else
Uwe Schulzweida's avatar
Uwe Schulzweida committed
387
      cdoAbort("Cannot use pipes, pthread support not compiled in!");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
#endif
    }
  else
    {
      extern int cdoDefaultFileType/*, cdoDefaultInstID*/;
      size_t len, i;
      int nfiles = 1, j;
      char *filename = NULL;
      const char *pch;

      len = strlen(argument);

      for ( i = 0; i < len; i++ )
	if ( argument[i] == ':' ) break;

      if ( i < len )
	{
	  pch = &argument[i+1];
	  len -= (i+1);
407
408
	  if ( len && ( memcmp(argument, "filelist:", 9) == 0 || 
			memcmp(argument, "flist:", 6) == 0 ) )
Uwe Schulzweida's avatar
Uwe Schulzweida committed
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
	    {
	      for ( i = 0; i < len; i++ ) if ( pch[i] == ',' ) nfiles++;

	      if ( nfiles == 1 )
		{
		  char line[4096];
		  FILE *fp, *fp2;
		  fp = fopen(pch, "r");
		  if ( fp == NULL ) cdoAbort("Open failed on %s", pch);

		  if ( cdoVerbose )
		    cdoPrint("Reading file names from %s", pch);

		  /* find number of files */
		  nfiles = 0;
		  while ( readline(fp, line, 4096) )
		    {
		      if ( line[0] == '#' || line[0] == '\0' ||
			   line[0] == ' ' ) continue;

		      fp2 = fopen(line, "r" );
		      if ( fp2 == NULL ) cdoAbort("Open failed on %s", line);
		      fclose(fp2);
		      nfiles++;
		      if ( cdoVerbose )
			cdoPrint("File number %d is %s", nfiles, line);
		    }

		  if ( nfiles == 0 ) cdoAbort("No imput file found in %s", pch);

		  pstreamptr->mfiles = nfiles;
		  pstreamptr->mfnames = (char **) malloc(nfiles*sizeof(char *));
		  
		  rewind(fp);

		  nfiles = 0;
		  while ( readline(fp, line, 4096) )
		    {
		      if ( line[0] == '#' || line[0] == '\0' ||
			   line[0] == ' ' ) continue;

		      pstreamptr->mfnames[nfiles] = strdupx(line);
		      nfiles++;
		    }

		  fclose(fp);
		}
	      else
		{
		  char line[65536];

		  pstreamptr->mfiles = nfiles;
		  pstreamptr->mfnames = (char **) malloc(nfiles*sizeof(char *));
		  
		  strcpy(line, pch);
		  for ( i = 0; i < len; i++ ) if ( line[i] == ',' ) line[i] = 0;

		  i = 0;
		  for ( j = 0; j < nfiles; j++ )
		    {
		      pstreamptr->mfnames[j] = strdupx(&line[i]);
		      i += strlen(&line[i]) + 1;
		    }
		}
	    }
474
	  else if ( len && memcmp(argument, "ls:", 3) == 0 )
Uwe Schulzweida's avatar
Uwe Schulzweida committed
475
476
477
478
479
480
481
482
483
484
485
	    {
	      char line[4096];
	      char command[4096];
	      char *fnames[16384];
	      FILE *pfp;

	      strcpy(command, "ls ");
	      strcat(command, pch);

	      pfp = popen(command, "r");
	      if ( pfp == 0 )
486
		SysError("popen %s failed", command);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
487
488
489
490

	      nfiles = 0;
	      while ( readline(pfp, line, 4096) )
		{
491
		  if ( nfiles >= 16384 ) cdoAbort("Too many input files (limit: 16384)");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
		  fnames[nfiles++] = strdupx(line);
		}

	      pclose(pfp);

	      pstreamptr->mfiles = nfiles;
	      pstreamptr->mfnames = (char **) malloc(nfiles*sizeof(char *));

	      for ( j = 0; j < nfiles; j++ )
		pstreamptr->mfnames[j] = fnames[j];
	    }
	}

      if ( pstreamptr->mfiles )
	{
	  len = strlen(pstreamptr->mfnames[0]);
	  filename = (char *) malloc(len+1);
	  strcpy(filename, pstreamptr->mfnames[0]);
	  pstreamptr->nfiles = 1;
	}
      else
	{
	  len = strlen(argument);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
515
	  /*
Uwe Schulzweida's avatar
Uwe Schulzweida committed
516
517
518
519
520
521
522
523
524
525
526
	  if ( cdoExpMode == CDO_EXP_REMOTE )
	    {
	      char datapath[] = "/scratch/localA/m214003/data/";
	      len += strlen(datapath);

	      filename = (char *) malloc(len+1);

	      strcpy(filename, datapath);
	      strcat(filename, argument);
	    }
	  else
Uwe Schulzweida's avatar
Uwe Schulzweida committed
527
	  */
Uwe Schulzweida's avatar
Uwe Schulzweida committed
528
529
530
531
532
	    {
	      filename = (char *) malloc(len+1);

	      strcpy(filename, argument);
	    }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
533
534
	}

535
      if ( PSTREAM_Debug ) Message("file %s", filename);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
536
537
538
539
540
541
542
543
544
545
546
547
548
549

#if  defined  (HAVE_LIBPTHREAD)
      pthread_mutex_lock(&streamOpenReadMutex);
#endif
      fileID = streamOpenRead(filename);
      if ( fileID < 0 ) cdiError(fileID, "Open failed on >%s<", filename);

      if ( cdoDefaultFileType == CDI_UNDEFID )
	cdoDefaultFileType = streamInqFiletype(fileID);
      /*
      if ( cdoDefaultInstID == CDI_UNDEFID )
	cdoDefaultInstID = streamInqInstID(fileID);
      */
      cdoInqHistory(fileID);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
550
551
552
#if  defined  (HAVE_LIBPTHREAD)
      pthread_mutex_unlock(&streamOpenReadMutex);
#endif
Uwe Schulzweida's avatar
Uwe Schulzweida committed
553

Uwe Schulzweida's avatar
Uwe Schulzweida committed
554
      pstreamptr->mode   = 'r';
Uwe Schulzweida's avatar
Uwe Schulzweida committed
555
556
557
558
      pstreamptr->name   = filename;
      pstreamptr->fileID = fileID;
    }

Uwe Schulzweida's avatar
Uwe Schulzweida committed
559
560
  if ( pstreamID < 0 ) cdiError(pstreamID, "Open failed on %s", argument);
  
Uwe Schulzweida's avatar
Uwe Schulzweida committed
561
562
563
  return (pstreamID);
}

564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
static
void query_user_exit(const char *argument)
{
  /* modified code from NCO */
#define USR_RPL_MAX_LNG 10 /* Maximum length for user reply */
#define USR_RPL_MAX_NBR 10 /* Maximum number of chances for user to reply */
  char usr_rpl[USR_RPL_MAX_LNG];
  int usr_rpl_int;
  short nbr_itr=0;
  size_t usr_rpl_lng = 0;

  /* Initialize user reply string */
  usr_rpl[0]='z';
  usr_rpl[1]='\0';

  while ( !(usr_rpl_lng == 1 && 
	    (*usr_rpl == 'o' || *usr_rpl == 'O' || *usr_rpl == 'e' || *usr_rpl == 'E')) )
    {
      if ( nbr_itr++ > USR_RPL_MAX_NBR )
	{
	  (void)fprintf(stdout,"\n%s: ERROR %d failed attempts to obtain valid interactive input.\n",
			processInqPrompt(), nbr_itr-1);
	  exit(EXIT_FAILURE);
	}

      if ( nbr_itr > 1 ) (void)fprintf(stdout,"%s: ERROR Invalid response.\n", processInqPrompt());
      (void)fprintf(stdout,"%s: %s exists ---`e'xit, or `o'verwrite (delete existing file) (e/o)? ",
		    processInqPrompt(), argument);
      (void)fflush(stdout);
      if ( fgets(usr_rpl, USR_RPL_MAX_LNG, stdin) == NULL ) continue;

      /* Ensure last character in input string is \n and replace that with \0 */
      usr_rpl_lng = strlen(usr_rpl);
      if ( usr_rpl_lng >= 1 )
	if ( usr_rpl[usr_rpl_lng-1] == '\n' )
	  {
	    usr_rpl[usr_rpl_lng-1] = '\0';
	    usr_rpl_lng--;
	  }
    }

  /* Ensure one case statement for each exit condition in preceding while loop */
  usr_rpl_int=(int)usr_rpl[0];
  switch(usr_rpl_int)
    {
    case 'E':
    case 'e':
      exit(EXIT_SUCCESS);
      break;
    case 'O':
    case 'o':
      break;
    default:
      exit(EXIT_FAILURE);
      break;
    } /* end switch */
}


Uwe Schulzweida's avatar
Uwe Schulzweida committed
623
624
625
int pstreamOpenWrite(const char *argument, int filetype)
{
  int fileID;
626
  int pstreamID = -1;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
627
  int ispipe;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
628
  pstream_t *pstreamptr;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
629
630
631

  PSTREAM_INIT

632
  ispipe = memcmp(argument, "(pipe", 5) == 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
633
634
635
636

  if ( ispipe )
    {
#if  defined  (HAVE_LIBPTHREAD)
637
      if ( PSTREAM_Debug ) Message("pipe %s", argument);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
638
639
      pstreamID = pstreamFindID(argument);
      if ( pstreamID == -1 )
640
	Error("%s not open", argument);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
641
642
643
644
645
646
647
648
649
650
651
652
653
654

      pstreamptr = pstream_to_pointer(pstreamID);

      pstreamptr->wthreadID = pthread_self();
      pstreamptr->filetype = filetype;
      processAddStream(pstreamID);
#endif
    }
  else
    {
      /* extern int cdoDefaultInstID; */
      char *filename = (char *) malloc(strlen(argument)+1);

      pstreamptr = pstream_new_entry();
655
      if ( ! pstreamptr ) Error("No memory");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
656
657
658

      pstreamID = pstreamptr->self;
  
659
      if ( PSTREAM_Debug ) Message("file %s", argument);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
660
661
662

      if ( filetype == CDI_UNDEFID ) filetype = FILETYPE_GRB;

663
664
665
666
667
668
669
      if ( cdoInteractive )
	{
	  int rstatus;
	  struct stat stbuf;

	  rstatus = stat(argument, &stbuf);
	  /* If permanent file already exists, query user whether to overwrite or exit */
670
	  if ( rstatus != -1 ) query_user_exit(argument);
671
672
	}

Uwe Schulzweida's avatar
Uwe Schulzweida committed
673
674
675
#if  defined  (HAVE_LIBPTHREAD)
      pthread_mutex_lock(&streamOpenWriteMutex);
#endif
Uwe Schulzweida's avatar
Uwe Schulzweida committed
676
      timer_start(timer_write);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
677
      fileID = streamOpenWrite(argument, filetype);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
678
      timer_stop(timer_write);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
679
680
681
682
683
#if  defined  (HAVE_LIBPTHREAD)
      pthread_mutex_unlock(&streamOpenWriteMutex);
#endif
      if ( fileID < 0 ) cdiError(fileID, "Open failed on %s", argument);

684
      cdoDefHistory(fileID, commandLine());
Uwe Schulzweida's avatar
Uwe Schulzweida committed
685
686
687

      if ( cdoDefaultByteorder != CDI_UNDEFID )
	streamDefByteorder(fileID, cdoDefaultByteorder);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
688

Uwe Schulzweida's avatar
Uwe Schulzweida committed
689
690
691
692
      if ( cdoCompress )
	{
	  if      ( filetype == FILETYPE_GRB )
	    {
Uwe Schulzweida's avatar
Uwe Schulzweida committed
693
694
	      cdoCompType  = COMPRESS_SZIP;
	      cdoCompLevel = 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
695
	    }
696
	  else if ( filetype == FILETYPE_NC4 || filetype == FILETYPE_NC4C )
Uwe Schulzweida's avatar
Uwe Schulzweida committed
697
	    {
Uwe Schulzweida's avatar
Uwe Schulzweida committed
698
699
	      cdoCompType  = COMPRESS_ZIP;
	      cdoCompLevel = 1;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
700
701
702
	    }
	}

Uwe Schulzweida's avatar
Uwe Schulzweida committed
703
      if ( cdoCompType != COMPRESS_NONE )
Uwe Schulzweida's avatar
Uwe Schulzweida committed
704
	{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
705
706
	  streamDefCompType(fileID, cdoCompType);
	  streamDefCompLevel(fileID, cdoCompLevel);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
707

708
	  if ( cdoCompType == COMPRESS_SZIP && (filetype != FILETYPE_GRB && filetype != FILETYPE_NC4 && filetype != FILETYPE_NC4C) )
Uwe Schulzweida's avatar
Uwe Schulzweida committed
709
710
	    cdoWarning("SZIP compression not available for non GRIB1/netCDF4 data!");

Uwe Schulzweida's avatar
Uwe Schulzweida committed
711
	  if ( cdoCompType == COMPRESS_JPEG && filetype != FILETYPE_GRB2 )
Uwe Schulzweida's avatar
Uwe Schulzweida committed
712
	    cdoWarning("SZIP compression not available for non GRIB2 data!");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
713

714
	  if ( cdoCompType == COMPRESS_ZIP && (filetype != FILETYPE_NC4 && filetype != FILETYPE_NC4C) )
Uwe Schulzweida's avatar
Uwe Schulzweida committed
715
	    cdoWarning("Deflate compression not available for non netCDF4 data!");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
716
	}
Uwe Schulzweida's avatar
Uwe Schulzweida committed
717
718
719
720
721
      /*
      if ( cdoDefaultInstID != CDI_UNDEFID )
	streamDefInstID(fileID, cdoDefaultInstID);
      */
      strcpy(filename, argument);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
722

723
724
725
726
727
      pstreamptr->mode     = 'w';
      pstreamptr->name     = filename;
      pstreamptr->fileID   = fileID;
      pstreamptr->filetype = filetype;
   }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
728
729
730
731
732
733
734
735
736
737

  return (pstreamID);
}


int pstreamOpenAppend(const char *argument)
{
  int fileID;
  int pstreamID = -1;
  int ispipe;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
738
  pstream_t *pstreamptr;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
739

740
  ispipe = memcmp(argument, "(pipe", 5) == 0;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
741
742
743

  if ( ispipe )
    {
744
      if ( PSTREAM_Debug ) Message("pipe %s", argument);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
745
746
747
748
749
750
751
      cdoAbort("this operator doesn't work with pipes!");
    }
  else
    {
      char *filename = (char *) malloc(strlen(argument)+1);

      pstreamptr = pstream_new_entry();
752
      if ( ! pstreamptr ) Error("No memory");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
753
754
755

      pstreamID = pstreamptr->self;
  
756
      if ( PSTREAM_Debug ) Message("file %s", argument);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
757

Uwe Schulzweida's avatar
Uwe Schulzweida committed
758
      timer_start(timer_write);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
759
      fileID = streamOpenAppend(argument);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
760
      timer_stop(timer_write);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
761
762
763
764
765
766
      if ( fileID < 0 ) cdiError(fileID, "Open failed on %s", argument);
      /*
      cdoInqHistory(fileID);
      cdoDefHistory(fileID, commandLine());
      */
      strcpy(filename, argument);
767
768

      pstreamptr->mode   = 'a';
Uwe Schulzweida's avatar
Uwe Schulzweida committed
769
770
771
772
773
774
775
776
777
778
      pstreamptr->name   = filename;
      pstreamptr->fileID = fileID;
    }

  return (pstreamID);
}


void pstreamClose(int pstreamID)
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
779
  pstream_t *pstreamptr;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
780
781
782

  pstreamptr = pstream_to_pointer(pstreamID);

783
  if ( pstreamptr == NULL )
784
    Error("Internal problem, stream %d not open!", pstreamID);
785

Uwe Schulzweida's avatar
Uwe Schulzweida committed
786
787
788
  if ( pstreamptr->ispipe )
    {
#if  defined  (HAVE_LIBPTHREAD)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
789
      pipe_t *pipe;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
790
791
792
793
794
      int lread = FALSE, lwrite = FALSE;
      pthread_t threadID = pthread_self();

      if      ( pthread_equal(threadID, pstreamptr->rthreadID) ) lread  = TRUE;
      else if ( pthread_equal(threadID, pstreamptr->wthreadID) ) lwrite = TRUE;
795
      else Error("Internal problem! Close pipe %s", pstreamptr->name);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
796
797
798
799
800
801
802

      if ( lread )
	{
	  pipe = pstreamptr->pipe;
	  pthread_mutex_lock(pipe->mutex);
	  pipe->EOP = TRUE;
	  if ( PSTREAM_Debug )
803
	    Message("%s read closed", pstreamptr->name);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
804
805
806
	  pthread_mutex_unlock(pipe->mutex);     
	  pthread_cond_signal(pipe->tsDef);
	  pthread_cond_signal(pipe->tsInq);
807
808
809
	 
	  pthread_cond_signal(pipe->recInq);
	 
Uwe Schulzweida's avatar
Uwe Schulzweida committed
810
811
812
813
814
815
816
	  pthread_mutex_lock(pipe->mutex);
	  pstreamptr->isopen = FALSE;
	  pthread_mutex_unlock(pipe->mutex);     
	  pthread_cond_signal(pipe->isclosed);

	  pthread_join(pstreamptr->wthreadID, NULL);

Uwe Schulzweida's avatar
Uwe Schulzweida committed
817
	  processAddNvals(pipe->nvals);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
818
819
820
821
822
823
824
825
826
	  pipeDelete(pipe);
	  pstream_delete_entry(pstreamptr);
	}
      else
	{
	  pipe = pstreamptr->pipe;
	  pthread_mutex_lock(pipe->mutex);
	  pipe->EOP = TRUE;
	  if ( PSTREAM_Debug )
827
	    Message("%s write closed", pstreamptr->name);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
828
829
830
831
832
833
834
835
	  pthread_mutex_unlock(pipe->mutex);     
	  pthread_cond_signal(pipe->tsDef);
	  pthread_cond_signal(pipe->tsInq);

	  pthread_mutex_lock(pipe->mutex);
	  while ( pstreamptr->isopen )
	    {
	      if ( PSTREAM_Debug )
836
		Message("wait of read close");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
837
838
839
840
841
	      pthread_cond_wait(pipe->isclosed, pipe->mutex);
	    }
	  pthread_mutex_unlock(pipe->mutex);
	}
#else
Uwe Schulzweida's avatar
Uwe Schulzweida committed
842
      cdoAbort("Cannot use pipes, pthread support not compiled in!");
Uwe Schulzweida's avatar
Uwe Schulzweida committed
843
844
845
846
847
#endif
    }
  else
    {
      if ( PSTREAM_Debug )
848
	Message("%s fileID %d\n", pstreamptr->name, pstreamptr->fileID);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
849

Uwe Schulzweida's avatar
Uwe Schulzweida committed
850
      if ( pstreamptr->mode == 'r' )
Uwe Schulzweida's avatar
Uwe Schulzweida committed
851
852
853
	{
	  processAddNvals(streamNvals(pstreamptr->fileID));
	}
Uwe Schulzweida's avatar
Uwe Schulzweida committed
854

Uwe Schulzweida's avatar
Uwe Schulzweida committed
855
856
      streamClose(pstreamptr->fileID);

Uwe Schulzweida's avatar
Uwe Schulzweida committed
857
858
859
860
      if ( cdoExpMode == CDO_EXP_REMOTE )
	{
	  if ( pstreamptr->mode == 'w' )
	    {
Uwe Schulzweida's avatar
Uwe Schulzweida committed
861
862
863
864
	      extern const char *cdojobfiles;
	      FILE *fp = fopen(cdojobfiles, "a");
	      fprintf(fp, "%s\n", pstreamptr->name);
	      fclose(fp);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
865
866
867
	    }
	}

868
869
870
871
872
873
      if ( pstreamptr->name )
	{
	  free(pstreamptr->name);
	  pstreamptr->name = NULL;
	}

874
875
876
877
878
879
      if ( pstreamptr->varlist )
	{
	  free(pstreamptr->varlist);
	  pstreamptr->varlist = NULL;
	}

Uwe Schulzweida's avatar
Uwe Schulzweida committed
880
881
882
883
884
885
886
887
      pstream_delete_entry(pstreamptr);
    }
}


int pstreamInqVlist(int pstreamID)
{
  int vlistID;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
888
  pstream_t *pstreamptr;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
889
890
891
892
893

  pstreamptr = pstream_to_pointer(pstreamID);

#if  defined  (HAVE_LIBPTHREAD)
  if ( pstreamptr->ispipe )
894
895
896
897
898
    {
      vlistID = pipeInqVlist(pstreamptr);
      if ( vlistID == -1 )
	cdoAbort("Couldn't read data from input stream %s!", pstreamptr->name);
    }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
899
900
901
902
903
  else
#endif
    {
      extern int cdoDefaultTimeType;

Uwe Schulzweida's avatar
Uwe Schulzweida committed
904
      timer_start(timer_read);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
905
      vlistID = streamInqVlist(pstreamptr->fileID);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
906
      timer_stop(timer_read);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
907
908
909
910

      if ( cdoDefaultTimeType != CDI_UNDEFID )
	taxisDefType(vlistInqTaxis(vlistID), cdoDefaultTimeType);

Uwe Schulzweida's avatar
Uwe Schulzweida committed
911
912
      pstreamptr->vlistID = vlistID;
    }
Uwe Schulzweida's avatar
Uwe Schulzweida committed
913

Uwe Schulzweida's avatar
Uwe Schulzweida committed
914
915
916
917
918
919
  if ( vlistNumber(vlistID) == CDI_COMP && cdoStreamNumber() == CDI_REAL )
    cdoAbort("Complex fields are not supported by this operator!");

  if ( vlistNumber(vlistID) == CDI_REAL && cdoStreamNumber() == CDI_COMP )
    cdoAbort("This operator needs complex fields!");

Uwe Schulzweida's avatar
Uwe Schulzweida committed
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
  processDefVarNum(vlistNvars(vlistID), pstreamID);

  return (vlistID);
}


const char *cdoComment(void)
{
  static char comment[256];
  static int init = 0;
  int size = 0;
  extern char CDO_Version[];

  if ( ! init )
    {
      init = 1;

      size = strlen(CDO_Version);

      strncat(comment, CDO_Version, size);
    }

  return (comment);
}

945
static
Uwe Schulzweida's avatar
Uwe Schulzweida committed
946
void pstreamDefVarlist(pstream_t *pstreamptr, int vlistID)
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
{
  int varID, nvars;
  int laddoffset, lscalefactor;
  int datatype, filetype;
  varlist_t *varlist;

  filetype = pstreamptr->filetype;

  if ( pstreamptr->vlistID != -1 )
    cdoAbort("Internal problem, vlist already defined!");

  if ( pstreamptr->varlist != NULL )
    cdoAbort("Internal problem, varlist already allocated!");

  nvars = vlistNvars(vlistID);
  varlist = (varlist_t *) malloc(nvars*sizeof(varlist_t));

  for ( varID = 0; varID < nvars; ++varID )
    {
      varlist[varID].gridsize    = gridInqSize(vlistInqVarGrid(vlistID, varID));
      varlist[varID].datatype    = vlistInqVarDatatype(vlistID, varID);
      varlist[varID].missval     = vlistInqVarMissval(vlistID, varID);
      varlist[varID].addoffset   = vlistInqVarAddoffset(vlistID, varID);
      varlist[varID].scalefactor = vlistInqVarScalefactor(vlistID, varID);

      varlist[varID].check_datarange = FALSE;

      laddoffset   = IS_NOT_EQUAL(varlist[varID].addoffset, 0);
      lscalefactor = IS_NOT_EQUAL(varlist[varID].scalefactor, 1);

      datatype = varlist[varID].datatype;

979
      if ( filetype == FILETYPE_NC || filetype == FILETYPE_NC2 || filetype == FILETYPE_NC4 || filetype == FILETYPE_NC4C )
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
	{
	  if ( datatype == DATATYPE_UINT8 && (filetype == FILETYPE_NC || filetype == FILETYPE_NC2) )
	    {
	      datatype = DATATYPE_INT16;
	      varlist[varID].datatype = datatype;
	    }

	  if ( datatype == DATATYPE_UINT16 && (filetype == FILETYPE_NC || filetype == FILETYPE_NC2) )
	    {
	      datatype = DATATYPE_INT32;
	      varlist[varID].datatype = datatype;
	    }

	  if ( laddoffset || lscalefactor )
	    {
	      if ( datatype == DATATYPE_INT8   ||
		   datatype == DATATYPE_UINT8  ||
		   datatype == DATATYPE_INT16  ||
		   datatype == DATATYPE_UINT16 )
		varlist[varID].check_datarange = TRUE;
	    }
For faster browsing, not all history is shown. View entire blame