Commit 226274b2 authored by Oliver Heidmann's avatar Oliver Heidmann
Browse files

moved thread creation from pstreamOpenReadPipe into its own function

parent 0dcc5586
...@@ -285,47 +285,12 @@ int pstreamIsPipe(int pstreamID) ...@@ -285,47 +285,12 @@ int pstreamIsPipe(int pstreamID)
return pstreamptr->ispipe; return pstreamptr->ispipe;
} }
static static void createPipeName(char *pipename, int pnlen){
void pstreamOpenReadPipe(const argument_t *argument, pstream_t *pstreamptr)
{
#if defined(HAVE_LIBPTHREAD)
int pstreamID = pstreamptr->self;
size_t pnlen = 16;
char *pipename = (char*) Malloc(pnlen);
// struct sched_param param;
argument_t *newargument = (argument_t*) Malloc(sizeof(argument_t));
newargument->argc = argument->argc + 1;
newargument->argv = (char **) Malloc(newargument->argc*sizeof(char *));
memcpy(newargument->argv, argument->argv, argument->argc*sizeof(char *));
char *operatorArg = argument->argv[0];
const char *operatorName = getOperatorName(operatorArg);
size_t len = strlen(argument->args);
char *newarg = (char*) Malloc(len+pnlen);
strcpy(newarg, argument->args);
snprintf(pipename, pnlen, "(pipe%d.%d)", processSelf() + 1, processInqChildNum() + 1); snprintf(pipename, pnlen, "(pipe%d.%d)", processSelf() + 1, processInqChildNum() + 1);
newarg[len] = ' '; }
strcpy(&newarg[len+1], pipename);
newargument->argv[argument->argc] = pipename;
newargument->args = newarg;
/*
printf("pstreamOpenRead: new args >%s<\n", newargument->args);
for ( int i = 0; i < newargument->argc; ++i )
printf("pstreamOpenRead: new arg %d >%s<\n", i, newargument->argv[i]);
*/
pstreamptr->ispipe = TRUE;
pstreamptr->name = pipename;
pstreamptr->rthreadID = pthread_self();
pstreamptr->pipe = pipeNew();
pstreamptr->argument = (void *) newargument;
if ( ! cdoSilentMode )
cdoPrint("Started child process \"%s\".", newarg+1);
static int pCreateReadThread(char *newarg, argument_t *argument, const char *operatorName){
pthread_attr_t attr; pthread_attr_t attr;
int status = pthread_attr_init(&attr); int status = pthread_attr_init(&attr);
if ( status ) SysError("pthread_attr_init failed for '%s'", newarg+1); if ( status ) SysError("pthread_attr_init failed for '%s'", newarg+1);
...@@ -355,13 +320,57 @@ void pstreamOpenReadPipe(const argument_t *argument, pstream_t *pstreamptr) ...@@ -355,13 +320,57 @@ void pstreamOpenReadPipe(const argument_t *argument, pstream_t *pstreamptr)
} }
pthread_t thrID; pthread_t thrID;
int rval = pthread_create(&thrID, &attr, operatorModule(operatorName), newargument); int rval = pthread_create(&thrID, &attr, operatorModule(operatorName), argument);
if ( rval != 0 ) if ( rval != 0 )
{ {
errno = rval; errno = rval;
SysError("pthread_create failed for '%s'", newarg+1); SysError("pthread_create failed for '%s'", newarg+1);
} }
return thrID;
}
static
void pstreamOpenReadPipe(const argument_t *argument, pstream_t *pstreamptr)
{
#if defined(HAVE_LIBPTHREAD)
int pstreamID = pstreamptr->self;
size_t pnlen = 16;
char *pipename = (char*) Malloc(pnlen);
// struct sched_param param;
argument_t *newargument = (argument_t*) Malloc(sizeof(argument_t));
newargument->argc = argument->argc + 1;
newargument->argv = (char **) Malloc(newargument->argc*sizeof(char *));
memcpy(newargument->argv, argument->argv, argument->argc*sizeof(char *));
char *operatorArg = argument->argv[0];
const char *operatorName = getOperatorName(operatorArg);
size_t len = strlen(argument->args);
char *newarg = (char*) Malloc(len+pnlen);
strcpy(newarg, argument->args);
createPipeName(pipename, pnlen);
newarg[len] = ' ';
strcpy(&newarg[len+1], pipename);
newargument->argv[argument->argc] = pipename;
newargument->args = newarg;
/*
printf("pstreamOpenRead: new args >%s<\n", newargument->args);
for ( int i = 0; i < newargument->argc; ++i )
printf("pstreamOpenRead: new arg %d >%s<\n", i, newargument->argv[i]);
*/
pstreamptr->ispipe = TRUE;
pstreamptr->name = pipename;
pstreamptr->rthreadID = pthread_self();
pstreamptr->pipe = pipeNew();
pstreamptr->argument = (void *) newargument;
if ( ! cdoSilentMode ){
cdoPrint("Started child process \"%s\".", newarg+1);
}
pCreateReadThread(newarg, newargument, operatorName);
/* Free(operatorName); */ /* Free(operatorName); */
processAddStream(pstreamID); processAddStream(pstreamID);
/* pipeInqInfo(pstreamID); */ /* pipeInqInfo(pstreamID); */
...@@ -369,6 +378,7 @@ void pstreamOpenReadPipe(const argument_t *argument, pstream_t *pstreamptr) ...@@ -369,6 +378,7 @@ void pstreamOpenReadPipe(const argument_t *argument, pstream_t *pstreamptr)
#else #else
cdoAbort("Cannot use pipes, pthread support not compiled in!"); cdoAbort("Cannot use pipes, pthread support not compiled in!");
#endif #endif
} }
static static
...@@ -396,11 +406,14 @@ void pstreamCreateFilelist(const argument_t *argument, pstream_t *pstreamptr) ...@@ -396,11 +406,14 @@ void pstreamCreateFilelist(const argument_t *argument, pstream_t *pstreamptr)
char line[4096]; char line[4096];
FILE *fp, *fp2; FILE *fp, *fp2;
fp = fopen(pch, "r"); fp = fopen(pch, "r");
if ( fp == NULL ) cdoAbort("Open failed on %s", pch); if ( fp == NULL )
{
cdoAbort("Open failed on %s", pch);
}
if ( cdoVerbose ) if ( cdoVerbose )
{
cdoPrint("Reading file names from %s", pch); cdoPrint("Reading file names from %s", pch);
}
/* find number of files */ /* find number of files */
nfiles = 0; nfiles = 0;
while ( readline(fp, line, 4096) ) while ( readline(fp, line, 4096) )
...@@ -419,7 +432,7 @@ void pstreamCreateFilelist(const argument_t *argument, pstream_t *pstreamptr) ...@@ -419,7 +432,7 @@ void pstreamCreateFilelist(const argument_t *argument, pstream_t *pstreamptr)
pstreamptr->mfiles = nfiles; pstreamptr->mfiles = nfiles;
pstreamptr->mfnames = (char **) Malloc(nfiles*sizeof(char *)); pstreamptr->mfnames = (char **) Malloc(nfiles*sizeof(char *));
rewind(fp); rewind(fp);
nfiles = 0; nfiles = 0;
...@@ -440,10 +453,15 @@ void pstreamCreateFilelist(const argument_t *argument, pstream_t *pstreamptr) ...@@ -440,10 +453,15 @@ void pstreamCreateFilelist(const argument_t *argument, pstream_t *pstreamptr)
pstreamptr->mfiles = nfiles; pstreamptr->mfiles = nfiles;
pstreamptr->mfnames = (char **) Malloc(nfiles*sizeof(char *)); pstreamptr->mfnames = (char **) Malloc(nfiles*sizeof(char *));
strcpy(line, pch); strcpy(line, pch);
for ( i = 0; i < len; i++ ) if ( line[i] == ',' ) line[i] = 0; for ( i = 0; i < len; i++ )
{
if ( line[i] == ',' )
{
line[i] = 0;
}
}
i = 0; i = 0;
for ( j = 0; j < nfiles; j++ ) for ( j = 0; j < nfiles; j++ )
{ {
...@@ -1699,7 +1717,6 @@ void cdoFinish(void) ...@@ -1699,7 +1717,6 @@ void cdoFinish(void)
if ( ! cdoSilentMode ) if ( ! cdoSilentMode )
fprintf(stderr, " ( %.2fs )\n", c_cputime); fprintf(stderr, " ( %.2fs )\n", c_cputime);
} }
if ( cdoBenchmark && processID == 0 ) if ( cdoBenchmark && processID == 0 )
fprintf(stderr, "total: user %.2fs sys %.2fs cpu %.2fs mem%s\n", fprintf(stderr, "total: user %.2fs sys %.2fs cpu %.2fs mem%s\n",
p_usertime, p_systime, p_cputime, memstring); p_usertime, p_systime, p_cputime, memstring);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment