diff --git a/apps/hiopy/_data_handler.py b/apps/hiopy/_data_handler.py index 67335a69612dc29624dfed8578ffb87c7cba6f4c..c203dd6ee317ffc27829078570adef3c4657405f 100644 --- a/apps/hiopy/_data_handler.py +++ b/apps/hiopy/_data_handler.py @@ -2,6 +2,7 @@ import logging from time import perf_counter import numpy as np +from coyote import add_task class Timer: @@ -28,25 +29,38 @@ class DataHandler: if self.use_buffer: self.buf = np.empty([self.time_chunk_size, *var.shape[1:-1], ma - mi], dtype=var.dtype) - def flush(self): - if self.prune_offset is not None: - fro = max(0, self.t_flushed - self.prune_offset) - to = max(0, self.t - self.prune_offset) - with Timer() as t: - self.var[fro:to, ..., self.cell_slice] = self.var.fill_value - logging.info(f"pruned {self.var.name} for timesteps {fro}:{to}. Took {t.elapsed_time}s") - + def _create_flush_task(self): fro = self.t_flushed % self.time_chunk_size to = fro + (self.t - self.t_flushed) - with Timer() as t: - self.var[self.t_flushed : self.t, ..., self.cell_slice] = self.buf[fro:to, ...] - logging.info( - f"wrote {self.var.name} for timesteps {self.t_flushed}:{self.t}." - f" Took {t.elapsed_time}s" - ) + buf = self.buf[fro:to, ...].copy() + t_flushed = int(self.t_flushed) + t = int(self.t) + + def _flush(): + nonlocal t, t_flushed, buf, self + if self.prune_offset is not None: + prune_from = max(0, t_flushed - self.prune_offset) + prune_to = max(0, t - self.prune_offset) + with Timer() as timer: + self.var[prune_from:prune_to, ..., self.cell_slice] = self.var.fill_value + logging.info( + f"pruned {self.var.name} for timesteps {fro}:{to}. Took {timer.elapsed_time}s" + ) + + with Timer() as timer: + self.var[t_flushed:t, ..., self.cell_slice] = buf + logging.info( + f"wrote {self.var.name} for timesteps {t_flushed}:{t}." + f" Took {timer.elapsed_time}s" + ) + if self.loco_server is not None: + self.loco_server.set_timestep(self.var.name, t - 1) + self.t_flushed = self.t - if self.loco_server is not None: - self.loco_server.set_timestep(self.var.name, self.t - 1) + return _flush + + def flush(self): + self._create_flush_task()() def __call__(self, event): logging.info( @@ -59,7 +73,7 @@ class DataHandler: self.buf = np.squeeze(np.transpose(event.data))[None, ...] self.t += 1 if self.t % self.time_chunk_size == 0 or not self.use_buffer: # chunk complete - self.flush() + add_task(self._create_flush_task()) def set_loco_server(self, loco_server): self.loco_server = loco_server diff --git a/python/coyote_py.cpp b/python/coyote_py.cpp index 4f5087903f135e299c52001419649c8484e1172d..6cad6c2cd2655c3dca124983ddde7ccd8e6adb47 100644 --- a/python/coyote_py.cpp +++ b/python/coyote_py.cpp @@ -110,6 +110,14 @@ PYBIND11_MODULE(coyote, m) { m.def("get_field_metadata", &get_field_metadata); m.def("run", &coyote_run, py::call_guard<py::gil_scoped_release>()); m.def("init", &coyote_init, "group_name"_a, "nthreads"_a = 1); + m.def( + "add_task", + [](std::function<void()> task, int prio) { + py::gil_scoped_release release; + coyote_add_task(task, prio); + }, + "task"_a, + "prio"_a = 0); m.def("start_datetime", &coyote_start_datetime); m.def("end_datetime", &coyote_end_datetime); m.def("group_comm", []() { return MPI_Comm_c2f(coyote_group_comm()); }); diff --git a/src/coyote.cpp b/src/coyote.cpp index 63fe40b74d0887c9205c4680ce96513c03a6e79c..de55d08f1f2afe1bfe15c997ae397624a15206af 100644 --- a/src/coyote.cpp +++ b/src/coyote.cpp @@ -408,4 +408,8 @@ namespace coyote { void coyote_run() { CoyoteEnv::getInstance().run(); } + void coyote_add_task(std::function<void()> task, int prio) { + CoyoteEnv::getInstance().add_task(task, prio); + } + } // namespace coyote diff --git a/src/coyote.hpp b/src/coyote.hpp index 70ac5f9ef6a72f69a2e653d229c972ff94bb6f90..b172e6a0f743c77007782abfbc538a02ac1beafb 100644 --- a/src/coyote.hpp +++ b/src/coyote.hpp @@ -77,6 +77,7 @@ namespace coyote { const std::string& source_grid, const std::string& field_name); void coyote_run(); + void coyote_add_task(std::function<void()> task, int prio = 0); } // namespace coyote #endif diff --git a/src/coyoteenv.cpp b/src/coyoteenv.cpp index 4011fc9e4bcb4cbe73421935009c1f0a26c0dd51..34d4e2a35cc95c678ef4ccf3eb37780bc75b7cac 100644 --- a/src/coyoteenv.cpp +++ b/src/coyoteenv.cpp @@ -131,7 +131,7 @@ namespace coyote { _task_queue.enqueue( [f]() mutable { Event e(f); - f->op(Event(e)); + f->op(e); }, {1, f->datetime()}); @@ -158,4 +158,8 @@ namespace coyote { if (_finalize_mpi) MPI_Finalize(); } + + void CoyoteEnv::add_task(std::function<void()> task, int prio) { + _task_queue.enqueue(std::move(task), {prio, ""}); + } } // namespace coyote diff --git a/src/coyoteenv.hpp b/src/coyoteenv.hpp index e37bdc6ddce45f75934810014573a4bb9d256c18..41238014df7b1ec9ec3bd3c1c05aea60c3f387dc 100644 --- a/src/coyoteenv.hpp +++ b/src/coyoteenv.hpp @@ -41,6 +41,7 @@ namespace coyote { void ensure_sync_def(); void ensure_enddef(); void run(); + void add_task(std::function<void()> task, int prio = 0); private: static std::unique_ptr<CoyoteEnv> _instance; diff --git a/src/tsqueue.hpp b/src/tsqueue.hpp index 1433a4305184b0ac732230e26f3fc4e4f93b9181..54558879a0f80194b95be497e53fca474463ae0e 100644 --- a/src/tsqueue.hpp +++ b/src/tsqueue.hpp @@ -13,34 +13,37 @@ class QueueTerminated {}; template <class T, class P, class Compare = std::less<P>> class TSQueue { private: - typedef std::optional<std::tuple<P, T>> elem_type; + typedef std::tuple<P, std::shared_ptr<T>> elem_type; public: TSQueue() : q(), m(), c() {} ~TSQueue() {} - void enqueue(T t, P prio) { + void enqueue(T&& t, P prio) { std::lock_guard<std::mutex> lock(m); - q.push(std::make_tuple(prio, t)); + q.push(std::make_tuple(prio, std::make_shared<T>(std::forward<T>(t)))); c.notify_one(); } void end() { std::lock_guard<std::mutex> lock(m); - q.push(std::nullopt); + q.push({}); c.notify_all(); } T dequeue() { - std::unique_lock<std::mutex> lock(m); - c.wait(lock, [&] { return !q.empty(); }); - elem_type val = q.top(); - if (!val) { - throw QueueTerminated(); + elem_type val; + { + std::unique_lock<std::mutex> lock(m); + c.wait(lock, [&] { return !q.empty(); }); + val = q.top(); + if (!std::get<1>(val)) { + throw QueueTerminated(); + } + q.pop(); } - q.pop(); - return std::get<1>(val.value()); + return *std::get<1>(val); } bool terminated() { @@ -58,11 +61,11 @@ class TSQueue { struct _Compare { Compare compare; bool operator()(const elem_type& a, const elem_type& b) const { - if (!a) + if (!std::get<1>(a)) return true; - if (!b) + if (!std::get<1>(b)) return false; - return compare(std::get<0>(a.value()), std::get<0>(b.value())); + return compare(std::get<0>(a), std::get<0>(b)); } }; std::priority_queue<elem_type, std::vector<elem_type>, _Compare> q; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index eb2a870a8d33be4f64e2df293f572c8b0159249f..397feca3b01e2760fdce6a7c8219430d75e8fd2a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -34,3 +34,22 @@ add_test( NAME test_tsqueue COMMAND test_tsqueue ) + +add_executable(test_subtasks test_subtasks.cpp) +target_link_libraries(test_subtasks coyote) +add_test( + NAME test_subtasks + COMMAND "${MPIEXEC_EXECUTABLE}" + "${MPIEXEC_NUMPROC_FLAG}" "1" ${CMAKE_CURRENT_BINARY_DIR}/test_subtasks : "${MPIEXEC_NUMPROC_FLAG}" "1" ${CMAKE_CURRENT_BINARY_DIR}/simple_source +) + +add_test( + NAME test_subtasks_py + COMMAND + "${MPIEXEC_EXECUTABLE}" + "${MPIEXEC_NUMPROC_FLAG}" "1" python ${CMAKE_CURRENT_SOURCE_DIR}/test_subtasks.py : "${MPIEXEC_NUMPROC_FLAG}" "1" ${CMAKE_CURRENT_BINARY_DIR}/simple_source +) +set_tests_properties(test_subtasks_py PROPERTIES + ENVIRONMENT "PYTHONPATH=${CMAKE_BINARY_DIR}/python:$ENV{PYTHONPATH}" + TIMEOUT 20 +) diff --git a/tests/test_subtasks.cpp b/tests/test_subtasks.cpp new file mode 100644 index 0000000000000000000000000000000000000000..931863b36416a25fa0d4068a58497cc2a67025c7 --- /dev/null +++ b/tests/test_subtasks.cpp @@ -0,0 +1,32 @@ +#include <atomic> +#include <cassert> +#include <coyote.hpp> +#include <iostream> + +using namespace coyote; + +int main() { + coyote_init("", 4); // use 4 threads + Coyote coyote("test_simple"); + + coyote.def_grid({{4}}, {{0.0, 0.0, 1.0, 1.0}}, {{0.0, 1.0, 1.0, 0.0}}, {{0, 1, 2, 3}}); + + std::atomic_int task_count = 0; + std::atomic_int subtask_count = 0; + coyote.add_field( + "simple_source", + "simple_grid", + "clock", + [&](Event ev) { + task_count++; + coyote_add_task([&]() { subtask_count++; }); + }, + "PT2S", + 1); + + coyote_run(); + + std::cout << task_count << std::endl; + assert(task_count == subtask_count); + return 0; +} diff --git a/tests/test_subtasks.py b/tests/test_subtasks.py new file mode 100644 index 0000000000000000000000000000000000000000..c44dc4b940bcdcf187cfce9aeda1ea15a0a360ec --- /dev/null +++ b/tests/test_subtasks.py @@ -0,0 +1,27 @@ +#!/usr/bin/python3 + +import numpy as np +from coyote import Coyote, add_task, run + +coyote = Coyote("test_simple") + +coyote.def_grid([4], [0.0, 0.0, 1.0, 1.0], [0.0, 1.0, 1.0, 0.0], [0, 1, 2, 3], [0.5], [0.5]) + +s = np.zeros(shape=(1, 1), dtype=np.float64) + + +def handler(event): + data = np.asarray(event.data).copy() + + def task(): + global s + s[:] += data + + add_task(task, 0) + + +coyote.add_field("simple_source", "simple_grid", "clock", handler, "PT2S") + +run() + +print(s)