Commit 28b7821e authored by Oliver Heidmann's avatar Oliver Heidmann
Browse files

split the streams array in process_t into out- and instrams and changed the...

split the streams array in process_t into out- and instrams and changed the functions who used the single array accordingly
parent 226274b2
......@@ -65,7 +65,8 @@ int processCreate(void)
Process[processID].threadID = pthread_self();
Process[processID].l_threadID = 1;
#endif
Process[processID].nstream = 0;
Process[processID].nInStream = 0;
Process[processID].nOutStream = 0;
Process[processID].nchild = 0;
cdoProcessTime(&Process[processID].s_utime, &Process[processID].s_stime);
......@@ -157,18 +158,29 @@ off_t processInqNvals(int processID)
return Process[processID].nvals;
}
void processAddOutputStream(int streamID)
{
int processID = processSelf();
int sindex = Process[processID].nOutStream++;
if ( sindex >= MAX_STREAM )
Error("Limit of %d output streams per process reached (processID = %d)!", MAX_STREAM, processID);
Process[processID].outputStreams[sindex] = streamID;
}
void processAddStream(int streamID)
void processAddInputStream(int streamID)
{
int processID = processSelf();
if ( pstreamIsPipe(streamID) ) Process[processID].nchild++;
int sindex = Process[processID].nstream++;
int sindex = Process[processID].nInStream++;
if ( sindex >= MAX_STREAM )
Error("Limit of %d streams per process reached (processID = %d)!", MAX_STREAM, processID);
Error("Limit of %d input streams per process reached (processID = %d)!", MAX_STREAM, processID);
Process[processID].streams[sindex] = streamID;
Process[processID].inputStreams[sindex] = streamID;
}
......@@ -213,11 +225,18 @@ void processAccuTime(double utime, double stime)
}
int processInqStreamNum(void)
int processInqOutputStreamNum(void)
{
int processID = processSelf();
return Process[processID].nstream;
return Process[processID].nOutStream;
}
int processInqInputStreamNum(void)
{
int processID = processSelf();
return Process[processID].nInStream;
}
......@@ -229,13 +248,18 @@ int processInqChildNum(void)
}
int processInqStreamID(int streamindex)
int processInqOutputStreamID(int streamindex)
{
int processID = processSelf();
return (Process[processID].streams[streamindex]);
return (Process[processID].outputStreams[streamindex]);
}
int processInqInputStreamID(int streamindex)
{
int processID = processSelf();
return (Process[processID].inputStreams[streamindex]);
}
const char *processInqOpername2(int processID)
{
......
......@@ -42,8 +42,10 @@ typedef struct {
int l_threadID;
#endif
short nchild;
short nstream;
short streams[MAX_STREAM];
short nInStream;
short nOutStream;
short inputStreams[MAX_STREAM];
short outputStreams[MAX_STREAM];
double s_utime;
double s_stime;
double a_utime;
......@@ -77,9 +79,12 @@ void processDelete(void);
int processInqTimesteps(void);
void processDefTimesteps(int streamID);
int processInqVarNum(void);
int processInqStreamNum(void);
int processInqStreamID(int streamindex);
void processAddStream(int streamID);
int processInqInputStreamNum(void);
int processInqOutputStreamNum(void);
int processInqInputStreamID(int streamindex);
int processInqOutputStreamID(int streamindex);
void processAddInputStream(int streamID);
void processAddOutputStream(int streamID);
void processDelStream(int streamID);
void processDefVarNum(int nvars, int streamID);
void processDefArgument(void *vargument);
......
......@@ -372,7 +372,7 @@ void pstreamOpenReadPipe(const argument_t *argument, pstream_t *pstreamptr)
}
pCreateReadThread(newarg, newargument, operatorName);
/* Free(operatorName); */
processAddStream(pstreamID);
processAddInputStream(pstreamID);
/* pipeInqInfo(pstreamID); */
if ( PSTREAM_Debug ) Message("pipe %s", pipename);
#else
......@@ -659,7 +659,7 @@ int pstreamOpenWritePipe(const argument_t *argument, int filetype)
pstreamptr->wthreadID = pthread_self();
pstreamptr->filetype = filetype;
processAddStream(pstreamID);
processAddOutputStream(pstreamID);
#endif
return pstreamID;
......@@ -1596,10 +1596,22 @@ void pstreamCloseAll(void)
static
void processClosePipes(void)
{
int nstream = processInqStreamNum();
int nstream = processInqInputStreamNum();
for ( int sindex = 0; sindex < nstream; sindex++ )
{
int pstreamID = processInqStreamID(sindex);
int pstreamID = processInqInputStreamID(sindex);
pstream_t *pstreamptr = pstream_to_pointer(pstreamID);
if ( PSTREAM_Debug )
Message("process %d stream %d close streamID %d", processSelf(), sindex, pstreamID);
if ( pstreamptr ) pstreamClose(pstreamID);
}
nstream = processInqOutputStreamNum();
for ( int sindex = 0; sindex < nstream; sindex++ )
{
int pstreamID = processInqOutputStreamID(sindex);
pstream_t *pstreamptr = pstream_to_pointer(pstreamID);
if ( PSTREAM_Debug )
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment