From 928e214fba665d8aa18c0248039201262a84bf44 Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Thu, 20 Feb 2025 12:55:48 +0100
Subject: [PATCH 01/16] feat: one_shot fields

---
 python/coyote_py.cpp |  3 ++-
 src/coyote.cpp       | 30 +++++++++++++++++++-----------
 src/coyote.hpp       |  3 ++-
 src/coyoteenv.cpp    | 25 ++++++++++++++-----------
 src/field.cpp        |  7 ++++++-
 src/field.hpp        |  5 +++--
 tests/test_simple.py | 15 +++++++++++++++
 7 files changed, 61 insertions(+), 27 deletions(-)

diff --git a/python/coyote_py.cpp b/python/coyote_py.cpp
index 70e8e4f..4c5844d 100644
--- a/python/coyote_py.cpp
+++ b/python/coyote_py.cpp
@@ -38,7 +38,8 @@ PYBIND11_MODULE(coyote, m) {
              "frac_mask"_a                = std::vector<double>(),
              "pass_through"_a             = true,
              "src_lag"_a                  = 1,
-             "weight_filename"_a          = std::nullopt)
+             "weight_filename"_a          = std::nullopt,
+             "one_shot"_a                 = false)
         .def_property_readonly(
             "comp_comm", [](const Coyote& c) -> int { return MPI_Comm_c2f(c.get_mpi_comm()); })
         .def_property_readonly("comp_rank",
diff --git a/src/coyote.cpp b/src/coyote.cpp
index 727bbe6..98696b1 100644
--- a/src/coyote.cpp
+++ b/src/coyote.cpp
@@ -33,7 +33,8 @@ namespace coyote {
             std::vector<double> frac_mask,
             bool pass_through,
             int src_lag,
-            std::optional<std::string> weight_filename) {
+            std::optional<std::string> weight_filename,
+            bool one_shot) {
             auto& cenv = CoyoteEnv::getInstance();
             cenv.ensure_def_comps();
 
@@ -46,13 +47,18 @@ namespace coyote {
                     field_name.c_str());
             }
 
-            if (!iso_dt) {
-                cenv.ensure_sync_def();
-                iso_dt = yac_cget_field_timestep_instance(
-                    cenv.get_yac_instance_id(),
-                    source_comp.c_str(),
-                    source_grid.c_str(),
-                    field_name.c_str());
+            if (!one_shot) {
+                if (!iso_dt) {
+                    cenv.ensure_sync_def();
+                    iso_dt = yac_cget_field_timestep_instance(
+                        cenv.get_yac_instance_id(),
+                        source_comp.c_str(),
+                        source_grid.c_str(),
+                        field_name.c_str());
+                }
+            } else {
+                iso_dt  = "P9999999Y";
+                src_lag = 0;
             }
 
             std::string tgt_field_name = field_name + "_target";
@@ -105,7 +111,7 @@ namespace coyote {
                 yac_time_reduction,
                 yac_interpolation_stack,
                 src_lag,
-                /*tgt_lag*/ 1,
+                /*tgt_lag*/ one_shot ? 0 : 1,
                 ext_couple_config_id);
 
             yac_cfree_interp_stack_config(yac_interpolation_stack);
@@ -346,7 +352,8 @@ namespace coyote {
         std::vector<double> frac_mask,
         bool pass_through,
         int src_lag,
-        std::optional<std::string> weight_filename) {
+        std::optional<std::string> weight_filename,
+        bool one_shot) {
         pImpl->add_field(
             source_comp,
             source_grid,
@@ -359,7 +366,8 @@ namespace coyote {
             frac_mask,
             pass_through,
             src_lag,
-            weight_filename);
+            weight_filename,
+            one_shot);
     }
 
     MPI_Comm Coyote::get_mpi_comm() const { return pImpl->get_mpi_comm(); }
diff --git a/src/coyote.hpp b/src/coyote.hpp
index 4578846..b4461b2 100644
--- a/src/coyote.hpp
+++ b/src/coyote.hpp
@@ -39,7 +39,8 @@ namespace coyote {
             std::vector<double> frac_mask              = {},
             bool pass_through                          = true,
             int src_lag                                = 1,
-            std::optional<std::string> weight_filename = std::nullopt);
+            std::optional<std::string> weight_filename = std::nullopt,
+            bool one_shot                              = false);
 
         MPI_Comm get_mpi_comm() const;
 
diff --git a/src/coyoteenv.cpp b/src/coyoteenv.cpp
index 4011fc9..259f948 100644
--- a/src/coyoteenv.cpp
+++ b/src/coyoteenv.cpp
@@ -102,20 +102,18 @@ namespace coyote {
         std::vector<std::thread> worker(_nthreads - 1);
         std::generate(worker.begin(), worker.end(), [&]() { return std::thread(do_work); });
 
-        int done = 0;
-        do {
-            done = 0;
+        while (_fields.size() > 0) {
             std::string curr_datetime =
                 (*std::min_element(_fields.begin(), _fields.end(), [](const auto& a, const auto& b) {
-                    return a->datetime() < b->datetime();
+                    std::string a_dt = a->datetime();
+                    std::string b_dt = b->datetime();
+                    if (a_dt.size() < b_dt.size())
+                        return true;
+                    if (a_dt.size() > b_dt.size())
+                        return false;
+                    return a_dt < b_dt;
                 }))->datetime();
             for (auto& field : _fields) {
-                // skip if done
-                if (field->is_done()) {
-                    done++;
-                    continue;
-                }
-
                 // ensure that fields are updated chronologically
                 if (field->datetime() != curr_datetime)
                     continue;
@@ -142,7 +140,12 @@ namespace coyote {
             if (_nthreads == 1 && !_task_queue.empty()) {
                 _task_queue.dequeue()();
             }
-        } while (done < _fields.size());
+
+            // filter out finished fields
+            _fields.erase(std::remove_if(
+                              _fields.begin(), _fields.end(), [](auto& f) { return f->is_done(); }),
+                          _fields.end());
+        }
 
         _task_queue.end();
 
diff --git a/src/field.cpp b/src/field.cpp
index 4e9a267..cece9b5 100644
--- a/src/field.cpp
+++ b/src/field.cpp
@@ -42,7 +42,11 @@ namespace coyote {
         }
     }
 
-    std::string Field::datetime() const { return yac_cget_field_datetime(_target_field_id); }
+    std::string Field::datetime() const {
+        if (_datetime == "")
+            _datetime = yac_cget_field_datetime(_target_field_id);
+        return _datetime;
+    }
 
     bool Field::is_done() const { return _info == YAC_ACTION_OUT_OF_BOUND; }
     bool Field::is_ready() {
@@ -59,6 +63,7 @@ namespace coyote {
             return false;
         yac_cget_action(_target_field_id, &_info);
         _get_started = false;
+        _datetime    = yac_cget_field_datetime(_target_field_id);
         return true;
     }
 
diff --git a/src/field.hpp b/src/field.hpp
index 9918f89..efb3a43 100644
--- a/src/field.hpp
+++ b/src/field.hpp
@@ -42,8 +42,9 @@ namespace coyote {
         std::vector<double> _frac_mask;
         int _collection_size;
         size_t _point_size;
-        int _info         = YAC_ACTION_NONE;
-        bool _get_started = false;
+        int _info                     = YAC_ACTION_NONE;
+        bool _get_started             = false;
+        mutable std::string _datetime = "";
     };
 } // namespace coyote
 
diff --git a/tests/test_simple.py b/tests/test_simple.py
index b709524..6698824 100644
--- a/tests/test_simple.py
+++ b/tests/test_simple.py
@@ -21,4 +21,19 @@ def handler(event):
 
 coyote.add_field("simple_source", "simple_grid", "clock", handler, "PT2S")
 
+
+one_shot_called = False
+
+
+def one_shot_handler(event):
+    global one_shot_called
+    print("FOO")
+    assert not one_shot_called
+    one_shot_called = True
+
+
+coyote.add_field("simple_source", "simple_grid", "x", one_shot_handler, one_shot=True)
+
 run()
+
+assert one_shot_called
-- 
GitLab


From ff0176846de0e90040cbf12cef56c29fddab9a5e Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Thu, 20 Feb 2025 20:15:18 +0100
Subject: [PATCH 02/16] feat: implement one_shot fields in hiopy

---
 apps/hiopy/_data_handler.py | 53 ++++++++++++++++++++++++++-----------
 apps/hiopy/worker.py        | 37 +++++++++++++++-----------
 tests/simple_source.c       | 37 +++++++++++++++-----------
 3 files changed, 79 insertions(+), 48 deletions(-)

diff --git a/apps/hiopy/_data_handler.py b/apps/hiopy/_data_handler.py
index 67335a6..5ee288c 100644
--- a/apps/hiopy/_data_handler.py
+++ b/apps/hiopy/_data_handler.py
@@ -24,7 +24,10 @@ class DataHandler:
         self.prune_offset = var.attrs.get("loco::num_retained_timesteps", None)
         self.t_flushed = self.t
         self.time_chunk_size = var.chunks[0]
-        self.use_buffer = var.attrs.get("hiopy::use_buffer", self.time_chunk_size > 1)
+        self.use_buffer = var.attrs.get(
+            "hiopy::use_buffer", self.time_chunk_size > 1 and self.t is not None
+        )
+        assert not (self.t is None and self.use_buffer)
         if self.use_buffer:
             self.buf = np.empty([self.time_chunk_size, *var.shape[1:-1], ma - mi], dtype=var.dtype)
 
@@ -36,29 +39,47 @@ class DataHandler:
                 self.var[fro:to, ..., self.cell_slice] = self.var.fill_value
             logging.info(f"pruned {self.var.name} for timesteps {fro}:{to}. Took {t.elapsed_time}s")
 
-        fro = self.t_flushed % self.time_chunk_size
-        to = fro + (self.t - self.t_flushed)
-        with Timer() as t:
-            self.var[self.t_flushed : self.t, ..., self.cell_slice] = self.buf[fro:to, ...]
-        logging.info(
-            f"wrote {self.var.name} for timesteps {self.t_flushed}:{self.t}."
-            f" Took {t.elapsed_time}s"
-        )
-        self.t_flushed = self.t
-        if self.loco_server is not None:
+        if self.t is None:
+            if self.t_flushed is None:
+                with Timer() as t:
+                    self.var[..., self.cell_slice] = np.squeeze(self.buf)
+                logging.info(
+                    f"wrote {self.var.name}." f" Took {t.elapsed_time}s" f"{self.t_flushed}"
+                )
+            self.t_flushed = 1
+        else:
+            fro = self.t_flushed % self.time_chunk_size
+            to = fro + (self.t - self.t_flushed)
+            with Timer() as t:
+                self.var[self.t_flushed : self.t, ..., self.cell_slice] = self.buf[fro:to, ...]
+            logging.info(
+                f"wrote {self.var.name} for timesteps {self.t_flushed}:{self.t}."
+                f" Took {t.elapsed_time}s"
+            )
+
+            self.t_flushed = self.t
+        if self.loco_server is not None and self.t is not None:
             self.loco_server.set_timestep(self.var.name, self.t - 1)
 
     def __call__(self, event):
-        logging.info(
-            f"received {self.var.name} at {self.t % self.time_chunk_size+1}/{self.time_chunk_size}"
-        )
+        if self.t is not None:
+            logging.info(
+                f"received {self.var.name} at "
+                f"{self.t % self.time_chunk_size+1}/{self.time_chunk_size}"
+            )
+        else:
+            logging.info(f"received {self.var.name}")
+
         if self.use_buffer:
             self.buf[self.t % self.time_chunk_size, ...] = np.transpose(event.data)
             event.release()
         else:
             self.buf = np.squeeze(np.transpose(event.data))[None, ...]
-        self.t += 1
-        if self.t % self.time_chunk_size == 0 or not self.use_buffer:  # chunk complete
+        if self.t is not None:
+            self.t += 1
+        if (
+            (self.t is None) or (self.t % self.time_chunk_size == 0) or (not self.use_buffer)
+        ):  # chunk complete
             self.flush()
 
     def set_loco_server(self, loco_server):
diff --git a/apps/hiopy/worker.py b/apps/hiopy/worker.py
index 794f769..14236ca 100755
--- a/apps/hiopy/worker.py
+++ b/apps/hiopy/worker.py
@@ -118,20 +118,23 @@ def main():
         for v in data_vars:
             # compute timestep
             time_dim_name = v.attrs["_ARRAY_DIMENSIONS"][0]
-            time_coordinate = v.group[time_dim_name]
-            assert (
-                "seconds since " in time_coordinate.attrs["units"]
-            ), "Currently the time must be given in seconds"
-
-            # compute time start index
-            t0 = (
-                np.datetime64(start_datetime())
-                - np.datetime64(v.group.time.attrs["units"][len("seconds since ") :])
-            ) / np.timedelta64(1, "s")
-            t0_idx = np.searchsorted(v.group.time, t0)
-            assert v.group.time[t0_idx] == t0, "start_datetime not found in time axis"
-
-            dt = time_coordinate[t0_idx + 1] - time_coordinate[t0_idx]
+            if time_dim_name in v.group and "T" in v.group[time_dim_name].attrs["axis"]:
+                time_coordinate = v.group[time_dim_name]
+                assert (
+                    "seconds since " in time_coordinate.attrs["units"]
+                ), "Currently the time must be given in seconds"
+
+                # compute time start index
+                t0 = (
+                    np.datetime64(start_datetime())
+                    - np.datetime64(v.group[time_dim_name].attrs["units"][len("seconds since ") :])
+                ) / np.timedelta64(1, "s")
+                t0_idx = np.searchsorted(v.group[time_dim_name], t0)
+                assert v.group.time[t0_idx] == t0, "start_datetime not found in time axis"
+
+                dt = time_coordinate[t0_idx + 1] - time_coordinate[t0_idx]
+            else:
+                t0_idx = None
 
             # see YAC_REDUCTION_TIME_NONE etc. (TODO: pass constants through coyote)
             time_methods2yac = {"point": 0, "sum": 1, "mean": 2, "min": 3, "max": 4}
@@ -172,16 +175,18 @@ def main():
             if args.loco:
                 data_handlers[-1].set_loco_server(loco_server)
 
+            kwargs = {"one_shot": True} if t0_idx is None else {"iso_dt": f"PT{dt}S"}
+
             coyote.add_field(
                 src_comp,
                 src_grid,
                 src_name,
                 data_handlers[-1],
-                f"PT{dt}S",
-                collection_size,
+                collection_size=collection_size,
                 yac_time_reduction=time_methods2yac[time_method],
                 interpolation_stack_yaml=f"- nnn:\n    n: {nnn}",
                 frac_mask=frac_mask,
+                **kwargs,
             )
 
     run()
diff --git a/tests/simple_source.c b/tests/simple_source.c
index d3ebfde..acdf866 100644
--- a/tests/simple_source.c
+++ b/tests/simple_source.c
@@ -72,6 +72,9 @@ int main(int argc, char** argv) {
     int clock_field_id = -1;
     yac_cdef_field(
         "clock", comp_id, &point_id, 1, 1, "PT1S", YAC_TIME_UNIT_ISO_FORMAT, &clock_field_id);
+    int mask_field_id = -1;
+    yac_cdef_field(
+        "mask", comp_id, &point_id, 1, 1, "PT1S", YAC_TIME_UNIT_ISO_FORMAT, &mask_field_id);
 
     yac_cenddef();
 
@@ -79,7 +82,8 @@ int main(int argc, char** argv) {
     double *x_data     = malloc(datasize * sizeof(*x_data)),
            *y_data     = malloc(datasize * sizeof(*y_data)),
            *z_data     = malloc(datasize * sizeof(*z_data)),
-           *clock_data = malloc(datasize * sizeof(*clock_data));
+           *clock_data = malloc(datasize * sizeof(*clock_data)),
+           *mask_data  = malloc(datasize * sizeof(*mask_data));
     double dx          = M_PI / resolution;
 
     // this seems to be the mapping lon,lat -> xyz healpix uses
@@ -92,27 +96,27 @@ int main(int argc, char** argv) {
             y_data[idx]     = sin(lon) * cos(lat);
             z_data[idx]     = sin(lat);
             clock_data[idx] = 0;
+            mask_data[idx]  = (i + j) % 3 == 0;
         }
     }
 
-    int info = 0, ierror = -1;
+    int info = 0, ierror = -1, max_info = 0;
     int step = 0;
-    while (info != YAC_ACTION_PUT_FOR_RESTART) {
-        info = YAC_ACTION_PUT_FOR_RESTART;
-        printf("step: %d", step);
-        if (yac_cget_role_from_field_id(x_field_id) != YAC_EXCHANGE_TYPE_NONE) {
-            printf(" time: %s\n", yac_cget_field_datetime(x_field_id));
-            yac_cput_(x_field_id, 1, x_data, &info, &ierror);
-        }
-        if (yac_cget_role_from_field_id(y_field_id) != YAC_EXCHANGE_TYPE_NONE)
-            yac_cput_(y_field_id, 1, y_data, &info, &ierror);
-        if (yac_cget_role_from_field_id(z_field_id) != YAC_EXCHANGE_TYPE_NONE)
-            yac_cput_(z_field_id, 1, z_data, &info, &ierror);
-        if (yac_cget_role_from_field_id(clock_field_id) != YAC_EXCHANGE_TYPE_NONE)
-            yac_cput_(clock_field_id, 1, clock_data, &info, &ierror);
+    while (max_info < YAC_ACTION_PUT_FOR_RESTART) {
+        printf("step: %d\n", step);
+        yac_cput_(x_field_id, 1, x_data, &info, &ierror);
+        max_info = info;
+        yac_cput_(y_field_id, 1, y_data, &info, &ierror);
+        max_info = info > max_info ? info : max_info;
+        yac_cput_(z_field_id, 1, z_data, &info, &ierror);
+        max_info = info > max_info ? info : max_info;
+        yac_cput_(clock_field_id, 1, clock_data, &info, &ierror);
+        max_info = info > max_info ? info : max_info;
+        yac_cput_(mask_field_id, 1, mask_data, &info, &ierror);
+        max_info = info > max_info ? info : max_info;
+
         for (size_t i = 0; i < datasize; ++i)
             clock_data[i] += 1;
-        printf("\n");
         ++step;
     }
 
@@ -122,4 +126,5 @@ int main(int argc, char** argv) {
     free(y_data);
     free(z_data);
     free(clock_data);
+    free(mask_data);
 }
-- 
GitLab


From 1f1a1eecf35269bb32e91d61603c7f7806b5b77c Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Fri, 21 Feb 2025 08:48:14 +0100
Subject: [PATCH 03/16] feat: change frac_mask to span instead of vector

---
 python/coyote_py.cpp | 65 +++++++++++++++++++++++++++++++++-----------
 src/coyote.cpp       |  4 +--
 src/coyote.hpp       |  2 +-
 src/field.cpp        |  9 ++++--
 src/field.hpp        |  5 ++--
 5 files changed, 62 insertions(+), 23 deletions(-)

diff --git a/python/coyote_py.cpp b/python/coyote_py.cpp
index 4c5844d..14610ec 100644
--- a/python/coyote_py.cpp
+++ b/python/coyote_py.cpp
@@ -24,22 +24,55 @@ PYBIND11_MODULE(coyote, m) {
     py::class_<Coyote>(m, "Coyote")
         .def(py::init<std::string>(), "name"_a)
         // we need to wrap add_fields to turn the frac_mask buffer into a double*
-        .def("add_field",
-             &Coyote::add_field,
-             "Add a field to the coyote instance",
-             "source_comp"_a,
-             "source_grid"_a,
-             "field_name"_a,
-             "eventHandler"_a,
-             "iso_dt"_a                   = std::nullopt,
-             "collection_size"_a          = std::nullopt,
-             "interpolation_stack_yaml"_a = "[nnn]",
-             "yac_time_reduction"_a       = 0,
-             "frac_mask"_a                = std::vector<double>(),
-             "pass_through"_a             = true,
-             "src_lag"_a                  = 1,
-             "weight_filename"_a          = std::nullopt,
-             "one_shot"_a                 = false)
+        .def(
+            "add_field",
+            [](Coyote& c,
+               const std::string& source_comp,
+               const std::string& source_grid,
+               const std::string& field_name,
+               const std::function<void(Event event)>& eventHandler,
+               std::optional<std::string> iso_dt,
+               std::optional<int> collection_size,
+               std::string interpolation_stack_yaml,
+               int yac_time_reduction,
+               std::optional<py::array_t<double, py::array::c_style | py::array::forcecast>>
+                   frac_mask = std::nullopt,
+               bool pass_through,
+               int src_lag,
+               std::optional<std::string> weight_filename = std::nullopt,
+               bool one_shot                              = false) {
+                std::span<const double> frac_mask_span;
+                if (frac_mask)
+                    frac_mask_span = _array2span(frac_mask.value());
+                c.add_field(
+                    source_comp,
+                    source_grid,
+                    field_name,
+                    eventHandler,
+                    iso_dt,
+                    collection_size,
+                    interpolation_stack_yaml,
+                    yac_time_reduction,
+                    frac_mask_span,
+                    pass_through,
+                    src_lag,
+                    weight_filename,
+                    one_shot);
+            },
+            "Add a field to the coyote instance",
+            "source_comp"_a,
+            "source_grid"_a,
+            "field_name"_a,
+            "eventHandler"_a,
+            "iso_dt"_a                   = std::nullopt,
+            "collection_size"_a          = std::nullopt,
+            "interpolation_stack_yaml"_a = "[nnn]",
+            "yac_time_reduction"_a       = 0,
+            "frac_mask"_a                = std::nullopt,
+            "pass_through"_a             = true,
+            "src_lag"_a                  = 1,
+            "weight_filename"_a          = std::nullopt,
+            "one_shot"_a                 = false)
         .def_property_readonly(
             "comp_comm", [](const Coyote& c) -> int { return MPI_Comm_c2f(c.get_mpi_comm()); })
         .def_property_readonly("comp_rank",
diff --git a/src/coyote.cpp b/src/coyote.cpp
index 98696b1..3fdf004 100644
--- a/src/coyote.cpp
+++ b/src/coyote.cpp
@@ -30,7 +30,7 @@ namespace coyote {
             std::optional<int> collection_size,
             std::string interpolation_stack_yaml,
             int yac_time_reduction,
-            std::vector<double> frac_mask,
+            std::span<const double> frac_mask,
             bool pass_through,
             int src_lag,
             std::optional<std::string> weight_filename,
@@ -349,7 +349,7 @@ namespace coyote {
         std::optional<int> collection_size,
         std::string interpolation_stack_yaml,
         int yac_time_reduction,
-        std::vector<double> frac_mask,
+        std::span<const double> frac_mask,
         bool pass_through,
         int src_lag,
         std::optional<std::string> weight_filename,
diff --git a/src/coyote.hpp b/src/coyote.hpp
index b4461b2..558d8b1 100644
--- a/src/coyote.hpp
+++ b/src/coyote.hpp
@@ -36,7 +36,7 @@ namespace coyote {
             std::optional<int> collection_size         = std::nullopt,
             std::string interpolation_stack_yaml       = "[nnn]",
             int yac_time_reduction                     = 0,
-            std::vector<double> frac_mask              = {},
+            std::span<const double> frac_mask          = {},
             bool pass_through                          = true,
             int src_lag                                = 1,
             std::optional<std::string> weight_filename = std::nullopt,
diff --git a/src/field.cpp b/src/field.cpp
index cece9b5..4edc838 100644
--- a/src/field.cpp
+++ b/src/field.cpp
@@ -5,7 +5,7 @@ namespace coyote {
     Field::Field(const int& target_field_id,
                  const std::optional<int>& source_field_id,
                  const std::vector<double>& buffer,
-                 const std::vector<double>& frac_mask,
+                 std::span<const double> frac_mask,
                  const int& collection_size,
                  const std::function<void(Event)>& eventHandle)
         : _target_field_id(target_field_id),
@@ -35,7 +35,12 @@ namespace coyote {
             int ierror = 0;
             if (!_frac_mask.empty()) {
                 yac_cput_frac_(
-                    field_id, _collection_size, _buffer.data(), _frac_mask.data(), &info, &ierror);
+                    field_id,
+                    _collection_size,
+                    _buffer.data(),
+                    const_cast<double*>(_frac_mask.data()),
+                    &info,
+                    &ierror);
             } else {
                 yac_cput_(field_id, _collection_size, _buffer.data(), &info, &ierror);
             }
diff --git a/src/field.hpp b/src/field.hpp
index efb3a43..e17c7cd 100644
--- a/src/field.hpp
+++ b/src/field.hpp
@@ -4,6 +4,7 @@
 #include <functional>
 #include <optional>
 #include <semaphore>
+#include <span>
 #include <vector>
 
 #include <mpi.h>
@@ -22,7 +23,7 @@ namespace coyote {
         Field(const int& target_field_id,
               const std::optional<int>& source_field_id,
               const std::vector<double>& buffer,
-              const std::vector<double>& frac_mask,
+              std::span<const double> frac_mask,
               const int& collection_size,
               const std::function<void(Event)>& eventHandle);
 
@@ -39,7 +40,7 @@ namespace coyote {
         int _target_field_id;
         std::optional<int> _source_field_id;
         std::vector<double> _buffer;
-        std::vector<double> _frac_mask;
+        std::span<const double> _frac_mask;
         int _collection_size;
         size_t _point_size;
         int _info                     = YAC_ACTION_NONE;
-- 
GitLab


From 7d60474fe6456d610e1395a7ef6b557a758cde0b Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Fri, 21 Feb 2025 08:49:51 +0100
Subject: [PATCH 04/16] feat: support frac_mask in hiopy

---
 apps/hiopy/_data_handler.py |  7 ++++++-
 apps/hiopy/worker.py        | 25 ++++++++++++++++---------
 2 files changed, 22 insertions(+), 10 deletions(-)

diff --git a/apps/hiopy/_data_handler.py b/apps/hiopy/_data_handler.py
index 5ee288c..b8cdf91 100644
--- a/apps/hiopy/_data_handler.py
+++ b/apps/hiopy/_data_handler.py
@@ -14,7 +14,7 @@ class Timer:
 
 
 class DataHandler:
-    def __init__(self, var, t0, cell_slice):
+    def __init__(self, var, t0, cell_slice, mem=None):
         self.var = var
         self.cell_slice = cell_slice
         self.loco_server = None
@@ -30,6 +30,7 @@ class DataHandler:
         assert not (self.t is None and self.use_buffer)
         if self.use_buffer:
             self.buf = np.empty([self.time_chunk_size, *var.shape[1:-1], ma - mi], dtype=var.dtype)
+        self.mem = mem
 
     def flush(self):
         if self.prune_offset is not None:
@@ -75,6 +76,10 @@ class DataHandler:
             event.release()
         else:
             self.buf = np.squeeze(np.transpose(event.data))[None, ...]
+
+        if self.mem is not None:
+            self.mem[...] = self.buf
+
         if self.t is not None:
             self.t += 1
         if (
diff --git a/apps/hiopy/worker.py b/apps/hiopy/worker.py
index 14236ca..eed90ef 100755
--- a/apps/hiopy/worker.py
+++ b/apps/hiopy/worker.py
@@ -101,7 +101,14 @@ def main():
     if group_comm_rank() == 0:
         logging.debug(distributed_data_vars)
 
-    frac_masks = {}  # the numpy array must keep alive until the end of the program
+    # collect the fields that act as frac_mask
+    frac_masks = {
+        v.name: np.array(v[..., chunk_slice])
+        for gid, vgroup, chunk_slice, data_vars in my_data_vars
+        for v in data_vars
+        for w in data_vars
+        if "hiopy::frac_mask" in w.attrs and w.attrs["hiopy::frac_mask"] == v.basename
+    }
 
     coyote_instances = {
         (gid, vgroup): Coyote(f"{args.process_group}_{gid}_{vgroup}")
@@ -155,12 +162,12 @@ def main():
                 src_comp = src_grid = f"{args.process_group}_{gid}_{src_vgroup}"
             time_method = v.attrs.get("hiopy::time_method", "point")
             nnn = v.attrs.get("hiopy::nnn", 1)
+
+            kwargs = {"one_shot": True} if t0_idx is None else {"iso_dt": f"PT{dt}S"}
+
             frac_mask_name = v.attrs.get("hiopy::frac_mask", None)
-            frac_mask = []
             if frac_mask_name is not None:
-                if frac_mask_name not in frac_masks:
-                    frac_masks[frac_mask_name] = np.array(v.group[frac_mask_name][chunk_slice])
-                frac_mask = frac_masks[frac_mask_name]
+                kwargs["frac_mask"] = frac_masks[v.group.name + frac_mask_name]
 
             logging.info(
                 f"registering {v.name} with "
@@ -170,13 +177,14 @@ def main():
                 f"{src_name=}, "
                 f"{time_method=}"
             )
-            data_handlers.append(DataHandler(v, t0_idx, chunk_slice))
+
+            data_handlers.append(
+                DataHandler(v, t0_idx, chunk_slice, mem=frac_masks.get(v.name, None))
+            )
 
             if args.loco:
                 data_handlers[-1].set_loco_server(loco_server)
 
-            kwargs = {"one_shot": True} if t0_idx is None else {"iso_dt": f"PT{dt}S"}
-
             coyote.add_field(
                 src_comp,
                 src_grid,
@@ -185,7 +193,6 @@ def main():
                 collection_size=collection_size,
                 yac_time_reduction=time_methods2yac[time_method],
                 interpolation_stack_yaml=f"- nnn:\n    n: {nnn}",
-                frac_mask=frac_mask,
                 **kwargs,
             )
 
-- 
GitLab


From 433aec6ccc556124321a5df6d4039169d1bdaa87 Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Mon, 24 Feb 2025 12:22:03 +0100
Subject: [PATCH 05/16] feat: set cell indices for healpix grids

---
 apps/hiopy/_grids.py |  4 +++-
 python/coyote_py.cpp | 15 ++++++++++-----
 2 files changed, 13 insertions(+), 6 deletions(-)

diff --git a/apps/hiopy/_grids.py b/apps/hiopy/_grids.py
index da2220c..5ebf7bc 100644
--- a/apps/hiopy/_grids.py
+++ b/apps/hiopy/_grids.py
@@ -78,4 +78,6 @@ def def_healpix_grid(coyote, nside, nest=True, cell_idx=None):
     verts_xyz, quads = np.unique(boundaries_xyz, return_inverse=True, axis=0)
     vlon, vlat = _xyz2lonlat(verts_xyz)
     vertex_of_cell = quads.reshape(-1, 4).T
-    coyote.def_grid(4 * np.ones(len(cell_idx)), vlon, vlat, vertex_of_cell.T.flatten(), clon, clat)
+    coyote.def_grid(
+        4 * np.ones(len(cell_idx)), vlon, vlat, vertex_of_cell.T.flatten(), clon, clat, cell_idx
+    )
diff --git a/python/coyote_py.cpp b/python/coyote_py.cpp
index 14610ec..4ab3d86 100644
--- a/python/coyote_py.cpp
+++ b/python/coyote_py.cpp
@@ -95,27 +95,32 @@ PYBIND11_MODULE(coyote, m) {
                py::array_t<double, py::array::c_style | py::array::forcecast> y_vertices,
                py::array_t<int, py::array::c_style | py::array::forcecast> cell_to_vertex,
                std::optional<py::array_t<double, py::array::c_style | py::array::forcecast>> x_cells,
-               std::optional<py::array_t<double, py::array::c_style | py::array::forcecast>>
-                   y_cells) {
+               std::optional<py::array_t<double, py::array::c_style | py::array::forcecast>> y_cells,
+               std::optional<py::array_t<int, py::array::c_style | py::array::forcecast>> cells_idx) {
                 std::optional<std::span<const double>> _x_cells = std::nullopt;
                 if (x_cells)
                     _x_cells = _array2span(x_cells.value());
                 std::optional<std::span<const double>> _y_cells = std::nullopt;
                 if (y_cells)
                     _y_cells = _array2span(y_cells.value());
+                std::optional<std::span<const int>> _cells_idx = std::nullopt;
+                if (cells_idx)
+                    _cells_idx = _array2span(cells_idx.value());
                 c.def_grid(_array2span(num_vertices_per_cell),
                            _array2span(x_vertices),
                            _array2span(y_vertices),
                            _array2span(cell_to_vertex),
                            _x_cells,
-                           _y_cells);
+                           _y_cells,
+                           _cells_idx);
             },
             "num_vertices_per_cell"_a,
             "x_vertices"_a,
             "y_vertices"_a,
             "cell_to_vertex"_a,
-            py::arg("x_cells") = std::nullopt,
-            py::arg("y_cells") = std::nullopt)
+            "x_cells"_a   = std::nullopt,
+            "y_cells"_a   = std::nullopt,
+            "cells_idx"_a = std::nullopt)
         .def("get_field_collection_size", &Coyote::get_field_collection_size)
         .def("get_field_timestep", &Coyote::get_field_timestep)
         .def("get_field_metadata", &Coyote::get_field_metadata)
-- 
GitLab


From 076ffd59f60fcdbff209d9d2e89dd426d0f58efa Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Mon, 24 Feb 2025 12:36:07 +0100
Subject: [PATCH 06/16] refactor: specify field name explicitly

---
 apps/hiopy/configure/configure.py |  4 ++--
 apps/hiopy/worker.py              | 11 +++++++----
 python/coyote_py.cpp              |  3 +++
 python/examples/netcdf_output.py  |  2 +-
 src/coyote.cpp                    |  7 +++++--
 src/coyote.hpp                    |  1 +
 tests/test_simple.cxx             |  1 +
 tests/test_simple.py              |  4 ++--
 8 files changed, 22 insertions(+), 11 deletions(-)

diff --git a/apps/hiopy/configure/configure.py b/apps/hiopy/configure/configure.py
index f53774a..71b3c45 100755
--- a/apps/hiopy/configure/configure.py
+++ b/apps/hiopy/configure/configure.py
@@ -89,10 +89,10 @@ def add_variable(
             v.attrs[key] = val
         if time_method is not None:
             v.attrs["hiopy::time_method"] = time_method
-        if yac_name is not None:
-            v.attrs["hiopy::src_name"] = yac_name
         if "hiopy::parent" not in g.attrs:
             v.attrs["hiopy::yac_source"] = (yac_source_comp, yac_source_grid)
+            if yac_name is not None:
+                v.attrs["hiopy::src_name"] = yac_name
         else:
             v.attrs["hiopy::nnn"] = 4
         v.attrs["hiopy::enable"] = True
diff --git a/apps/hiopy/worker.py b/apps/hiopy/worker.py
index eed90ef..b201bcc 100755
--- a/apps/hiopy/worker.py
+++ b/apps/hiopy/worker.py
@@ -147,7 +147,8 @@ def main():
             time_methods2yac = {"point": 0, "sum": 1, "mean": 2, "min": 3, "max": 4}
 
             collection_size = v.shape[1] if len(v.shape) == 3 else 1
-            src_name = v.attrs.get("hiopy::src_name", default=v.name.split("/")[-1])
+            name = v.basename
+            src_name = v.attrs.get("hiopy::src_name", name)
             if "hiopy::yac_source" in v.attrs:
                 src_comp, src_grid = v.attrs["hiopy::yac_source"]
             else:
@@ -157,7 +158,7 @@ def main():
                     for gid, vg, _, gvars in chain(*distributed_data_vars)
                     for groupvar in gvars
                     if groupvar.name.split("/")[-2] == v.group.attrs["hiopy::parent"]
-                    and groupvar.name.split("/")[-1] == src_name
+                    and groupvar.name.split("/")[-1] == name
                 ][0]
                 src_comp = src_grid = f"{args.process_group}_{gid}_{src_vgroup}"
             time_method = v.attrs.get("hiopy::time_method", "point")
@@ -170,12 +171,13 @@ def main():
                 kwargs["frac_mask"] = frac_masks[v.group.name + frac_mask_name]
 
             logging.info(
-                f"registering {v.name} with "
+                f"registering {name} with "
                 f"{collection_size=}, "
                 f"{src_comp=}, "
                 f"{src_grid=}, "
                 f"{src_name=}, "
-                f"{time_method=}"
+                f"{time_method=}, "
+                f"{nnn=}"
             )
 
             data_handlers.append(
@@ -186,6 +188,7 @@ def main():
                 data_handlers[-1].set_loco_server(loco_server)
 
             coyote.add_field(
+                name,
                 src_comp,
                 src_grid,
                 src_name,
diff --git a/python/coyote_py.cpp b/python/coyote_py.cpp
index 4ab3d86..c031171 100644
--- a/python/coyote_py.cpp
+++ b/python/coyote_py.cpp
@@ -27,6 +27,7 @@ PYBIND11_MODULE(coyote, m) {
         .def(
             "add_field",
             [](Coyote& c,
+               const std::string& name,
                const std::string& source_comp,
                const std::string& source_grid,
                const std::string& field_name,
@@ -45,6 +46,7 @@ PYBIND11_MODULE(coyote, m) {
                 if (frac_mask)
                     frac_mask_span = _array2span(frac_mask.value());
                 c.add_field(
+                    name,
                     source_comp,
                     source_grid,
                     field_name,
@@ -60,6 +62,7 @@ PYBIND11_MODULE(coyote, m) {
                     one_shot);
             },
             "Add a field to the coyote instance",
+            "name"_a,
             "source_comp"_a,
             "source_grid"_a,
             "field_name"_a,
diff --git a/python/examples/netcdf_output.py b/python/examples/netcdf_output.py
index 12cac57..a4beb33 100644
--- a/python/examples/netcdf_output.py
+++ b/python/examples/netcdf_output.py
@@ -58,6 +58,6 @@ class handler:
 for vn in args.vars:
     var = output_ds.createVariable(vn, "f4", (time_dim, height_dim, cell_dim))
 
-    coyote.add_field(args.src_comp, args.src_grid, vn, handler(var), args.timestep)
+    coyote.add_field(vn, args.src_comp, args.src_grid, vn, handler(var), args.timestep)
 
 coyote.run()
diff --git a/src/coyote.cpp b/src/coyote.cpp
index 3fdf004..2219544 100644
--- a/src/coyote.cpp
+++ b/src/coyote.cpp
@@ -22,6 +22,7 @@ namespace coyote {
         }
 
         void add_field(
+            const std::string& name,
             const std::string& source_comp,
             const std::string& source_grid,
             const std::string& field_name,
@@ -61,7 +62,7 @@ namespace coyote {
                 src_lag = 0;
             }
 
-            std::string tgt_field_name = field_name + "_target";
+            std::string tgt_field_name = name + "_target";
             int target_field_id        = -1;
             yac_cdef_field(
                 tgt_field_name.c_str(),
@@ -76,7 +77,7 @@ namespace coyote {
             if (pass_through) {
                 int sfid = -1;
                 yac_cdef_field(
-                    field_name.c_str(),
+                    name.c_str(),
                     _yac_comp_id,
                     &_yac_point_id,
                     1,
@@ -341,6 +342,7 @@ namespace coyote {
     Coyote& Coyote::operator=(Coyote&&) = default;
 
     void Coyote::add_field(
+        const std::string& name,
         const std::string& source_comp,
         const std::string& source_grid,
         const std::string& field_name,
@@ -355,6 +357,7 @@ namespace coyote {
         std::optional<std::string> weight_filename,
         bool one_shot) {
         pImpl->add_field(
+            name,
             source_comp,
             source_grid,
             field_name,
diff --git a/src/coyote.hpp b/src/coyote.hpp
index 558d8b1..3570281 100644
--- a/src/coyote.hpp
+++ b/src/coyote.hpp
@@ -28,6 +28,7 @@ namespace coyote {
         ~Coyote();
 
         void add_field(
+            const std::string& name,
             const std::string& source_comp,
             const std::string& source_grid,
             const std::string& field_name,
diff --git a/tests/test_simple.cxx b/tests/test_simple.cxx
index f21c31b..bfaebfa 100644
--- a/tests/test_simple.cxx
+++ b/tests/test_simple.cxx
@@ -15,6 +15,7 @@ int main() {
 
     double i = 1.0;
     coyote.add_field(
+        "clock",
         "simple_source",
         "simple_grid",
         "clock",
diff --git a/tests/test_simple.py b/tests/test_simple.py
index 6698824..b191876 100644
--- a/tests/test_simple.py
+++ b/tests/test_simple.py
@@ -19,7 +19,7 @@ def handler(event):
     assert data.base is not None  # ensures that this is a view onto the data
 
 
-coyote.add_field("simple_source", "simple_grid", "clock", handler, "PT2S")
+coyote.add_field("clock", "simple_source", "simple_grid", "clock", handler, "PT2S")
 
 
 one_shot_called = False
@@ -32,7 +32,7 @@ def one_shot_handler(event):
     one_shot_called = True
 
 
-coyote.add_field("simple_source", "simple_grid", "x", one_shot_handler, one_shot=True)
+coyote.add_field("x", "simple_source", "simple_grid", "x", one_shot_handler, one_shot=True)
 
 run()
 
-- 
GitLab


From fe589d3816d356ab536e1003fc1595596873f69b Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Mon, 24 Feb 2025 12:37:13 +0100
Subject: [PATCH 07/16] fix: set frac_mask properly in configure

---
 apps/hiopy/configure/configure.py | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/apps/hiopy/configure/configure.py b/apps/hiopy/configure/configure.py
index 71b3c45..7a9c1cd 100755
--- a/apps/hiopy/configure/configure.py
+++ b/apps/hiopy/configure/configure.py
@@ -95,6 +95,8 @@ def add_variable(
                 v.attrs["hiopy::src_name"] = yac_name
         else:
             v.attrs["hiopy::nnn"] = 4
+        if frac_mask is not None:
+            v.attrs["hiopy::frac_mask"] = frac_mask
         v.attrs["hiopy::enable"] = True
 
 
-- 
GitLab


From f20d703a94c3aefea5dfb0ac9f578d00fd4bdd67 Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Mon, 24 Feb 2025 12:38:04 +0100
Subject: [PATCH 08/16] fix: set collection_size properly for one_shot fields

---
 apps/hiopy/worker.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/apps/hiopy/worker.py b/apps/hiopy/worker.py
index b201bcc..fa0cfe8 100755
--- a/apps/hiopy/worker.py
+++ b/apps/hiopy/worker.py
@@ -140,13 +140,14 @@ def main():
                 assert v.group.time[t0_idx] == t0, "start_datetime not found in time axis"
 
                 dt = time_coordinate[t0_idx + 1] - time_coordinate[t0_idx]
+                collection_size = v.shape[1] if len(v.shape) == 3 else 1
             else:
                 t0_idx = None
+                collection_size = v.shape[0] if len(v.shape) == 2 else 1
 
             # see YAC_REDUCTION_TIME_NONE etc. (TODO: pass constants through coyote)
             time_methods2yac = {"point": 0, "sum": 1, "mean": 2, "min": 3, "max": 4}
 
-            collection_size = v.shape[1] if len(v.shape) == 3 else 1
             name = v.basename
             src_name = v.attrs.get("hiopy::src_name", name)
             if "hiopy::yac_source" in v.attrs:
-- 
GitLab


From e00e62faea1063161400c7c40d5607cd1d8f797d Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Mon, 24 Feb 2025 12:38:38 +0100
Subject: [PATCH 09/16] fix: enable frac_mask if used

---
 src/coyote.cpp | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/src/coyote.cpp b/src/coyote.cpp
index 2219544..fbf12e2 100644
--- a/src/coyote.cpp
+++ b/src/coyote.cpp
@@ -73,6 +73,10 @@ namespace coyote {
                 iso_dt.value().c_str(),
                 YAC_TIME_UNIT_ISO_FORMAT,
                 &target_field_id);
+
+            const char* comp_name = yac_cget_component_name_from_field_id(target_field_id);
+            const char* grid_name = yac_cget_grid_name_from_field_id(target_field_id);
+
             std::optional<int> source_field_id;
             if (pass_through) {
                 int sfid = -1;
@@ -86,9 +90,14 @@ namespace coyote {
                     YAC_TIME_UNIT_ISO_FORMAT,
                     &sfid);
                 source_field_id = sfid;
+                if (!frac_mask.empty())
+                    yac_cenable_field_frac_mask_instance(
+                        cenv.get_yac_instance_id(),
+                        comp_name,
+                        grid_name,
+                        name.c_str(),
+                        std::numeric_limits<double>::quiet_NaN());
             }
-            const char* comp_name = yac_cget_component_name_from_field_id(target_field_id);
-            const char* grid_name = yac_cget_grid_name_from_field_id(target_field_id);
 
             int yac_interpolation_stack = -1;
             yac_cget_interp_stack_config_from_string_yaml(
-- 
GitLab


From 3435c62029388989b093f494a134feaedfd994b0 Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Mon, 24 Feb 2025 12:38:55 +0100
Subject: [PATCH 10/16] fix: avoid deallocation of field before program
 terminates

---
 src/coyote.cpp    | 2 +-
 src/coyoteenv.cpp | 8 ++++----
 src/coyoteenv.hpp | 4 ++--
 3 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/src/coyote.cpp b/src/coyote.cpp
index fbf12e2..636e3f4 100644
--- a/src/coyote.cpp
+++ b/src/coyote.cpp
@@ -127,7 +127,7 @@ namespace coyote {
             yac_cfree_interp_stack_config(yac_interpolation_stack);
 
             size_t size = collection_size.value() * yac_cget_points_size(_yac_point_id);
-            cenv.add_field(std::make_unique<Field>(
+            cenv.add_field(std::make_shared<Field>(
                 target_field_id,
                 source_field_id,
                 std::vector<double>(size),
diff --git a/src/coyoteenv.cpp b/src/coyoteenv.cpp
index 259f948..d6b9c8e 100644
--- a/src/coyoteenv.cpp
+++ b/src/coyoteenv.cpp
@@ -58,7 +58,7 @@ namespace coyote {
 
     MPI_Comm CoyoteEnv::get_group_comm() const { return group_comm; }
 
-    void CoyoteEnv::add_field(std::unique_ptr<Field> field) { _fields.push_back(std::move(field)); }
+    void CoyoteEnv::add_field(std::shared_ptr<Field> field) { _fields.push_back(std::move(field)); }
 
     void CoyoteEnv::ensure_def_comps() {
         if (state_def_comps)
@@ -127,11 +127,11 @@ namespace coyote {
                 });
 
                 _task_queue.enqueue(
-                    [f]() mutable {
+                    [field, f]() mutable {
                         Event e(f);
-                        f->op(Event(e));
+                        field->op(Event(e));
                     },
-                    {1, f->datetime()});
+                    {1, field->datetime()});
 
                 // actually we want to offload this also to the
                 // task_queue but yac is not thread-safe.
diff --git a/src/coyoteenv.hpp b/src/coyoteenv.hpp
index e37bdc6..42de2dd 100644
--- a/src/coyoteenv.hpp
+++ b/src/coyoteenv.hpp
@@ -36,7 +36,7 @@ namespace coyote {
         int get_yac_instance_id() const;
         std::string start_datetime();
         std::string end_datetime();
-        void add_field(std::unique_ptr<Field> field);
+        void add_field(std::shared_ptr<Field> field);
         void ensure_def_comps();
         void ensure_sync_def();
         void ensure_enddef();
@@ -49,7 +49,7 @@ namespace coyote {
         bool state_def_comps, state_sync_def, state_enddef;
         std::thread put_thread, get_thread;
         TaskQueue _task_queue;
-        std::vector<std::unique_ptr<Field>> _fields;
+        std::vector<std::shared_ptr<Field>> _fields;
         bool _finalize_mpi = false;
         uint _nthreads     = 1;
     };
-- 
GitLab


From 72234f8e86922872c73f38eaf9bc8e2d11181061 Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Mon, 24 Feb 2025 12:41:38 +0100
Subject: [PATCH 11/16] fix: path to frac_mask

---
 apps/hiopy/worker.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/apps/hiopy/worker.py b/apps/hiopy/worker.py
index fa0cfe8..2cf8ff1 100755
--- a/apps/hiopy/worker.py
+++ b/apps/hiopy/worker.py
@@ -1,6 +1,7 @@
 #!/usr/bin/env python3
 
 import logging
+import os.path as path
 from argparse import ArgumentParser
 from itertools import chain, groupby
 
@@ -169,7 +170,7 @@ def main():
 
             frac_mask_name = v.attrs.get("hiopy::frac_mask", None)
             if frac_mask_name is not None:
-                kwargs["frac_mask"] = frac_masks[v.group.name + frac_mask_name]
+                kwargs["frac_mask"] = frac_masks[path.join(v.group.name, frac_mask_name)]
 
             logging.info(
                 f"registering {name} with "
-- 
GitLab


From a2c1fda66dd3341410f6126b86f9e61c163ba9bc Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Tue, 25 Feb 2025 12:00:58 +0100
Subject: [PATCH 12/16] fix: remove debug output

---
 apps/hiopy/_data_handler.py | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/apps/hiopy/_data_handler.py b/apps/hiopy/_data_handler.py
index b8cdf91..08c35c7 100644
--- a/apps/hiopy/_data_handler.py
+++ b/apps/hiopy/_data_handler.py
@@ -44,9 +44,7 @@ class DataHandler:
             if self.t_flushed is None:
                 with Timer() as t:
                     self.var[..., self.cell_slice] = np.squeeze(self.buf)
-                logging.info(
-                    f"wrote {self.var.name}." f" Took {t.elapsed_time}s" f"{self.t_flushed}"
-                )
+                logging.info(f"wrote {self.var.name}." f" Took {t.elapsed_time}s")
             self.t_flushed = 1
         else:
             fro = self.t_flushed % self.time_chunk_size
-- 
GitLab


From 529861796dc170dced17b315f12a7ce8ae69020d Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Tue, 25 Feb 2025 12:01:20 +0100
Subject: [PATCH 13/16] fix: frac_mask dtype

---
 apps/hiopy/worker.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/apps/hiopy/worker.py b/apps/hiopy/worker.py
index 2cf8ff1..6d9e958 100755
--- a/apps/hiopy/worker.py
+++ b/apps/hiopy/worker.py
@@ -104,7 +104,7 @@ def main():
 
     # collect the fields that act as frac_mask
     frac_masks = {
-        v.name: np.array(v[..., chunk_slice])
+        v.name: np.array(v[..., chunk_slice], dtype=np.float64)
         for gid, vgroup, chunk_slice, data_vars in my_data_vars
         for v in data_vars
         for w in data_vars
-- 
GitLab


From 424109c25bab5c926725d63a8c61572b09d6f591 Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Tue, 25 Feb 2025 12:01:43 +0100
Subject: [PATCH 14/16] fix: proceed all one-shot fields first

---
 src/coyoteenv.cpp | 9 +++++++++
 src/tsqueue.hpp   | 4 +++-
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/src/coyoteenv.cpp b/src/coyoteenv.cpp
index d6b9c8e..1100c11 100644
--- a/src/coyoteenv.cpp
+++ b/src/coyoteenv.cpp
@@ -101,6 +101,7 @@ namespace coyote {
         assert(_nthreads > 0);
         std::vector<std::thread> worker(_nthreads - 1);
         std::generate(worker.begin(), worker.end(), [&]() { return std::thread(do_work); });
+        std::string startdate = yac_cget_start_datetime_instance(yac_instance_id);
 
         while (_fields.size() > 0) {
             std::string curr_datetime =
@@ -141,6 +142,14 @@ namespace coyote {
                 _task_queue.dequeue()();
             }
 
+            // ensure that the one_shot fields are received before starting with the non-one_shot
+            // fields. This ensures that the frac_masks are in-place
+            while (startdate == curr_datetime && !_task_queue.empty()) {
+                auto task = _task_queue.dequeue(false);
+                if (task)
+                    task();
+            }
+
             // filter out finished fields
             _fields.erase(std::remove_if(
                               _fields.begin(), _fields.end(), [](auto& f) { return f->is_done(); }),
diff --git a/src/tsqueue.hpp b/src/tsqueue.hpp
index 1433a43..ec5c8e9 100644
--- a/src/tsqueue.hpp
+++ b/src/tsqueue.hpp
@@ -32,8 +32,10 @@ class TSQueue {
         c.notify_all();
     }
 
-    T dequeue() {
+    T dequeue(bool wait = true) {
         std::unique_lock<std::mutex> lock(m);
+        if (!wait && q.empty())
+            return T{};
         c.wait(lock, [&] { return !q.empty(); });
         elem_type val = q.top();
         if (!val) {
-- 
GitLab


From e514a6ffd8cc5b2eb6bde960ef3ffb9752b7ae7c Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Tue, 25 Feb 2025 12:02:24 +0100
Subject: [PATCH 15/16] ci: add test for frac_masks

---
 apps/hiopy/tests/check_hierarchy.py          | 14 +++++++++++---
 apps/hiopy/tests/create_simple_dataset.sh.in |  3 ++-
 tests/simple_source.c                        |  4 +++-
 3 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/apps/hiopy/tests/check_hierarchy.py b/apps/hiopy/tests/check_hierarchy.py
index 16aab00..966808c 100755
--- a/apps/hiopy/tests/check_hierarchy.py
+++ b/apps/hiopy/tests/check_hierarchy.py
@@ -9,7 +9,7 @@ import zarr
 logging.basicConfig(level=logging.INFO)
 
 
-def check_interpolation(source_var, target_var):
+def check_interpolation(source_var, target_var, source_group):
     logging.info(f"Checking {source_var.name} to {target_var.name}")
     if target_var.attrs.get("hiopy::nnn", None) != 4:
         logging.info(
@@ -17,7 +17,15 @@ def check_interpolation(source_var, target_var):
             f", it doesnt use 4-NNN interpolation."
         )
         return
-    assert np.allclose(sum([source_var[..., i::4] for i in range(4)]) / 4, target_var)
+    if "hiopy::frac_mask" in source_var.attrs:
+        mask_name = source_var.attrs["hiopy::frac_mask"]
+        mask = source_group[mask_name]
+        coarse = np.nansum([(source_var[..., i::4] * mask[i::4]) for i in range(4)], axis=0) / (
+            np.sum([mask[i::4] for i in range(4)], axis=0)
+        )
+        assert np.allclose(coarse, target_var)
+    else:
+        assert np.allclose(sum([source_var[..., i::4] for i in range(4)]) / 4, target_var)
 
 
 def _collect_groups(dataset, parent=None):
@@ -36,7 +44,7 @@ def main():
             parent = p[g.attrs["hiopy::parent"]]
             for name, item in g.arrays():
                 if {"cell", "time"} <= set(item.attrs["_ARRAY_DIMENSIONS"]):
-                    check_interpolation(parent[name], item)
+                    check_interpolation(parent[name], item, parent)
 
 
 if __name__ == "__main__":
diff --git a/apps/hiopy/tests/create_simple_dataset.sh.in b/apps/hiopy/tests/create_simple_dataset.sh.in
index 398166c..1a29401 100755
--- a/apps/hiopy/tests/create_simple_dataset.sh.in
+++ b/apps/hiopy/tests/create_simple_dataset.sh.in
@@ -12,7 +12,8 @@ z = zarr.open("${dataset}")
 hc.add_healpix_hierarchy(z, order=3)
 hc.add_time(z, "2000-01-01", "2000-01-01T00:01:00", 3)
 
-hc.add_variable(z, "x", "simple_source", "simple_grid")
+hc.add_variable(z, "frac_mask", "simple_source", "simple_grid", yac_name="mask", taxis=None)
+hc.add_variable(z, "x", "simple_source", "simple_grid", frac_mask="frac_mask")
 hc.add_variable(z, "y", "simple_source", "simple_grid")
 hc.add_variable(z, "z", "simple_source", "simple_grid")
 hc.add_variable(z, "clock", "simple_source", "simple_grid")
diff --git a/tests/simple_source.c b/tests/simple_source.c
index acdf866..1e39785 100644
--- a/tests/simple_source.c
+++ b/tests/simple_source.c
@@ -96,7 +96,9 @@ int main(int argc, char** argv) {
             y_data[idx]     = sin(lon) * cos(lat);
             z_data[idx]     = sin(lat);
             clock_data[idx] = 0;
-            mask_data[idx]  = (i + j) % 3 == 0;
+            mask_data[idx]  = ((i + j) % 3) != 0;
+            if (mask_data[idx] == 0)
+                x_data[idx] = NAN;
         }
     }
 
-- 
GitLab


From d7d2fc09a624919401b930245f8cfaa91218c8ac Mon Sep 17 00:00:00 2001
From: Nils-Arne Dreier <dreier@dkrz.de>
Date: Tue, 25 Feb 2025 15:26:45 +0100
Subject: [PATCH 16/16] doc: improve documentation for frac_masks

---
 doc/hiopy.rst | 24 ++++++++++--------------
 1 file changed, 10 insertions(+), 14 deletions(-)

diff --git a/doc/hiopy.rst b/doc/hiopy.rst
index 9a1ed99..0101656 100644
--- a/doc/hiopy.rst
+++ b/doc/hiopy.rst
@@ -79,11 +79,12 @@ All of the above steps can be done using the `hiopy-configure` tool.
 Assumptions
 ~~~~~~~~~~~
 
-hiopy assumes that the variables it should write have the time axis as
-the first dimension and the horizontal axis as its last
-dimension. The variables are enabled to be written using the boolean value of attribute hiopy::enable.
+Variables that are precessed by hiopy need to have set the attribute `hiopy::enable`.
 
-hiopy specific parameters are added in the attributes and are usually prefixed by `hiopy::`:
+hiopy examinates the dimensions of the variable. If the first dimension has a coordinate, and the `axis` attribute of the coordinate is `T`, hiopy assumes that this variable is time dependend with the respective time coordinates. If such a coordiante is not found, hiopy considers the field as "one_shot" field which is handled once in the very beginning of the run.
+Furthermore hiopy assumes that the last dimension is the spatial dimension. It examinates the attribute `grid_mapping` to determine the corresponding grid.
+
+Further hiopy specific parameters can be added in the attributes and are prefixed with `hiopy::`:
 
 .. csv-table::
    :header: "attribute", "entity", "description"
@@ -93,7 +94,7 @@ hiopy specific parameters are added in the attributes and are usually prefixed b
    "hiopy::src_name", "Array", "yac field name of the source field. Default is the name of the array itself."
    "hiopy::time_method", "Array", "Time interpolation method (point, mean). Default: point"
    "hiopy::nnn", "Array", "Number of nearest neighbors to use. Default is 1."
-   "hiopy::frac_mask", "Array", "Array name of the fractional mask that is used when forwarding the data. (see `yac_cput_frac`)"
+   "hiopy::frac_mask", "Array", "Array name of the fractional mask that is used when forwarding the data. (see `yac_cput_frac`). Could also be a field that is processed by hiopy itself in the same run."
    "hiopy::enable", "Boolean", "Enable or disable the variable to be handled by the worker"
 
 `hiopy.configure` can also be used as a python
@@ -105,16 +106,9 @@ from level 6 to 0 and one 3d variable "ta":
    import zarr
    import hiopy.configure as hc
 
-   dates = {"startdate": np.datetime64("2020-01-01"),
-         "enddate": np.datetime64("2020-01-10")}
-
    z = zarr.open("dataset.zarr")
-   hc.create_hierachy(z,
-                      order=6,
-                      startdate=np.datetime64("2020-01-01"),
-                      enddate=np.datetime64("2020-01-10"),
-                      dt=3*60*60)  # 3 hourly time axis
-
+   hc.add_healpix_hierarchy(z, order=6)
+   hc.add_time(z, "2020-01-01", "2020-01-10", dt=3*60*60)  # 3 hourly time axis
    hc.add_height(z, "level", 72)
    hc.add_variable(z,
                    name="ta",
@@ -124,6 +118,8 @@ from level 6 to 0 and one 3d variable "ta":
                    zaxis="level",
                    chunk_shape=(8, 16, 2**12))  # 8M (uncompressed)
 
+   zarr.consolidate_metadata(z.store)  # optional
+
 Setup
 -----
 
-- 
GitLab