Commit 707c7f54 authored by Oliver Heidmann's avatar Oliver Heidmann
Browse files

moved getOperatorArg to util, added print functions for arguments and processes

parent b095a2bc
...@@ -40,6 +40,7 @@ ...@@ -40,6 +40,7 @@
#include "util.h" #include "util.h"
#include "pstream_int.h" #include "pstream_int.h"
#include "dmemory.h" #include "dmemory.h"
#include "pthread.h"
#if defined(HAVE_LIBPTHREAD) #if defined(HAVE_LIBPTHREAD)
pthread_mutex_t processMutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t processMutex = PTHREAD_MUTEX_INITIALIZER;
...@@ -385,29 +386,6 @@ const char *processOperator(void) ...@@ -385,29 +386,6 @@ const char *processOperator(void)
return Process[processID].xoperator; return Process[processID].xoperator;
} }
static
char *getOperatorArg(const char *xoperator)
{
char *operatorArg = NULL;
if ( xoperator )
{
char *commapos = (char *)strchr(xoperator, ',');
if ( commapos )
{
size_t len = strlen(commapos+1);
if ( len )
{
operatorArg = (char*) Malloc(len+1);
strcpy(operatorArg, commapos+1);
}
}
}
return operatorArg;
}
static int skipInputStreams(int argc, char *argv[], int globArgc, int nstreams); static int skipInputStreams(int argc, char *argv[], int globArgc, int nstreams);
static static
...@@ -1000,3 +978,36 @@ int cdoStreamNumber() ...@@ -1000,3 +978,36 @@ int cdoStreamNumber()
return operatorStreamNumber(Process[processID].operatorName); return operatorStreamNumber(Process[processID].operatorName);
} }
void print_process(int p_process_id)
{
#if defined(HAVE_LIBPTHREAD)
std::cout << " threadID : " << Process[p_process_id].threadID << std::endl;
std::cout << " l_threadID : " << Process[p_process_id].l_threadID << std::endl;
#endif
std::cout << " nchild : " << Process[p_process_id].nchild << std::endl;
std::cout << " nInStream : " << Process[p_process_id].nInStream << std::endl;
std::cout << " nOutStream : " << Process[p_process_id].nOutStream << std::endl;
for(int i = 0; i < Process[p_process_id].nInStream; i++){
std::cout << " " << Process[p_process_id].inputStreams[i] << std::endl;
}
for(int i = 0; i < Process[p_process_id].nOutStream; i++){
std::cout << " " << Process[p_process_id].outputStreams[i] << std::endl;
}
std::cout << " s_utime : " << Process[p_process_id].s_utime << std::endl;
std::cout << " s_stime : " << Process[p_process_id].s_stime << std::endl;
std::cout << " a_utime : " << Process[p_process_id].a_utime << std::endl;
std::cout << " a_stime : " << Process[p_process_id].a_stime << std::endl;
std::cout << " cputime : " << Process[p_process_id].cputime << std::endl;
std::cout << " nvals : " << Process[p_process_id].nvals << std::endl;
std::cout << " nvars : " << Process[p_process_id].nvars << std::endl;
std::cout << " ntimesteps : " << Process[p_process_id].ntimesteps << std::endl;
std::cout << " streamCnt : " << Process[p_process_id].streamCnt << std::endl;
std::cout << " streamNames : " << Process[p_process_id].streamNames << std::endl;
std::cout << " xoperator : " << Process[p_process_id].xoperator << std::endl;
std::cout << " operatorName : " << Process[p_process_id].operatorName << std::endl;
std::cout << " operatorArg : " << Process[p_process_id].operatorArg << std::endl;
std::cout << " oargc : " << Process[p_process_id].oargc << std::endl;
std::cout << " noper : " << Process[p_process_id].noper << std::endl;
}
...@@ -104,4 +104,5 @@ const char *processInqOpername(void); ...@@ -104,4 +104,5 @@ const char *processInqOpername(void);
const char *processInqOpername2(int processID); const char *processInqOpername2(int processID);
const char *processInqPrompt(void); const char *processInqPrompt(void);
void print_process(int p_process_id);
#endif /* _PROCESS_H */ #endif /* _PROCESS_H */
...@@ -290,12 +290,12 @@ static void createPipeName(char *pipename, int pnlen){ ...@@ -290,12 +290,12 @@ static void createPipeName(char *pipename, int pnlen){
snprintf(pipename, pnlen, "(pipe%d.%d)", processSelf() + 1, processInqChildNum() + 1); snprintf(pipename, pnlen, "(pipe%d.%d)", processSelf() + 1, processInqChildNum() + 1);
} }
static pthread_t pCreateReadThread(char *newarg, argument_t *argument, const char *operatorName){ pthread_t pCreateReadThread(argument_t *argument){
pthread_attr_t attr; pthread_attr_t attr;
int status = pthread_attr_init(&attr); int status = pthread_attr_init(&attr);
if ( status ) SysError("pthread_attr_init failed for '%s'", newarg+1); if ( status ) SysError("pthread_attr_init failed for '%s'", argument->operatorName);
status = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); status = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if ( status ) SysError("pthread_attr_setdetachstate failed for '%s'", newarg+1); if ( status ) SysError("pthread_attr_setdetachstate failed for '%s'", argument->operatorName);
/* /*
param.sched_priority = 0; param.sched_priority = 0;
status = pthread_attr_setschedparam(&attr, &param); status = pthread_attr_setschedparam(&attr, &param);
...@@ -320,11 +320,11 @@ static pthread_t pCreateReadThread(char *newarg, argument_t *argument, const cha ...@@ -320,11 +320,11 @@ static pthread_t pCreateReadThread(char *newarg, argument_t *argument, const cha
} }
pthread_t thrID; pthread_t thrID;
int rval = pthread_create(&thrID, &attr, operatorModule(operatorName), argument); int rval = pthread_create(&thrID, &attr, operatorModule(argument->operatorName.c_str()), argument);
if ( rval != 0 ) if ( rval != 0 )
{ {
errno = rval; errno = rval;
SysError("pthread_create failed for '%s'", newarg+1); SysError("pthread_create failed for '%s'", argument->operatorName);
} }
return thrID; return thrID;
} }
...@@ -339,9 +339,10 @@ void pstreamOpenReadPipe(const argument_t *argument, pstream_t *pstreamptr) ...@@ -339,9 +339,10 @@ void pstreamOpenReadPipe(const argument_t *argument, pstream_t *pstreamptr)
char *pipename = (char*) Malloc(pnlen); char *pipename = (char*) Malloc(pnlen);
// struct sched_param param; // struct sched_param param;
argument_t *newargument = (argument_t*) Malloc(sizeof(argument_t)); argument_t *newargument = new argument_t();
newargument->argc = argument->argc + 1; newargument->argc = argument->argc + 1;
newargument->argv = (char **) Malloc(newargument->argc*sizeof(char *)); newargument->argv = (char **) Malloc(newargument->argc*sizeof(char *));
newargument->operatorName = "";
memcpy(newargument->argv, argument->argv, argument->argc*sizeof(char *)); memcpy(newargument->argv, argument->argv, argument->argc*sizeof(char *));
char *operatorArg = argument->argv[0]; char *operatorArg = argument->argv[0];
...@@ -356,6 +357,7 @@ void pstreamOpenReadPipe(const argument_t *argument, pstream_t *pstreamptr) ...@@ -356,6 +357,7 @@ void pstreamOpenReadPipe(const argument_t *argument, pstream_t *pstreamptr)
newargument->argv[argument->argc] = pipename; newargument->argv[argument->argc] = pipename;
newargument->args = newarg; newargument->args = newarg;
newargument->operatorName = std::string(operatorName,strlen(operatorName));
/* /*
printf("pstreamOpenRead: new args >%s<\n", newargument->args); printf("pstreamOpenRead: new args >%s<\n", newargument->args);
for ( int i = 0; i < newargument->argc; ++i ) for ( int i = 0; i < newargument->argc; ++i )
...@@ -370,7 +372,7 @@ void pstreamOpenReadPipe(const argument_t *argument, pstream_t *pstreamptr) ...@@ -370,7 +372,7 @@ void pstreamOpenReadPipe(const argument_t *argument, pstream_t *pstreamptr)
if ( ! cdoSilentMode ){ if ( ! cdoSilentMode ){
cdoPrint("Started child process \"%s\".", newarg+1); cdoPrint("Started child process \"%s\".", newarg+1);
} }
pCreateReadThread(newarg, newargument, operatorName); pCreateReadThread(newargument);
/* Free(operatorName); */ /* Free(operatorName); */
processAddInputStream(pstreamID); processAddInputStream(pstreamID);
/* pipeInqInfo(pstreamID); */ /* pipeInqInfo(pstreamID); */
...@@ -888,7 +890,7 @@ void pstreamClose(int pstreamID) ...@@ -888,7 +890,7 @@ void pstreamClose(int pstreamID)
argument_t *argument = (argument_t *) (pstreamptr->argument); argument_t *argument = (argument_t *) (pstreamptr->argument);
if ( argument->argv ) Free(argument->argv); if ( argument->argv ) Free(argument->argv);
if ( argument->args ) Free(argument->args); if ( argument->args ) Free(argument->args);
Free(argument); delete(argument);
} }
vlistDestroy(pstreamptr->vlistID); vlistDestroy(pstreamptr->vlistID);
pthread_mutex_unlock(pipe->mutex); pthread_mutex_unlock(pipe->mutex);
......
...@@ -22,5 +22,6 @@ ...@@ -22,5 +22,6 @@
void pstreamDebug(int debug); void pstreamDebug(int debug);
int pstreamIsPipe(int streamID); int pstreamIsPipe(int streamID);
pthread_t pCreateReadThread(argument_t *argument);
#endif /* _PSTREAM_INT_H */ #endif /* _PSTREAM_INT_H */
...@@ -116,8 +116,6 @@ const char *cdoExpName = NULL; ...@@ -116,8 +116,6 @@ const char *cdoExpName = NULL;
int timer_read, timer_write; int timer_read, timer_write;
const char *cdoComment(void) const char *cdoComment(void)
{ {
static char comment[256]; static char comment[256];
...@@ -245,6 +243,28 @@ const char *getOperatorName(const char *operatorArg) ...@@ -245,6 +243,28 @@ const char *getOperatorName(const char *operatorArg)
return operatorName; return operatorName;
} }
char *getOperatorArg(const char *xoperator)
{
char *operatorArg = NULL;
if ( xoperator )
{
char *commapos = (char *)strchr(xoperator, ',');
if ( commapos )
{
size_t len = strlen(commapos+1);
if ( len )
{
operatorArg = (char*) Malloc(len+1);
strcpy(operatorArg, commapos+1);
}
}
}
return operatorArg;
}
argument_t *file_argument_new(const char *filename) argument_t *file_argument_new(const char *filename)
{ {
...@@ -923,3 +943,17 @@ void cdo_check_round(void) ...@@ -923,3 +943,17 @@ void cdo_check_round(void)
} }
} }
} }
void print_argument(argument_t * p_argument)
{
std::cout << "argv with " << p_argument->argc << " arguments:" << std::endl;
for(int i = 0; i < p_argument->argc; i++)
{
std::cout << p_argument->argv[i] << " ";
}
std::cout << std::endl;
std::cout << "OperatorName: "<< p_argument->operatorName << std::endl;
std::cout << "operatorArguments: " << p_argument->operatorArguments << std::endl;
}
...@@ -113,10 +113,12 @@ extern const char *CDO_Version; ...@@ -113,10 +113,12 @@ extern const char *CDO_Version;
typedef struct { typedef struct {
int process_id;
int argc; int argc;
int argl;
char **argv; char **argv;
char *args; char *args;
std::string operatorName;
char * operatorArguments;
} argument_t; } argument_t;
argument_t *file_argument_new(const char *filename); argument_t *file_argument_new(const char *filename);
...@@ -126,8 +128,9 @@ void argument_free(argument_t *argument); ...@@ -126,8 +128,9 @@ void argument_free(argument_t *argument);
void argument_fill(argument_t *argument, int argc, char *argv[]); void argument_fill(argument_t *argument, int argc, char *argv[]);
char *getProgname(char *string); char *getProgname(char *string);
char *getOperator(const char *argument); char *GetOperator(const char *argument);
const char *getOperatorName(const char *xoperator); const char *getOperatorName(const char *xoperator);
char *getOperatorArg(const char *xoperator);
const char *cdoComment(void); const char *cdoComment(void);
argument_t makeArgument(int argc, char *argv[]); argument_t makeArgument(int argc, char *argv[]);
...@@ -235,4 +238,5 @@ int wildcardmatch(const char *w, const char *s); ...@@ -235,4 +238,5 @@ int wildcardmatch(const char *w, const char *s);
void cdo_check_round(void); void cdo_check_round(void);
void print_argument(argument_t *argument);
#endif /* _UTIL_H */ #endif /* _UTIL_H */
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment