Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,22 @@ def increase_output_statistics(self, port_id: PortIdentity, size: int) -> None:

def increase_data_processing_time(self, time: int) -> None:
if time < 0:
raise ValueError("Time must be non-negative")
logger.warning(
f"increase_data_processing_time called with negative elapsed time "
f"({time}ns); ignoring. Use time.monotonic_ns() at call sites to "
"prevent clock-skew from producing negative durations."
)
return
self._data_processing_time += time

def increase_control_processing_time(self, time: int) -> None:
if time < 0:
raise ValueError("Time must be non-negative")
logger.warning(
f"increase_control_processing_time called with negative elapsed time "
f"({time}ns); ignoring. Use time.monotonic_ns() at call sites to "
"prevent clock-skew from producing negative durations."
)
return
self._control_processing_time += time

def update_total_execution_time(self, time: int) -> None:
Expand Down
12 changes: 6 additions & 6 deletions amber/src/main/python/core/runnables/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def complete(self) -> None:
# stop the data processing thread
self.data_processor.stop()
self.context.state_manager.transit_to(WorkerState.COMPLETED)
self.context.statistics_manager.update_total_execution_time(time.time_ns())
self.context.statistics_manager.update_total_execution_time(time.monotonic_ns())
controller_interface = self._async_rpc_client.controller_stub()
controller_interface.worker_execution_completed(EmptyRequest())
self.context.close()
Expand Down Expand Up @@ -130,7 +130,7 @@ def _check_and_process_control(self) -> None:
def pre_start(self) -> None:
self.context.state_manager.assert_state(WorkerState.UNINITIALIZED)
self.context.state_manager.transit_to(WorkerState.READY)
self.context.statistics_manager.initialize_worker_start_time(time.time_ns())
self.context.statistics_manager.initialize_worker_start_time(time.monotonic_ns())

@overrides
def receive(self, next_entry: QueueElement) -> None:
Expand Down Expand Up @@ -226,15 +226,15 @@ def _process_dcm(self, dcm_element: DCMElement) -> None:

:param dcm_element: DirectControlMessageElement to be handled.
"""
start_time = time.time_ns()
start_time = time.monotonic_ns()
match(
(dcm_element.tag, get_one_of(dcm_element.payload, sealed=False)),
typing.Tuple[ChannelIdentity, ControlInvocation],
self._async_rpc_server.receive,
typing.Tuple[ChannelIdentity, ReturnInvocation],
self._async_rpc_client.receive,
)
end_time = time.time_ns()
end_time = time.monotonic_ns()
self.context.statistics_manager.increase_control_processing_time(
end_time - start_time
)
Expand Down Expand Up @@ -431,12 +431,12 @@ def _switch_context(self) -> None:
"""
Notify the DataProcessor thread and wait here until being switched back.
"""
start_time = time.time_ns()
start_time = time.monotonic_ns()
with self.context.tuple_processing_manager.context_switch_condition:
self.context.tuple_processing_manager.context_switch_condition.notify()
self.context.tuple_processing_manager.context_switch_condition.wait()
self._post_switch_context_checks()
end_time = time.time_ns()
end_time = time.monotonic_ns()
self.context.statistics_manager.increase_data_processing_time(
end_time - start_time
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,20 @@ def test_zero_processing_time_is_allowed(self):
assert stats.control_processing_time == 0

@pytest.mark.parametrize(
"method",
["increase_data_processing_time", "increase_control_processing_time"],
"method, attr",
[
("increase_data_processing_time", "_data_processing_time"),
("increase_control_processing_time", "_control_processing_time"),
],
)
Comment on lines 103 to 109
def test_negative_time_raises(self, method):
def test_negative_time_is_clamped_to_zero(self, method, attr):
# Negative elapsed times can arise from non-monotonic wall-clock reads
# (e.g. NTP adjustment). The manager should absorb them silently rather
# than raise and potentially hang the worker thread.
mgr = StatisticsManager()
with pytest.raises(ValueError, match="Time must be non-negative"):
getattr(mgr, method)(-1)
getattr(mgr, method)(100) # prime the accumulator
getattr(mgr, method)(-1) # must not raise
assert getattr(mgr, attr) == 100 # accumulator unchanged


class TestStatisticsManagerExecutionTime:
Expand Down
Loading