From e81241f6246157dd30d51f6fdb3544369ad93f84 Mon Sep 17 00:00:00 2001 From: Stella Jiang Date: Thu, 18 Jun 2026 22:57:20 -0700 Subject: [PATCH] fix(pyamber): use monotonic_ns to prevent negative elapsed times time.time_ns() (wall clock) can go backward on NTP adjustments or in VMs, making end_time - start_time negative in _switch_context and _process_dcm. Passing that negative delta to StatisticsManager raised ValueError, which was unhandled by the DataProcessor thread and left the worker in a deadlocked wait. Fix: replace all time.time_ns() in main_loop.py with time.monotonic_ns(), which is guaranteed non-decreasing. Defense-in-depth: update increase_data_processing_time and increase_control_processing_time to log a warning and return early instead of raising ValueError for negative values. This keeps the worker alive in the unlikely event of any remaining clock anomaly. Closes #3768 --- .../architecture/managers/statistics_manager.py | 14 ++++++++++++-- .../src/main/python/core/runnables/main_loop.py | 12 ++++++------ .../managers/test_statistics_manager.py | 17 ++++++++++++----- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/amber/src/main/python/core/architecture/managers/statistics_manager.py b/amber/src/main/python/core/architecture/managers/statistics_manager.py index 8151ca3bf1c..fb9c0f42966 100644 --- a/amber/src/main/python/core/architecture/managers/statistics_manager.py +++ b/amber/src/main/python/core/architecture/managers/statistics_manager.py @@ -77,12 +77,22 @@ def increase_output_statistics(self, port_id: PortIdentity, size: int) -> None: def increase_data_processing_time(self, time: int) -> None: if time < 0: - raise ValueError("Time must be non-negative") + logger.warning( + f"increase_data_processing_time called with negative elapsed time " + f"({time}ns); ignoring. Use time.monotonic_ns() at call sites to " + "prevent clock-skew from producing negative durations." + ) + return self._data_processing_time += time def increase_control_processing_time(self, time: int) -> None: if time < 0: - raise ValueError("Time must be non-negative") + logger.warning( + f"increase_control_processing_time called with negative elapsed time " + f"({time}ns); ignoring. Use time.monotonic_ns() at call sites to " + "prevent clock-skew from producing negative durations." + ) + return self._control_processing_time += time def update_total_execution_time(self, time: int) -> None: diff --git a/amber/src/main/python/core/runnables/main_loop.py b/amber/src/main/python/core/runnables/main_loop.py index 1334af12bfe..f2e684e1cb3 100644 --- a/amber/src/main/python/core/runnables/main_loop.py +++ b/amber/src/main/python/core/runnables/main_loop.py @@ -98,7 +98,7 @@ def complete(self) -> None: # stop the data processing thread self.data_processor.stop() self.context.state_manager.transit_to(WorkerState.COMPLETED) - self.context.statistics_manager.update_total_execution_time(time.time_ns()) + self.context.statistics_manager.update_total_execution_time(time.monotonic_ns()) controller_interface = self._async_rpc_client.controller_stub() controller_interface.worker_execution_completed(EmptyRequest()) self.context.close() @@ -130,7 +130,7 @@ def _check_and_process_control(self) -> None: def pre_start(self) -> None: self.context.state_manager.assert_state(WorkerState.UNINITIALIZED) self.context.state_manager.transit_to(WorkerState.READY) - self.context.statistics_manager.initialize_worker_start_time(time.time_ns()) + self.context.statistics_manager.initialize_worker_start_time(time.monotonic_ns()) @overrides def receive(self, next_entry: QueueElement) -> None: @@ -226,7 +226,7 @@ def _process_dcm(self, dcm_element: DCMElement) -> None: :param dcm_element: DirectControlMessageElement to be handled. """ - start_time = time.time_ns() + start_time = time.monotonic_ns() match( (dcm_element.tag, get_one_of(dcm_element.payload, sealed=False)), typing.Tuple[ChannelIdentity, ControlInvocation], @@ -234,7 +234,7 @@ def _process_dcm(self, dcm_element: DCMElement) -> None: typing.Tuple[ChannelIdentity, ReturnInvocation], self._async_rpc_client.receive, ) - end_time = time.time_ns() + end_time = time.monotonic_ns() self.context.statistics_manager.increase_control_processing_time( end_time - start_time ) @@ -431,12 +431,12 @@ def _switch_context(self) -> None: """ Notify the DataProcessor thread and wait here until being switched back. """ - start_time = time.time_ns() + start_time = time.monotonic_ns() with self.context.tuple_processing_manager.context_switch_condition: self.context.tuple_processing_manager.context_switch_condition.notify() self.context.tuple_processing_manager.context_switch_condition.wait() self._post_switch_context_checks() - end_time = time.time_ns() + end_time = time.monotonic_ns() self.context.statistics_manager.increase_data_processing_time( end_time - start_time ) diff --git a/amber/src/test/python/core/architecture/managers/test_statistics_manager.py b/amber/src/test/python/core/architecture/managers/test_statistics_manager.py index 5abf7a36b45..84ca48fe99a 100644 --- a/amber/src/test/python/core/architecture/managers/test_statistics_manager.py +++ b/amber/src/test/python/core/architecture/managers/test_statistics_manager.py @@ -101,13 +101,20 @@ def test_zero_processing_time_is_allowed(self): assert stats.control_processing_time == 0 @pytest.mark.parametrize( - "method", - ["increase_data_processing_time", "increase_control_processing_time"], + "method, attr", + [ + ("increase_data_processing_time", "_data_processing_time"), + ("increase_control_processing_time", "_control_processing_time"), + ], ) - def test_negative_time_raises(self, method): + def test_negative_time_is_clamped_to_zero(self, method, attr): + # Negative elapsed times can arise from non-monotonic wall-clock reads + # (e.g. NTP adjustment). The manager should absorb them silently rather + # than raise and potentially hang the worker thread. mgr = StatisticsManager() - with pytest.raises(ValueError, match="Time must be non-negative"): - getattr(mgr, method)(-1) + getattr(mgr, method)(100) # prime the accumulator + getattr(mgr, method)(-1) # must not raise + assert getattr(mgr, attr) == 100 # accumulator unchanged class TestStatisticsManagerExecutionTime: