Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 109 additions & 105 deletions apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import sys
import threading
import time
import weakref

from pathlib import Path
from typing import Any
Expand All @@ -63,7 +64,9 @@
if str(_PLUGIN_DIR) not in sys.path:
sys.path.insert(0, str(_PLUGIN_DIR))

from bridge_client import BridgeError, MemosBridgeClient # noqa: E402
import bridge_supervisor # noqa: E402

from bridge_client import MemosBridgeClient # noqa: E402
from daemon_manager import ensure_bridge_running, ensure_viewer_daemon # noqa: E402


Expand Down Expand Up @@ -269,8 +272,9 @@ def __init__(self) -> None:
# show the model's thinking like OpenClaw does.
self._turn_thinking: str = ""
self._hook_registered = False
self._bridge_keepalive_stop = threading.Event()
self._bridge_keepalive_thread: threading.Thread | None = None
# Set while `shutdown()` runs so concurrent reconnect attempts
# don't respawn a bridge we are in the middle of releasing.
self._closing = threading.Event()
# Hermes runs background memory/skill reviewers by forking an agent and
# appending a synthetic user turn. That turn is instruction plumbing,
# not a human utterance, so it must not become a MemOS trace.
Expand Down Expand Up @@ -309,6 +313,7 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov
self._hermes_home = str(kwargs.get("hermes_home") or "")
self._platform = str(kwargs.get("platform") or "cli")
self._agent_identity = str(kwargs.get("agent_identity") or "hermes")
self._closing.clear()
try:
ensure_bridge_running()
except Exception as err:
Expand All @@ -318,37 +323,42 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov
ensure_viewer_daemon()
except Exception as err:
logger.warning("MemOS: viewer daemon check failed — %s", err)
new_bridge: MemosBridgeClient | None = None
# The bridge client is a process-wide shared resource owned by the
# supervisor: hosts may construct any number of provider instances
# (and may never call shutdown() on some of them), but only one
# Node subprocess exists per host process. See bridge_supervisor.
client: MemosBridgeClient | None = None
created = False
try:
new_bridge = MemosBridgeClient()
client, created = bridge_supervisor.supervisor.acquire(
self, lambda: MemosBridgeClient()
)
# Register the fallback LLM handler BEFORE we open the
# session so it is available the very first time the
# plugin's facade asks for help (e.g. on the first
# `turn.start` retrieval call).
new_bridge.register_host_handler(
"host.llm.complete",
self._handle_host_llm_complete,
)
self._bridge = new_bridge
self._bind_host_handlers(client)
self._bridge = client
self._open_session(session_id)
logger.info(
"MemOS: bridge ready session=%s platform=%s (episode deferred)",
"MemOS: bridge ready session=%s platform=%s shared_pid=%s (episode deferred)",
self._session_id,
self._platform,
getattr(client, "pid", "?"),
)
except Exception as err:
logger.warning("MemOS: bridge init failed — %s", err)
if new_bridge is not None:
with contextlib.suppress(Exception):
new_bridge.close()
if created:
bridge_supervisor.supervisor.discard(self, client)
else:
bridge_supervisor.supervisor.release(self)
self._bridge = None
# Register a Hermes plugin hook to capture tool calls as they
# happen. The `post_tool_call` hook fires after every tool
# dispatch (write_file, terminal, search_files, etc.) with the
# tool name, arguments, and result. We accumulate them and
# flush in `sync_turn`.
self._register_tool_call_hook()
self._start_bridge_keepalive()

def system_prompt_block(self) -> str: # type: ignore[override]
return (
Expand Down Expand Up @@ -421,25 +431,40 @@ def _record_namespace(self) -> dict[str, Any]:
return ns

def _register_tool_call_hook(self) -> None:
if self._hook_registered:
return
try:
from hermes_cli.plugins import (
get_plugin_manager, # pyright: ignore[reportMissingImports]
)

mgr = get_plugin_manager()
mgr._hooks.setdefault("post_tool_call", []).append(self._on_post_tool_call)
mgr._hooks.setdefault("post_llm_call", []).append(self._on_post_llm_call)
mgr._hooks.setdefault("transform_tool_result", []).append(
self._on_transform_tool_result
)
# The host's plugin manager is process-global; appending bound
# methods per instance both duplicated hook work and strongly
# pinned abandoned provider instances (and their bridge
# subprocesses) forever. Dispatchers are installed once per
# process; this instance is reachable only through a weakref
# registry, so dropping the provider really frees it.
bridge_supervisor.register_provider(self)
if bridge_supervisor.install_host_hooks():
self._hook_registered = True
logger.debug(
"MemOS: registered post_tool_call + post_llm_call + transform_tool_result hooks"
)
except Exception as err:
logger.debug("MemOS: could not register tool hook — %s", err)

def _bind_host_handlers(self, client: Any) -> None:
"""Register reverse-RPC handlers without strongly pinning ``self``.

The shared client outlives any single provider instance; a bound
method stored in its handler table would keep an abandoned
provider (and everything it references) alive for the client's
whole lifetime.
"""
handler_ref = weakref.WeakMethod(self._handle_host_llm_complete)

def _weak_host_llm_complete(params: dict[str, Any]) -> dict[str, Any]:
handler = handler_ref()
if handler is None:
# The registering provider was dropped by the host. The
# handler is session-agnostic (it resolves the host's
# main runtime, not provider state), so any surviving
# provider can serve the fallback instead of turning
# the whole process's LLM fallback path red.
for provider in bridge_supervisor.live_providers():
return provider._handle_host_llm_complete(params)
raise RuntimeError("memory provider has been released")
return handler(params)

client.register_host_handler("host.llm.complete", _weak_host_llm_complete)

def _on_transform_tool_result(
self,
Expand Down Expand Up @@ -1510,20 +1535,17 @@ def on_session_end(self, messages: list[dict[str, Any]]) -> None: # type: ignor
self._bridge.request("session.close", {"sessionId": self._session_id})

def shutdown(self) -> None: # type: ignore[override]
self._bridge_keepalive_stop.set()
if self._bridge_keepalive_thread and self._bridge_keepalive_thread.is_alive():
self._bridge_keepalive_thread.join(
timeout=12.0
) # Increased to cover health check timeout (10s) + margin
self._closing.set()
if self._prefetch_thread and self._prefetch_thread.is_alive():
self._prefetch_thread.join(timeout=5.0)
if self._bridge:
pid = getattr(self._bridge, "pid", "?")
logger.info("MemOS: shutting down bridge (pid=%s)", pid)
with contextlib.suppress(Exception):
self._bridge.close()
self._bridge = None
logger.info("MemOS: bridge shutdown complete (pid=%s)", pid)
bridge_supervisor.unregister_provider(self)
pid = getattr(self._bridge, "pid", "?") if self._bridge else "-"
self._bridge = None
# The supervisor closes the shared client (and stops the
# process-wide keepalive) only when the last holder releases.
logger.info("MemOS: releasing shared bridge (pid=%s)", pid)
bridge_supervisor.supervisor.release(self)
logger.info("MemOS: bridge release complete (pid=%s)", pid)

# ─── Host LLM bridge (fallback for plugin-side model failures) ────────

Expand Down Expand Up @@ -1719,58 +1741,58 @@ def _open_session(self, session_id: str = "", *, timeout: float = 30.0) -> None:
self._session_id = resp.get("sessionId") or requested_session

def _is_transport_closed(self, err: Exception) -> bool:
if isinstance(err, BridgeError) and err.code == "transport_closed":
return True
msg = str(err).lower()
return "broken pipe" in msg or "bridge closed" in msg or "transport_closed" in msg
return bridge_supervisor.transport_closed(err)

def _reconnect_bridge(self, session_id: str = "", *, timeout: float = 30.0) -> None:
# Don't reconnect if we're shutting down
if self._bridge_keepalive_stop.is_set():
if self._closing.is_set():
logger.debug("MemOS: skipping reconnect during shutdown")
return

with self._reconnect_lock:
# Double-check after acquiring lock
if self._bridge_keepalive_stop.is_set():
if self._closing.is_set():
logger.debug("MemOS: skipping reconnect during shutdown (after lock)")
return

old_bridge = self._bridge
old_pid = getattr(old_bridge, "pid", None) if old_bridge else None

if old_bridge:
logger.info("MemOS: closing old bridge (pid=%s)", old_pid)
with contextlib.suppress(Exception):
old_bridge.close()
logger.info("MemOS: old bridge closed (pid=%s)", old_pid)

stale = self._bridge
ensure_bridge_running()
try:
ensure_viewer_daemon()
except Exception as err:
logger.warning("MemOS: viewer daemon check failed during reconnect — %s", err)
new_bridge: MemosBridgeClient | None = None
# Generation-aware replacement: if another provider already
# replaced the dead shared client, adopt its replacement
# instead of spawning yet another process.
client, created = bridge_supervisor.supervisor.replace(
self, stale, lambda: MemosBridgeClient()
)
try:
new_bridge = MemosBridgeClient()
logger.info("MemOS: new bridge created (pid=%s)", getattr(new_bridge, "pid", "?"))

new_bridge.register_host_handler(
"host.llm.complete",
self._handle_host_llm_complete,
)
self._bridge = new_bridge
self._bind_host_handlers(client)
self._bridge = client
self._open_session(session_id, timeout=timeout)
# The episode we were tracking lived in the replaced
# binding; clear it so the next turn.start (or the
# sync_turn retry) re-establishes it on this client.
self._episode_id = ""
except Exception:
if new_bridge is not None:
with contextlib.suppress(Exception):
new_bridge.close()
if self._bridge is new_bridge:
if created:
bridge_supervisor.supervisor.discard(self, client)
else:
bridge_supervisor.supervisor.release(self)
if self._bridge is client:
self._bridge = None
raise

def _ensure_bridge(self, session_id: str = "", *, timeout: float = 30.0) -> bool:
if self._bridge:
# A binding goes stale when the supervisor holds a *newer* shared
# client — e.g. the keepalive (or another provider) respawned it
# after a crash. Rebind eagerly then, re-opening our session.
# When the supervisor has no current client, keep trusting the
# binding we hold: request failures will route through the
# reactive transport-closed reconnect in `sync_turn`.
current = bridge_supervisor.supervisor.peek()
if self._bridge is not None and (current is None or self._bridge is current):
return True
try:
self._reconnect_bridge(session_id or self._session_id, timeout=timeout)
Expand All @@ -1785,33 +1807,6 @@ def _ensure_bridge(self, session_id: str = "", *, timeout: float = 30.0) -> bool
self._bridge = None
return False

def _start_bridge_keepalive(self) -> None:
if self._bridge_keepalive_thread and self._bridge_keepalive_thread.is_alive():
return
self._bridge_keepalive_stop.clear()

def _run() -> None:
while not self._bridge_keepalive_stop.wait(5.0):
if not self._ensure_bridge(self._session_id, timeout=10.0):
continue
try:
assert self._bridge is not None
self._bridge.request("core.health", {}, timeout=10.0)
except Exception as err:
if self._is_transport_closed(err):
logger.info("MemOS: bridge keepalive reconnecting after transport close")
with contextlib.suppress(Exception):
self._reconnect_bridge(self._session_id, timeout=10.0)
else:
logger.debug("MemOS: bridge keepalive failed — %s", err)

self._bridge_keepalive_thread = threading.Thread(
target=_run,
daemon=True,
name="memos-bridge-keepalive",
)
self._bridge_keepalive_thread.start()

def _turn_start(self, query: str, *, session_id: str = "") -> str:
assert self._bridge is not None
host_runtime = self._host_runtime_context()
Expand Down Expand Up @@ -1876,11 +1871,15 @@ def _turn_end(
if agent_thinking:
payload["agentThinking"] = agent_thinking
result = self._bridge.request("turn.end", payload)
# Capture the trace ID for feedback submission
# Capture the trace ID for feedback submission. The core contract
# (memory-core.ts onTurnEnd) returns a singular `traceId`; accept
# the plural form too for forward compatibility.
if result and isinstance(result, dict):
trace_ids = result.get("traceIds", [])
if trace_ids and len(trace_ids) > 0:
trace_id = trace_ids[-1] # Last trace is the current turn
trace_id = result.get("traceId") or ""
if not trace_id:
trace_ids = result.get("traceIds") or []
trace_id = trace_ids[-1] if trace_ids else ""
if trace_id:
self._last_trace_id = trace_id
return trace_id
return ""
Expand Down Expand Up @@ -1950,5 +1949,10 @@ def register(ctx: Any) -> None:
ctx.register_memory_provider(MemTensorProvider())


def _reset_bridge_runtime_for_tests() -> None:
"""Tear down the process-wide bridge supervisor state. Test-only."""
bridge_supervisor.reset_for_tests()


# Pattern 2: exported class — fallback via `issubclass(MemoryProvider)`.
__all__ = ["PLUGIN_ID", "PLUGIN_VERSION", "MemTensorProvider", "register"]
Loading
Loading