From 3a5db617443d5504cedcab12b2965298481e7d6b Mon Sep 17 00:00:00 2001 From: Nils-Arne Dreier <dreier@dkrz.de> Date: Fri, 28 Feb 2025 14:20:53 +0100 Subject: [PATCH 1/4] feat: add possibility to add subtsaks to taskqueue --- python/coyote_py.cpp | 1 + src/coyote.cpp | 4 ++++ src/coyote.hpp | 1 + src/coyoteenv.cpp | 6 +++++- src/coyoteenv.hpp | 1 + tests/CMakeLists.txt | 19 +++++++++++++++++++ tests/test_subtasks.cpp | 32 ++++++++++++++++++++++++++++++++ tests/test_subtasks.py | 27 +++++++++++++++++++++++++++ 8 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 tests/test_subtasks.cpp create mode 100644 tests/test_subtasks.py diff --git a/python/coyote_py.cpp b/python/coyote_py.cpp index 4f50879..dc57a3f 100644 --- a/python/coyote_py.cpp +++ b/python/coyote_py.cpp @@ -110,6 +110,7 @@ PYBIND11_MODULE(coyote, m) { m.def("get_field_metadata", &get_field_metadata); m.def("run", &coyote_run, py::call_guard<py::gil_scoped_release>()); m.def("init", &coyote_init, "group_name"_a, "nthreads"_a = 1); + m.def("add_task", &coyote_add_task, "task"_a, "prio"_a = 0); m.def("start_datetime", &coyote_start_datetime); m.def("end_datetime", &coyote_end_datetime); m.def("group_comm", []() { return MPI_Comm_c2f(coyote_group_comm()); }); diff --git a/src/coyote.cpp b/src/coyote.cpp index 63fe40b..de55d08 100644 --- a/src/coyote.cpp +++ b/src/coyote.cpp @@ -408,4 +408,8 @@ namespace coyote { void coyote_run() { CoyoteEnv::getInstance().run(); } + void coyote_add_task(std::function<void()> task, int prio) { + CoyoteEnv::getInstance().add_task(task, prio); + } + } // namespace coyote diff --git a/src/coyote.hpp b/src/coyote.hpp index 70ac5f9..b172e6a 100644 --- a/src/coyote.hpp +++ b/src/coyote.hpp @@ -77,6 +77,7 @@ namespace coyote { const std::string& source_grid, const std::string& field_name); void coyote_run(); + void coyote_add_task(std::function<void()> task, int prio = 0); } // namespace coyote #endif diff --git a/src/coyoteenv.cpp b/src/coyoteenv.cpp index 4011fc9..c6a82ac 100644 --- a/src/coyoteenv.cpp +++ b/src/coyoteenv.cpp @@ -131,7 +131,7 @@ namespace coyote { _task_queue.enqueue( [f]() mutable { Event e(f); - f->op(Event(e)); + f->op(e); }, {1, f->datetime()}); @@ -158,4 +158,8 @@ namespace coyote { if (_finalize_mpi) MPI_Finalize(); } + + void CoyoteEnv::add_task(std::function<void()> task, int prio) { + _task_queue.enqueue(task, {prio, ""}); + } } // namespace coyote diff --git a/src/coyoteenv.hpp b/src/coyoteenv.hpp index e37bdc6..4123801 100644 --- a/src/coyoteenv.hpp +++ b/src/coyoteenv.hpp @@ -41,6 +41,7 @@ namespace coyote { void ensure_sync_def(); void ensure_enddef(); void run(); + void add_task(std::function<void()> task, int prio = 0); private: static std::unique_ptr<CoyoteEnv> _instance; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index eb2a870..397feca 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -34,3 +34,22 @@ add_test( NAME test_tsqueue COMMAND test_tsqueue ) + +add_executable(test_subtasks test_subtasks.cpp) +target_link_libraries(test_subtasks coyote) +add_test( + NAME test_subtasks + COMMAND "${MPIEXEC_EXECUTABLE}" + "${MPIEXEC_NUMPROC_FLAG}" "1" ${CMAKE_CURRENT_BINARY_DIR}/test_subtasks : "${MPIEXEC_NUMPROC_FLAG}" "1" ${CMAKE_CURRENT_BINARY_DIR}/simple_source +) + +add_test( + NAME test_subtasks_py + COMMAND + "${MPIEXEC_EXECUTABLE}" + "${MPIEXEC_NUMPROC_FLAG}" "1" python ${CMAKE_CURRENT_SOURCE_DIR}/test_subtasks.py : "${MPIEXEC_NUMPROC_FLAG}" "1" ${CMAKE_CURRENT_BINARY_DIR}/simple_source +) +set_tests_properties(test_subtasks_py PROPERTIES + ENVIRONMENT "PYTHONPATH=${CMAKE_BINARY_DIR}/python:$ENV{PYTHONPATH}" + TIMEOUT 20 +) diff --git a/tests/test_subtasks.cpp b/tests/test_subtasks.cpp new file mode 100644 index 0000000..931863b --- /dev/null +++ b/tests/test_subtasks.cpp @@ -0,0 +1,32 @@ +#include <atomic> +#include <cassert> +#include <coyote.hpp> +#include <iostream> + +using namespace coyote; + +int main() { + coyote_init("", 4); // use 4 threads + Coyote coyote("test_simple"); + + coyote.def_grid({{4}}, {{0.0, 0.0, 1.0, 1.0}}, {{0.0, 1.0, 1.0, 0.0}}, {{0, 1, 2, 3}}); + + std::atomic_int task_count = 0; + std::atomic_int subtask_count = 0; + coyote.add_field( + "simple_source", + "simple_grid", + "clock", + [&](Event ev) { + task_count++; + coyote_add_task([&]() { subtask_count++; }); + }, + "PT2S", + 1); + + coyote_run(); + + std::cout << task_count << std::endl; + assert(task_count == subtask_count); + return 0; +} diff --git a/tests/test_subtasks.py b/tests/test_subtasks.py new file mode 100644 index 0000000..c44dc4b --- /dev/null +++ b/tests/test_subtasks.py @@ -0,0 +1,27 @@ +#!/usr/bin/python3 + +import numpy as np +from coyote import Coyote, add_task, run + +coyote = Coyote("test_simple") + +coyote.def_grid([4], [0.0, 0.0, 1.0, 1.0], [0.0, 1.0, 1.0, 0.0], [0, 1, 2, 3], [0.5], [0.5]) + +s = np.zeros(shape=(1, 1), dtype=np.float64) + + +def handler(event): + data = np.asarray(event.data).copy() + + def task(): + global s + s[:] += data + + add_task(task, 0) + + +coyote.add_field("simple_source", "simple_grid", "clock", handler, "PT2S") + +run() + +print(s) -- GitLab From c806fe260c51cbd050f5a72ae3b65c7012907e1e Mon Sep 17 00:00:00 2001 From: Nils-Arne Dreier <dreier@dkrz.de> Date: Wed, 5 Mar 2025 09:31:10 +0100 Subject: [PATCH 2/4] feat: submit flush in hiopy as separate task --- apps/hiopy/_data_handler.py | 47 +++++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/apps/hiopy/_data_handler.py b/apps/hiopy/_data_handler.py index 67335a6..f504490 100644 --- a/apps/hiopy/_data_handler.py +++ b/apps/hiopy/_data_handler.py @@ -2,6 +2,7 @@ import logging from time import perf_counter import numpy as np +from coyote import add_task class Timer: @@ -28,25 +29,35 @@ class DataHandler: if self.use_buffer: self.buf = np.empty([self.time_chunk_size, *var.shape[1:-1], ma - mi], dtype=var.dtype) - def flush(self): - if self.prune_offset is not None: - fro = max(0, self.t_flushed - self.prune_offset) - to = max(0, self.t - self.prune_offset) + def _create_flush_task(self): + old_buffer = self.buf.copy() + + def _flush(): + if self.prune_offset is not None: + fro = max(0, self.t_flushed - self.prune_offset) + to = max(0, self.t - self.prune_offset) + with Timer() as t: + self.var[fro:to, ..., self.cell_slice] = self.var.fill_value + logging.info( + f"pruned {self.var.name} for timesteps {fro}:{to}. Took {t.elapsed_time}s" + ) + + fro = self.t_flushed % self.time_chunk_size + to = fro + (self.t - self.t_flushed) with Timer() as t: - self.var[fro:to, ..., self.cell_slice] = self.var.fill_value - logging.info(f"pruned {self.var.name} for timesteps {fro}:{to}. Took {t.elapsed_time}s") + self.var[self.t_flushed : self.t, ..., self.cell_slice] = old_buffer[fro:to, ...] + logging.info( + f"wrote {self.var.name} for timesteps {self.t_flushed}:{self.t}." + f" Took {t.elapsed_time}s" + ) + self.t_flushed = self.t + if self.loco_server is not None: + self.loco_server.set_timestep(self.var.name, self.t - 1) - fro = self.t_flushed % self.time_chunk_size - to = fro + (self.t - self.t_flushed) - with Timer() as t: - self.var[self.t_flushed : self.t, ..., self.cell_slice] = self.buf[fro:to, ...] - logging.info( - f"wrote {self.var.name} for timesteps {self.t_flushed}:{self.t}." - f" Took {t.elapsed_time}s" - ) - self.t_flushed = self.t - if self.loco_server is not None: - self.loco_server.set_timestep(self.var.name, self.t - 1) + return _flush + + def flush(self): + self._create_flush_task()() def __call__(self, event): logging.info( @@ -59,7 +70,7 @@ class DataHandler: self.buf = np.squeeze(np.transpose(event.data))[None, ...] self.t += 1 if self.t % self.time_chunk_size == 0 or not self.use_buffer: # chunk complete - self.flush() + add_task(self._create_flush_task()) def set_loco_server(self, loco_server): self.loco_server = loco_server -- GitLab From 79673428538c251d5e087f1166c84e32177eeea4 Mon Sep 17 00:00:00 2001 From: Nils-Arne Dreier <dreier@dkrz.de> Date: Wed, 5 Mar 2025 09:31:34 +0100 Subject: [PATCH 3/4] fix: avoid deconstruction of std::function in tsqueue --- python/coyote_py.cpp | 9 ++++++++- src/coyoteenv.cpp | 2 +- src/tsqueue.hpp | 31 +++++++++++++++++-------------- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/python/coyote_py.cpp b/python/coyote_py.cpp index dc57a3f..6cad6c2 100644 --- a/python/coyote_py.cpp +++ b/python/coyote_py.cpp @@ -110,7 +110,14 @@ PYBIND11_MODULE(coyote, m) { m.def("get_field_metadata", &get_field_metadata); m.def("run", &coyote_run, py::call_guard<py::gil_scoped_release>()); m.def("init", &coyote_init, "group_name"_a, "nthreads"_a = 1); - m.def("add_task", &coyote_add_task, "task"_a, "prio"_a = 0); + m.def( + "add_task", + [](std::function<void()> task, int prio) { + py::gil_scoped_release release; + coyote_add_task(task, prio); + }, + "task"_a, + "prio"_a = 0); m.def("start_datetime", &coyote_start_datetime); m.def("end_datetime", &coyote_end_datetime); m.def("group_comm", []() { return MPI_Comm_c2f(coyote_group_comm()); }); diff --git a/src/coyoteenv.cpp b/src/coyoteenv.cpp index c6a82ac..34d4e2a 100644 --- a/src/coyoteenv.cpp +++ b/src/coyoteenv.cpp @@ -160,6 +160,6 @@ namespace coyote { } void CoyoteEnv::add_task(std::function<void()> task, int prio) { - _task_queue.enqueue(task, {prio, ""}); + _task_queue.enqueue(std::move(task), {prio, ""}); } } // namespace coyote diff --git a/src/tsqueue.hpp b/src/tsqueue.hpp index 1433a43..5455887 100644 --- a/src/tsqueue.hpp +++ b/src/tsqueue.hpp @@ -13,34 +13,37 @@ class QueueTerminated {}; template <class T, class P, class Compare = std::less<P>> class TSQueue { private: - typedef std::optional<std::tuple<P, T>> elem_type; + typedef std::tuple<P, std::shared_ptr<T>> elem_type; public: TSQueue() : q(), m(), c() {} ~TSQueue() {} - void enqueue(T t, P prio) { + void enqueue(T&& t, P prio) { std::lock_guard<std::mutex> lock(m); - q.push(std::make_tuple(prio, t)); + q.push(std::make_tuple(prio, std::make_shared<T>(std::forward<T>(t)))); c.notify_one(); } void end() { std::lock_guard<std::mutex> lock(m); - q.push(std::nullopt); + q.push({}); c.notify_all(); } T dequeue() { - std::unique_lock<std::mutex> lock(m); - c.wait(lock, [&] { return !q.empty(); }); - elem_type val = q.top(); - if (!val) { - throw QueueTerminated(); + elem_type val; + { + std::unique_lock<std::mutex> lock(m); + c.wait(lock, [&] { return !q.empty(); }); + val = q.top(); + if (!std::get<1>(val)) { + throw QueueTerminated(); + } + q.pop(); } - q.pop(); - return std::get<1>(val.value()); + return *std::get<1>(val); } bool terminated() { @@ -58,11 +61,11 @@ class TSQueue { struct _Compare { Compare compare; bool operator()(const elem_type& a, const elem_type& b) const { - if (!a) + if (!std::get<1>(a)) return true; - if (!b) + if (!std::get<1>(b)) return false; - return compare(std::get<0>(a.value()), std::get<0>(b.value())); + return compare(std::get<0>(a), std::get<0>(b)); } }; std::priority_queue<elem_type, std::vector<elem_type>, _Compare> q; -- GitLab From 45ff6dcc68f315d5d9a31344b7a9ccc81bf9ecf9 Mon Sep 17 00:00:00 2001 From: Nils-Arne Dreier <dreier@dkrz.de> Date: Wed, 5 Mar 2025 11:29:48 +0100 Subject: [PATCH 4/4] fix: capture t explicitly in flush task --- apps/hiopy/_data_handler.py | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/apps/hiopy/_data_handler.py b/apps/hiopy/_data_handler.py index f504490..c203dd6 100644 --- a/apps/hiopy/_data_handler.py +++ b/apps/hiopy/_data_handler.py @@ -30,30 +30,33 @@ class DataHandler: self.buf = np.empty([self.time_chunk_size, *var.shape[1:-1], ma - mi], dtype=var.dtype) def _create_flush_task(self): - old_buffer = self.buf.copy() + fro = self.t_flushed % self.time_chunk_size + to = fro + (self.t - self.t_flushed) + buf = self.buf[fro:to, ...].copy() + t_flushed = int(self.t_flushed) + t = int(self.t) def _flush(): + nonlocal t, t_flushed, buf, self if self.prune_offset is not None: - fro = max(0, self.t_flushed - self.prune_offset) - to = max(0, self.t - self.prune_offset) - with Timer() as t: - self.var[fro:to, ..., self.cell_slice] = self.var.fill_value + prune_from = max(0, t_flushed - self.prune_offset) + prune_to = max(0, t - self.prune_offset) + with Timer() as timer: + self.var[prune_from:prune_to, ..., self.cell_slice] = self.var.fill_value logging.info( - f"pruned {self.var.name} for timesteps {fro}:{to}. Took {t.elapsed_time}s" + f"pruned {self.var.name} for timesteps {fro}:{to}. Took {timer.elapsed_time}s" ) - fro = self.t_flushed % self.time_chunk_size - to = fro + (self.t - self.t_flushed) - with Timer() as t: - self.var[self.t_flushed : self.t, ..., self.cell_slice] = old_buffer[fro:to, ...] + with Timer() as timer: + self.var[t_flushed:t, ..., self.cell_slice] = buf logging.info( - f"wrote {self.var.name} for timesteps {self.t_flushed}:{self.t}." - f" Took {t.elapsed_time}s" + f"wrote {self.var.name} for timesteps {t_flushed}:{t}." + f" Took {timer.elapsed_time}s" ) - self.t_flushed = self.t if self.loco_server is not None: - self.loco_server.set_timestep(self.var.name, self.t - 1) + self.loco_server.set_timestep(self.var.name, t - 1) + self.t_flushed = self.t return _flush def flush(self): -- GitLab