From 50cc828fb06a7b24a1f13fdab2a1182cf4eb1da6 Mon Sep 17 00:00:00 2001 From: Gonzalo Casas Date: Sat, 30 May 2026 12:50:27 +0200 Subject: [PATCH 1/3] fix: stop leaking a twisted log observer per Ros instance TwistedEventLoopManager.__init__ instantiated PythonLoggingObserver and started it; the observer was only stopped in manager.terminate(), which in turn was only invoked by Ros.terminate() (which stops the reactor permanently). Downstream code that follows the documented RosClient.__exit__ -> Ros.close() pattern never called terminate(), so every Ros instance added a permanent observer to twisted's globalLogPublisher. In long-running pytest sessions that create many short-lived Ros instances (e.g. compas_fab's integration suite), this accumulated tens of observers and slowed every twisted log emission linearly. It also contributed to occasional RosTimeoutError flakes on the next Ros.run() after many cycles. Make the observer a process-wide singleton: TwistedEventLoopManager lazily starts a single module-level PythonLoggingObserver via a thread- safe initializer, and terminate() now stops it once and clears the sentinel. Verified by a new tests/ros1/test_lifecycle.py case that constructs 10 Ros instances and asserts at most one observer is added. Co-Authored-By: Claude Opus 4.7 --- CHANGELOG.rst | 8 +++++ src/roslibpy/comm/comm_autobahn.py | 49 +++++++++++++++++++++++++++-- tests/ros1/test_lifecycle.py | 50 ++++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+), 3 deletions(-) create mode 100644 tests/ros1/test_lifecycle.py diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 6dca910..1049c32 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,14 @@ Unreleased **Fixed** +* ``TwistedEventLoopManager`` no longer leaks one ``PythonLoggingObserver`` per + ``Ros`` instance. The observer is now a process-wide singleton — there's only + ever one twisted-log → stdlib-logging bridge regardless of how many ``Ros`` + instances come and go. Long-running pytest sessions that create many + short-lived ``Ros`` instances were accumulating one observer per cycle, + slowing every log emission linearly and contributing to the occasional + ``RosTimeoutError`` flakes that motivated this lifecycle pass. + **Deprecated** **Removed** diff --git a/src/roslibpy/comm/comm_autobahn.py b/src/roslibpy/comm/comm_autobahn.py index 3a830fd..5394f1c 100644 --- a/src/roslibpy/comm/comm_autobahn.py +++ b/src/roslibpy/comm/comm_autobahn.py @@ -156,6 +156,37 @@ def set_max_retries(cls, max_retries): cls.maxRetries = max_retries +# The twisted reactor is a process-wide singleton, so the log observer that +# fans twisted's log events into the stdlib `logging` module is one too. Any +# Python process only ever needs one of these registered — registering more +# means every log event walks an O(n) observer list and ``globalLogPublisher`` +# keeps strong references to every observer for the life of the process. +# +# Prior to 2.1, ``TwistedEventLoopManager.__init__`` started a fresh +# ``PythonLoggingObserver`` per instance, and downstream wrappers that call +# ``Ros.close()`` (rather than ``Ros.terminate()``, which stops the reactor +# permanently) never invoked ``manager.terminate()`` and so never stopped the +# observer. Long-running pytest sessions that create many ``Ros`` instances +# would accumulate one observer per ``Ros``, slowing every log emission +# linearly and contributing to occasional ``RosTimeoutError`` flakes on the +# next ``Ros.run()``. +_LOG_OBSERVER = None +_LOG_OBSERVER_LOCK = threading.Lock() + + +def _ensure_log_observer_started(): + """Lazily start the single process-wide twisted log → stdlib logging bridge.""" + global _LOG_OBSERVER + if _LOG_OBSERVER is not None: + return + with _LOG_OBSERVER_LOCK: + if _LOG_OBSERVER is not None: + return + observer = log.PythonLoggingObserver() + observer.start() + _LOG_OBSERVER = observer + + class TwistedEventLoopManager(object): """Manage the main event loop using Twisted reactor. @@ -166,8 +197,11 @@ class TwistedEventLoopManager(object): """ def __init__(self): - self._log_observer = log.PythonLoggingObserver() - self._log_observer.start() + # Twisted's reactor is a process singleton — so is the observer that + # bridges twisted's log events into stdlib `logging`. See the module- + # level comment for the lifecycle details. + _ensure_log_observer_started() + self._log_observer = _LOG_OBSERVER def run(self): """Kick-starts a non-blocking event loop. @@ -273,4 +307,13 @@ def terminate(self): if self._thread: self._thread.join() - self._log_observer.stop() + # The observer is the process-wide singleton (see module top). Stop + # it once on terminate — the reactor cannot be restarted after stop, + # so no future TwistedEventLoopManager will need the bridge in this + # process. Subsequent terminates no-op. + global _LOG_OBSERVER + with _LOG_OBSERVER_LOCK: + if _LOG_OBSERVER is not None: + _LOG_OBSERVER.stop() + _LOG_OBSERVER = None + self._log_observer = None diff --git a/tests/ros1/test_lifecycle.py b/tests/ros1/test_lifecycle.py new file mode 100644 index 0000000..c6d84b4 --- /dev/null +++ b/tests/ros1/test_lifecycle.py @@ -0,0 +1,50 @@ +"""Lifecycle regression tests for the 2.1 fixes. + +Targets bugs that emerged from downstream pytest sessions (``compas_fab``) +where many sequential ``Ros`` instances are constructed, run, and closed +in the same Python process. + +If these tests flake, the corresponding fix is regressing — diagnose, +don't retry-loop. +""" + +from __future__ import print_function + +from roslibpy import Ros + +HOST = "127.0.0.1" +PORT = 9090 +URL = "ws://%s:%d" % (HOST, PORT) + + +def test_log_observer_does_not_leak(): + """Constructing many ``Ros`` instances must not leak twisted log observers. + + Before A1 every ``TwistedEventLoopManager.__init__`` (created lazily + one-per-``Ros``) called ``PythonLoggingObserver().start()`` and never + cleaned up unless ``manager.terminate()`` was explicitly called. + Downstream wrappers (compas_fab's ``RosClient.__exit__``) call + ``close()``, not ``terminate()`` — by design, since ``terminate()`` + stops the reactor permanently. So observers piled up forever. + + After A1 there's exactly one process-wide observer regardless of how + many ``Ros()`` instances come and go. + """ + from twisted.logger import globalLogPublisher + + initial = len(list(globalLogPublisher._observers)) + + for _ in range(10): + ros = Ros(URL) + ros.run() + ros.close() + + final = len(list(globalLogPublisher._observers)) + # Allow a single-observer drift in case the very first cycle is the one + # that registers the singleton. The leak case grows by len(cycles). + assert final - initial <= 1, ( + "Leaked %d twisted log observers across 10 lifecycle cycles " + "(expected at most 1)" % (final - initial) + ) + + From 45a3a87c9f8a6caacdb16fa8ae8f9d8a3c367ff4 Mon Sep 17 00:00:00 2001 From: Gonzalo Casas Date: Sat, 30 May 2026 12:51:33 +0200 Subject: [PATCH 2/3] fix(Ros.close): block until the WebSocket is actually torn down Previously Ros.close() returned as soon as proto.send_close() had been dispatched onto the reactor thread: it set wait_disconnect inside the on_ready callback, before clientConnectionLost ever fired. Downstream callers (compas_fab's RosClient.__exit__ -> Ros.close() -> immediately construct a new Ros) could then race the previous socket's TCP teardown, and on contended reactors the next Ros.run() occasionally tripped its 10s connection timeout. close() now registers a one-shot listener on the factory's "close" event (emitted from clientConnectionLost when the underlying TCP connection has actually been lost) and only sets wait_disconnect from there. The "closing" event still fires synchronously inside the on_ready callback before send_close(), so handlers that depend on that ordering are unaffected. If the disconnect doesn't complete within the timeout the listener is detached before raising RosTimeoutError, so a subsequent reconnect cycle on the same factory won't leak a reference to this Ros instance. Verified by a new test_close_blocks_until_disconnect case in tests/ros1/test_lifecycle.py. Co-Authored-By: Claude Opus 4.7 --- CHANGELOG.rst | 7 ++++++ src/roslibpy/ros.py | 46 +++++++++++++++++++++++++++--------- tests/ros1/test_lifecycle.py | 25 ++++++++++++++++++++ 3 files changed, 67 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 1049c32..4b33a76 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,13 @@ Unreleased **Changed** +* ``Ros.close()`` now blocks until the underlying WebSocket has actually torn + down — it waits for the factory's ``"close"`` event (emitted from + ``clientConnectionLost``) instead of returning as soon as ``send_close()`` is + dispatched on the reactor thread. Downstream callers that immediately + construct a new ``Ros`` no longer race the previous socket's cleanup. The + ``"closing"`` event semantics are unchanged. + **Fixed** * ``TwistedEventLoopManager`` no longer leaks one ``PythonLoggingObserver`` per diff --git a/src/roslibpy/ros.py b/src/roslibpy/ros.py index 78a29c8..5670c81 100644 --- a/src/roslibpy/ros.py +++ b/src/roslibpy/ros.py @@ -76,20 +76,44 @@ def _unset_connecting_flag(*args): self.factory.connect() def close(self, timeout=CONNECTION_TIMEOUT): - """Disconnect from ROS.""" - if self.is_connected: - wait_disconnect = threading.Event() + """Disconnect from ROS. + + Blocks until the underlying WebSocket has actually torn down + (``clientConnectionLost`` fired). Prior to 2.1, ``close()`` + returned as soon as ``send_close()`` had been dispatched on the + reactor thread; callers that immediately constructed a new ``Ros`` + could race the previous socket's cleanup, occasionally causing + the next ``run()`` to time out under contention. + """ + if not self.is_connected: + return + + wait_disconnect = threading.Event() - def _wrapper_callback(proto): - self.emit("closing") - proto.send_close() - wait_disconnect.set() - return proto + def _on_close(*_args): + wait_disconnect.set() - self.factory.on_ready(_wrapper_callback) + # `clientConnectionLost` on the factory emits the "close" event once + # the websocket has actually been torn down. Use `once` so the + # listener is auto-removed on first fire — protects against the + # ReconnectingClientFactory triggering a later non-manual close. + self.factory.once("close", _on_close) + + def _wrapper_callback(proto): + self.emit("closing") + proto.send_close() + return proto + + self.factory.on_ready(_wrapper_callback) - if not wait_disconnect.wait(timeout): - raise RosTimeoutError("Failed to disconnect to ROS") + if not wait_disconnect.wait(timeout): + # Detach the listener so it doesn't fire on a later reconnect + # cycle and leak a reference to this Ros instance. + try: + self.factory.off("close", _on_close) + except Exception: + pass + raise RosTimeoutError("Failed to disconnect to ROS") def run(self, timeout=CONNECTION_TIMEOUT): """Kick-starts a non-blocking event loop. diff --git a/tests/ros1/test_lifecycle.py b/tests/ros1/test_lifecycle.py index c6d84b4..0b22e0e 100644 --- a/tests/ros1/test_lifecycle.py +++ b/tests/ros1/test_lifecycle.py @@ -10,6 +10,8 @@ from __future__ import print_function +import threading + from roslibpy import Ros HOST = "127.0.0.1" @@ -48,3 +50,26 @@ def test_log_observer_does_not_leak(): ) +def test_close_blocks_until_disconnect(): + """``Ros.close()`` must not return until the WebSocket is actually closed. + + Before A2 the call returned once ``send_close`` had been dispatched on + the reactor thread — the protocol's ``onClose`` would fire shortly + after, leaving a brief window where ``is_connected`` could still read + True. Now ``close()`` blocks on ``clientConnectionLost`` so the + contract matches the name. + """ + ros = Ros(URL) + ros.run() + assert ros.is_connected + + closed_event = threading.Event() + ros.on("close", lambda _: closed_event.set()) + + ros.close() + + # The "close" event must have fired by the time close() returned. + assert closed_event.is_set(), "close() returned before clientConnectionLost fired" + assert not ros.is_connected, "is_connected still True after close()" + + From c7bb147c7de8ea9c621ca729abaafb6914801017 Mon Sep 17 00:00:00 2001 From: Gonzalo Casas Date: Sat, 30 May 2026 13:32:04 +0200 Subject: [PATCH 3/3] lint --- tests/ros1/test_lifecycle.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/ros1/test_lifecycle.py b/tests/ros1/test_lifecycle.py index 0b22e0e..c6a2fb3 100644 --- a/tests/ros1/test_lifecycle.py +++ b/tests/ros1/test_lifecycle.py @@ -71,5 +71,3 @@ def test_close_blocks_until_disconnect(): # The "close" event must have fired by the time close() returned. assert closed_event.is_set(), "close() returned before clientConnectionLost fired" assert not ros.is_connected, "is_connected still True after close()" - -