From 3a5db617443d5504cedcab12b2965298481e7d6b Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Fri, 28 Feb 2025 14:20:53 +0100
Subject: [PATCH 1/4] feat: add possibility to add subtsaks to taskqueue

---
 python/coyote_py.cpp    |  1 +
 src/coyote.cpp          |  4 ++++
 src/coyote.hpp          |  1 +
 src/coyoteenv.cpp       |  6 +++++-
 src/coyoteenv.hpp       |  1 +
 tests/CMakeLists.txt    | 19 +++++++++++++++++++
 tests/test_subtasks.cpp | 32 ++++++++++++++++++++++++++++++++
 tests/test_subtasks.py  | 27 +++++++++++++++++++++++++++
 8 files changed, 90 insertions(+), 1 deletion(-)
 create mode 100644 tests/test_subtasks.cpp
 create mode 100644 tests/test_subtasks.py

diff --git a/python/coyote_py.cpp b/python/coyote_py.cpp
index 4f50879..dc57a3f 100644
--- a/python/coyote_py.cpp
+++ b/python/coyote_py.cpp
@@ -110,6 +110,7 @@ PYBIND11_MODULE(coyote, m) {
     m.def("get_field_metadata", &get_field_metadata);
     m.def("run", &coyote_run, py::call_guard<py::gil_scoped_release>());
     m.def("init", &coyote_init, "group_name"_a, "nthreads"_a = 1);
+    m.def("add_task", &coyote_add_task, "task"_a, "prio"_a = 0);
     m.def("start_datetime", &coyote_start_datetime);
     m.def("end_datetime", &coyote_end_datetime);
     m.def("group_comm", []() { return MPI_Comm_c2f(coyote_group_comm()); });
diff --git a/src/coyote.cpp b/src/coyote.cpp
index 63fe40b..de55d08 100644
--- a/src/coyote.cpp
+++ b/src/coyote.cpp
@@ -408,4 +408,8 @@ namespace coyote {
 
     void coyote_run() { CoyoteEnv::getInstance().run(); }
 
+    void coyote_add_task(std::function<void()> task, int prio) {
+        CoyoteEnv::getInstance().add_task(task, prio);
+    }
+
 } // namespace coyote
diff --git a/src/coyote.hpp b/src/coyote.hpp
index 70ac5f9..b172e6a 100644
--- a/src/coyote.hpp
+++ b/src/coyote.hpp
@@ -77,6 +77,7 @@ namespace coyote {
                                    const std::string& source_grid,
                                    const std::string& field_name);
     void coyote_run();
+    void coyote_add_task(std::function<void()> task, int prio = 0);
 
 } // namespace coyote
 #endif
diff --git a/src/coyoteenv.cpp b/src/coyoteenv.cpp
index 4011fc9..c6a82ac 100644
--- a/src/coyoteenv.cpp
+++ b/src/coyoteenv.cpp
@@ -131,7 +131,7 @@ namespace coyote {
                 _task_queue.enqueue(
                     [f]() mutable {
                         Event e(f);
-                        f->op(Event(e));
+                        f->op(e);
                     },
                     {1, f->datetime()});
 
@@ -158,4 +158,8 @@ namespace coyote {
         if (_finalize_mpi)
             MPI_Finalize();
     }
+
+    void CoyoteEnv::add_task(std::function<void()> task, int prio) {
+        _task_queue.enqueue(task, {prio, ""});
+    }
 } // namespace coyote
diff --git a/src/coyoteenv.hpp b/src/coyoteenv.hpp
index e37bdc6..4123801 100644
--- a/src/coyoteenv.hpp
+++ b/src/coyoteenv.hpp
@@ -41,6 +41,7 @@ namespace coyote {
         void ensure_sync_def();
         void ensure_enddef();
         void run();
+        void add_task(std::function<void()> task, int prio = 0);
 
       private:
         static std::unique_ptr<CoyoteEnv> _instance;
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index eb2a870..397feca 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -34,3 +34,22 @@ add_test(
   NAME test_tsqueue
   COMMAND test_tsqueue
 )
+
+add_executable(test_subtasks test_subtasks.cpp)
+target_link_libraries(test_subtasks coyote)
+add_test(
+  NAME test_subtasks
+  COMMAND "${MPIEXEC_EXECUTABLE}"
+  "${MPIEXEC_NUMPROC_FLAG}" "1" ${CMAKE_CURRENT_BINARY_DIR}/test_subtasks : "${MPIEXEC_NUMPROC_FLAG}" "1" ${CMAKE_CURRENT_BINARY_DIR}/simple_source
+)
+
+add_test(
+  NAME test_subtasks_py
+  COMMAND
+  "${MPIEXEC_EXECUTABLE}"
+  "${MPIEXEC_NUMPROC_FLAG}" "1" python ${CMAKE_CURRENT_SOURCE_DIR}/test_subtasks.py : "${MPIEXEC_NUMPROC_FLAG}" "1" ${CMAKE_CURRENT_BINARY_DIR}/simple_source
+)
+set_tests_properties(test_subtasks_py PROPERTIES
+  ENVIRONMENT "PYTHONPATH=${CMAKE_BINARY_DIR}/python:$ENV{PYTHONPATH}"
+  TIMEOUT 20
+)
diff --git a/tests/test_subtasks.cpp b/tests/test_subtasks.cpp
new file mode 100644
index 0000000..931863b
--- /dev/null
+++ b/tests/test_subtasks.cpp
@@ -0,0 +1,32 @@
+#include <atomic>
+#include <cassert>
+#include <coyote.hpp>
+#include <iostream>
+
+using namespace coyote;
+
+int main() {
+    coyote_init("", 4); // use 4 threads
+    Coyote coyote("test_simple");
+
+    coyote.def_grid({{4}}, {{0.0, 0.0, 1.0, 1.0}}, {{0.0, 1.0, 1.0, 0.0}}, {{0, 1, 2, 3}});
+
+    std::atomic_int task_count    = 0;
+    std::atomic_int subtask_count = 0;
+    coyote.add_field(
+        "simple_source",
+        "simple_grid",
+        "clock",
+        [&](Event ev) {
+            task_count++;
+            coyote_add_task([&]() { subtask_count++; });
+        },
+        "PT2S",
+        1);
+
+    coyote_run();
+
+    std::cout << task_count << std::endl;
+    assert(task_count == subtask_count);
+    return 0;
+}
diff --git a/tests/test_subtasks.py b/tests/test_subtasks.py
new file mode 100644
index 0000000..c44dc4b
--- /dev/null
+++ b/tests/test_subtasks.py
@@ -0,0 +1,27 @@
+#!/usr/bin/python3
+
+import numpy as np
+from coyote import Coyote, add_task, run
+
+coyote = Coyote("test_simple")
+
+coyote.def_grid([4], [0.0, 0.0, 1.0, 1.0], [0.0, 1.0, 1.0, 0.0], [0, 1, 2, 3], [0.5], [0.5])
+
+s = np.zeros(shape=(1, 1), dtype=np.float64)
+
+
+def handler(event):
+    data = np.asarray(event.data).copy()
+
+    def task():
+        global s
+        s[:] += data
+
+    add_task(task, 0)
+
+
+coyote.add_field("simple_source", "simple_grid", "clock", handler, "PT2S")
+
+run()
+
+print(s)
-- 
GitLab


From c806fe260c51cbd050f5a72ae3b65c7012907e1e Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Wed, 5 Mar 2025 09:31:10 +0100
Subject: [PATCH 2/4] feat: submit flush in hiopy as separate task

---
 apps/hiopy/_data_handler.py | 47 +++++++++++++++++++++++--------------
 1 file changed, 29 insertions(+), 18 deletions(-)

diff --git a/apps/hiopy/_data_handler.py b/apps/hiopy/_data_handler.py
index 67335a6..f504490 100644
--- a/apps/hiopy/_data_handler.py
+++ b/apps/hiopy/_data_handler.py
@@ -2,6 +2,7 @@ import logging
 from time import perf_counter
 
 import numpy as np
+from coyote import add_task
 
 
 class Timer:
@@ -28,25 +29,35 @@ class DataHandler:
         if self.use_buffer:
             self.buf = np.empty([self.time_chunk_size, *var.shape[1:-1], ma - mi], dtype=var.dtype)
 
-    def flush(self):
-        if self.prune_offset is not None:
-            fro = max(0, self.t_flushed - self.prune_offset)
-            to = max(0, self.t - self.prune_offset)
+    def _create_flush_task(self):
+        old_buffer = self.buf.copy()
+
+        def _flush():
+            if self.prune_offset is not None:
+                fro = max(0, self.t_flushed - self.prune_offset)
+                to = max(0, self.t - self.prune_offset)
+                with Timer() as t:
+                    self.var[fro:to, ..., self.cell_slice] = self.var.fill_value
+                logging.info(
+                    f"pruned {self.var.name} for timesteps {fro}:{to}. Took {t.elapsed_time}s"
+                )
+
+            fro = self.t_flushed % self.time_chunk_size
+            to = fro + (self.t - self.t_flushed)
             with Timer() as t:
-                self.var[fro:to, ..., self.cell_slice] = self.var.fill_value
-            logging.info(f"pruned {self.var.name} for timesteps {fro}:{to}. Took {t.elapsed_time}s")
+                self.var[self.t_flushed : self.t, ..., self.cell_slice] = old_buffer[fro:to, ...]
+            logging.info(
+                f"wrote {self.var.name} for timesteps {self.t_flushed}:{self.t}."
+                f" Took {t.elapsed_time}s"
+            )
+            self.t_flushed = self.t
+            if self.loco_server is not None:
+                self.loco_server.set_timestep(self.var.name, self.t - 1)
 
-        fro = self.t_flushed % self.time_chunk_size
-        to = fro + (self.t - self.t_flushed)
-        with Timer() as t:
-            self.var[self.t_flushed : self.t, ..., self.cell_slice] = self.buf[fro:to, ...]
-        logging.info(
-            f"wrote {self.var.name} for timesteps {self.t_flushed}:{self.t}."
-            f" Took {t.elapsed_time}s"
-        )
-        self.t_flushed = self.t
-        if self.loco_server is not None:
-            self.loco_server.set_timestep(self.var.name, self.t - 1)
+        return _flush
+
+    def flush(self):
+        self._create_flush_task()()
 
     def __call__(self, event):
         logging.info(
@@ -59,7 +70,7 @@ class DataHandler:
             self.buf = np.squeeze(np.transpose(event.data))[None, ...]
         self.t += 1
         if self.t % self.time_chunk_size == 0 or not self.use_buffer:  # chunk complete
-            self.flush()
+            add_task(self._create_flush_task())
 
     def set_loco_server(self, loco_server):
         self.loco_server = loco_server
-- 
GitLab


From 79673428538c251d5e087f1166c84e32177eeea4 Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Wed, 5 Mar 2025 09:31:34 +0100
Subject: [PATCH 3/4] fix: avoid deconstruction of std::function in tsqueue

---
 python/coyote_py.cpp |  9 ++++++++-
 src/coyoteenv.cpp    |  2 +-
 src/tsqueue.hpp      | 31 +++++++++++++++++--------------
 3 files changed, 26 insertions(+), 16 deletions(-)

diff --git a/python/coyote_py.cpp b/python/coyote_py.cpp
index dc57a3f..6cad6c2 100644
--- a/python/coyote_py.cpp
+++ b/python/coyote_py.cpp
@@ -110,7 +110,14 @@ PYBIND11_MODULE(coyote, m) {
     m.def("get_field_metadata", &get_field_metadata);
     m.def("run", &coyote_run, py::call_guard<py::gil_scoped_release>());
     m.def("init", &coyote_init, "group_name"_a, "nthreads"_a = 1);
-    m.def("add_task", &coyote_add_task, "task"_a, "prio"_a = 0);
+    m.def(
+        "add_task",
+        [](std::function<void()> task, int prio) {
+            py::gil_scoped_release release;
+            coyote_add_task(task, prio);
+        },
+        "task"_a,
+        "prio"_a = 0);
     m.def("start_datetime", &coyote_start_datetime);
     m.def("end_datetime", &coyote_end_datetime);
     m.def("group_comm", []() { return MPI_Comm_c2f(coyote_group_comm()); });
diff --git a/src/coyoteenv.cpp b/src/coyoteenv.cpp
index c6a82ac..34d4e2a 100644
--- a/src/coyoteenv.cpp
+++ b/src/coyoteenv.cpp
@@ -160,6 +160,6 @@ namespace coyote {
     }
 
     void CoyoteEnv::add_task(std::function<void()> task, int prio) {
-        _task_queue.enqueue(task, {prio, ""});
+        _task_queue.enqueue(std::move(task), {prio, ""});
     }
 } // namespace coyote
diff --git a/src/tsqueue.hpp b/src/tsqueue.hpp
index 1433a43..5455887 100644
--- a/src/tsqueue.hpp
+++ b/src/tsqueue.hpp
@@ -13,34 +13,37 @@ class QueueTerminated {};
 template <class T, class P, class Compare = std::less<P>>
 class TSQueue {
   private:
-    typedef std::optional<std::tuple<P, T>> elem_type;
+    typedef std::tuple<P, std::shared_ptr<T>> elem_type;
 
   public:
     TSQueue() : q(), m(), c() {}
 
     ~TSQueue() {}
 
-    void enqueue(T t, P prio) {
+    void enqueue(T&& t, P prio) {
         std::lock_guard<std::mutex> lock(m);
-        q.push(std::make_tuple(prio, t));
+        q.push(std::make_tuple(prio, std::make_shared<T>(std::forward<T>(t))));
         c.notify_one();
     }
 
     void end() {
         std::lock_guard<std::mutex> lock(m);
-        q.push(std::nullopt);
+        q.push({});
         c.notify_all();
     }
 
     T dequeue() {
-        std::unique_lock<std::mutex> lock(m);
-        c.wait(lock, [&] { return !q.empty(); });
-        elem_type val = q.top();
-        if (!val) {
-            throw QueueTerminated();
+        elem_type val;
+        {
+            std::unique_lock<std::mutex> lock(m);
+            c.wait(lock, [&] { return !q.empty(); });
+            val = q.top();
+            if (!std::get<1>(val)) {
+                throw QueueTerminated();
+            }
+            q.pop();
         }
-        q.pop();
-        return std::get<1>(val.value());
+        return *std::get<1>(val);
     }
 
     bool terminated() {
@@ -58,11 +61,11 @@ class TSQueue {
     struct _Compare {
         Compare compare;
         bool operator()(const elem_type& a, const elem_type& b) const {
-            if (!a)
+            if (!std::get<1>(a))
                 return true;
-            if (!b)
+            if (!std::get<1>(b))
                 return false;
-            return compare(std::get<0>(a.value()), std::get<0>(b.value()));
+            return compare(std::get<0>(a), std::get<0>(b));
         }
     };
     std::priority_queue<elem_type, std::vector<elem_type>, _Compare> q;
-- 
GitLab


From 45ff6dcc68f315d5d9a31344b7a9ccc81bf9ecf9 Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Wed, 5 Mar 2025 11:29:48 +0100
Subject: [PATCH 4/4] fix: capture t explicitly in flush task

---
 apps/hiopy/_data_handler.py | 31 +++++++++++++++++--------------
 1 file changed, 17 insertions(+), 14 deletions(-)

diff --git a/apps/hiopy/_data_handler.py b/apps/hiopy/_data_handler.py
index f504490..c203dd6 100644
--- a/apps/hiopy/_data_handler.py
+++ b/apps/hiopy/_data_handler.py
@@ -30,30 +30,33 @@ class DataHandler:
             self.buf = np.empty([self.time_chunk_size, *var.shape[1:-1], ma - mi], dtype=var.dtype)
 
     def _create_flush_task(self):
-        old_buffer = self.buf.copy()
+        fro = self.t_flushed % self.time_chunk_size
+        to = fro + (self.t - self.t_flushed)
+        buf = self.buf[fro:to, ...].copy()
+        t_flushed = int(self.t_flushed)
+        t = int(self.t)
 
         def _flush():
+            nonlocal t, t_flushed, buf, self
             if self.prune_offset is not None:
-                fro = max(0, self.t_flushed - self.prune_offset)
-                to = max(0, self.t - self.prune_offset)
-                with Timer() as t:
-                    self.var[fro:to, ..., self.cell_slice] = self.var.fill_value
+                prune_from = max(0, t_flushed - self.prune_offset)
+                prune_to = max(0, t - self.prune_offset)
+                with Timer() as timer:
+                    self.var[prune_from:prune_to, ..., self.cell_slice] = self.var.fill_value
                 logging.info(
-                    f"pruned {self.var.name} for timesteps {fro}:{to}. Took {t.elapsed_time}s"
+                    f"pruned {self.var.name} for timesteps {fro}:{to}. Took {timer.elapsed_time}s"
                 )
 
-            fro = self.t_flushed % self.time_chunk_size
-            to = fro + (self.t - self.t_flushed)
-            with Timer() as t:
-                self.var[self.t_flushed : self.t, ..., self.cell_slice] = old_buffer[fro:to, ...]
+            with Timer() as timer:
+                self.var[t_flushed:t, ..., self.cell_slice] = buf
             logging.info(
-                f"wrote {self.var.name} for timesteps {self.t_flushed}:{self.t}."
-                f" Took {t.elapsed_time}s"
+                f"wrote {self.var.name} for timesteps {t_flushed}:{t}."
+                f" Took {timer.elapsed_time}s"
             )
-            self.t_flushed = self.t
             if self.loco_server is not None:
-                self.loco_server.set_timestep(self.var.name, self.t - 1)
+                self.loco_server.set_timestep(self.var.name, t - 1)
 
+        self.t_flushed = self.t
         return _flush
 
     def flush(self):
-- 
GitLab