Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,23 @@ Unreleased

**Changed**

* ``Ros.close()`` now blocks until the underlying WebSocket has actually torn
down — it waits for the factory's ``"close"`` event (emitted from
``clientConnectionLost``) instead of returning as soon as ``send_close()`` is
dispatched on the reactor thread. Downstream callers that immediately
construct a new ``Ros`` no longer race the previous socket's cleanup. The
``"closing"`` event semantics are unchanged.

**Fixed**

* ``TwistedEventLoopManager`` no longer leaks one ``PythonLoggingObserver`` per
``Ros`` instance. The observer is now a process-wide singleton — there's only
ever one twisted-log → stdlib-logging bridge regardless of how many ``Ros``
instances come and go. Long-running pytest sessions that create many
short-lived ``Ros`` instances were accumulating one observer per cycle,
slowing every log emission linearly and contributing to the occasional
``RosTimeoutError`` flakes that motivated this lifecycle pass.

**Deprecated**

**Removed**
Expand Down
49 changes: 46 additions & 3 deletions src/roslibpy/comm/comm_autobahn.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,37 @@ def set_max_retries(cls, max_retries):
cls.maxRetries = max_retries


# The twisted reactor is a process-wide singleton, so the log observer that
# fans twisted's log events into the stdlib `logging` module is one too. Any
# Python process only ever needs one of these registered — registering more
# means every log event walks an O(n) observer list and ``globalLogPublisher``
# keeps strong references to every observer for the life of the process.
#
# Prior to 2.1, ``TwistedEventLoopManager.__init__`` started a fresh
# ``PythonLoggingObserver`` per instance, and downstream wrappers that call
# ``Ros.close()`` (rather than ``Ros.terminate()``, which stops the reactor
# permanently) never invoked ``manager.terminate()`` and so never stopped the
# observer. Long-running pytest sessions that create many ``Ros`` instances
# would accumulate one observer per ``Ros``, slowing every log emission
# linearly and contributing to occasional ``RosTimeoutError`` flakes on the
# next ``Ros.run()``.
_LOG_OBSERVER = None
_LOG_OBSERVER_LOCK = threading.Lock()


def _ensure_log_observer_started():
"""Lazily start the single process-wide twisted log → stdlib logging bridge."""
global _LOG_OBSERVER
if _LOG_OBSERVER is not None:
return
with _LOG_OBSERVER_LOCK:
if _LOG_OBSERVER is not None:
return
observer = log.PythonLoggingObserver()
observer.start()
_LOG_OBSERVER = observer


class TwistedEventLoopManager(object):
"""Manage the main event loop using Twisted reactor.

Expand All @@ -166,8 +197,11 @@ class TwistedEventLoopManager(object):
"""

def __init__(self):
self._log_observer = log.PythonLoggingObserver()
self._log_observer.start()
# Twisted's reactor is a process singleton — so is the observer that
# bridges twisted's log events into stdlib `logging`. See the module-
# level comment for the lifecycle details.
_ensure_log_observer_started()
self._log_observer = _LOG_OBSERVER

def run(self):
"""Kick-starts a non-blocking event loop.
Expand Down Expand Up @@ -273,4 +307,13 @@ def terminate(self):
if self._thread:
self._thread.join()

self._log_observer.stop()
# The observer is the process-wide singleton (see module top). Stop
# it once on terminate — the reactor cannot be restarted after stop,
# so no future TwistedEventLoopManager will need the bridge in this
# process. Subsequent terminates no-op.
global _LOG_OBSERVER
with _LOG_OBSERVER_LOCK:
if _LOG_OBSERVER is not None:
_LOG_OBSERVER.stop()
_LOG_OBSERVER = None
self._log_observer = None
46 changes: 35 additions & 11 deletions src/roslibpy/ros.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,44 @@ def _unset_connecting_flag(*args):
self.factory.connect()

def close(self, timeout=CONNECTION_TIMEOUT):
"""Disconnect from ROS."""
if self.is_connected:
wait_disconnect = threading.Event()
"""Disconnect from ROS.

Blocks until the underlying WebSocket has actually torn down
(``clientConnectionLost`` fired). Prior to 2.1, ``close()``
returned as soon as ``send_close()`` had been dispatched on the
reactor thread; callers that immediately constructed a new ``Ros``
could race the previous socket's cleanup, occasionally causing
the next ``run()`` to time out under contention.
"""
if not self.is_connected:
return

wait_disconnect = threading.Event()

def _wrapper_callback(proto):
self.emit("closing")
proto.send_close()
wait_disconnect.set()
return proto
def _on_close(*_args):
wait_disconnect.set()

self.factory.on_ready(_wrapper_callback)
# `clientConnectionLost` on the factory emits the "close" event once
# the websocket has actually been torn down. Use `once` so the
# listener is auto-removed on first fire — protects against the
# ReconnectingClientFactory triggering a later non-manual close.
self.factory.once("close", _on_close)

def _wrapper_callback(proto):
self.emit("closing")
proto.send_close()
return proto

self.factory.on_ready(_wrapper_callback)

if not wait_disconnect.wait(timeout):
raise RosTimeoutError("Failed to disconnect to ROS")
if not wait_disconnect.wait(timeout):
# Detach the listener so it doesn't fire on a later reconnect
# cycle and leak a reference to this Ros instance.
try:
self.factory.off("close", _on_close)
except Exception:
pass
raise RosTimeoutError("Failed to disconnect to ROS")

def run(self, timeout=CONNECTION_TIMEOUT):
"""Kick-starts a non-blocking event loop.
Expand Down
73 changes: 73 additions & 0 deletions tests/ros1/test_lifecycle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Lifecycle regression tests for the 2.1 fixes.

Targets bugs that emerged from downstream pytest sessions (``compas_fab``)
where many sequential ``Ros`` instances are constructed, run, and closed
in the same Python process.

If these tests flake, the corresponding fix is regressing — diagnose,
don't retry-loop.
"""

from __future__ import print_function

import threading

from roslibpy import Ros

HOST = "127.0.0.1"
PORT = 9090
URL = "ws://%s:%d" % (HOST, PORT)


def test_log_observer_does_not_leak():
"""Constructing many ``Ros`` instances must not leak twisted log observers.

Before A1 every ``TwistedEventLoopManager.__init__`` (created lazily
one-per-``Ros``) called ``PythonLoggingObserver().start()`` and never
cleaned up unless ``manager.terminate()`` was explicitly called.
Downstream wrappers (compas_fab's ``RosClient.__exit__``) call
``close()``, not ``terminate()`` — by design, since ``terminate()``
stops the reactor permanently. So observers piled up forever.

After A1 there's exactly one process-wide observer regardless of how
many ``Ros()`` instances come and go.
"""
from twisted.logger import globalLogPublisher

initial = len(list(globalLogPublisher._observers))

for _ in range(10):
ros = Ros(URL)
ros.run()
ros.close()

final = len(list(globalLogPublisher._observers))
# Allow a single-observer drift in case the very first cycle is the one
# that registers the singleton. The leak case grows by len(cycles).
assert final - initial <= 1, (
"Leaked %d twisted log observers across 10 lifecycle cycles "
"(expected at most 1)" % (final - initial)
)


def test_close_blocks_until_disconnect():
"""``Ros.close()`` must not return until the WebSocket is actually closed.

Before A2 the call returned once ``send_close`` had been dispatched on
the reactor thread — the protocol's ``onClose`` would fire shortly
after, leaving a brief window where ``is_connected`` could still read
True. Now ``close()`` blocks on ``clientConnectionLost`` so the
contract matches the name.
"""
ros = Ros(URL)
ros.run()
assert ros.is_connected

closed_event = threading.Event()
ros.on("close", lambda _: closed_event.set())

ros.close()

# The "close" event must have fired by the time close() returned.
assert closed_event.is_set(), "close() returned before clientConnectionLost fired"
assert not ros.is_connected, "is_connected still True after close()"
Loading