diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py index fd9d0d827..1580ee599 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py @@ -52,6 +52,7 @@ import sys import threading import time +import weakref from pathlib import Path from typing import Any @@ -63,7 +64,9 @@ if str(_PLUGIN_DIR) not in sys.path: sys.path.insert(0, str(_PLUGIN_DIR)) -from bridge_client import BridgeError, MemosBridgeClient # noqa: E402 +import bridge_supervisor # noqa: E402 + +from bridge_client import MemosBridgeClient # noqa: E402 from daemon_manager import ensure_bridge_running, ensure_viewer_daemon # noqa: E402 @@ -269,8 +272,9 @@ def __init__(self) -> None: # show the model's thinking like OpenClaw does. self._turn_thinking: str = "" self._hook_registered = False - self._bridge_keepalive_stop = threading.Event() - self._bridge_keepalive_thread: threading.Thread | None = None + # Set while `shutdown()` runs so concurrent reconnect attempts + # don't respawn a bridge we are in the middle of releasing. + self._closing = threading.Event() # Hermes runs background memory/skill reviewers by forking an agent and # appending a synthetic user turn. That turn is instruction plumbing, # not a human utterance, so it must not become a MemOS trace. @@ -309,6 +313,7 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov self._hermes_home = str(kwargs.get("hermes_home") or "") self._platform = str(kwargs.get("platform") or "cli") self._agent_identity = str(kwargs.get("agent_identity") or "hermes") + self._closing.clear() try: ensure_bridge_running() except Exception as err: @@ -318,29 +323,35 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov ensure_viewer_daemon() except Exception as err: logger.warning("MemOS: viewer daemon check failed — %s", err) - new_bridge: MemosBridgeClient | None = None + # The bridge client is a process-wide shared resource owned by the + # supervisor: hosts may construct any number of provider instances + # (and may never call shutdown() on some of them), but only one + # Node subprocess exists per host process. See bridge_supervisor. + client: MemosBridgeClient | None = None + created = False try: - new_bridge = MemosBridgeClient() + client, created = bridge_supervisor.supervisor.acquire( + self, lambda: MemosBridgeClient() + ) # Register the fallback LLM handler BEFORE we open the # session so it is available the very first time the # plugin's facade asks for help (e.g. on the first # `turn.start` retrieval call). - new_bridge.register_host_handler( - "host.llm.complete", - self._handle_host_llm_complete, - ) - self._bridge = new_bridge + self._bind_host_handlers(client) + self._bridge = client self._open_session(session_id) logger.info( - "MemOS: bridge ready session=%s platform=%s (episode deferred)", + "MemOS: bridge ready session=%s platform=%s shared_pid=%s (episode deferred)", self._session_id, self._platform, + getattr(client, "pid", "?"), ) except Exception as err: logger.warning("MemOS: bridge init failed — %s", err) - if new_bridge is not None: - with contextlib.suppress(Exception): - new_bridge.close() + if created: + bridge_supervisor.supervisor.discard(self, client) + else: + bridge_supervisor.supervisor.release(self) self._bridge = None # Register a Hermes plugin hook to capture tool calls as they # happen. The `post_tool_call` hook fires after every tool @@ -348,7 +359,6 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov # tool name, arguments, and result. We accumulate them and # flush in `sync_turn`. self._register_tool_call_hook() - self._start_bridge_keepalive() def system_prompt_block(self) -> str: # type: ignore[override] return ( @@ -421,25 +431,40 @@ def _record_namespace(self) -> dict[str, Any]: return ns def _register_tool_call_hook(self) -> None: - if self._hook_registered: - return - try: - from hermes_cli.plugins import ( - get_plugin_manager, # pyright: ignore[reportMissingImports] - ) - - mgr = get_plugin_manager() - mgr._hooks.setdefault("post_tool_call", []).append(self._on_post_tool_call) - mgr._hooks.setdefault("post_llm_call", []).append(self._on_post_llm_call) - mgr._hooks.setdefault("transform_tool_result", []).append( - self._on_transform_tool_result - ) + # The host's plugin manager is process-global; appending bound + # methods per instance both duplicated hook work and strongly + # pinned abandoned provider instances (and their bridge + # subprocesses) forever. Dispatchers are installed once per + # process; this instance is reachable only through a weakref + # registry, so dropping the provider really frees it. + bridge_supervisor.register_provider(self) + if bridge_supervisor.install_host_hooks(): self._hook_registered = True - logger.debug( - "MemOS: registered post_tool_call + post_llm_call + transform_tool_result hooks" - ) - except Exception as err: - logger.debug("MemOS: could not register tool hook — %s", err) + + def _bind_host_handlers(self, client: Any) -> None: + """Register reverse-RPC handlers without strongly pinning ``self``. + + The shared client outlives any single provider instance; a bound + method stored in its handler table would keep an abandoned + provider (and everything it references) alive for the client's + whole lifetime. + """ + handler_ref = weakref.WeakMethod(self._handle_host_llm_complete) + + def _weak_host_llm_complete(params: dict[str, Any]) -> dict[str, Any]: + handler = handler_ref() + if handler is None: + # The registering provider was dropped by the host. The + # handler is session-agnostic (it resolves the host's + # main runtime, not provider state), so any surviving + # provider can serve the fallback instead of turning + # the whole process's LLM fallback path red. + for provider in bridge_supervisor.live_providers(): + return provider._handle_host_llm_complete(params) + raise RuntimeError("memory provider has been released") + return handler(params) + + client.register_host_handler("host.llm.complete", _weak_host_llm_complete) def _on_transform_tool_result( self, @@ -1510,20 +1535,17 @@ def on_session_end(self, messages: list[dict[str, Any]]) -> None: # type: ignor self._bridge.request("session.close", {"sessionId": self._session_id}) def shutdown(self) -> None: # type: ignore[override] - self._bridge_keepalive_stop.set() - if self._bridge_keepalive_thread and self._bridge_keepalive_thread.is_alive(): - self._bridge_keepalive_thread.join( - timeout=12.0 - ) # Increased to cover health check timeout (10s) + margin + self._closing.set() if self._prefetch_thread and self._prefetch_thread.is_alive(): self._prefetch_thread.join(timeout=5.0) - if self._bridge: - pid = getattr(self._bridge, "pid", "?") - logger.info("MemOS: shutting down bridge (pid=%s)", pid) - with contextlib.suppress(Exception): - self._bridge.close() - self._bridge = None - logger.info("MemOS: bridge shutdown complete (pid=%s)", pid) + bridge_supervisor.unregister_provider(self) + pid = getattr(self._bridge, "pid", "?") if self._bridge else "-" + self._bridge = None + # The supervisor closes the shared client (and stops the + # process-wide keepalive) only when the last holder releases. + logger.info("MemOS: releasing shared bridge (pid=%s)", pid) + bridge_supervisor.supervisor.release(self) + logger.info("MemOS: bridge release complete (pid=%s)", pid) # ─── Host LLM bridge (fallback for plugin-side model failures) ──────── @@ -1719,58 +1741,58 @@ def _open_session(self, session_id: str = "", *, timeout: float = 30.0) -> None: self._session_id = resp.get("sessionId") or requested_session def _is_transport_closed(self, err: Exception) -> bool: - if isinstance(err, BridgeError) and err.code == "transport_closed": - return True - msg = str(err).lower() - return "broken pipe" in msg or "bridge closed" in msg or "transport_closed" in msg + return bridge_supervisor.transport_closed(err) def _reconnect_bridge(self, session_id: str = "", *, timeout: float = 30.0) -> None: # Don't reconnect if we're shutting down - if self._bridge_keepalive_stop.is_set(): + if self._closing.is_set(): logger.debug("MemOS: skipping reconnect during shutdown") return with self._reconnect_lock: # Double-check after acquiring lock - if self._bridge_keepalive_stop.is_set(): + if self._closing.is_set(): logger.debug("MemOS: skipping reconnect during shutdown (after lock)") return - old_bridge = self._bridge - old_pid = getattr(old_bridge, "pid", None) if old_bridge else None - - if old_bridge: - logger.info("MemOS: closing old bridge (pid=%s)", old_pid) - with contextlib.suppress(Exception): - old_bridge.close() - logger.info("MemOS: old bridge closed (pid=%s)", old_pid) - + stale = self._bridge ensure_bridge_running() try: ensure_viewer_daemon() except Exception as err: logger.warning("MemOS: viewer daemon check failed during reconnect — %s", err) - new_bridge: MemosBridgeClient | None = None + # Generation-aware replacement: if another provider already + # replaced the dead shared client, adopt its replacement + # instead of spawning yet another process. + client, created = bridge_supervisor.supervisor.replace( + self, stale, lambda: MemosBridgeClient() + ) try: - new_bridge = MemosBridgeClient() - logger.info("MemOS: new bridge created (pid=%s)", getattr(new_bridge, "pid", "?")) - - new_bridge.register_host_handler( - "host.llm.complete", - self._handle_host_llm_complete, - ) - self._bridge = new_bridge + self._bind_host_handlers(client) + self._bridge = client self._open_session(session_id, timeout=timeout) + # The episode we were tracking lived in the replaced + # binding; clear it so the next turn.start (or the + # sync_turn retry) re-establishes it on this client. + self._episode_id = "" except Exception: - if new_bridge is not None: - with contextlib.suppress(Exception): - new_bridge.close() - if self._bridge is new_bridge: + if created: + bridge_supervisor.supervisor.discard(self, client) + else: + bridge_supervisor.supervisor.release(self) + if self._bridge is client: self._bridge = None raise def _ensure_bridge(self, session_id: str = "", *, timeout: float = 30.0) -> bool: - if self._bridge: + # A binding goes stale when the supervisor holds a *newer* shared + # client — e.g. the keepalive (or another provider) respawned it + # after a crash. Rebind eagerly then, re-opening our session. + # When the supervisor has no current client, keep trusting the + # binding we hold: request failures will route through the + # reactive transport-closed reconnect in `sync_turn`. + current = bridge_supervisor.supervisor.peek() + if self._bridge is not None and (current is None or self._bridge is current): return True try: self._reconnect_bridge(session_id or self._session_id, timeout=timeout) @@ -1785,33 +1807,6 @@ def _ensure_bridge(self, session_id: str = "", *, timeout: float = 30.0) -> bool self._bridge = None return False - def _start_bridge_keepalive(self) -> None: - if self._bridge_keepalive_thread and self._bridge_keepalive_thread.is_alive(): - return - self._bridge_keepalive_stop.clear() - - def _run() -> None: - while not self._bridge_keepalive_stop.wait(5.0): - if not self._ensure_bridge(self._session_id, timeout=10.0): - continue - try: - assert self._bridge is not None - self._bridge.request("core.health", {}, timeout=10.0) - except Exception as err: - if self._is_transport_closed(err): - logger.info("MemOS: bridge keepalive reconnecting after transport close") - with contextlib.suppress(Exception): - self._reconnect_bridge(self._session_id, timeout=10.0) - else: - logger.debug("MemOS: bridge keepalive failed — %s", err) - - self._bridge_keepalive_thread = threading.Thread( - target=_run, - daemon=True, - name="memos-bridge-keepalive", - ) - self._bridge_keepalive_thread.start() - def _turn_start(self, query: str, *, session_id: str = "") -> str: assert self._bridge is not None host_runtime = self._host_runtime_context() @@ -1876,11 +1871,15 @@ def _turn_end( if agent_thinking: payload["agentThinking"] = agent_thinking result = self._bridge.request("turn.end", payload) - # Capture the trace ID for feedback submission + # Capture the trace ID for feedback submission. The core contract + # (memory-core.ts onTurnEnd) returns a singular `traceId`; accept + # the plural form too for forward compatibility. if result and isinstance(result, dict): - trace_ids = result.get("traceIds", []) - if trace_ids and len(trace_ids) > 0: - trace_id = trace_ids[-1] # Last trace is the current turn + trace_id = result.get("traceId") or "" + if not trace_id: + trace_ids = result.get("traceIds") or [] + trace_id = trace_ids[-1] if trace_ids else "" + if trace_id: self._last_trace_id = trace_id return trace_id return "" @@ -1950,5 +1949,10 @@ def register(ctx: Any) -> None: ctx.register_memory_provider(MemTensorProvider()) +def _reset_bridge_runtime_for_tests() -> None: + """Tear down the process-wide bridge supervisor state. Test-only.""" + bridge_supervisor.reset_for_tests() + + # Pattern 2: exported class — fallback via `issubclass(MemoryProvider)`. __all__ = ["PLUGIN_ID", "PLUGIN_VERSION", "MemTensorProvider", "register"] diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py index 5b986ec66..d84ad0eaa 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py @@ -19,6 +19,7 @@ import shutil import subprocess import threading +import weakref from pathlib import Path from typing import TYPE_CHECKING, Any @@ -143,18 +144,28 @@ def __init__( env=env, cwd=str(plugin_root), ) + # Reader threads hold a weakref to the client, never a strong + # reference: a bound-method target would pin the client (and its + # subprocess pipes) for as long as the thread blocks on stdout, + # making an abandoned client immortal. With weakrefs, dropping + # the last strong reference lets GC run the finalizer below, + # which closes the child's stdin — the bridge's graceful-exit + # signal — and that EOF in turn ends both reader threads. self._reader = threading.Thread( - target=self._read_loop, + target=_pump_stdout, + args=(weakref.ref(self), self._proc.stdout), daemon=True, name="memos-bridge-reader", ) self._reader.start() self._stderr_reader = threading.Thread( - target=self._stderr_loop, + target=_pump_stderr, + args=(self._proc.stderr,), daemon=True, name="memos-bridge-stderr", ) self._stderr_reader.start() + self._finalizer = weakref.finalize(self, _release_process_pipes, self._proc) @property def pid(self) -> int: @@ -285,76 +296,92 @@ def close(self) -> None: entry["event"].set() self._pending.clear() + # 6. Release the remaining pipe fds now rather than at GC time — + # under reconnect churn the wrappers would otherwise accumulate + # until a collection cycle (and warn via ResourceWarning). + for stream in (self._proc.stdout, self._proc.stderr): + with contextlib.suppress(Exception): + if stream is not None: + stream.close() + + # Orderly shutdown released the process and all pipes; the GC + # backstop has nothing left to do. + finalizer = getattr(self, "_finalizer", None) + if finalizer is not None: + finalizer.detach() + # ─── Internals ── - def _read_loop(self) -> None: - assert self._proc.stdout is not None - for line in self._proc.stdout: - line = line.strip() - if not line: - continue - try: - msg = json.loads(line) - except json.JSONDecodeError: - logger.debug("bridge: malformed line: %r", line[:120]) - continue - if "id" in msg and msg["id"] is not None and ("result" in msg or "error" in msg): - self._resolve(msg) - continue - if msg.get("method") == "events.notify": - for cb in list(self._events): - try: - cb(msg.get("params") or {}) - except Exception: - logger.debug("event listener threw", exc_info=True) - continue - if msg.get("method") == "logs.forward": - for cb in list(self._logs): - try: - cb(msg.get("params") or {}) - except Exception: - logger.debug("log listener threw", exc_info=True) - continue - # Reverse-direction request: the bridge is asking the - # adapter to do something (e.g. run a fallback LLM call - # via `host.llm.complete`). Dispatch to the registered - # handler and write the response back synchronously. - method = msg.get("method") - rpc_id = msg.get("id") - if ( - isinstance(method, str) - and rpc_id is not None - and "result" not in msg - and "error" not in msg - ): - handler = self._host_handler_for(method) - if handler is None: - self._send_response( - rpc_id, - error={ - "code": -32601, - "message": f"method not found: {method}", - "data": {"code": "unknown_method"}, - }, - ) - continue - params = msg.get("params") or {} - if not isinstance(params, dict): - params = {} + def _handle_line(self, line: str) -> None: + line = line.strip() + if not line: + return + try: + msg = json.loads(line) + except json.JSONDecodeError: + logger.debug("bridge: malformed line: %r", line[:120]) + return + if not isinstance(msg, dict): + # Valid JSON but not an object (null, list, string, …) — + # dict-key access below would raise and kill the reader thread. + logger.debug("bridge: non-object line: %r", line[:120]) + return + if "id" in msg and msg["id"] is not None and ("result" in msg or "error" in msg): + self._resolve(msg) + return + if msg.get("method") == "events.notify": + for cb in list(self._events): + try: + cb(msg.get("params") or {}) + except Exception: + logger.debug("event listener threw", exc_info=True) + return + if msg.get("method") == "logs.forward": + for cb in list(self._logs): try: - result = handler(params) - self._send_response(rpc_id, result=result) - except Exception as err: - logger.warning("host handler %s failed: %s", method, err) - self._send_response( - rpc_id, - error={ - "code": -32000, - "message": str(err) or err.__class__.__name__, - "data": {"code": "host_handler_failed"}, - }, - ) - continue + cb(msg.get("params") or {}) + except Exception: + logger.debug("log listener threw", exc_info=True) + return + # Reverse-direction request: the bridge is asking the + # adapter to do something (e.g. run a fallback LLM call + # via `host.llm.complete`). Dispatch to the registered + # handler and write the response back synchronously. + method = msg.get("method") + rpc_id = msg.get("id") + if ( + isinstance(method, str) + and rpc_id is not None + and "result" not in msg + and "error" not in msg + ): + handler = self._host_handler_for(method) + if handler is None: + self._send_response( + rpc_id, + error={ + "code": -32601, + "message": f"method not found: {method}", + "data": {"code": "unknown_method"}, + }, + ) + return + params = msg.get("params") or {} + if not isinstance(params, dict): + params = {} + try: + result = handler(params) + self._send_response(rpc_id, result=result) + except Exception as err: + logger.warning("host handler %s failed: %s", method, err) + self._send_response( + rpc_id, + error={ + "code": -32000, + "message": str(err) or err.__class__.__name__, + "data": {"code": "host_handler_failed"}, + }, + ) def _host_handler_for( self, @@ -400,13 +427,6 @@ def _send_response( except (BrokenPipeError, OSError): pass - def _stderr_loop(self) -> None: - assert self._proc.stderr is not None - for line in self._proc.stderr: - line = line.rstrip() - if line: - logger.debug("bridge.stderr: %s", line) - def _resolve(self, msg: dict[str, Any]) -> None: rpc_id = msg.get("id") if not isinstance(rpc_id, int): @@ -420,3 +440,51 @@ def _resolve(self, msg: dict[str, Any]) -> None: else: entry["result"] = msg.get("result") entry["event"].set() + + +# ─── Module-level pump targets (no strong reference to the client) ───────── + + +def _pump_stdout(client_ref: weakref.ref, stdout: Any) -> None: + """Forward bridge stdout lines to the client while it is alive. + + Holds only a weakref between lines so an abandoned client can be + garbage-collected; its finalizer then closes the pipes, which ends + this loop via EOF (or a closed-file error). + """ + try: + for line in stdout: + client = client_ref() + if client is None: + break + client._handle_line(line) + del client + except (ValueError, OSError): + pass # pipe closed underneath us — finalizer or close() ran + + +def _pump_stderr(stderr: Any) -> None: + try: + for line in stderr: + line = line.rstrip() + if line: + logger.debug("bridge.stderr: %s", line) + except (ValueError, OSError): + pass + + +def _release_process_pipes(proc: subprocess.Popen) -> None: + """GC backstop: close the subprocess pipes once the client is gone. + + Closing stdin is the bridge's graceful-exit signal (it exits on + stdin EOF in ``--no-viewer`` mode), so even a client that was never + ``close()``d cannot strand a live Node process. The child is not + waited on here — finalizers must not block; the exited child is + reaped by the stdlib's cooperative ``subprocess._cleanup`` sweep. + """ + for stream in (proc.stdin, proc.stdout, proc.stderr): + with contextlib.suppress(Exception): + if stream is not None: + stream.close() + with contextlib.suppress(Exception): + proc.poll() diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_supervisor.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_supervisor.py new file mode 100644 index 000000000..17cac3ff3 --- /dev/null +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_supervisor.py @@ -0,0 +1,446 @@ +"""Process-wide supervision of the MemOS stdio bridge (issue #1910). + +Why this module exists +---------------------- +The Hermes host gives no guarantee about provider lifecycles: it builds a +fresh ``MemTensorProvider`` on every ``load_memory_provider()`` call, and +older hosts forked review/delegate agents that rebuilt the provider per +turn and abandoned it without ever calling ``shutdown()``. When each +provider instance owned its own Node bridge subprocess, every abandoned +instance leaked one live ``bridge.cjs`` process — and its private +keepalive thread resurrected the process even after a manual ``kill``. + +The invariant enforced here: **at most one live bridge client per host +process**, regardless of how many provider instances exist. Ownership is +moved off the provider instances onto a module-level supervisor: + +* ``BridgeSupervisor`` — refcounted (via ``WeakSet`` of holders) shared + client with generation-aware replacement, plus a single keepalive + thread that respawns the *shared* client after a crash and stops once + the last holder releases. +* a weakref provider registry + once-per-process host hook installation, + so N provider instances no longer append N copies of bound-method + hooks to the host's global plugin manager (which both duplicated hook + work and pinned abandoned instances forever). + +The bridge JSON-RPC protocol is multi-session (``session.open`` carries +``sessionId``), so sharing one client across providers is semantically +free — the ``--daemon`` viewer process already shares a core the same way. +""" + +from __future__ import annotations + +import contextlib +import logging +import threading +import weakref + +from typing import TYPE_CHECKING, Any + + +if TYPE_CHECKING: + from collections.abc import Callable + + +logger = logging.getLogger(__name__) + +KEEPALIVE_INTERVAL_SEC = 5.0 +KEEPALIVE_PING_TIMEOUT_SEC = 10.0 + + +def transport_closed(err: Exception) -> bool: + """Classify an exception as "the bridge pipe is gone".""" + if getattr(err, "code", "") == "transport_closed": + return True + msg = str(err).lower() + return "broken pipe" in msg or "bridge closed" in msg or "transport_closed" in msg + + +class BridgeSupervisor: + """Owns the single shared bridge client for this host process. + + Holders are provider instances tracked in a ``WeakSet`` — an + abandoned provider that gets garbage-collected silently drops its + hold, so a leaky host can no longer keep the refcount pinned. + The client is closed when the last holder releases; the host + process exiting closes the child's stdin either way, which the + bridge treats as a graceful shutdown signal. + """ + + def __init__(self) -> None: + self._lock = threading.RLock() + self._client: Any = None + self._holders: weakref.WeakSet[Any] = weakref.WeakSet() + self._factory: Callable[[], Any] | None = None + self._keepalive_stop = threading.Event() + self._keepalive_thread: threading.Thread | None = None + + # ─── Client lifecycle ────────────────────────────────────────────── + + def peek(self) -> Any: + """Return the current shared client (or None) without side effects.""" + with self._lock: + return self._client + + def acquire(self, holder: Any, factory: Callable[[], Any]) -> tuple[Any, bool]: + """Register ``holder`` and return ``(client, created)``. + + Reuses the existing shared client when one is live; otherwise + creates it via ``factory``. Idempotent per holder. + """ + with self._lock: + self._factory = factory + created = False + if self._client is None: + self._client = factory() + created = True + logger.info( + "MemOS: shared bridge client started (pid=%s)", + getattr(self._client, "pid", "?"), + ) + self._holders.add(holder) + self._ensure_keepalive_locked() + return self._client, created + + def replace(self, holder: Any, stale: Any, factory: Callable[[], Any]) -> tuple[Any, bool]: + """Replace the shared client iff ``stale`` is still the current one. + + If another caller already replaced it, adopt the newer client + instead of spawning a third. Returns ``(client, created)``. + ``stale`` is always closed (idempotent) once it is no longer the + shared client. + """ + to_close: Any = None + with self._lock: + self._factory = factory + current = self._client + if current is not None and current is not stale: + client, created = current, False + to_close = stale + else: + self._client = None + client = factory() + self._client = client + created = True + to_close = current if current is not None else stale + logger.info( + "MemOS: shared bridge client replaced (old pid=%s, new pid=%s)", + getattr(to_close, "pid", "?") if to_close is not None else "-", + getattr(client, "pid", "?"), + ) + self._holders.add(holder) + self._ensure_keepalive_locked() + if to_close is not None and to_close is not client: + self._close_quietly(to_close) + return client, created + + def release(self, holder: Any) -> None: + """Drop ``holder``; close the shared client when no holders remain.""" + to_close: Any = None + with self._lock: + self._holders.discard(holder) + if len(self._holders) == 0: + to_close = self._client + self._client = None + self._stop_keepalive_locked() + if to_close is not None: + logger.info( + "MemOS: last holder released — closing shared bridge (pid=%s)", + getattr(to_close, "pid", "?"), + ) + self._close_quietly(to_close) + + def discard(self, holder: Any, client: Any) -> None: + """Failure path: drop ``holder`` and close ``client`` it created.""" + with self._lock: + self._holders.discard(holder) + if self._client is client: + self._client = None + if len(self._holders) == 0: + self._stop_keepalive_locked() + if client is not None: + self._close_quietly(client) + + def respawn(self, stale: Any) -> Any: + """Keepalive path: replace a dead shared client, holders unchanged. + + Returns the replacement (or the already-newer current client), + or None when no factory is known, no holders remain, or the + spawn failed. + """ + to_close: Any = None + with self._lock: + factory = self._factory + if factory is None: + return None + # Hard invariant: zero holders ⇒ no live client. Without this + # guard, a respawn racing the last release() (the health ping + # can be mid-flight for up to its timeout) would revive a + # client that no holder will ever close — the exact leak + # class this module exists to eliminate. + if len(self._holders) == 0: + return None + current = self._client + if current is not None and current is not stale: + return current + self._client = None + try: + client = factory() + except Exception as err: + logger.warning("MemOS: bridge respawn failed — %s", err) + return None + self._client = client + to_close = current if current is not None else stale + logger.info( + "MemOS: keepalive respawned shared bridge (old pid=%s, new pid=%s)", + getattr(to_close, "pid", "?") if to_close is not None else "-", + getattr(client, "pid", "?"), + ) + if to_close is not None: + self._close_quietly(to_close) + return client + + @staticmethod + def _close_quietly(client: Any) -> None: + with contextlib.suppress(Exception): + client.close() + + # ─── Keepalive (one thread per process, not per provider) ───────── + + def _ensure_keepalive_locked(self) -> None: + if self._keepalive_thread is not None and self._keepalive_thread.is_alive(): + return + self._keepalive_stop = threading.Event() + self._keepalive_thread = threading.Thread( + target=self._keepalive_loop, + args=(self._keepalive_stop,), + daemon=True, + name="memos-bridge-keepalive", + ) + self._keepalive_thread.start() + + def _stop_keepalive_locked(self) -> None: + self._keepalive_stop.set() + # Drop the reference so the next _ensure spawns a fresh thread + # (with a fresh stop event) even while the old one is still on + # its way out — checking is_alive() alone races with a rapid + # release()→acquire() and would leave the new holder without + # any keepalive. The old thread exits on its own captured event. + self._keepalive_thread = None + + def _keepalive_loop(self, stop: threading.Event) -> None: + respawn_failures = 0 + backoff_cycles = 0 + while not stop.wait(KEEPALIVE_INTERVAL_SEC): + client = self.peek() + if client is None: + # A failed reconnect can discard the shared client while + # holders remain (e.g. session.open timed out on a fresh + # spawn). Recreate proactively so recovery doesn't have + # to wait for the next turn's reactive reconnect. + with self._lock: + idle = len(self._holders) == 0 + if idle: + respawn_failures = 0 + backoff_cycles = 0 + continue + # Exponential backoff when the factory keeps failing + # (e.g. Node uninstalled) so a permanently-broken + # environment doesn't warn every interval forever. + if backoff_cycles > 0: + backoff_cycles -= 1 + continue + logger.log( + logging.INFO if respawn_failures == 0 else logging.DEBUG, + "MemOS: shared bridge missing with live holders; respawning", + ) + replacement = self.respawn(None) + if replacement is None: + respawn_failures += 1 + backoff_cycles = min(2**respawn_failures, 12) - 1 + continue + else: + try: + client.request("core.health", {}, timeout=KEEPALIVE_PING_TIMEOUT_SEC) + respawn_failures = 0 + backoff_cycles = 0 + continue + except Exception as err: + if not transport_closed(err): + logger.debug("MemOS: bridge keepalive ping failed — %s", err) + continue + logger.info("MemOS: shared bridge transport closed; respawning") + replacement = self.respawn(client) + if replacement is None: + continue + respawn_failures = 0 + backoff_cycles = 0 + # Re-register host handlers eagerly so reverse RPC + # (host.llm.complete) works during the replacement's startup + # recovery; providers re-open their sessions lazily via + # `_ensure_bridge` on next use. + for provider in live_providers(): + with contextlib.suppress(Exception): + provider._bind_host_handlers(replacement) + + +supervisor = BridgeSupervisor() + + +# ─── Provider registry + once-per-process host hook installation ───────── + +_registry_lock = threading.Lock() +_provider_refs: list[weakref.ref[Any]] = [] +_hooks_installed = False +_hooked_manager_ref: weakref.ref[Any] | None = None + +_HOOK_NAMES = ("post_tool_call", "post_llm_call", "transform_tool_result") + + +def register_provider(provider: Any) -> None: + """Track a live provider (weakly) for hook dispatch + handler rebinds.""" + with _registry_lock: + _prune_locked() + if not any(ref() is provider for ref in _provider_refs): + _provider_refs.append(weakref.ref(provider)) + + +def unregister_provider(provider: Any) -> None: + with _registry_lock: + _provider_refs[:] = [ + ref for ref in _provider_refs if ref() is not None and ref() is not provider + ] + + +def live_providers() -> list[Any]: + with _registry_lock: + _prune_locked() + return [p for p in (ref() for ref in _provider_refs) if p is not None] + + +def _prune_locked() -> None: + _provider_refs[:] = [ref for ref in _provider_refs if ref() is not None] + + +def install_host_hooks() -> bool: + """Ensure our dispatchers sit in the hermes plugin manager exactly once. + + The previous design appended three bound methods *per provider + instance* and never removed them — every abandoned instance stayed + strongly referenced by the host forever. Dispatchers are module-level + functions; provider instances are reached weakly via the registry. + + Installation is idempotent *and self-healing*: instead of a boolean + short-circuit, each call verifies the dispatchers are actually + present in the *current* manager's hook lists — hosts can rebuild + the plugin manager or clear its ``_hooks`` on plugin reload, which + would otherwise leave us silently unhooked. + """ + global _hooks_installed, _hooked_manager_ref + with _registry_lock: + try: + from hermes_cli.plugins import ( + get_plugin_manager, # pyright: ignore[reportMissingImports] + ) + + mgr = get_plugin_manager() + hooks = mgr._hooks + installed_any = False + for name, dispatcher in ( + ("post_tool_call", _dispatch_post_tool_call), + ("post_llm_call", _dispatch_post_llm_call), + ("transform_tool_result", _dispatch_transform_tool_result), + ): + callbacks = hooks.setdefault(name, []) + if dispatcher not in callbacks: + callbacks.append(dispatcher) + installed_any = True + _hooks_installed = True + _hooked_manager_ref = weakref.ref(mgr) + if installed_any: + logger.debug( + "MemOS: installed post_tool_call + post_llm_call + " + "transform_tool_result dispatchers (process-wide, once)" + ) + return True + except Exception as err: + logger.debug("MemOS: could not install host hooks — %s", err) + return False + + +def _dispatch_post_tool_call(*args: Any, **kwargs: Any) -> None: + for provider in live_providers(): + try: + provider._on_post_tool_call(*args, **kwargs) + except Exception: + logger.debug("MemOS: post_tool_call dispatch failed", exc_info=True) + + +def _dispatch_post_llm_call(*args: Any, **kwargs: Any) -> None: + for provider in live_providers(): + try: + provider._on_post_llm_call(*args, **kwargs) + except Exception: + logger.debug("MemOS: post_llm_call dispatch failed", exc_info=True) + + +def _dispatch_transform_tool_result(*args: Any, **kwargs: Any) -> str | None: + """First non-None transform wins. + + Deliberate convergence from the old per-instance chain: providers for + other sessions return None via their ``_matches_session`` guard, and + the only non-None transform (the repeated-failure repair hint) is + identical and de-duplicated across providers, so first-wins matches + the old chain's net effect without N copies of the hook. + """ + for provider in live_providers(): + try: + result = provider._on_transform_tool_result(*args, **kwargs) + except Exception: + logger.debug("MemOS: transform_tool_result dispatch failed", exc_info=True) + continue + if result is not None: + return result + return None + + +# ─── Test support ───────────────────────────────────────────────────────── + + +def reset_for_tests() -> None: + """Tear down all process-wide state. Test-only.""" + global _hooks_installed, _hooked_manager_ref + with supervisor._lock: + client = supervisor._client + # Capture before _stop_keepalive_locked clears the reference, + # or the join below would silently never run. + thread = supervisor._keepalive_thread + supervisor._client = None + supervisor._factory = None + supervisor._holders = weakref.WeakSet() + supervisor._stop_keepalive_locked() + if client is not None: + with contextlib.suppress(Exception): + client.close() + if thread is not None and thread.is_alive(): + thread.join(timeout=2.0) + with _registry_lock: + _provider_refs.clear() + mgr = _hooked_manager_ref() if _hooked_manager_ref is not None else None + if mgr is not None: + hooks = getattr(mgr, "_hooks", {}) + for name in _HOOK_NAMES: + callbacks = hooks.get(name) + if isinstance(callbacks, list): + hooks[name] = [ + cb + for cb in callbacks + if cb + not in ( + _dispatch_post_tool_call, + _dispatch_post_llm_call, + _dispatch_transform_tool_result, + ) + ] + _hooks_installed = False + _hooked_manager_ref = None diff --git a/apps/memos-local-plugin/tests/python/test_bridge_client.py b/apps/memos-local-plugin/tests/python/test_bridge_client.py index b47d5660e..7c2b51d41 100644 --- a/apps/memos-local-plugin/tests/python/test_bridge_client.py +++ b/apps/memos-local-plugin/tests/python/test_bridge_client.py @@ -295,6 +295,17 @@ def test_close_is_idempotent(self) -> None: client.close() client.close() # second call must not raise + def test_handle_line_ignores_non_object_json(self) -> None: + """Valid-but-non-object JSON (null, list, …) must not kill the + reader thread via dict-key access on a non-dict.""" + client = MemosBridgeClient(bridge_path="/tmp/bridge.cts") + for raw in ("null", "[1, 2]", '"just a string"', "42"): + client._handle_line(raw) # must not raise + # Reader path still functional afterwards. + res = client.request("core.health") + self.assertTrue(res["ok"]) + client.close() + def test_stdio_bridge_starts_without_viewer_by_default(self) -> None: client = MemosBridgeClient(bridge_path="/tmp/bridge.cts") assert self._fake is not None @@ -353,6 +364,7 @@ def setUp(self) -> None: import memos_provider self._provider_mod = memos_provider + self._reset_bridge_runtime() self._patches = [ patch("memos_provider.ensure_bridge_running", return_value=True), @@ -364,6 +376,13 @@ def setUp(self) -> None: def tearDown(self) -> None: for p in self._patches: p.stop() + self._reset_bridge_runtime() + + def _reset_bridge_runtime(self) -> None: + # The bridge client is shared process-wide; isolate tests. + reset = getattr(self._provider_mod, "_reset_bridge_runtime_for_tests", None) + if callable(reset): + reset() def test_is_available_returns_true_when_bridge_ok(self) -> None: p = self._provider_mod.MemTensorProvider() diff --git a/apps/memos-local-plugin/tests/python/test_hermes_bridge_lifecycle.py b/apps/memos-local-plugin/tests/python/test_hermes_bridge_lifecycle.py new file mode 100644 index 000000000..0a7c6e9b1 --- /dev/null +++ b/apps/memos-local-plugin/tests/python/test_hermes_bridge_lifecycle.py @@ -0,0 +1,435 @@ +"""Bridge lifecycle tests for issue #1910 — process leak hardening. + +The invariant under test: no matter how many ``MemTensorProvider`` +instances a host constructs (old hermes-agent versions rebuilt one per +background-review fork, i.e. per turn), the adapter must keep **at most +one** live bridge client per host process, must not pin abandoned +provider instances in memory, and must stop all of its threads once the +last provider shuts down. + +These tests run against fake bridges (no Node subprocess), mirroring the +conventions in ``test_hermes_provider_pipeline.py``. +""" + +from __future__ import annotations + +import gc +import sys +import threading +import time +import types +import unittest +import weakref + +from pathlib import Path +from unittest.mock import MagicMock, patch + + +_ADAPTER_ROOT = Path(__file__).resolve().parent.parent.parent / "adapters" / "hermes" +_PLUGIN_DIR = _ADAPTER_ROOT / "memos_provider" +for _p in (_ADAPTER_ROOT, _PLUGIN_DIR): + if str(_p) not in sys.path: + sys.path.insert(0, str(_p)) + +import memos_provider # noqa: E402 + +from bridge_client import BridgeError # noqa: E402 + + +class FakeBridge: + """Minimal JSON-RPC bridge double matching the pipeline-test fake.""" + + def __init__(self) -> None: + self.calls: list[tuple[str, dict]] = [] + self.closed = False + self.host_handlers: dict[str, object] = {} + + def register_host_handler(self, method: str, handler: object) -> None: + self.host_handlers[method] = handler + + def request(self, method: str, params: dict | None = None, **_kwargs: object) -> dict: + payload = params or {} + self.calls.append((method, payload)) + if method == "session.open": + return {"sessionId": payload.get("sessionId") or "hermes:test-session"} + if method == "turn.start": + return {"query": {"episodeId": "ep-1"}, "injectedContext": "ctx"} + if method == "turn.end": + return {"traceId": "tr-1", "episodeId": "ep-1"} + return {"ok": True} + + def close(self) -> None: + self.closed = True + + +class DeadOnHealthBridge(FakeBridge): + """Simulates a crashed bridge process: every health ping pipe-breaks.""" + + def request(self, method: str, params: dict | None = None, **_kwargs: object) -> dict: + if method == "core.health": + raise BridgeError("transport_closed", "[Errno 32] Broken pipe") + return super().request(method, params, **_kwargs) + + +def _reset_bridge_runtime() -> None: + reset = getattr(memos_provider, "_reset_bridge_runtime_for_tests", None) + if callable(reset): + reset() + + +def _keepalive_threads() -> list[threading.Thread]: + return [t for t in threading.enumerate() if t.name == "memos-bridge-keepalive" and t.is_alive()] + + +class _FakeHermesPluginHost: + """Stands in for hermes_cli.plugins.get_plugin_manager().""" + + def __init__(self) -> None: + self.manager = types.SimpleNamespace(_hooks={}) + + +class BridgeLifecycleTests(unittest.TestCase): + def setUp(self) -> None: + _reset_bridge_runtime() + self.addCleanup(_reset_bridge_runtime) + + self._daemon_patches = [ + patch("memos_provider.ensure_bridge_running", return_value=True), + patch("memos_provider.ensure_viewer_daemon", return_value=True), + ] + for p in self._daemon_patches: + p.start() + self.addCleanup(p.stop) + + # Fake hermes host plugin manager so hook registration exercises + # the real (global, deduplicated) registration path. + self.plugin_host = _FakeHermesPluginHost() + hermes_plugins = types.ModuleType("hermes_cli.plugins") + hermes_plugins.get_plugin_manager = lambda: self.plugin_host.manager + hermes_cli = types.ModuleType("hermes_cli") + hermes_cli.plugins = hermes_plugins + self._saved_modules = { + name: sys.modules.get(name) for name in ("hermes_cli", "hermes_cli.plugins") + } + sys.modules["hermes_cli"] = hermes_cli + sys.modules["hermes_cli.plugins"] = hermes_plugins + self.addCleanup(self._restore_modules) + + def _restore_modules(self) -> None: + for name, mod in self._saved_modules.items(): + if mod is None: + sys.modules.pop(name, None) + else: + sys.modules[name] = mod + + def _factory(self, bridges: list[FakeBridge] | None = None) -> MagicMock: + made: list[FakeBridge] = [] + + def _make() -> FakeBridge: + bridge = bridges.pop(0) if bridges else FakeBridge() + made.append(bridge) + return bridge + + mock = MagicMock(side_effect=_make) + mock.made = made # type: ignore[attr-defined] + return mock + + def _initialized(self, session: str) -> memos_provider.MemTensorProvider: + provider = memos_provider.MemTensorProvider() + provider.initialize(session, hermes_home="/tmp/hermes-test", platform="cli") + return provider + + # ─── One bridge per process, no matter how many providers ──────────── + + def test_many_provider_instances_share_single_bridge_client(self) -> None: + factory = self._factory() + with patch("memos_provider.MemosBridgeClient", factory): + providers = [self._initialized(f"session-{i}") for i in range(3)] + + self.assertEqual(factory.call_count, 1) + bridges = {id(p._bridge) for p in providers} + self.assertEqual(len(bridges), 1) + # Every provider still opened its own session on the shared bridge. + shared = providers[0]._bridge + opened = [params for method, params in shared.calls if method == "session.open"] + self.assertEqual( + sorted(p.get("sessionId") for p in opened), + ["session-0", "session-1", "session-2"], + ) + for p in providers: + p.shutdown() + + def test_reinitialize_same_provider_does_not_spawn_second_client(self) -> None: + factory = self._factory() + with patch("memos_provider.MemosBridgeClient", factory): + provider = self._initialized("session-a") + first = provider._bridge + provider.initialize("session-a-reinit", hermes_home="/tmp/hermes-test") + + self.assertEqual(factory.call_count, 1) + self.assertIs(provider._bridge, first) + self.assertFalse(first.closed) + provider.shutdown() + + # ─── Orderly shutdown of the shared client ──────────────────────────── + + def test_shutdown_of_last_provider_closes_shared_client_and_keepalive(self) -> None: + factory = self._factory() + with patch("memos_provider.MemosBridgeClient", factory): + a = self._initialized("session-a") + b = self._initialized("session-b") + shared = a._bridge + + a.shutdown() + self.assertFalse(shared.closed, "client must survive while b is live") + + b.shutdown() + self.assertTrue(shared.closed) + + deadline = time.time() + 3.0 + while time.time() < deadline and _keepalive_threads(): + time.sleep(0.02) + self.assertEqual(_keepalive_threads(), []) + + # ─── Host hook hygiene ──────────────────────────────────────────────── + + def test_global_hooks_registered_once_for_many_providers(self) -> None: + factory = self._factory() + with patch("memos_provider.MemosBridgeClient", factory): + providers = [self._initialized(f"session-{i}") for i in range(3)] + + hooks = self.plugin_host.manager._hooks + for hook_name in ("post_tool_call", "post_llm_call", "transform_tool_result"): + self.assertEqual( + len(hooks.get(hook_name, [])), + 1, + f"hook {hook_name} must be installed exactly once, " + f"got {len(hooks.get(hook_name, []))}", + ) + for p in providers: + p.shutdown() + + def test_hooks_reinstall_after_host_manager_hooks_cleared(self) -> None: + """Hosts can rebuild the plugin manager or clear its _hooks on + plugin reload; the next provider initialize must self-heal the + dispatcher installation instead of short-circuiting forever.""" + factory = self._factory() + with patch("memos_provider.MemosBridgeClient", factory): + a = self._initialized("session-a") + # Simulate `hermes plugins reload`: hook lists wiped. + self.plugin_host.manager._hooks.clear() + + b = self._initialized("session-b") + + hooks = self.plugin_host.manager._hooks + for hook_name in ("post_tool_call", "post_llm_call", "transform_tool_result"): + self.assertEqual( + len(hooks.get(hook_name, [])), + 1, + f"hook {hook_name} must be re-installed exactly once after a wipe", + ) + a.shutdown() + b.shutdown() + + def test_reset_for_tests_joins_keepalive_deterministically(self) -> None: + factory = self._factory() + with patch("memos_provider.MemosBridgeClient", factory): + self._initialized("session-a") + self.assertTrue(_keepalive_threads(), "keepalive must be running") + _reset_bridge_runtime() + # No polling loop: reset itself must have joined the thread. + self.assertEqual(_keepalive_threads(), []) + + def test_hook_dispatch_still_reaches_the_matching_provider(self) -> None: + factory = self._factory() + with patch("memos_provider.MemosBridgeClient", factory): + provider = self._initialized("session-a") + provider.on_turn_start(1, "task") + + for cb in self.plugin_host.manager._hooks.get("post_tool_call", []): + cb( + tool_name="terminal", + args={"cmd": "ls"}, + result="ok", + tool_call_id="t1", + session_id="session-a", + ) + + self.assertEqual([tc["name"] for tc in provider._tool_calls], ["terminal"]) + provider.shutdown() + + # ─── Abandoned providers must be collectable, not immortal ─────────── + + def test_abandoned_provider_is_garbage_collectable(self) -> None: + factory = self._factory() + with patch("memos_provider.MemosBridgeClient", factory): + provider = self._initialized("session-abandoned") + ref = weakref.ref(provider) + # Host drops the provider without ever calling shutdown() — + # exactly what pre-#27190 hermes review forks did every turn. + del provider + gc.collect() + + self.assertIsNone( + ref(), + "abandoned provider must be garbage-collectable; something " + "(global hooks / keepalive thread / host handler) is pinning it", + ) + + # ─── Crash recovery without per-orphan resurrection ────────────────── + + def test_keepalive_respawns_dead_shared_bridge_exactly_once(self) -> None: + import bridge_supervisor as supervisor_mod + + factory = self._factory(bridges=[DeadOnHealthBridge(), FakeBridge()]) + with ( + patch.object(supervisor_mod, "KEEPALIVE_INTERVAL_SEC", 0.05), + patch("memos_provider.MemosBridgeClient", factory), + ): + provider = self._initialized("session-a") + dead = provider._bridge + + deadline = time.time() + 3.0 + while time.time() < deadline and factory.call_count < 2: + time.sleep(0.02) + + self.assertEqual(factory.call_count, 2) + self.assertTrue(dead.closed) + # The replacement is healthy, so no further respawn happens. + time.sleep(0.3) + self.assertEqual(factory.call_count, 2) + provider.shutdown() + + def test_respawn_after_last_release_does_not_revive_client(self) -> None: + """Race guard: a health ping can be mid-flight while the last + provider shuts down; the subsequent respawn must not revive a + zero-holder client that nobody will ever close.""" + import bridge_supervisor as supervisor_mod + + factory = self._factory() + with patch("memos_provider.MemosBridgeClient", factory): + provider = self._initialized("session-a") + client = provider._bridge + provider.shutdown() + + revived = supervisor_mod.supervisor.respawn(client) + + self.assertIsNone(revived) + self.assertIsNone(supervisor_mod.supervisor.peek()) + self.assertEqual(factory.call_count, 1) + + def test_keepalive_recreates_client_lost_while_holders_remain(self) -> None: + """A failed reconnect can discard the shared client while other + holders survive; the keepalive must recreate it proactively + instead of idling on None forever.""" + import bridge_supervisor as supervisor_mod + + factory = self._factory() + with ( + patch.object(supervisor_mod, "KEEPALIVE_INTERVAL_SEC", 0.05), + patch("memos_provider.MemosBridgeClient", factory), + ): + a = self._initialized("session-a") + b = self._initialized("session-b") + shared = a._bridge + + # B's reconnect created-and-failed path discards the client. + supervisor_mod.supervisor.discard(b, shared) + self.assertIsNone(supervisor_mod.supervisor.peek()) + + deadline = time.time() + 3.0 + while time.time() < deadline and supervisor_mod.supervisor.peek() is None: + time.sleep(0.02) + + self.assertIsNotNone( + supervisor_mod.supervisor.peek(), + "keepalive must respawn a client while holders remain", + ) + self.assertEqual(factory.call_count, 2) + a.shutdown() + b.shutdown() + + def test_rapid_release_then_acquire_keeps_keepalive_alive(self) -> None: + """Stop-event reuse race: shutdown() of the last holder followed + immediately by a fresh initialize() (same keepalive interval) + must leave the new holder with a working keepalive — the old + thread may still report is_alive() while exiting.""" + import bridge_supervisor as supervisor_mod + + factory = self._factory(bridges=[FakeBridge(), DeadOnHealthBridge(), FakeBridge()]) + with ( + patch.object(supervisor_mod, "KEEPALIVE_INTERVAL_SEC", 0.05), + patch("memos_provider.MemosBridgeClient", factory), + ): + first = self._initialized("session-a") + first.shutdown() + # Immediately re-enter: the previous keepalive thread can + # still be alive at this point with its stop event set. + second = self._initialized("session-b") + + self.assertFalse(supervisor_mod.supervisor._keepalive_stop.is_set()) + self.assertTrue( + supervisor_mod.supervisor._keepalive_thread is not None + and supervisor_mod.supervisor._keepalive_thread.is_alive() + ) + + # The fresh keepalive must actually guard the new client: + # its bridge dies on health ping and gets respawned. + deadline = time.time() + 3.0 + while time.time() < deadline and factory.call_count < 3: + time.sleep(0.02) + self.assertEqual(factory.call_count, 3) + second.shutdown() + + def test_host_llm_handler_falls_through_when_registrant_was_dropped(self) -> None: + """host.llm.complete is last-registrant-wins on the shared client; + if that registrant is GC'd, the weak wrapper must route to a + surviving provider instead of failing the whole fallback path.""" + factory = self._factory() + with patch("memos_provider.MemosBridgeClient", factory): + survivor = self._initialized("session-a") + survivor._handle_host_llm_complete = lambda params: {"text": "from-survivor"} + + dropped = self._initialized("session-b") # registers last, wins + shared = dropped._bridge + del dropped + gc.collect() + + handler = shared.host_handlers["host.llm.complete"] + result = handler({"messages": [{"role": "user", "content": "ping"}]}) + + self.assertEqual(result, {"text": "from-survivor"}) + survivor.shutdown() + + def test_stale_provider_binding_rebinds_to_current_shared_client(self) -> None: + factory = self._factory() + with patch("memos_provider.MemosBridgeClient", factory): + a = self._initialized("session-a") + b = self._initialized("session-b") + stale = b._bridge + + # A detects a dead pipe and reconnects: the shared client is + # replaced exactly once. + a._reconnect_bridge("session-a") + self.assertEqual(factory.call_count, 2) + current = a._bridge + self.assertIsNot(current, stale) + self.assertTrue(stale.closed) + + # B lazily rebinds to the replacement instead of spawning a third. + self.assertTrue(b._ensure_bridge("session-b")) + self.assertIs(b._bridge, current) + self.assertEqual(factory.call_count, 2) + reopened = [ + params + for method, params in current.calls + if method == "session.open" and params.get("sessionId") == "session-b" + ] + self.assertEqual(len(reopened), 1) + + a.shutdown() + b.shutdown() + + +if __name__ == "__main__": + unittest.main() diff --git a/apps/memos-local-plugin/tests/python/test_hermes_provider_pipeline.py b/apps/memos-local-plugin/tests/python/test_hermes_provider_pipeline.py index 149cea586..8933ae436 100644 --- a/apps/memos-local-plugin/tests/python/test_hermes_provider_pipeline.py +++ b/apps/memos-local-plugin/tests/python/test_hermes_provider_pipeline.py @@ -67,7 +67,18 @@ def request(self, method: str, params: dict | None = None, **_kwargs: object) -> return super().request(method, params, **_kwargs) +def _reset_bridge_runtime() -> None: + reset = getattr(memos_provider, "_reset_bridge_runtime_for_tests", None) + if callable(reset): + reset() + + class HermesProviderPipelineTests(unittest.TestCase): + def setUp(self) -> None: + # The bridge client is shared process-wide; isolate tests. + _reset_bridge_runtime() + self.addCleanup(_reset_bridge_runtime) + def test_lifecycle_persists_turn_and_closes_real_episode(self) -> None: bridge = FakeBridge() with ( @@ -114,6 +125,9 @@ def test_lifecycle_persists_turn_and_closes_real_episode(self) -> None: turn_end = next(params for method, params in bridge.calls if method == "turn.end") self.assertEqual(turn_end["agent"], "hermes") + # The core returns a singular `traceId` (memory-core.ts onTurnEnd); + # it must be captured for verifier-feedback trace binding. + self.assertEqual(provider._last_trace_id, "trace-1") self.assertEqual(turn_end["sessionId"], "host-session") self.assertEqual(turn_end["episodeId"], "episode-from-turn-start") self.assertIn("HERMES_MEMOS_E2E_0428", turn_end["userText"])