Commit 56d2ebd0 authored by Oliver Heidmann's avatar Oliver Heidmann
Browse files

fixed hasAllInputs( and createProcesses to work with variable number of input...

fixed hasAllInputs( and createProcesses to work with variable number of input files, moved expand_wildcards and checkStreamCnt into pstream_t
parent b40bce58
......@@ -43,6 +43,7 @@
#include "pthread.h"
#include "cdoDebugOutput.h"
#include <algorithm>
#include <map>
#include <stack>
......@@ -672,10 +673,10 @@ expand_filename(const char *string)
return filename;
}
static int
expand_wildcards(process_t &process, int streamCnt)
int
process_t::expand_wildcards(int streamCnt)
{
const char *streamname0 = process.streamArguments[0].args;
const char *streamname0 = streamArguments[0].args;
if (streamname0[0] == '-')
return 1;
......@@ -689,21 +690,21 @@ expand_wildcards(process_t &process, int streamCnt)
if (glob_arg->argc > 1 && glob_arg->argv[0][0] != '-')
{
if (cdoVerbose)
cdoPrint("Replaced >%s< by", process.streamArguments[0].args);
cdoPrint("Replaced >%s< by", streamArguments[0].args);
streamCnt = streamCnt - 1 + glob_arg->argc;
Free(process.streamArguments[0].args);
Free(streamArguments[0].args);
process.streamArguments.resize(streamCnt);
streamArguments.resize(streamCnt);
// move output streams to the end
for (int i = 1; i < process.m_streamCnt; ++i)
process.streamArguments[i + glob_arg->argc - 1] = process.streamArguments[i];
for (int i = 1; i < m_streamCnt; ++i)
streamArguments[i + glob_arg->argc - 1] = streamArguments[i];
for (int i = 0; i < glob_arg->argc; ++i)
{
argument_t &current_argument = process.streamArguments[i];
argument_t &current_argument = streamArguments[i];
current_argument.argc = 1;
current_argument.argv.resize(current_argument.argc);
current_argument.argv[0] = strdupx(glob_arg->argv[i]);
......@@ -712,7 +713,7 @@ expand_wildcards(process_t &process, int streamCnt)
cdoPrint(" >%s<", glob_arg->argv[i]);
}
process.m_streamCnt = streamCnt;
m_streamCnt = streamCnt;
}
Free(glob_arg);
......@@ -721,81 +722,79 @@ expand_wildcards(process_t &process, int streamCnt)
return 1;
}
int
checkStreamCnt(void)
int process_t::checkStreamCnt(void)
{
process_t &process = processSelf();
int streamInCnt, streamOutCnt;
int wantedStreamInCnt,wantedStreamOutCnt;
int streamInCnt0;
int streamCnt = 0;
int i, j;
int obase = FALSE;
int status = 0;
streamInCnt = operatorStreamInCnt(process.operatorName);
streamOutCnt = operatorStreamOutCnt(process.operatorName);
wantedStreamInCnt= operatorStreamInCnt(operatorName);
wantedStreamOutCnt = operatorStreamOutCnt(operatorName);
streamInCnt0 = streamInCnt;
streamInCnt0 = wantedStreamInCnt;
if (streamOutCnt == -1)
if (wantedStreamOutCnt == -1)
{
streamOutCnt = 1;
wantedStreamOutCnt = 1;
obase = TRUE;
}
if (streamInCnt == -1 && streamOutCnt == -1)
if (wantedStreamInCnt== -1 && wantedStreamOutCnt == -1)
cdoAbort("I/O stream counts unlimited no allowed!");
// printf(" streamInCnt, streamOutCnt %d %d\n", streamInCnt, streamOutCnt);
if (streamInCnt == -1)
// printf(" wantedStreamInCnt,wantedStreamOutCnt %d %d\n", wantedStreamInCnt,wantedStreamOutCnt);
if (wantedStreamInCnt== -1)
{
streamInCnt = process.m_streamCnt - streamOutCnt;
if (streamInCnt < 1)
wantedStreamInCnt = m_streamCnt - wantedStreamOutCnt;
if (wantedStreamInCnt< 1)
cdoAbort("Input streams missing!");
}
if (streamOutCnt == -1)
if (wantedStreamOutCnt == -1)
{
streamOutCnt = process.m_streamCnt - streamInCnt;
if (streamOutCnt < 1)
wantedStreamOutCnt = m_streamCnt - wantedStreamInCnt;
if (wantedStreamOutCnt < 1)
cdoAbort("Output streams missing!");
}
// printf(" streamInCnt, streamOutCnt %d %d\n", streamInCnt, streamOutCnt);
// printf(" wantedStreamInCnt,wantedStreamOutCnt %d %d\n", wantedStreamInCnt,wantedStreamOutCnt);
streamCnt = streamInCnt + streamOutCnt;
// printf(" streamCnt %d %d\n", process.m_streamCnt, streamCnt);
streamCnt = wantedStreamInCnt+ wantedStreamOutCnt;
// printf(" streamCnt %d %d\n", m_streamCnt, streamCnt);
if (process.m_streamCnt > streamCnt)
if (m_streamCnt > streamCnt)
cdoAbort("Too many streams!"
" Operator needs %d input and %d output streams.",
streamInCnt,
streamOutCnt);
wantedStreamInCnt,
wantedStreamOutCnt);
if (process.m_streamCnt < streamCnt)
if (m_streamCnt < streamCnt)
cdoAbort("Too few streams specified!"
" Operator needs %d input and %d output streams.",
streamInCnt,
streamOutCnt);
wantedStreamInCnt,
wantedStreamOutCnt);
for (i = streamInCnt; i < streamCnt; i++)
for (i = wantedStreamInCnt;i < streamCnt; i++)
{
if (process.streamArguments[i].args[0] == '-')
if (streamArguments[i].args[0] == '-')
{
cdoAbort("Output file name %s must not begin with \"-\"!", process.streamArguments[i].args);
cdoAbort("Output file name %s must not begin with \"-\"!", streamArguments[i].args);
}
else if (!obase)
{
for (j = 0; j < streamInCnt; j++) /* does not work with files in pipes */
if (strcmp(process.streamArguments[i].args, process.streamArguments[j].args) == 0)
for (j = 0; j < wantedStreamInCnt;j++) /* does not work with files in pipes */
if (strcmp(streamArguments[i].args, streamArguments[j].args) == 0)
cdoAbort("Output file name %s is equal to input file name"
" on position %d!\n",
process.streamArguments[i].args,
streamArguments[i].args,
j + 1);
}
}
if (streamInCnt == 1 && streamInCnt0 == -1)
status = expand_wildcards(process, streamCnt);
if (wantedStreamInCnt == 1 && streamInCnt0 == -1)
status = expand_wildcards( streamCnt);
return status;
}
......@@ -803,8 +802,11 @@ checkStreamCnt(void)
bool
process_t::hasAllInputs()
{
// std::cout << m_module.streamInCnt << " " << childProcesses.size() + inputStreams.size() << std::endl;
return m_module.streamInCnt == (childProcesses.size() + inputStreams.size());
if(m_module.streamInCnt == -1)
{
return false;
}
return m_module.streamInCnt == (inputStreams.size());
}
#include <fstream>
......@@ -845,7 +847,8 @@ createProcesses(int argc, const char **argv)
call_stack.push(root_process);
current_process = call_stack.top();
for(int i = 0; i < root_process->m_module.streamOutCnt; i++)
int cntOutFiles = std::max(0, (int)current_process->m_module.streamOutCnt);
for(int i = 0; i < cntOutFiles; i++)
{
if(CdoDebug::PROCESS)
{
......@@ -892,7 +895,7 @@ createProcesses(int argc, const char **argv)
}
idx++;
}
while ((current_process != root_process || !root_process->hasAllInputs()) && idx < argc - 1);
while ((current_process != root_process || !root_process->hasAllInputs()) && idx < argc - cntOutFiles);
if(CdoDebug::PROCESS)
{
......@@ -1402,6 +1405,7 @@ process_t::addChild(process_t *childProcess)
{
childProcesses.push_back(childProcess);
nchild = childProcesses.size();
inputStreams.push_back(create_pstream());
}
void
......
......@@ -32,6 +32,7 @@ constexpr int MAX_OPERATOR = 128;
constexpr int MAX_OARGC = 4096;
constexpr int MAX_FILES = 65536;
enum class ProcessCheckResult{UNLIMITED_STREAM_COUNTS, INPUT_STREAM_MISSING, OUTPUT_STREAM_MISSING, TOO_MANY_STREAMS, TOO_FEW_STREAMS, FILENAME_HAS_OPERATOR_MARKER, OUTFILE_IS_INFILE, SUCCESS};
typedef struct
{
......@@ -67,7 +68,7 @@ public:
short nvars;
int ntimesteps;
short m_streamCnt;
int m_streamCnt;
std::vector<argument_t> streamArguments;
const char *m_operatorCommand;
const char *operatorName;
......@@ -99,6 +100,8 @@ private:
void OpenWrite(int p_input_idx);
void OpenAppend(int p_input_idx);
void setStreamNames(int argc, std::vector<char *> &argv);
int expand_wildcards(int streamCnt);
int checkStreamCnt();
};
extern std::map<int, process_t> Process;
......@@ -139,9 +142,9 @@ const char *processInqOpername2(int processID);
const char *processInqPrompt(void);
const argument_t *cdoStreamName(int cnt);
int checkStreamCnt();
void createProcesses(int argc, const char **argv);
void clearProcesses();
int processNumsActive();
int checkStreamCnt();
#endif /* _PROCESS_H */
#include "bandit/bandit/bandit.h"
#include "../../src/modules.h"
#include "../../src/operator_help.h"
#include "../../src/process.h"
#include <iostream>
void *Test(void *ptr) { return ptr; }
std::vector<std::string> TestHelp = {"TEST", "HELP"};
std::vector<const char *> test_argv{"-test", "in_file", "out_file"};
go_bandit([]() {
bandit::describe("Process: 1 input, 1 output", []() {
add_module("Test", {Test, TestHelp, {"test"}, 1, 0, 1, 2});
createProcesses(test_argv.size(), &test_argv[0]);
process_t test_process = Process.find(0)->second;
bandit::it("should have appropriate number of in streams", [&]() {
AssertThat(test_process.getInStreamCnt(), snowhouse::Equals(1));
});
bandit::it("should have appropriate number of out streams", [&]() {
AssertThat(test_process.getOutStreamCnt(), snowhouse::Equals(2));
});
});
});
int main(int argc, char **argv) { return bandit::run(argc, argv); }
#include "bandit/bandit/bandit.h"
#include "../../src/modules.h"
#include "../../src/operator_help.h"
#include "../../src/process.h"
#include <iostream>
void *Test(void *ptr) { return ptr; }
std::vector<std::string> TestHelp = {"TEST", "HELP"};
std::vector<const char *> test_argv{"-test", "in_file", "out_file"};
go_bandit([]() {
bandit::describe("Process: 1 input, 1 output", []() {
add_module("Test", {Test, TestHelp, {"test"}, 1, 0, 1, 1});
createProcesses(test_argv.size(), &test_argv[0]);
process_t test_process = Process.find(0)->second;
bandit::it("should have appropriate number of in streams", [&]() {
AssertThat(test_process.getInStreamCnt(), snowhouse::Equals(1));
});
bandit::it("should have appropriate number of out streams", [&]() {
AssertThat(test_process.getOutStreamCnt(), snowhouse::Equals(1));
});
});
});
int main(int argc, char **argv) { return bandit::run(argc, argv); }
#include "bandit/bandit/bandit.h"
#include "../../src/modules.h"
#include "../../src/operator_help.h"
#include "../../src/process.h"
#include <iostream>
void *Info(void *test) { return test; }
go_bandit([]() {
bandit::describe("Process creation", []() {
std::vector<const char *> test_argv{"-info", "some_test_bs"};
add_module("Info", {Info, InfoHelp, {"info"}, 1, 0, -1, 0});
process_t *test_process = processCreate(test_argv[0]);
bandit::it("should have the name of the operator", [&]() {
AssertThat(test_process->operatorName, snowhouse::Equals("info"));
});
bandit::it("should update the count for active processes", [&]() {
AssertThat(processNumsActive(), snowhouse::Equals(1));
});
bandit::it("should update the cound for existing processes",
[&]() { AssertThat(processNums(), snowhouse::Equals(1)); });
bandit::it("new size of Process should be updated",
[&]() { AssertThat(Process.size(), snowhouse::Equals(1ul)); });
bandit::it("ID should be set right",
[&]() { AssertThat(test_process->m_ID, snowhouse::Equals(0)); });
});
});
int main(int argc, char **argv) { return bandit::run(argc, argv); }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment