fileStream.cc 9.71 KB
Newer Older
1 2 3 4
/*
  This file is part of CDO. CDO is a collection of Operators to
  manipulate and analyse Climate model Data.

Uwe Schulzweida's avatar
Uwe Schulzweida committed
5
  Copyright (C) 2003-2020 Uwe Schulzweida, <uwe.schulzweida AT mpimet.mpg.de>
6 7 8 9 10 11 12 13 14 15 16
  See COPYING file for copying and redistribution conditions.

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation; version 2 of the License.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.
*/
17 18 19 20
#ifdef HAVE_CONFIG_H
#include "config.h"   /* HAVE_NC4HDF5_THREADSAFE */
#endif

21 22 23 24 25 26 27 28
#include <sys/stat.h> /* stat */
#include <cdi.h>

#include "fileStream.h"

#include "cdi_lockedIO.h"
#include "cdo_options.h"
#include "cdo_output.h"
29
#include "cdo_default_values.h"
30 31 32 33 34
#include "cdo_history.h"

#include "timer.h"
#include "commandline.h"

35
bool FileStream::TimerEnabled = false;
36
static bool inputFileTypeIsNetCDF4 = false;
37

38
FileStream::FileStream(const std::string &p_fileName) : m_filename(p_fileName)
39 40 41 42
{
  m_name = p_fileName;
  m_fileID = CDI_UNDEFID;
}
43 44 45 46 47

int
FileStream::openRead()
{
  openLock();
48

Uwe Schulzweida's avatar
Uwe Schulzweida committed
49
  const auto fileID = streamOpenRead(m_filename.c_str());
50 51
  if (fileID < 0) cdiOpenError(fileID, "Open failed on >%s<", m_filename.c_str());
  isopen = true;
52 53 54

  m_filetype = streamInqFiletype(fileID);
  if (CdoDefault::FileType == CDI_UNDEFID) CdoDefault::FileType = m_filetype;
55
  m_fileID = fileID;
56

57 58
  if (m_filetype == CDI_FILETYPE_NC4 || m_filetype == CDI_FILETYPE_NC4C) inputFileTypeIsNetCDF4 = true;

59
  Debug(FILE_STREAM, "Set number of worker to %d", Options::numStreamWorker);
60 61
  if (Options::numStreamWorker > 0) streamDefNumWorker(fileID, Options::numStreamWorker);

62
  openUnlock();
63

64 65
  return fileID;
}
66

67 68 69 70 71 72
int
FileStream::openWrite(int p_filetype)
{
  if (Options::cdoInteractive)
    {
      struct stat stbuf;
Uwe Schulzweida's avatar
Uwe Schulzweida committed
73
      const auto rstatus = stat(m_name.c_str(), &stbuf);
74
      // If permanent file already exists, query user whether to overwrite or exit
75 76
      if (rstatus != -1) query_user_exit(m_name.c_str());
    }
77
  if (p_filetype == CDI_UNDEFID) p_filetype = CDI_FILETYPE_GRB;
78 79 80 81 82 83 84 85 86 87 88
  /*
#ifndef HAVE_NC4HDF5_THREADSAFE
  bool outputFileTypeIsNetCDF4 = (p_filetype == CDI_FILETYPE_NC4 || p_filetype == CDI_FILETYPE_NC4C);
  if (inputFileTypeIsNetCDF4 && outputFileTypeIsNetCDF4 && processNums() > 1)
    {
      cdoWarning("Using a non-thread-safe NetCDF4/HDF5 library in a multi-threaded environment may lead to erroneous results!");
      cdoWarning("Use a thread-safe NetCDF4/HDF5 library or the CDO option -L to avoid such errors.");
    }
#endif
  */
  // TODO FIX THIS: if (FileStream::timersEnabled()) timer_start(timer_write);
89 90

  openLock();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
91
  const auto fileID = streamOpenWrite(m_filename.c_str(), p_filetype);
92 93
  openUnlock();

94
  // TODO FIX THIS: if(FileStream::timersEnabled()) timer_stop(timer_write);
95 96 97 98 99
  if (fileID < 0) cdiOpenError(fileID, "Open failed on >%s<", m_name.c_str());
  isopen = true;

  if (CdoDefault::Byteorder != CDI_UNDEFID) streamDefByteorder(fileID, CdoDefault::Byteorder);

100
  setCompression(fileID, p_filetype);
101 102 103 104 105 106

  m_fileID = fileID;
  m_filetype = p_filetype;

  return m_cdoStreamID;
}
107

108 109 110
int
FileStream::openAppend()
{
111
  if (FileStream::timersEnabled()) timer_start(timer_write);
112 113

  openLock();
Uwe Schulzweida's avatar
Uwe Schulzweida committed
114
  const auto fileID = streamOpenAppend(m_filename.c_str());
115 116
  openUnlock();

117
  if (FileStream::timersEnabled()) timer_stop(timer_write);
118 119 120 121 122

  if (fileID < 0) cdiOpenError(fileID, "Open failed on >%s<", m_filename.c_str());

  isopen = true;

123
  m_filetype = streamInqFiletype(fileID);
124
  setCompression(fileID, m_filetype);
125 126 127 128 129 130

  m_fileID = fileID;

  return m_fileID;
}

131
void
Uwe Schulzweida's avatar
Uwe Schulzweida committed
132
FileStream::defVlist(const int p_vlistID)
133
{
134
  cdo_append_history(p_vlistID, commandLine());
135

136 137
  if (CdoDefault::DataType != CDI_UNDEFID)
    {
Uwe Schulzweida's avatar
Uwe Schulzweida committed
138
      const auto nvars = vlistNvars(p_vlistID);
139
      for (int varID = 0; varID < nvars; ++varID) vlistDefVarDatatype(p_vlistID, varID, CdoDefault::DataType);
140 141 142

      if (CdoDefault::DataType == CDI_DATATYPE_FLT64 || CdoDefault::DataType == CDI_DATATYPE_FLT32)
        {
143
          for (int varID = 0; varID < nvars; varID++)
144 145 146 147 148 149 150 151 152
            {
              vlistDefVarAddoffset(p_vlistID, varID, 0.0);
              vlistDefVarScalefactor(p_vlistID, varID, 1.0);
            }
        }
    }

  if (Options::cdoChunkType != CDI_UNDEFID)
    {
Uwe Schulzweida's avatar
Uwe Schulzweida committed
153
      const auto nvars = vlistNvars(p_vlistID);
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
      for (int varID = 0; varID < nvars; ++varID) vlistDefVarChunkType(p_vlistID, varID, Options::cdoChunkType);
    }

  if (Options::CMOR_Mode)
    {
      cdo_def_tracking_id(p_vlistID, "tracking_id");
      cdo_def_creation_date(p_vlistID);
    }

  if (Options::VersionInfo) cdiDefAttTxt(p_vlistID, CDI_GLOBAL, "CDO", (int) strlen(cdoComment()), cdoComment());

#ifdef _OPENMP
  if (Threading::ompNumThreads > 1)
    cdiDefAttInt(p_vlistID, CDI_GLOBAL, "cdo_openmp_thread_number", CDI_DATATYPE_INT32, 1, &Threading::ompNumThreads);
#endif
169
  defDatarangeList(p_vlistID);
170

171
  if (FileStream::timersEnabled()) timer_start(timer_write);
172
  streamDefVlistLocked(m_fileID, p_vlistID);
173
  if (FileStream::timersEnabled()) timer_stop(timer_write);
174
}
175

176 177 178
int
FileStream::inqVlist()
{
179
  if (FileStream::timersEnabled()) timer_start(timer_read);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
180
  const auto vlistID = streamInqVlistLocked(m_fileID);
181
  if (FileStream::timersEnabled()) timer_stop(timer_read);
182
  if (vlistID == -1) cdoAbort("Couldn't read data from input fileID %d!", m_fileID);
183

Uwe Schulzweida's avatar
Uwe Schulzweida committed
184
  const auto nsubtypes = vlistNsubtypes(vlistID);
185 186 187 188 189 190 191
  if (nsubtypes > 1) cdoWarning("Subtypes are unsupported, the processing results are possibly wrong!");

  if (CdoDefault::TaxisType != CDI_UNDEFID) taxisDefType(vlistInqTaxis(vlistID), CdoDefault::TaxisType);

  m_vlistID = vlistID;
  return m_vlistID;
}
192

193
void
Uwe Schulzweida's avatar
Uwe Schulzweida committed
194
FileStream::inqRecord(int *const varID, int *const levelID)
195
{
196
  if (FileStream::timersEnabled()) timer_start(timer_read);
197
  streamInqRecLocked(m_fileID, varID, levelID);
198 199
  if (FileStream::timersEnabled()) timer_stop(timer_read);
  m_varID = *varID;
200
}
201

202
void
Uwe Schulzweida's avatar
Uwe Schulzweida committed
203
FileStream::defRecord(const int varID, const int levelID)
204
{
205
  if (FileStream::timersEnabled()) timer_start(timer_write);
206
  streamDefRecLocked(m_fileID, varID, levelID);
207 208
  if (FileStream::timersEnabled()) timer_stop(timer_write);
  m_varID = varID;
209 210
}

211
void
Uwe Schulzweida's avatar
Uwe Schulzweida committed
212
FileStream::readRecord(float *const p_data, size_t *const nmiss)
213
{
214
  if (FileStream::timersEnabled()) timer_start(timer_read);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
215
  streamReadrecordFloatLocked(m_fileID, p_data, nmiss);
216
  if (FileStream::timersEnabled()) timer_stop(timer_read);
217
}
218

219
void
Uwe Schulzweida's avatar
Uwe Schulzweida committed
220
FileStream::readRecord(double *const p_data, size_t *const nmiss)
221
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
222 223 224
  if (FileStream::timersEnabled()) timer_start(timer_read);
  streamReadrecordDoubleLocked(m_fileID, p_data, nmiss);
  if (FileStream::timersEnabled()) timer_stop(timer_read);
225 226
}

227
void
Uwe Schulzweida's avatar
Uwe Schulzweida committed
228
FileStream::readRecord(Field *const p_field, size_t *const nmiss)
229
{
230
  readRecord(p_field->vec_d.data(), nmiss);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
231 232 233
}

void
Uwe Schulzweida's avatar
Uwe Schulzweida committed
234
FileStream::writeRecord(float *const p_data, const size_t p_nmiss)
Uwe Schulzweida's avatar
Uwe Schulzweida committed
235 236 237 238
{
  if (FileStream::timersEnabled()) timer_start(timer_write);
  streamWriteRecordFloatLocked(m_fileID, p_data, p_nmiss);
  if (FileStream::timersEnabled()) timer_stop(timer_write);
239 240
}

241
void
Uwe Schulzweida's avatar
Uwe Schulzweida committed
242
FileStream::writeRecord(double *const p_data, const size_t p_nmiss)
243
{
Uwe Schulzweida's avatar
Uwe Schulzweida committed
244
  auto varID = m_varID;
245
  if (FileStream::timersEnabled()) timer_start(timer_write);
246

247 248
  if (varID < (int) m_datarangelist.size())
    if (m_datarangelist[varID].check_datarange) m_datarangelist[varID].checkDatarange(p_data, p_nmiss);
249 250 251

  streamWriteRecordDoubleLocked(m_fileID, p_data, p_nmiss);

252
  if (FileStream::timersEnabled()) timer_stop(timer_write);
253
}
Uwe Schulzweida's avatar
Uwe Schulzweida committed
254

255
void
Uwe Schulzweida's avatar
Uwe Schulzweida committed
256
FileStream::writeRecord(Field *const p_field, const size_t p_nmiss)
257
{
258
  writeRecord(p_field->vec_d.data(), p_nmiss);
259
}
260

261
void
262
FileStream::copyRecord(CdoStreamID p_destination)
263
{
264
  FileStream *fStream = dynamic_cast<FileStream *>(p_destination.get());
265
  streamCopyRecordLocked(m_fileID, fStream->getFileID());
266
}
267 268 269 270 271 272 273 274 275
/*
 * FileStream::inqTimestep(int p_tsID)
 * stets internal state of the cdi datastructure to work on the given timestep (p_tsID) and returns the number of records that the
 * timestep contains.
 * Inquires and defines the time axis type if the timestep ID is 0 AND the taxis type is yet to be defined.
 * When the timestep inquiry was successfull m_tsID is set to the wanted p_tsID IF p_tsID != m_tsID
 * When only one process is running the timers are enabled.
 * -- last Documentation update(2019-06-14) --
 */
276
int
Uwe Schulzweida's avatar
Uwe Schulzweida committed
277
FileStream::inqTimestep(const int p_tsID)
278
{
279
  if (FileStream::timersEnabled()) timer_start(timer_read);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
280
  const auto nrecs = streamInqTimeStepLocked(m_fileID, p_tsID);
281
  if (FileStream::timersEnabled()) timer_stop(timer_read);
282 283 284

  if (p_tsID == 0 && CdoDefault::TaxisType != CDI_UNDEFID) taxisDefType(vlistInqTaxis(m_vlistID), CdoDefault::TaxisType);

Uwe Schulzweida's avatar
Uwe Schulzweida committed
285
  if (nrecs && p_tsID != m_tsID) m_tsID = p_tsID;
286
  Debug(FILE_STREAM, "Current TsID: %d,  nrecs: %d", m_tsID, nrecs);
Uwe Schulzweida's avatar
Uwe Schulzweida committed
287

288 289 290
  return nrecs;
}

291
void
Uwe Schulzweida's avatar
Uwe Schulzweida committed
292
FileStream::defTimestep(const int p_tsID)
293
{
294
  if (FileStream::timersEnabled()) timer_start(timer_write);
295 296 297 298 299
  // don't use sync -> very slow on GPFS
  //  if ( p_tsID > 0 ) streamSync(fileID);

  streamDefTimeStepLocked(m_fileID, p_tsID);

300
  if (FileStream::timersEnabled()) timer_stop(timer_write);
301 302 303 304 305
}

int
FileStream::inqFileType()
{
306
  return streamInqFiletype(m_fileID);
307
}
308

309 310 311 312 313 314 315 316 317
int
FileStream::inqByteorder()
{
  return streamInqByteorder(m_fileID);
}

size_t
FileStream::getNvals()
{
318 319
  // set when the stream is closed
  // see: FileStream::close()
320
  return m_nvals;
321 322 323 324 325 326 327 328
}

int
FileStream::getFileID()
{
  return m_fileID;
}

329 330 331
void
FileStream::close()
{
332
  Debug(FILE_STREAM, "%s fileID %d", m_name, m_fileID);
333

334
  m_nvals = streamNvals(m_fileID);
335
  streamCloseLocked(m_fileID);
336

337
  isopen = false;
338
  m_vlistID = -1;
339

340
  if (m_datarangelist.size())
341
    {
342 343
      m_datarangelist.clear();
      m_datarangelist.shrink_to_fit();
344
    }
345
}