Commit 7e17fa4e authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

Merge branch 'develop' of git.mpimet.mpg.de:cdo into develop

parents 1fa50ac6 6e633272
......@@ -48,6 +48,8 @@
pthread_mutex_t processMutex = PTHREAD_MUTEX_INITIALIZER;
#endif
constexpr bool PROCESS_DEBUG = false;
static std::map<int, process_t> Process;
static int NumProcess = 0;
......@@ -1081,3 +1083,160 @@ process_t::print_process()
std::cout << " oargc : " << oargc << std::endl;
std::cout << " noper : " << noper << std::endl;
}
static void
processClosePipes(void)
{
int nstream = processInqInputStreamNum();
for (int sindex = 0; sindex < nstream; sindex++)
{
pstream_t *pstreamptr = processInqInputStream(sindex);
if (PROCESS_DEBUG)
Message("process %d stream %d close streamID %d", processSelf(), sindex, pstreamptr->self);
if (pstreamptr)
pstreamptr->close();
}
nstream = processInqOutputStreamNum();
for (int sindex = 0; sindex < nstream; sindex++)
{
pstream_t *pstreamptr = processInqOutputStream(sindex);
if (PROCESS_DEBUG)
Message("process %d stream %d close streamID %d", processSelf(), sindex, pstreamptr->self);
if (pstreamptr)
pstreamptr->close();
}
}
void
cdoFinish(void)
{
int processID = processSelf().m_ID;
int nvars, ntimesteps;
char memstring[32] = { "" };
double s_utime, s_stime;
double e_utime, e_stime;
double c_cputime = 0, c_usertime = 0, c_systime = 0;
double p_cputime = 0, p_usertime = 0, p_systime = 0;
#if defined(HAVE_LIBPTHREAD)
if (PROCESS_DEBUG)
Message("process %d thread %ld", processID, pthread_self());
#endif
int64_t nvals = processInqNvals(processID);
nvars = processInqVarNum();
ntimesteps = processInqTimesteps();
if (!cdoSilentMode)
{
set_text_color(stderr, RESET, GREEN);
fprintf(stderr, "%s: ", processInqPrompt());
reset_text_color(stderr);
if (nvals > 0)
{
if (sizeof(int64_t) > sizeof(long))
#if defined(_WIN32)
fprintf(stderr,
"Processed %I64d value%s from %d variable%s",
#else
fprintf(stderr,
"Processed %jd value%s from %d variable%s",
#endif
(intmax_t) nvals,
ADD_PLURAL(nvals),
nvars,
ADD_PLURAL(nvars));
else
fprintf(stderr,
"Processed %ld value%s from %d variable%s",
(long) nvals,
ADD_PLURAL(nvals),
nvars,
ADD_PLURAL(nvars));
}
else if (nvars > 0)
{
fprintf(stderr, "Processed %d variable%s", nvars, ADD_PLURAL(nvars));
}
if (ntimesteps > 0)
fprintf(stderr, " over %d timestep%s", ntimesteps, ADD_PLURAL(ntimesteps));
// fprintf(stderr, ".");
}
/*
fprintf(stderr, "%s: Processed %d variable%s %d timestep%s.",
processInqPrompt(), nvars, nvars > 1 ? "s" : "",
ntimesteps, ntimesteps > 1 ? "s" : "");
*/
processStartTime(&s_utime, &s_stime);
cdoProcessTime(&e_utime, &e_stime);
c_usertime = e_utime - s_utime;
c_systime = e_stime - s_stime;
c_cputime = c_usertime + c_systime;
#if defined(HAVE_LIBPTHREAD)
if (getPthreadScope() == PTHREAD_SCOPE_PROCESS)
{
c_usertime /= processNums();
c_systime /= processNums();
c_cputime /= processNums();
}
#endif
processDefCputime(processID, c_cputime);
processAccuTime(c_usertime, c_systime);
if (processID == 0)
{
int mu[] = { 'b', 'k', 'm', 'g', 't' };
int muindex = 0;
long memmax;
memmax = memTotal();
while (memmax > 9999)
{
memmax /= 1024;
muindex++;
}
if (memmax)
snprintf(memstring, sizeof(memstring), " %ld%c ", memmax, mu[muindex]);
processEndTime(&p_usertime, &p_systime);
p_cputime = p_usertime + p_systime;
if (cdoLogOff == 0)
{
cdologs(processNums());
cdologo(processNums());
cdolog(processInqPrompt(), p_cputime);
}
}
#if defined(HAVE_SYS_TIMES_H)
if (cdoBenchmark)
fprintf(stderr, " ( %.2fs %.2fs %.2fs %s)\n", c_usertime, c_systime, c_cputime, memstring);
else
{
if (!cdoSilentMode)
fprintf(stderr, " ( %.2fs )\n", c_cputime);
}
if (cdoBenchmark && processID == 0)
fprintf(stderr, "total: user %.2fs sys %.2fs cpu %.2fs mem%s\n", p_usertime, p_systime, p_cputime, memstring);
#else
fprintf(stderr, "\n");
#endif
processClosePipes();
processDelete();
}
......@@ -1771,6 +1771,7 @@ void pstreamCloseAll()
{
for(auto pstream_iter : _pstream_map)
{
if(PSTREAM_Debug)
Message("Close file %s id %d", pstream_iter.second.m_name.c_str(), pstream_iter.second.m_fileID);
streamClose(pstream_iter.second.m_fileID);
}
......@@ -1797,161 +1798,7 @@ pstreamCloseAll(void)
}
}
*/
static void
processClosePipes(void)
{
int nstream = processInqInputStreamNum();
for (int sindex = 0; sindex < nstream; sindex++)
{
pstream_t *pstreamptr = processInqInputStream(sindex);
if (PSTREAM_Debug)
Message("process %d stream %d close streamID %d", processSelf(), sindex, pstreamptr->self);
if (pstreamptr)
pstreamptr->close();
}
nstream = processInqOutputStreamNum();
for (int sindex = 0; sindex < nstream; sindex++)
{
pstream_t *pstreamptr = processInqOutputStream(sindex);
if (PSTREAM_Debug)
Message("process %d stream %d close streamID %d", processSelf(), sindex, pstreamptr->self);
if (pstreamptr)
pstreamptr->close();
}
}
void
cdoFinish(void)
{
int processID = processSelf().m_ID;
int nvars, ntimesteps;
char memstring[32] = { "" };
double s_utime, s_stime;
double e_utime, e_stime;
double c_cputime = 0, c_usertime = 0, c_systime = 0;
double p_cputime = 0, p_usertime = 0, p_systime = 0;
#if defined(HAVE_LIBPTHREAD)
if (PSTREAM_Debug)
Message("process %d thread %ld", processID, pthread_self());
#endif
int64_t nvals = processInqNvals(processID);
nvars = processInqVarNum();
ntimesteps = processInqTimesteps();
if (!cdoSilentMode)
{
set_text_color(stderr, RESET, GREEN);
fprintf(stderr, "%s: ", processInqPrompt());
reset_text_color(stderr);
if (nvals > 0)
{
if (sizeof(int64_t) > sizeof(long))
#if defined(_WIN32)
fprintf(stderr,
"Processed %I64d value%s from %d variable%s",
#else
fprintf(stderr,
"Processed %jd value%s from %d variable%s",
#endif
(intmax_t) nvals,
ADD_PLURAL(nvals),
nvars,
ADD_PLURAL(nvars));
else
fprintf(stderr,
"Processed %ld value%s from %d variable%s",
(long) nvals,
ADD_PLURAL(nvals),
nvars,
ADD_PLURAL(nvars));
}
else if (nvars > 0)
{
fprintf(stderr, "Processed %d variable%s", nvars, ADD_PLURAL(nvars));
}
if (ntimesteps > 0)
fprintf(stderr, " over %d timestep%s", ntimesteps, ADD_PLURAL(ntimesteps));
// fprintf(stderr, ".");
}
/*
fprintf(stderr, "%s: Processed %d variable%s %d timestep%s.",
processInqPrompt(), nvars, nvars > 1 ? "s" : "",
ntimesteps, ntimesteps > 1 ? "s" : "");
*/
processStartTime(&s_utime, &s_stime);
cdoProcessTime(&e_utime, &e_stime);
c_usertime = e_utime - s_utime;
c_systime = e_stime - s_stime;
c_cputime = c_usertime + c_systime;
#if defined(HAVE_LIBPTHREAD)
if (pthreadScope == PTHREAD_SCOPE_PROCESS)
{
c_usertime /= processNums();
c_systime /= processNums();
c_cputime /= processNums();
}
#endif
processDefCputime(processID, c_cputime);
processAccuTime(c_usertime, c_systime);
if (processID == 0)
{
int mu[] = { 'b', 'k', 'm', 'g', 't' };
int muindex = 0;
long memmax;
memmax = memTotal();
while (memmax > 9999)
{
memmax /= 1024;
muindex++;
}
if (memmax)
snprintf(memstring, sizeof(memstring), " %ld%c ", memmax, mu[muindex]);
processEndTime(&p_usertime, &p_systime);
p_cputime = p_usertime + p_systime;
if (cdoLogOff == 0)
{
cdologs(processNums());
cdologo(processNums());
cdolog(processInqPrompt(), p_cputime);
}
}
#if defined(HAVE_SYS_TIMES_H)
if (cdoBenchmark)
fprintf(stderr, " ( %.2fs %.2fs %.2fs %s)\n", c_usertime, c_systime, c_cputime, memstring);
else
{
if (!cdoSilentMode)
fprintf(stderr, " ( %.2fs )\n", c_cputime);
}
if (cdoBenchmark && processID == 0)
fprintf(stderr, "total: user %.2fs sys %.2fs cpu %.2fs mem%s\n", p_usertime, p_systime, p_cputime, memstring);
#else
fprintf(stderr, "\n");
#endif
processClosePipes();
processDelete();
}
int
pstreamInqFiletype(int pstreamID)
......@@ -2044,3 +1891,10 @@ openUnlock(void)
pthread_mutex_unlock(&streamOpenReadMutex);
#endif
}
//TODO remove when processes create the new threads
const int &getPthreadScope()
{
return pthreadScope;
}
......@@ -90,4 +90,6 @@ int pstreamFileID(int pstreamID);
void cdoVlistCopyFlag(int vlistID2, int vlistID1);
const int &getPthreadScope();
#endif /* PSTREAM_H */
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment