Commit a3957536 authored by Oliver Heidmann's avatar Oliver Heidmann
Browse files

added prototypes for pstream replacements

parent 0520935f
/*
This file is part of CDO. CDO is a collection of Operators to
manipulate and analyse Climate model Data.
Copyright (C) 2003-2019 Uwe Schulzweida, <uwe.schulzweida AT mpimet.mpg.de>
See COPYING file for copying and redistribution conditions.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <string>
#include <cassert>
#include "cdoStream.h"
#include "cdo_options.h"
CdoStream::CdoStream(int p_id) : m_cdoStreamID(p_id)
{
isopen = false;
m_vlistID = -1;
m_tsID = -1;
m_filetype = -1;
m_name = "";
m_varID = -1;
}
void
CdoStream::defVarList(int p_vlistID)
{
int filetype = m_filetype;
if (m_vlistID != -1) cdoAbort("Internal problem, vlist already defined!");
if (m_varlist.size() != 0) cdoAbort("Internal problem, varlist already allocated!");
const int nvars = vlistNvars(p_vlistID);
assert(nvars > 0);
m_varlist.resize(nvars);
for (int varID = 0; varID < nvars; ++varID)
{
m_varlist[varID].gridsize = gridInqSize(vlistInqVarGrid(p_vlistID, varID));
m_varlist[varID].datatype = vlistInqVarDatatype(p_vlistID, varID);
m_varlist[varID].missval = vlistInqVarMissval(p_vlistID, varID);
m_varlist[varID].addoffset = vlistInqVarAddoffset(p_vlistID, varID);
m_varlist[varID].scalefactor = vlistInqVarScalefactor(p_vlistID, varID);
m_varlist[varID].check_datarange = false;
const bool laddoffset = IS_NOT_EQUAL(m_varlist[varID].addoffset, 0);
const bool lscalefactor = IS_NOT_EQUAL(m_varlist[varID].scalefactor, 1);
int datatype = m_varlist[varID].datatype;
if (filetype == CDI_FILETYPE_NC || filetype == CDI_FILETYPE_NC2 || filetype == CDI_FILETYPE_NC4
|| filetype == CDI_FILETYPE_NC4C || filetype == CDI_FILETYPE_NC5)
{
if (datatype == CDI_DATATYPE_UINT8
&& (filetype == CDI_FILETYPE_NC || filetype == CDI_FILETYPE_NC2 || filetype == CDI_FILETYPE_NC5))
{
datatype = CDI_DATATYPE_INT16;
m_varlist[varID].datatype = datatype;
}
if (datatype == CDI_DATATYPE_UINT16
&& (filetype == CDI_FILETYPE_NC || filetype == CDI_FILETYPE_NC2 || filetype == CDI_FILETYPE_NC5))
{
datatype = CDI_DATATYPE_INT32;
m_varlist[varID].datatype = datatype;
}
if (laddoffset || lscalefactor)
{
if (datatype == CDI_DATATYPE_INT8 || datatype == CDI_DATATYPE_UINT8 || datatype == CDI_DATATYPE_INT16
|| datatype == CDI_DATATYPE_UINT16)
m_varlist[varID].check_datarange = true;
}
else if (Options::CheckDatarange)
{
m_varlist[varID].check_datarange = true;
}
}
}
m_vlistID = p_vlistID; /* used for -r/-a */
}
void
pstreamDebug(int debug)
{
PSTREAM = debug;
}
/*
This file is part of CDO. CDO is a collection of Operators to
manipulate and analyse Climate model Data.
Copyright (C) 2003-2019 Uwe Schulzweida, <uwe.schulzweida AT mpimet.mpg.de>
See COPYING file for copying and redistribution conditions.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
*/
#ifndef PSTREAM_H
#define PSTREAM_H
#ifdef HAVE_CONFIG_H
#include "config.h" /* _FILE_OFFSET_BITS influence off_t */
#endif
#include "varlist.h"
#include <sys/types.h> /* off_t */
#include <vector>
static int processNum = 0;
class FileStream;
class CdoStream
{
public:
CdoStream(int id);
// Constructors
virtual int openRead();
virtual int openWrite(int p_filetype);
virtual int openAppend();
virtual int inqVlist();
virtual int defVlist(int p_vlistID);
virtual int inqRecord(int *varID, int *levelID);
virtual int defRecord(int varID, int levelID);
virtual int readRecord(double *data, size_t *nmiss);
virtual int readRecord(float *data, size_t *nmiss);
virtual int writeRecord(double *data, size_t nmiss);
virtual int writeRecord(float *data, size_t nmiss);
virtual int copyRecord(FileStream *dest);
virtual int inqTimestep(int tsID);
virtual int defTimestep(int tsID);
virtual int inqFileType();
virtual int inqByteorder();
virtual int close();
virtual size_t getNvals();
int m_cdoStreamID; // aka the id of the pstream
int m_filetype;
bool isopen;
std::string m_name;
std::vector<varlist_t> m_varlist;
int m_vlistID;
int m_tsID;
int m_varID; /* next varID defined with streamDefVar */
protected:
void defVarList(int p_vlistID);
private:
CdoStream();
};
CdoStream *pstreamToPointer(int pstreamID);
void closeAllStreams();
void setProcessNum(int p_num);
#endif /* PSTREAM_H */
/*
This file is part of CDO. CDO is a collection of Operators to
manipulate and analyse Climate model Data.
Copyright (C) 2003-2019 Uwe Schulzweida, <uwe.schulzweida AT mpimet.mpg.de>
See COPYING file for copying and redistribution conditions.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
*/
#include <sys/stat.h> /* stat */
#include <cdi.h>
#include "fileStream.h"
#include "cdi_lockedIO.h"
#include "cdo_options.h"
#include "cdo_output.h"
#include "cdo_defaultValues.h"
#include "cdo_history.h"
#include "timer.h"
#include "commandline.h"
FileStream::FileStream(int p_cdoStreamID,std::string p_fileName) :CdoStream(p_cdoStreamID), m_filename(p_fileName) {}
int
FileStream::openRead()
{
openLock();
int fileID = streamOpenRead(m_filename.c_str());
if (fileID < 0) cdiOpenError(fileID, "Open failed on >%s<", m_filename.c_str());
isopen = true;
if (CdoDefault::FileType == CDI_UNDEFID) CdoDefault::FileType = streamInqFiletype(fileID);
cdoInqHistory(fileID);
m_fileID = fileID;
openUnlock();
return fileID;
}
int
FileStream::openWrite(int p_filetype)
{
if (Options::cdoInteractive)
{
struct stat stbuf;
const int rstatus = stat(m_name.c_str(), &stbuf);
/* If permanent file already exists, query user whether to overwrite or exit */
if (rstatus != -1) query_user_exit(m_name.c_str());
}
if (m_filetype == CDI_UNDEFID) m_filetype = CDI_FILETYPE_GRB;
//TODO FIX THIS: if (processNum == 1 && Threading::ompNumThreads == 1) timer_start(timer_write);
openLock();
int fileID = streamOpenWrite(m_filename.c_str(), m_filetype);
openUnlock();
//TODO FIX THIS: if (processNum == 1 && Threading::ompNumThreads == 1) timer_stop(timer_write);
if (fileID < 0) cdiOpenError(fileID, "Open failed on >%s<", m_name.c_str());
isopen = true;
cdoDefHistory(fileID, commandLine());
if (CdoDefault::Byteorder != CDI_UNDEFID) streamDefByteorder(fileID, CdoDefault::Byteorder);
set_comp(fileID, p_filetype);
m_fileID = fileID;
m_filetype = p_filetype;
return m_cdoStreamID;
}
int
FileStream::openAppend()
{
if (processNum == 1 && Threading::ompNumThreads == 1) timer_start(timer_write);
openLock();
int fileID = streamOpenAppend(m_filename.c_str());
openUnlock();
if (processNum == 1 && Threading::ompNumThreads == 1) timer_stop(timer_write);
if (fileID < 0) cdiOpenError(fileID, "Open failed on >%s<", m_filename.c_str());
isopen = true;
int filetype = streamInqFiletype(fileID);
set_comp(fileID, filetype);
m_fileID = fileID;
return m_fileID;
}
int
FileStream::defVlist(int p_vlistID)
{
if (CdoDefault::DataType != CDI_UNDEFID)
{
int varID, nvars = vlistNvars(p_vlistID);
for (varID = 0; varID < nvars; ++varID) vlistDefVarDatatype(p_vlistID, varID, CdoDefault::DataType);
if (CdoDefault::DataType == CDI_DATATYPE_FLT64 || CdoDefault::DataType == CDI_DATATYPE_FLT32)
{
for (varID = 0; varID < nvars; varID++)
{
vlistDefVarAddoffset(p_vlistID, varID, 0.0);
vlistDefVarScalefactor(p_vlistID, varID, 1.0);
}
}
}
if (Options::cdoChunkType != CDI_UNDEFID)
{
const int nvars = vlistNvars(p_vlistID);
for (int varID = 0; varID < nvars; ++varID) vlistDefVarChunkType(p_vlistID, varID, Options::cdoChunkType);
}
if (Options::CMOR_Mode)
{
cdo_def_tracking_id(p_vlistID, "tracking_id");
cdo_def_creation_date(p_vlistID);
}
if (Options::VersionInfo) cdiDefAttTxt(p_vlistID, CDI_GLOBAL, "CDO", (int) strlen(cdoComment()), cdoComment());
#ifdef _OPENMP
if (Threading::ompNumThreads > 1)
cdiDefAttInt(p_vlistID, CDI_GLOBAL, "cdo_openmp_thread_number", CDI_DATATYPE_INT32, 1, &Threading::ompNumThreads);
#endif
defVarList(p_vlistID);
if (processNum == 1 && Threading::ompNumThreads == 1) timer_start(timer_write);
streamDefVlistLocked(m_fileID, p_vlistID);
if (processNum == 1 && Threading::ompNumThreads == 1) timer_stop(timer_write);
}
int
FileStream::inqVlist()
{
int vlistID;
if (processNum == 1 && Threading::ompNumThreads == 1) timer_start(timer_read);
vlistID = streamInqVlistLocked(m_fileID);
if (processNum == 1 && Threading::ompNumThreads == 1) timer_stop(timer_read);
if (vlistID == -1)
{
cdoAbort("Couldn't read data from input fileID %d!", m_fileID);
}
const int nsubtypes = vlistNsubtypes(vlistID);
if (nsubtypes > 1) cdoWarning("Subtypes are unsupported, the processing results are possibly wrong!");
if (CdoDefault::TaxisType != CDI_UNDEFID) taxisDefType(vlistInqTaxis(vlistID), CdoDefault::TaxisType);
m_vlistID = vlistID;
return m_vlistID;
}
int
FileStream::inqRecord(int *varID, int *levelID)
{
if (processNum == 1 && Threading::ompNumThreads == 1) timer_start(timer_read);
streamInqRecLocked(m_fileID, varID, levelID);
if (processNum == 1 && Threading::ompNumThreads == 1) timer_stop(timer_read);
return 1;
}
int
FileStream::defRecord(int varID, int levelID)
{
if (processNum == 1 && Threading::ompNumThreads == 1) timer_start(timer_write);
streamDefRecLocked(m_fileID, varID, levelID);
if (processNum == 1 && Threading::ompNumThreads == 1) timer_stop(timer_write);
return 1;
}
int
FileStream::readRecord(double *data, size_t *nmiss)
{
if (processNum == 1 && Threading::ompNumThreads == 1) timer_start(timer_read);
streamReadrecordDoubleLocked(m_fileID, data, nmiss);
if (processNum == 1 && Threading::ompNumThreads == 1) timer_stop(timer_read);
}
int
FileStream::readRecord(float *data, size_t *nmiss)
{
if (processNum == 1 && Threading::ompNumThreads == 1) timer_start(timer_read);
streamReadrecordFloatLocked(m_fileID, data, nmiss);
if (processNum == 1 && Threading::ompNumThreads == 1) timer_stop(timer_read);
}
int
FileStream::writeRecord(double *p_data, size_t p_nmiss)
{
int varID = m_varID;
if (processNum == 1 && Threading::ompNumThreads == 1) timer_start(timer_write);
if (varID < (int) m_varlist.size())
if (m_varlist[varID].check_datarange) m_varlist[varID].checkDatarange(p_data, p_nmiss);
streamWriteRecordDoubleLocked(m_fileID, p_data, p_nmiss);
if (processNum == 1 && Threading::ompNumThreads == 1) timer_stop(timer_write);
}
int
FileStream::writeRecord(float *p_data, size_t p_nmiss)
{
if (processNum == 1 && Threading::ompNumThreads == 1) timer_start(timer_write);
streamWriteRecordFloatLocked(m_fileID, p_data, p_nmiss);
if (processNum == 1 && Threading::ompNumThreads == 1) timer_stop(timer_write);
}
int
FileStream::copyRecord(FileStream *p_destination)
{
return streamCopyRecordLocked(m_fileID, p_destination->getFileID());
}
int
FileStream::inqTimestep(int p_tsID)
{
if (processNum == 1 && Threading::ompNumThreads == 1) timer_start(timer_read);
int nrecs = streamInqTimeStepLocked(m_fileID, p_tsID);
if (processNum == 1 && Threading::ompNumThreads == 1) timer_stop(timer_read);
if (p_tsID == 0 && CdoDefault::TaxisType != CDI_UNDEFID) taxisDefType(vlistInqTaxis(m_vlistID), CdoDefault::TaxisType);
if (nrecs && p_tsID != m_tsID)
{
m_tsID = p_tsID;
}
return nrecs;
}
int
FileStream::defTimestep(int p_tsID)
{
if (processNum == 1 && Threading::ompNumThreads == 1) timer_start(timer_write);
// don't use sync -> very slow on GPFS
// if ( p_tsID > 0 ) streamSync(fileID);
streamDefTimeStepLocked(m_fileID, p_tsID);
if (processNum == 1 && Threading::ompNumThreads == 1) timer_stop(timer_write);
return 1;
}
int
FileStream::inqFileType()
{
return streamInqByteorder(m_fileID);
}
int
FileStream::inqByteorder()
{
return streamInqByteorder(m_fileID);
}
size_t
FileStream::getNvals()
{
return streamNvals(m_fileID);
}
int
FileStream::getFileID()
{
return m_fileID;
}
/*
This file is part of CDO. CDO is a collection of Operators to
manipulate and analyse Climate model Data.
Copyright (C) 2003-2019 Uwe Schulzweida, <uwe.schulzweida AT mpimet.mpg.de>
See COPYING file for copying and redistribution conditions.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
*/
#ifndef FILESTREAM_H
#define FILESTREAM_H
#include <string>
#include "cdoStream.h"
class FileStream : public CdoStream
{
public:
// Constructors
FileStream(int p_cdoStreamID, std::string p_fileStream);
// ---
// CdoStream Interface functions
int openRead() override;
int openWrite(int p_filetype) override;
int openAppend() override;
int inqVlist() override;
int defVlist(int p_vlistID) override;
int inqRecord(int *varID, int *levelID) override;
int defRecord(int varID, int levelID) override;
int readRecord(double *data, size_t *nmiss) override;
int readRecord(float *data, size_t *nmiss) override;
int writeRecord(double *data, size_t nmiss) override;
int writeRecord(float *data, size_t nmiss) override;
int copyRecord(FileStream *p_fileStream) override;
int inqTimestep(int tsID) override;
int defTimestep(int tsID) override;
int inqFileType() override;
int inqByteorder() override;
int close() override;
size_t getNvals() override;
// ---
// FileStreamOnly
int getFileID();
// ---
private:
FileStream() = delete;
void checkDatarange(int varID, double *array, size_t nmiss);
int m_fileID;
std::string m_filename;
};
#endif
/*
This file is part of CDO. CDO is a collection of Operators to
manipulate and analyse Climate model Data.
Copyright (C) 2003-2019 Uwe Schulzweida, <uwe.schulzweida AT mpimet.mpg.de>
See COPYING file for copying and redistribution conditions.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
*/
#include <pthread.h>
#include <cdi.h>
#include "pthread_debug.h"
#include "pipeStream.h"
#include "cdo_options.h"
#include "cdo_output.h"
PipeStream::PipeStream(int p_cdoStreamID, int p_processID) : CdoStream(p_cdoStreamID)
{
m_pipe->pipeSetName(p_processID, p_cdoStreamID);
}
int
PipeStream::openRead()
{
rthreadID = pthread_self();
isopen = true;
return m_cdoStreamID;
}
int
PipeStream::openWrite(int filetype)
{
Debug(PSTREAM, "pipe %s", m_pipe->name);
wthreadID = pthread_self();
m_filetype = filetype;
isopen = true;
return m_cdoStreamID;
}
int
PipeStream::openAppend()
{
cdoWarning("Operator does not suport pipes!");
return -1;
}
int
PipeStream::inqVlist()
{
// m_vlist is changed when the whlist was successfully defined by another pipe!!
int vlistID = m_pipe->pipeInqVlist(m_vlistID);
//
if (vlistID == -1) cdoAbort("Couldn't read data from input stream %s!", m_name.c_str());
return vlistID;
}
int
PipeStream::defVlist(int p_vlistID)
{
Debug(PSTREAM, "%s pstreamID %d", m_pipe->name, m_cdoStreamID);
int vlistIDcp = vlistDuplicate(p_vlistID);
m_pipe->pipeDefVlist(m_vlistID, vlistIDcp);
}
int
PipeStream::inqRecord(int *varID, int *levelID)
{
m_pipe->pipeInqRecord(varID, levelID);
}
int
PipeStream::defRecord(int varID, int levelID)
{
m_pipe->pipeDefRecord(varID, levelID);
}
int
PipeStream::readRecord(float *data, size_t *nmiss)
{
cdoAbort("pipeReadRecord not implemented for memtype float!");
return -1;
}