Commit 1fa50ac6 authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

Merge branch 'develop' of git.mpimet.mpg.de:cdo into develop

parents 03a0fec7 a95a47ac
......@@ -1448,7 +1448,7 @@ void *Echam5ini(void *argument)
int operatorID = cdoOperatorID();
int operfunc = cdoOperatorF1(operatorID);
if ( operatorID == EXPORT_E5ML && processSelf() != 0 )
if ( operatorID == EXPORT_E5ML && processSelf().m_ID != 0 )
cdoAbort("This operator can't be linked with other operators!");
if ( operfunc == func_read )
......
......@@ -41,7 +41,7 @@ void *Eofcoeff(void * argument)
cdoInitialize(argument);
if ( processSelf() != 0 ) cdoAbort("This operator can't be combined with other operators!");
if ( processSelf().m_ID != 0 ) cdoAbort("This operator can't be combined with other operators!");
cdoOperatorAdd("eofcoeff", 0, 0, NULL);
......
......@@ -40,7 +40,7 @@ void *Eofcoeff3d(void * argument)
cdoInitialize(argument);
if ( processSelf() != 0 ) cdoAbort("This operator can't be combined with other operators!");
if ( processSelf().m_ID != 0 ) cdoAbort("This operator can't be combined with other operators!");
cdoOperatorAdd("eofcoeff3d", 0, 0, NULL);
......
This diff is collapsed.
......@@ -35,7 +35,7 @@ void *Selrec(void *argument)
cdoInitialize(argument);
if ( processSelf() != 0 ) cdoAbort("This operator can't be combined with other operators!");
if ( processSelf().m_ID != 0 ) cdoAbort("This operator can't be combined with other operators!");
operatorInputArg("records");
......
......@@ -61,7 +61,7 @@ void *Split(void *argument)
cdoInitialize(argument);
if ( processSelf() != 0 ) cdoAbort("This operator can't be combined with other operators!");
if ( processSelf().m_ID != 0 ) cdoAbort("This operator can't be combined with other operators!");
bool lcopy = UNCHANGED_RECORD;
......
......@@ -40,7 +40,7 @@ void *Splitrec(void *argument)
cdoInitialize(argument);
if ( processSelf() != 0 ) cdoAbort("This operator can't be combined with other operators!");
if ( processSelf().m_ID != 0 ) cdoAbort("This operator can't be combined with other operators!");
bool lcopy = UNCHANGED_RECORD;
......
......@@ -48,7 +48,7 @@ void *Splitsel(void *argument)
cdoInitialize(argument);
if ( processSelf() != 0 ) cdoAbort("This operator can't be combined with other operators!");
if ( processSelf().m_ID != 0 ) cdoAbort("This operator can't be combined with other operators!");
bool lcopy = UNCHANGED_RECORD;
......
......@@ -73,7 +73,7 @@ void *Splittime(void *argument)
cdoInitialize(argument);
if ( processSelf() != 0 ) cdoAbort("This operator can't be combined with other operators!");
if ( processSelf().m_ID != 0 ) cdoAbort("This operator can't be combined with other operators!");
bool lcopy = UNCHANGED_RECORD;
......
......@@ -54,7 +54,7 @@ void *Splityear(void *argument)
cdoInitialize(argument);
if ( processSelf() != 0 ) cdoAbort("This operator can't be combined with other operators!");
if ( processSelf().m_ID != 0 ) cdoAbort("This operator can't be combined with other operators!");
bool lcopy = UNCHANGED_RECORD;
......
This diff is collapsed.
......@@ -23,6 +23,8 @@
#include "util.h"
#include "pstream.h"
#include <vector>
constexpr int MAX_PROCESS = 128;
constexpr int MAX_STREAM = 64;
constexpr int MAX_OPERATOR = 128;
......@@ -38,16 +40,16 @@ typedef struct {
}
oper_t;
typedef struct {
class process_t {
public:
int m_ID;
#if defined(HAVE_LIBPTHREAD)
pthread_t threadID;
int l_threadID;
#endif
short nchild;
short nInStream;
short nOutStream;
short inputStreams[MAX_STREAM];
short outputStreams[MAX_STREAM];
std::vector<pstream_t*> inputStreams;
std::vector<pstream_t*> outputStreams;
double s_utime;
double s_stime;
double a_utime;
......@@ -67,21 +69,27 @@ typedef struct {
char prompt[64];
short noper;
oper_t oper[MAX_OPERATOR];
}
process_t;
int getInStreamCnt();
int getOutStreamCnt();
void initProcess();
void print_process();
process_t(int ID);
private:
process_t();
};
int processSelf(void);
pstream_t* processInqInputStream(int streamindex);
pstream_t* processInqOutputStream(int streamindex);
process_t& processSelf(void);
int processCreate(void);
void processDelete(void);
int processInqTimesteps(void);
void processDefTimesteps(int streamID);
int processInqVarNum(void);
int processInqInputStreamNum(void);
int processInqOutputStreamNum(void);
int processInqInputStreamID(int streamindex);
int processInqOutputStreamID(int streamindex);
int processInqInputStreamNum(void);
int processInqOutputStreamNum(void);
void processAddInputStream(pstream_t *p_pstream_ptr);
void processAddOutputStream(pstream_t *p_pstream_ptr);
void processDelStream(int streamID);
......@@ -106,7 +114,5 @@ const char *processInqOpername(void);
const char *processInqOpername2(int processID);
const char *processInqPrompt(void);
void print_process(int p_process_id);
const argument_t *cdoStreamName(int cnt);
#endif /* _PROCESS_H */
......@@ -332,7 +332,7 @@ pstream_t::isPipe()
void pstream_t::createPipeName(char *pipename, int pnlen)
{
snprintf(pipename, pnlen, "(pipe%d.%d)", processSelf() + 1, processInqChildNum() + 1);
snprintf(pipename, pnlen, "(pipe%d.%d)", processSelf().m_ID + 1, processInqChildNum() + 1);
}
pthread_t
......@@ -625,18 +625,10 @@ pstreamOpenRead(const argument_t *argument)
*/
if (ispipe)
{
if(PSTREAM_Debug)
{
//TODO << "opening pipe for reading" << std::this_thread::get_id() << std::endl;
}
pstreamptr->pstreamOpenReadPipe(argument);
}
else
{
if(PSTREAM_Debug)
{
//TODO << "opening file for reading" << std::this_thread::get_id() << std::endl;
}
pstreamptr->pstreamOpenReadFile(argument);
}
......@@ -935,7 +927,6 @@ pstream_t::openAppend(const char *p_filename)
void
pstreamCloseChildStream(pstream_t *pstreamptr)
{
//TODO << "pCCS"<< std::endl;
pipe_t *pipe = pstreamptr->pipe;
pthread_mutex_lock(pipe->m_mutex);
pipe->EOP = true;
......@@ -967,13 +958,11 @@ pstreamCloseChildStream(pstream_t *pstreamptr)
pthread_mutex_unlock(pipe->m_mutex);
processAddNvals(pipe->nvals);
//TODO << "HERE WE ARE !!!|" << std::endl;
}
void
pstreamCloseParentStream(pstream_t *pstreamptr)
{
//TODO << "pCPS"<< std::endl;
pipe_t *pipe = pstreamptr->pipe;
pthread_mutex_lock(pipe->m_mutex);
pipe->EOP = true;
......@@ -1001,19 +990,28 @@ pstreamClose(int pstreamID)
if (pstreamptr == NULL)
Error("Internal problem, stream %d not open!", pstreamID);
if (pstreamptr->ispipe)
pstreamptr->close();
if(!pstreamptr->ispipe)
{
pstream_delete_entry(pstreamptr);
}
}
void pstream_t::close(){
if (ispipe)
{
#if defined(HAVE_LIBPTHREAD)
pthread_t threadID = pthread_self();
if (pthread_equal(threadID, pstreamptr->rthreadID))
pstreamCloseChildStream(pstreamptr);
else if (pthread_equal(threadID, pstreamptr->wthreadID))
pstreamCloseParentStream(pstreamptr);
if (pthread_equal(threadID, rthreadID))
pstreamCloseChildStream(this);
else if (pthread_equal(threadID, wthreadID))
pstreamCloseParentStream(this);
else
Error("Internal problem! Close pipe %s", pstreamptr->m_name.c_str());
Error("Internal problem! Close pipe %s", m_name.c_str());
processDelStream(pstreamID);
// processDelStream(pstreamID);
#else
cdoAbort("Cannot use pipes, pthread support not compiled in!");
#endif
......@@ -1021,18 +1019,18 @@ pstreamClose(int pstreamID)
else
{
if (PSTREAM_Debug)
Message("%s fileID %d", pstreamptr->m_name.c_str(), pstreamptr->m_fileID);
Message("%s fileID %d", m_name.c_str(), m_fileID);
if (pstreamptr->mode == 'r')
if (mode == 'r')
{
processAddNvals(streamNvals(pstreamptr->m_fileID));
processAddNvals(streamNvals(m_fileID));
}
#if defined(HAVE_LIBPTHREAD)
if (cdoLockIO)
pthread_mutex_lock(&streamMutex);
#endif
streamClose(pstreamptr->m_fileID);
streamClose(m_fileID);
#if defined(HAVE_LIBPTHREAD)
if (cdoLockIO)
pthread_mutex_unlock(&streamMutex);
......@@ -1040,22 +1038,21 @@ pstreamClose(int pstreamID)
if (cdoExpMode == CDO_EXP_REMOTE)
{
if (pstreamptr->mode == 'w')
if (mode == 'w')
{
extern const char *cdojobfiles;
FILE *fp = fopen(cdojobfiles, "a");
fprintf(fp, "%s\n", pstreamptr->m_name.c_str());
fprintf(fp, "%s\n", m_name.c_str());
fclose(fp);
}
}
if (pstreamptr->m_varlist)
if (m_varlist)
{
Free(pstreamptr->m_varlist);
pstreamptr->m_varlist = NULL;
Free(m_varlist);
m_varlist = NULL;
}
pstream_delete_entry(pstreamptr);
}
}
......@@ -1090,6 +1087,10 @@ pstream_t::inqVlist()
pthread_mutex_lock(&streamMutex);
#endif
vlistID = streamInqVlist(m_fileID);
if (vlistID == -1){
cdoAbort("Couldn't read data from input fileID %d!", m_fileID);
}
#if defined(HAVE_LIBPTHREAD)
if (cdoLockIO)
pthread_mutex_unlock(&streamMutex);
......@@ -1761,11 +1762,10 @@ cdoInitialize(void *argument)
#if defined(HAVE_LIBPTHREAD)
if (PSTREAM_Debug)
Message("process %d thread %ld", processSelf(), pthread_self());
Message("process %d thread %ld", processSelf().m_ID, pthread_self());
#endif
processDefArgument(argument);
//TODO << "we got through this" << std::endl;
}
void pstreamCloseAll()
{
......@@ -1803,34 +1803,32 @@ processClosePipes(void)
int nstream = processInqInputStreamNum();
for (int sindex = 0; sindex < nstream; sindex++)
{
int pstreamID = processInqInputStreamID(sindex);
pstream_t *pstreamptr = pstream_to_pointer(pstreamID);
pstream_t *pstreamptr = processInqInputStream(sindex);
if (PSTREAM_Debug)
Message("process %d stream %d close streamID %d", processSelf(), sindex, pstreamID);
Message("process %d stream %d close streamID %d", processSelf(), sindex, pstreamptr->self);
if (pstreamptr)
pstreamClose(pstreamID);
pstreamptr->close();
}
nstream = processInqOutputStreamNum();
for (int sindex = 0; sindex < nstream; sindex++)
{
int pstreamID = processInqOutputStreamID(sindex);
pstream_t *pstreamptr = pstream_to_pointer(pstreamID);
pstream_t *pstreamptr = processInqOutputStream(sindex);
if (PSTREAM_Debug)
Message("process %d stream %d close streamID %d", processSelf(), sindex, pstreamID);
Message("process %d stream %d close streamID %d", processSelf(), sindex, pstreamptr->self);
if (pstreamptr)
pstreamClose(pstreamID);
pstreamptr->close();
}
}
void
cdoFinish(void)
{
int processID = processSelf();
int processID = processSelf().m_ID;
int nvars, ntimesteps;
char memstring[32] = { "" };
double s_utime, s_stime;
......
......@@ -38,6 +38,7 @@ public:
void openAppend(const char * p_filename);
void init();
void defVlist(int p_vlistID);
void close();
int self; //aka the id of the pstream
int mode;
int m_fileID;
......
......@@ -41,7 +41,7 @@
#define ADD_PLURAL(n) ((n)!=1 ? "s" : "")
#define UNCHANGED_RECORD (processSelf() == 0 && cdoStreamName(0)->argv[0][0] != '-' && cdoRegulargrid == FALSE && cdoDefaultFileType == -1 && cdoDefaultDataType == -1 && cdoDefaultByteorder == -1 )
#define UNCHANGED_RECORD (processSelf().m_ID == 0 && cdoStreamName(0)->argv[0][0] != '-' && cdoRegulargrid == FALSE && cdoDefaultFileType == -1 && cdoDefaultDataType == -1 && cdoDefaultByteorder == -1 )
#include <string>
extern char *Progname;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment