diff --git a/amber/src/main/python/core/architecture/managers/statistics_manager.py b/amber/src/main/python/core/architecture/managers/statistics_manager.py index 8151ca3bf1c..fb9c0f42966 100644 --- a/amber/src/main/python/core/architecture/managers/statistics_manager.py +++ b/amber/src/main/python/core/architecture/managers/statistics_manager.py @@ -77,12 +77,22 @@ def increase_output_statistics(self, port_id: PortIdentity, size: int) -> None: def increase_data_processing_time(self, time: int) -> None: if time < 0: - raise ValueError("Time must be non-negative") + logger.warning( + f"increase_data_processing_time called with negative elapsed time " + f"({time}ns); ignoring. Use time.monotonic_ns() at call sites to " + "prevent clock-skew from producing negative durations." + ) + return self._data_processing_time += time def increase_control_processing_time(self, time: int) -> None: if time < 0: - raise ValueError("Time must be non-negative") + logger.warning( + f"increase_control_processing_time called with negative elapsed time " + f"({time}ns); ignoring. Use time.monotonic_ns() at call sites to " + "prevent clock-skew from producing negative durations." + ) + return self._control_processing_time += time def update_total_execution_time(self, time: int) -> None: diff --git a/amber/src/main/python/core/runnables/main_loop.py b/amber/src/main/python/core/runnables/main_loop.py index 1334af12bfe..f2e684e1cb3 100644 --- a/amber/src/main/python/core/runnables/main_loop.py +++ b/amber/src/main/python/core/runnables/main_loop.py @@ -98,7 +98,7 @@ def complete(self) -> None: # stop the data processing thread self.data_processor.stop() self.context.state_manager.transit_to(WorkerState.COMPLETED) - self.context.statistics_manager.update_total_execution_time(time.time_ns()) + self.context.statistics_manager.update_total_execution_time(time.monotonic_ns()) controller_interface = self._async_rpc_client.controller_stub() controller_interface.worker_execution_completed(EmptyRequest()) self.context.close() @@ -130,7 +130,7 @@ def _check_and_process_control(self) -> None: def pre_start(self) -> None: self.context.state_manager.assert_state(WorkerState.UNINITIALIZED) self.context.state_manager.transit_to(WorkerState.READY) - self.context.statistics_manager.initialize_worker_start_time(time.time_ns()) + self.context.statistics_manager.initialize_worker_start_time(time.monotonic_ns()) @overrides def receive(self, next_entry: QueueElement) -> None: @@ -226,7 +226,7 @@ def _process_dcm(self, dcm_element: DCMElement) -> None: :param dcm_element: DirectControlMessageElement to be handled. """ - start_time = time.time_ns() + start_time = time.monotonic_ns() match( (dcm_element.tag, get_one_of(dcm_element.payload, sealed=False)), typing.Tuple[ChannelIdentity, ControlInvocation], @@ -234,7 +234,7 @@ def _process_dcm(self, dcm_element: DCMElement) -> None: typing.Tuple[ChannelIdentity, ReturnInvocation], self._async_rpc_client.receive, ) - end_time = time.time_ns() + end_time = time.monotonic_ns() self.context.statistics_manager.increase_control_processing_time( end_time - start_time ) @@ -431,12 +431,12 @@ def _switch_context(self) -> None: """ Notify the DataProcessor thread and wait here until being switched back. """ - start_time = time.time_ns() + start_time = time.monotonic_ns() with self.context.tuple_processing_manager.context_switch_condition: self.context.tuple_processing_manager.context_switch_condition.notify() self.context.tuple_processing_manager.context_switch_condition.wait() self._post_switch_context_checks() - end_time = time.time_ns() + end_time = time.monotonic_ns() self.context.statistics_manager.increase_data_processing_time( end_time - start_time ) diff --git a/amber/src/test/python/core/architecture/managers/test_statistics_manager.py b/amber/src/test/python/core/architecture/managers/test_statistics_manager.py index 5abf7a36b45..84ca48fe99a 100644 --- a/amber/src/test/python/core/architecture/managers/test_statistics_manager.py +++ b/amber/src/test/python/core/architecture/managers/test_statistics_manager.py @@ -101,13 +101,20 @@ def test_zero_processing_time_is_allowed(self): assert stats.control_processing_time == 0 @pytest.mark.parametrize( - "method", - ["increase_data_processing_time", "increase_control_processing_time"], + "method, attr", + [ + ("increase_data_processing_time", "_data_processing_time"), + ("increase_control_processing_time", "_control_processing_time"), + ], ) - def test_negative_time_raises(self, method): + def test_negative_time_is_clamped_to_zero(self, method, attr): + # Negative elapsed times can arise from non-monotonic wall-clock reads + # (e.g. NTP adjustment). The manager should absorb them silently rather + # than raise and potentially hang the worker thread. mgr = StatisticsManager() - with pytest.raises(ValueError, match="Time must be non-negative"): - getattr(mgr, method)(-1) + getattr(mgr, method)(100) # prime the accumulator + getattr(mgr, method)(-1) # must not raise + assert getattr(mgr, attr) == 100 # accumulator unchanged class TestStatisticsManagerExecutionTime: