diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 6dca910..4b33a76 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,8 +14,23 @@ Unreleased **Changed** +* ``Ros.close()`` now blocks until the underlying WebSocket has actually torn + down — it waits for the factory's ``"close"`` event (emitted from + ``clientConnectionLost``) instead of returning as soon as ``send_close()`` is + dispatched on the reactor thread. Downstream callers that immediately + construct a new ``Ros`` no longer race the previous socket's cleanup. The + ``"closing"`` event semantics are unchanged. + **Fixed** +* ``TwistedEventLoopManager`` no longer leaks one ``PythonLoggingObserver`` per + ``Ros`` instance. The observer is now a process-wide singleton — there's only + ever one twisted-log → stdlib-logging bridge regardless of how many ``Ros`` + instances come and go. Long-running pytest sessions that create many + short-lived ``Ros`` instances were accumulating one observer per cycle, + slowing every log emission linearly and contributing to the occasional + ``RosTimeoutError`` flakes that motivated this lifecycle pass. + **Deprecated** **Removed** diff --git a/src/roslibpy/comm/comm_autobahn.py b/src/roslibpy/comm/comm_autobahn.py index 3a830fd..5394f1c 100644 --- a/src/roslibpy/comm/comm_autobahn.py +++ b/src/roslibpy/comm/comm_autobahn.py @@ -156,6 +156,37 @@ def set_max_retries(cls, max_retries): cls.maxRetries = max_retries +# The twisted reactor is a process-wide singleton, so the log observer that +# fans twisted's log events into the stdlib `logging` module is one too. Any +# Python process only ever needs one of these registered — registering more +# means every log event walks an O(n) observer list and ``globalLogPublisher`` +# keeps strong references to every observer for the life of the process. +# +# Prior to 2.1, ``TwistedEventLoopManager.__init__`` started a fresh +# ``PythonLoggingObserver`` per instance, and downstream wrappers that call +# ``Ros.close()`` (rather than ``Ros.terminate()``, which stops the reactor +# permanently) never invoked ``manager.terminate()`` and so never stopped the +# observer. Long-running pytest sessions that create many ``Ros`` instances +# would accumulate one observer per ``Ros``, slowing every log emission +# linearly and contributing to occasional ``RosTimeoutError`` flakes on the +# next ``Ros.run()``. +_LOG_OBSERVER = None +_LOG_OBSERVER_LOCK = threading.Lock() + + +def _ensure_log_observer_started(): + """Lazily start the single process-wide twisted log → stdlib logging bridge.""" + global _LOG_OBSERVER + if _LOG_OBSERVER is not None: + return + with _LOG_OBSERVER_LOCK: + if _LOG_OBSERVER is not None: + return + observer = log.PythonLoggingObserver() + observer.start() + _LOG_OBSERVER = observer + + class TwistedEventLoopManager(object): """Manage the main event loop using Twisted reactor. @@ -166,8 +197,11 @@ class TwistedEventLoopManager(object): """ def __init__(self): - self._log_observer = log.PythonLoggingObserver() - self._log_observer.start() + # Twisted's reactor is a process singleton — so is the observer that + # bridges twisted's log events into stdlib `logging`. See the module- + # level comment for the lifecycle details. + _ensure_log_observer_started() + self._log_observer = _LOG_OBSERVER def run(self): """Kick-starts a non-blocking event loop. @@ -273,4 +307,13 @@ def terminate(self): if self._thread: self._thread.join() - self._log_observer.stop() + # The observer is the process-wide singleton (see module top). Stop + # it once on terminate — the reactor cannot be restarted after stop, + # so no future TwistedEventLoopManager will need the bridge in this + # process. Subsequent terminates no-op. + global _LOG_OBSERVER + with _LOG_OBSERVER_LOCK: + if _LOG_OBSERVER is not None: + _LOG_OBSERVER.stop() + _LOG_OBSERVER = None + self._log_observer = None diff --git a/src/roslibpy/ros.py b/src/roslibpy/ros.py index 78a29c8..5670c81 100644 --- a/src/roslibpy/ros.py +++ b/src/roslibpy/ros.py @@ -76,20 +76,44 @@ def _unset_connecting_flag(*args): self.factory.connect() def close(self, timeout=CONNECTION_TIMEOUT): - """Disconnect from ROS.""" - if self.is_connected: - wait_disconnect = threading.Event() + """Disconnect from ROS. + + Blocks until the underlying WebSocket has actually torn down + (``clientConnectionLost`` fired). Prior to 2.1, ``close()`` + returned as soon as ``send_close()`` had been dispatched on the + reactor thread; callers that immediately constructed a new ``Ros`` + could race the previous socket's cleanup, occasionally causing + the next ``run()`` to time out under contention. + """ + if not self.is_connected: + return + + wait_disconnect = threading.Event() - def _wrapper_callback(proto): - self.emit("closing") - proto.send_close() - wait_disconnect.set() - return proto + def _on_close(*_args): + wait_disconnect.set() - self.factory.on_ready(_wrapper_callback) + # `clientConnectionLost` on the factory emits the "close" event once + # the websocket has actually been torn down. Use `once` so the + # listener is auto-removed on first fire — protects against the + # ReconnectingClientFactory triggering a later non-manual close. + self.factory.once("close", _on_close) + + def _wrapper_callback(proto): + self.emit("closing") + proto.send_close() + return proto + + self.factory.on_ready(_wrapper_callback) - if not wait_disconnect.wait(timeout): - raise RosTimeoutError("Failed to disconnect to ROS") + if not wait_disconnect.wait(timeout): + # Detach the listener so it doesn't fire on a later reconnect + # cycle and leak a reference to this Ros instance. + try: + self.factory.off("close", _on_close) + except Exception: + pass + raise RosTimeoutError("Failed to disconnect to ROS") def run(self, timeout=CONNECTION_TIMEOUT): """Kick-starts a non-blocking event loop. diff --git a/tests/ros1/test_lifecycle.py b/tests/ros1/test_lifecycle.py new file mode 100644 index 0000000..c6a2fb3 --- /dev/null +++ b/tests/ros1/test_lifecycle.py @@ -0,0 +1,73 @@ +"""Lifecycle regression tests for the 2.1 fixes. + +Targets bugs that emerged from downstream pytest sessions (``compas_fab``) +where many sequential ``Ros`` instances are constructed, run, and closed +in the same Python process. + +If these tests flake, the corresponding fix is regressing — diagnose, +don't retry-loop. +""" + +from __future__ import print_function + +import threading + +from roslibpy import Ros + +HOST = "127.0.0.1" +PORT = 9090 +URL = "ws://%s:%d" % (HOST, PORT) + + +def test_log_observer_does_not_leak(): + """Constructing many ``Ros`` instances must not leak twisted log observers. + + Before A1 every ``TwistedEventLoopManager.__init__`` (created lazily + one-per-``Ros``) called ``PythonLoggingObserver().start()`` and never + cleaned up unless ``manager.terminate()`` was explicitly called. + Downstream wrappers (compas_fab's ``RosClient.__exit__``) call + ``close()``, not ``terminate()`` — by design, since ``terminate()`` + stops the reactor permanently. So observers piled up forever. + + After A1 there's exactly one process-wide observer regardless of how + many ``Ros()`` instances come and go. + """ + from twisted.logger import globalLogPublisher + + initial = len(list(globalLogPublisher._observers)) + + for _ in range(10): + ros = Ros(URL) + ros.run() + ros.close() + + final = len(list(globalLogPublisher._observers)) + # Allow a single-observer drift in case the very first cycle is the one + # that registers the singleton. The leak case grows by len(cycles). + assert final - initial <= 1, ( + "Leaked %d twisted log observers across 10 lifecycle cycles " + "(expected at most 1)" % (final - initial) + ) + + +def test_close_blocks_until_disconnect(): + """``Ros.close()`` must not return until the WebSocket is actually closed. + + Before A2 the call returned once ``send_close`` had been dispatched on + the reactor thread — the protocol's ``onClose`` would fire shortly + after, leaving a brief window where ``is_connected`` could still read + True. Now ``close()`` blocks on ``clientConnectionLost`` so the + contract matches the name. + """ + ros = Ros(URL) + ros.run() + assert ros.is_connected + + closed_event = threading.Event() + ros.on("close", lambda _: closed_event.set()) + + ros.close() + + # The "close" event must have fired by the time close() returned. + assert closed_event.is_set(), "close() returned before clientConnectionLost fired" + assert not ros.is_connected, "is_connected still True after close()"