Skip to content
Snippets Groups Projects
Commit 9fb94ce4 authored by Nils-Arne Dreier's avatar Nils-Arne Dreier
Browse files

refactor: internal data structures and introduce release function

parent 92ee69e7
No related branches found
No related tags found
1 merge request!23refactor: internal data structures and introduce release function
Pipeline #91676 passed with warnings
......@@ -88,13 +88,19 @@ PYBIND11_MODULE(coyote, m) {
.def("get_field_metadata", &Coyote::get_field_metadata)
.def("get_all_comp_grid_fields", &Coyote::get_all_comp_grid_fields);
py::class_<Event>(m, "Event").def_property_readonly("data", [](const Event& self) {
return py::memoryview::from_buffer(
self.data.data(), // buffer pointer
{(py::ssize_t)self.point_size, (py::ssize_t)self.collection_size}, // shape (rows, cols)
{sizeof(double), sizeof(double) * self.point_size} // strides
);
});
py::class_<Event>(m, "Event")
.def_property_readonly(
"data",
[](const Event& self) {
return py::memoryview::from_buffer(
self.data().data(), // buffer pointer
{(py::ssize_t)self.point_size, (py::ssize_t)self.collection_size}, // shape
// (rows,
// cols)
{sizeof(double), sizeof(double) * self.point_size} // strides
);
})
.def("release", &Event::release);
m.def("run", &coyote_run, py::call_guard<py::gil_scoped_release>());
m.def("init", &coyote_init, "group_name"_a);
......
......@@ -26,7 +26,7 @@ namespace coyote {
const std::string& source_comp,
const std::string& source_grid,
const std::string& field_name,
const std::function<void(const Event& event)>& eventHandler,
const std::function<void(Event event)>& eventHandler,
std::optional<std::string> iso_dt,
std::optional<int> collection_size,
std::string interpolation_stack_yaml,
......@@ -324,7 +324,7 @@ namespace coyote {
const std::string& source_comp,
const std::string& source_grid,
const std::string& field_name,
const std::function<void(const Event& event)>& eventHandler,
const std::function<void(Event)>& eventHandler,
std::optional<std::string> iso_dt,
std::optional<int> collection_size,
std::string interpolation_stack_yaml,
......
......@@ -9,12 +9,30 @@
#include <span>
#include <string>
#include <field.hpp>
namespace coyote {
struct Event {
std::vector<double> data;
int collection_size;
size_t point_size;
class Event {
std::shared_ptr<Field>& _ptr_fieldBuffer;
public:
Event(std::shared_ptr<Field>& ptr_fieldBuffer) : _ptr_fieldBuffer(ptr_fieldBuffer) {
collection_size = ptr_fieldBuffer->collection_size;
point_size = data().size() / collection_size;
}
void release() { _ptr_fieldBuffer.reset(); }
const std::vector<double>& data() const {
if (_ptr_fieldBuffer)
return _ptr_fieldBuffer->buffer;
else
throw std::runtime_error("Buffer alread freed.");
}
size_t collection_size;
int point_size;
};
class Coyote {
......@@ -33,7 +51,7 @@ namespace coyote {
const std::string& source_comp,
const std::string& source_grid,
const std::string& field_name,
const std::function<void(const Event& event)>& eventHandler,
const std::function<void(Event event)>& eventHandler,
std::optional<std::string> iso_dt = std::nullopt,
std::optional<int> collection_size = std::nullopt,
std::string interpolation_stack_yaml = "[nnn]",
......
......@@ -86,8 +86,8 @@ namespace coyote {
put_thread = std::thread([&]() {
while (true) {
try {
auto event = _put_queue.dequeue();
int field_id = event->field.yac_source_field_id.value();
std::shared_ptr<Field> field = _put_queue.dequeue();
int field_id = field->yac_source_field_id.value();
int info = -1, ierror = 0;
int collection_size = yac_cget_collection_size_from_field_id(field_id);
......@@ -102,17 +102,16 @@ namespace coyote {
do {
yac_ctest(field_id, &flag);
} while (!flag);
if (!event->field.frac_mask.empty()) {
if (!field->frac_mask.empty()) {
yac_cput_frac_(
field_id,
collection_size,
event->field.buffer.data(),
event->field.frac_mask.data(),
field->buffer.data(),
field->frac_mask.data(),
&info,
&ierror);
} else {
yac_cput_(
field_id, collection_size, event->field.buffer.data(), &info, &ierror);
yac_cput_(field_id, collection_size, field->buffer.data(), &info, &ierror);
}
} catch (QueueTerminated e) {
break;
......@@ -148,12 +147,9 @@ namespace coyote {
continue;
field->get_started = false;
// fire event
std::shared_ptr<FieldBuffer> e(
new FieldBuffer({.data = field->buffer, .field = *field}), [&](auto p) {
field->semaphore.release(); // release the semaphore if all references
// of the shared_ptr are desctructed
delete p;
});
std::shared_ptr<Field> e(field.get(), [](auto p) {
p->semaphore.release(); // release the semaphore if all references
});
_main_queue.enqueue(e);
if (field->yac_source_field_id)
_put_queue.enqueue(e);
......@@ -170,12 +166,8 @@ namespace coyote {
while (true) {
try {
auto event = _main_queue.dequeue();
Event ce = {
.data = event->data,
.collection_size = event->field.collection_size,
.point_size = event->data.size() / (size_t)event->field.collection_size};
event->field.op(ce);
std::shared_ptr<Field> field = _main_queue.dequeue();
field->op(Event(field));
} catch (QueueTerminated e) {
break;
}
......
......@@ -8,7 +8,7 @@
namespace coyote {
using EventQueue = TSQueue<std::shared_ptr<FieldBuffer>>;
using EventQueue = TSQueue<std::shared_ptr<Field>>;
class CoyoteEnv {
friend void coyote_init(std::string group_name);
......
......@@ -11,17 +11,17 @@ extern "C" {
#include <yac.h>
}
#include <coyote.hpp>
namespace coyote {
class Event;
struct Field {
Field(const int& target_field_id,
const std::optional<int>& source_field_id,
const std::vector<double>& buffer,
const std::vector<double>& frac_mask,
const int& collection_size,
const std::function<void(const Event&)>& eventHandle)
const std::function<void(Event)>& eventHandle)
: yac_target_field_id(target_field_id),
yac_source_field_id(source_field_id),
buffer(buffer),
......@@ -34,18 +34,12 @@ namespace coyote {
std::vector<double> buffer;
std::vector<double> frac_mask;
int collection_size;
std::function<void(const Event&)> op;
std::function<void(Event)> op;
std::binary_semaphore semaphore = std::binary_semaphore(1);
int info = YAC_ACTION_NONE;
bool get_started = false;
};
// internally used Event class
struct FieldBuffer {
const std::vector<double>& data;
Field& field;
};
} // namespace coyote
#endif
......@@ -34,12 +34,12 @@ int main(int argc, char** argv) {
"simple_source",
"simple_grid",
"x",
[&](const Event& ev) {
[&](Event ev) {
for (size_t i = 0; i < idx.size(); ++i) {
vec3 p = hb.pix2vec(static_cast<int>(idx_offset + i));
double err = std::abs(ev.data[i] - p.x);
double err = std::abs(ev.data()[i] - p.x);
if (err > tol) {
std::cerr << "Error " << i << ": |" << ev.data[i] << " - " << p.x
std::cerr << "Error " << i << ": |" << ev.data()[i] << " - " << p.x
<< "| = " << err << std::endl;
MPI_Abort(MPI_COMM_WORLD, 1);
}
......@@ -52,12 +52,12 @@ int main(int argc, char** argv) {
"simple_source",
"simple_grid",
"y",
[&](const Event& ev) {
[&](Event ev) {
for (size_t i = 0; i < idx.size(); ++i) {
vec3 p = hb.pix2vec(static_cast<int>(idx_offset + i));
double err = std::abs(ev.data[i] - p.y);
double err = std::abs(ev.data()[i] - p.y);
if (err > tol) {
std::cerr << "Error " << i << ": |" << ev.data[i] << " - " << p.y
std::cerr << "Error " << i << ": |" << ev.data()[i] << " - " << p.y
<< "| = " << err << std::endl;
MPI_Abort(MPI_COMM_WORLD, 2);
}
......@@ -70,12 +70,12 @@ int main(int argc, char** argv) {
"simple_source",
"simple_grid",
"z",
[&](const Event& ev) {
[&](Event ev) {
for (size_t i = 0; i < idx.size(); ++i) {
vec3 p = hb.pix2vec(static_cast<int>(idx_offset + i));
double err = std::abs(ev.data[i] - p.z);
double err = std::abs(ev.data()[i] - p.z);
if (err > tol) {
std::cerr << "Error " << i << ": |" << ev.data[i] << " - " << p.z
std::cerr << "Error " << i << ": |" << ev.data()[i] << " - " << p.z
<< "| = " << err << std::endl;
MPI_Abort(MPI_COMM_WORLD, 3);
}
......
......@@ -18,10 +18,11 @@ int main() {
"simple_source",
"simple_grid",
"clock",
[&](const Event& ev) {
[&](Event ev) {
using namespace std::chrono_literals;
std::cout << i << "==" << ev.data[0] << std::endl;
assert(fabs(i - ev.data[0]) / i < 1e-10);
std::cout << i << "==" << ev.data()[0] << std::endl;
assert(fabs(i - ev.data()[0]) / i < 1e-10);
ev.release(); // We dont need the buffer here anymore
i += 2;
std::this_thread::sleep_for(10ms); // simulate workload
},
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment