Skip to content
Snippets Groups Projects

[hiopy] Subtasks

Open Nils-Arne Dreier requested to merge subtasks into main
1 file
+ 29
18
Compare changes
  • Side-by-side
  • Inline
+ 29
18
@@ -2,6 +2,7 @@ import logging
from time import perf_counter
import numpy as np
from coyote import add_task
class Timer:
@@ -28,25 +29,35 @@ class DataHandler:
if self.use_buffer:
self.buf = np.empty([self.time_chunk_size, *var.shape[1:-1], ma - mi], dtype=var.dtype)
def flush(self):
if self.prune_offset is not None:
fro = max(0, self.t_flushed - self.prune_offset)
to = max(0, self.t - self.prune_offset)
def _create_flush_task(self):
old_buffer = self.buf.copy()
def _flush():
if self.prune_offset is not None:
fro = max(0, self.t_flushed - self.prune_offset)
to = max(0, self.t - self.prune_offset)
with Timer() as t:
self.var[fro:to, ..., self.cell_slice] = self.var.fill_value
logging.info(
f"pruned {self.var.name} for timesteps {fro}:{to}. Took {t.elapsed_time}s"
)
fro = self.t_flushed % self.time_chunk_size
to = fro + (self.t - self.t_flushed)
with Timer() as t:
self.var[fro:to, ..., self.cell_slice] = self.var.fill_value
logging.info(f"pruned {self.var.name} for timesteps {fro}:{to}. Took {t.elapsed_time}s")
self.var[self.t_flushed : self.t, ..., self.cell_slice] = old_buffer[fro:to, ...]
logging.info(
f"wrote {self.var.name} for timesteps {self.t_flushed}:{self.t}."
f" Took {t.elapsed_time}s"
)
self.t_flushed = self.t
if self.loco_server is not None:
self.loco_server.set_timestep(self.var.name, self.t - 1)
fro = self.t_flushed % self.time_chunk_size
to = fro + (self.t - self.t_flushed)
with Timer() as t:
self.var[self.t_flushed : self.t, ..., self.cell_slice] = self.buf[fro:to, ...]
logging.info(
f"wrote {self.var.name} for timesteps {self.t_flushed}:{self.t}."
f" Took {t.elapsed_time}s"
)
self.t_flushed = self.t
if self.loco_server is not None:
self.loco_server.set_timestep(self.var.name, self.t - 1)
return _flush
def flush(self):
self._create_flush_task()()
def __call__(self, event):
logging.info(
@@ -59,7 +70,7 @@ class DataHandler:
self.buf = np.squeeze(np.transpose(event.data))[None, ...]
self.t += 1
if self.t % self.time_chunk_size == 0 or not self.use_buffer: # chunk complete
self.flush()
add_task(self._create_flush_task())
def set_loco_server(self, loco_server):
self.loco_server = loco_server
Loading