Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ benchmarks/snapshots/
.memsearch/

app-tauri/src-tauri/target/

# Vendored site-packages tree (pip --target install; not part of the repo)
/deps/
2 changes: 2 additions & 0 deletions mcp_server/core/wiki_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,8 @@ def audit_all_domains(
".venv",
"venv",
"env",
"deps",
"site-packages",
"__pycache__",
".mypy_cache",
".pytest_cache",
Expand Down
11 changes: 10 additions & 1 deletion mcp_server/core/wiki_drift.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,17 @@ def _file_exists_under(source_root: str, cited: str) -> bool:
if os.path.isfile(full):
return True
bn = os.path.basename(cited)
# Prune the same vendored / build dirs as list_source_files. Without this,
# a repo carrying a venv/, node_modules/, deps/, or site-packages/ at its
# root makes this per-cited-path fallback walk tens of thousands of files,
# turning one consolidate cycle into a multi-minute stall. The skip set is
# the single source of truth for "not a source tree".
from mcp_server.core.wiki_coverage import _SKIP_DIRECTORIES

for dirpath, dirnames, filenames in os.walk(source_root):
dirnames[:] = [d for d in dirnames if not d.startswith(".")]
dirnames[:] = [
d for d in dirnames if d not in _SKIP_DIRECTORIES and not d.startswith(".")
]
if bn in filenames:
return True
return False
Expand Down
4 changes: 2 additions & 2 deletions mcp_server/handlers/add_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import Any

from mcp_server.infrastructure.memory_config import get_memory_settings
from mcp_server.infrastructure.memory_store import MemoryStore
from mcp_server.infrastructure.memory_store import MemoryStore, get_shared_store
from mcp_server.handlers._tool_meta import IDEMPOTENT_WRITE

# ── Schema ────────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -111,7 +111,7 @@ def _get_store() -> MemoryStore:
global _store
if _store is None:
settings = get_memory_settings()
_store = MemoryStore(settings.DB_PATH, settings.EMBEDDING_DIM)
_store = get_shared_store(settings.DB_PATH, settings.EMBEDDING_DIM)
return _store


Expand Down
4 changes: 2 additions & 2 deletions mcp_server/handlers/anchor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from typing import Any

from mcp_server.infrastructure.memory_config import get_memory_settings
from mcp_server.infrastructure.memory_store import MemoryStore
from mcp_server.infrastructure.memory_store import MemoryStore, get_shared_store
from mcp_server.handlers._tool_meta import IDEMPOTENT_WRITE

# ── Schema ────────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -76,7 +76,7 @@ def _get_store() -> MemoryStore:
global _store
if _store is None:
settings = get_memory_settings()
_store = MemoryStore(settings.DB_PATH, settings.EMBEDDING_DIM)
_store = get_shared_store(settings.DB_PATH, settings.EMBEDDING_DIM)
return _store


Expand Down
4 changes: 2 additions & 2 deletions mcp_server/handlers/assess_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from typing import Any

from mcp_server.infrastructure.memory_config import get_memory_settings
from mcp_server.infrastructure.memory_store import MemoryStore
from mcp_server.infrastructure.memory_store import MemoryStore, get_shared_store
from mcp_server.handlers._tool_meta import READ_ONLY

# ── Schema ────────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -76,7 +76,7 @@ def _get_store() -> MemoryStore:
global _store
if _store is None:
settings = get_memory_settings()
_store = MemoryStore(settings.DB_PATH, settings.EMBEDDING_DIM)
_store = get_shared_store(settings.DB_PATH, settings.EMBEDDING_DIM)
return _store


Expand Down
4 changes: 2 additions & 2 deletions mcp_server/handlers/backfill_memories.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
slug_to_domain,
)
from mcp_server.infrastructure.memory_config import get_memory_settings
from mcp_server.infrastructure.memory_store import MemoryStore
from mcp_server.infrastructure.memory_store import MemoryStore, get_shared_store
from mcp_server.infrastructure.scanner import read_head_tail
from mcp_server.handlers._tool_meta import NON_IDEMPOTENT_WRITE

Expand Down Expand Up @@ -324,7 +324,7 @@ async def handler(args: dict[str, Any] | None = None) -> dict[str, Any]:
"""Backfill prior conversations into the memory store."""
parsed = _parse_args(args)
settings = get_memory_settings()
store = MemoryStore(settings.DB_PATH, settings.EMBEDDING_DIM)
store = get_shared_store(settings.DB_PATH, settings.EMBEDDING_DIM)
ensure_backfill_log(store)

candidates = discover_files(parsed["project_filter"], parsed["max_files"])
Expand Down
4 changes: 2 additions & 2 deletions mcp_server/handlers/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from mcp_server.core.replay import format_restoration
from mcp_server.handlers._tool_meta import IDEMPOTENT_WRITE
from mcp_server.infrastructure.memory_config import get_memory_settings
from mcp_server.infrastructure.memory_store import MemoryStore
from mcp_server.infrastructure.memory_store import MemoryStore, get_shared_store

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -150,7 +150,7 @@ def _get_store() -> MemoryStore:
global _store
if _store is None:
settings = get_memory_settings()
_store = MemoryStore(settings.DB_PATH, settings.EMBEDDING_DIM)
_store = get_shared_store(settings.DB_PATH, settings.EMBEDDING_DIM)
return _store


Expand Down
4 changes: 2 additions & 2 deletions mcp_server/handlers/codebase_analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
)
from mcp_server.handlers.remember import handler as remember_handler
from mcp_server.infrastructure.memory_config import get_memory_settings
from mcp_server.infrastructure.memory_store import MemoryStore
from mcp_server.infrastructure.memory_store import MemoryStore, get_shared_store
from mcp_server.handlers._tool_meta import READ_ONLY

# ── Schema ────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -143,7 +143,7 @@ def _get_store() -> MemoryStore:
global _store
if _store is None:
s = get_memory_settings()
_store = MemoryStore(s.DB_PATH, s.EMBEDDING_DIM)
_store = get_shared_store(s.DB_PATH, s.EMBEDDING_DIM)
return _store


Expand Down
20 changes: 17 additions & 3 deletions mcp_server/handlers/consolidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
get_embedding_engine,
)
from mcp_server.infrastructure.memory_config import get_memory_settings
from mcp_server.infrastructure.memory_store import MemoryStore
from mcp_server.infrastructure.memory_store import MemoryStore, get_shared_store
from mcp_server.handlers._tool_meta import IDEMPOTENT_WRITE

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -161,7 +161,7 @@ def _get_store() -> MemoryStore:
global _store
if _store is None:
settings = get_memory_settings()
_store = MemoryStore(settings.DB_PATH, settings.EMBEDDING_DIM)
_store = get_shared_store(settings.DB_PATH, settings.EMBEDDING_DIM)
return _store


Expand All @@ -183,6 +183,20 @@ def _timed(fn, *args, **kwargs) -> dict[str, Any]:
return result


async def _atimed(fn, *args, **kwargs) -> dict[str, Any]:
"""Async counterpart of :func:`_timed` for awaitable cycle functions."""
t0 = time.monotonic()
try:
result = await fn(*args, **kwargs) or {}
except Exception as exc:
ms = int((time.monotonic() - t0) * 1000)
return {"error": f"{type(exc).__name__}: {exc}", "duration_ms": ms}
ms = int((time.monotonic() - t0) * 1000)
if isinstance(result, dict):
result["duration_ms"] = ms
return result


async def handler(args: dict[str, Any] | None = None) -> dict[str, Any]:
"""Run maintenance cycles on the memory system."""
args = args or {}
Expand Down Expand Up @@ -212,7 +226,7 @@ async def handler(args: dict[str, Any] | None = None) -> dict[str, Any]:
if args.get("wiki", True):
cap_raw = args.get("wiki_max_purges_per_axis", 500)
cap = int(cap_raw) if cap_raw is not None and int(cap_raw) > 0 else None
wiki_stats = _timed(
wiki_stats = await _atimed(
run_wiki_maintenance,
store,
apply_stubs=bool(args.get("wiki_apply_stubs", True)),
Expand Down
118 changes: 67 additions & 51 deletions mcp_server/handlers/consolidation/wiki_maintenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,30 @@
from __future__ import annotations

import logging
import os
from typing import Any

logger = logging.getLogger(__name__)


def _headless_authoring_enabled() -> bool:
"""Opt-in gate for the ``claude -p`` headless authoring drain.

Default OFF. The drain can spawn up to ~38 ``claude -p`` subprocesses
per cycle (30 anchor + 8 file-doc), each up to 180s, **synchronously
on the consolidate event loop**. Unthrottled, that storm wedges the
machine — and in tests/CI (where ``claude`` may be on PATH on dev
boxes) it also blocks the suite. It stays off until per-cycle load
balancing lands; set ``CORTEX_HEADLESS_AUTHORING=1`` to opt in.
"""
return os.getenv("CORTEX_HEADLESS_AUTHORING", "0").strip().lower() in {
"1",
"true",
"yes",
"on",
}


# Autonomous mode applies the stub + classifier purge axes — these
# remove content that is either placeholder-only or doesn't pass
# admission. **Shallow pages are NEVER auto-deleted** (user direction
Expand All @@ -56,28 +75,25 @@


async def _invoke_wiki_purge(args: dict[str, Any]) -> dict[str, Any]:
"""Call the wiki_purge handler in whichever event-loop context we land in."""
import asyncio

"""Await the wiki_purge handler on the caller's event loop."""
from mcp_server.handlers.wiki_purge import handler as wiki_purge_handler

try:
running_loop = asyncio.get_running_loop()
except RuntimeError:
running_loop = None
if running_loop is None:
return await wiki_purge_handler(args)
# We're already inside an event loop — just await directly. The
# ``async`` def at the top makes that legal.
return await wiki_purge_handler(args)


def _run_purge_axis(
async def _run_purge_axis(
*, axis: str, apply: bool, max_purges: int | None = None
) -> dict[str, Any]:
"""Run wiki_purge with exactly one axis enabled, returning a flat dict."""
import asyncio

"""Run wiki_purge with exactly one axis enabled, returning a flat dict.

Awaited directly on the consolidate handler's event loop. An earlier
revision bridged via ``asyncio.run_coroutine_threadsafe(...).result()``,
which self-deadlocked: it scheduled the coroutine on the *same* loop
whose thread the synchronous caller had already blocked, so the
coroutine could never run and every axis stalled to the 120s timeout
(CI Test job hung ~1h). Awaiting keeps the psycopg async pool on its
owning loop and removes the deadlock entirely.
"""
purge_args: dict[str, Any] = {
"apply": apply,
"purge_stubs": axis == "stub",
Expand All @@ -86,19 +102,10 @@ def _run_purge_axis(
}
if max_purges is not None:
purge_args["max_purges"] = max_purges
try:
running_loop = asyncio.get_running_loop()
except RuntimeError:
running_loop = None
if running_loop is None:
return asyncio.run(_invoke_wiki_purge(purge_args))
future = asyncio.run_coroutine_threadsafe(
_invoke_wiki_purge(purge_args), running_loop
)
return future.result(timeout=120)


def run_wiki_maintenance(
return await _invoke_wiki_purge(purge_args)


async def run_wiki_maintenance(
store: Any,
*,
apply_stubs: bool = _AUTONOMOUS_STUB_APPLY_DEFAULT,
Expand Down Expand Up @@ -143,7 +150,7 @@ def run_wiki_maintenance(

# Stub axis.
try:
r = _run_purge_axis(
r = await _run_purge_axis(
axis="stub", apply=apply_stubs, max_purges=max_purges_per_axis
)
out["stub"]["purged"] = r.get("purged", 0)
Expand All @@ -155,7 +162,7 @@ def run_wiki_maintenance(

# Classifier axis.
try:
r = _run_purge_axis(
r = await _run_purge_axis(
axis="classifier",
apply=apply_classifier_rejects,
max_purges=max_purges_per_axis,
Expand All @@ -172,27 +179,36 @@ def run_wiki_maintenance(
# session. The worker here calls `claude -p` directly so the
# loop closes without human intervention. See
# ``consolidation/headless_authoring.py``.
try:
from mcp_server.handlers.consolidation.headless_authoring import (
run_headless_authoring_cycle,
)

cycle = run_headless_authoring_cycle()
out["headless_authoring"] = {
"pages_with_gaps": cycle.pages_with_gaps,
"drains_attempted": cycle.drains_attempted,
"drains_filled": cycle.drains_filled,
"drains_failed": cycle.drains_failed,
"duration_ms": cycle.duration_ms,
}
except Exception as exc:
logger.debug(
"wiki_maintenance: headless authoring drain failed (non-fatal): %s",
exc,
)
out["headless_authoring"] = {
"status": f"error: {type(exc).__name__}: {exc}",
}
#
# Opt-in only (default OFF): the drain spawns up to ~38 ``claude -p``
# subprocesses synchronously on the event loop. Until per-cycle load
# balancing lands it stays gated behind ``CORTEX_HEADLESS_AUTHORING``
# so consolidate (and the test suite) never blocks on a subprocess
# storm.
if not _headless_authoring_enabled():
out["headless_authoring"] = {"status": "disabled"}
else:
try:
from mcp_server.handlers.consolidation.headless_authoring import (
run_headless_authoring_cycle,
)

cycle = run_headless_authoring_cycle()
out["headless_authoring"] = {
"pages_with_gaps": cycle.pages_with_gaps,
"drains_attempted": cycle.drains_attempted,
"drains_filled": cycle.drains_filled,
"drains_failed": cycle.drains_failed,
"duration_ms": cycle.duration_ms,
}
except Exception as exc:
logger.debug(
"wiki_maintenance: headless authoring drain failed (non-fatal): %s",
exc,
)
out["headless_authoring"] = {
"status": f"error: {type(exc).__name__}: {exc}",
}

# Per-project coverage dashboards (Meadows L6 information surface).
try:
Expand Down
4 changes: 2 additions & 2 deletions mcp_server/handlers/create_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing import Any

from mcp_server.infrastructure.memory_config import get_memory_settings
from mcp_server.infrastructure.memory_store import MemoryStore
from mcp_server.infrastructure.memory_store import MemoryStore, get_shared_store
from mcp_server.handlers._tool_meta import IDEMPOTENT_WRITE

# ── Schema ────────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -91,7 +91,7 @@ def _get_store() -> MemoryStore:
global _store
if _store is None:
settings = get_memory_settings()
_store = MemoryStore(settings.DB_PATH, settings.EMBEDDING_DIM)
_store = get_shared_store(settings.DB_PATH, settings.EMBEDDING_DIM)
return _store


Expand Down
4 changes: 2 additions & 2 deletions mcp_server/handlers/curate_wiki.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from mcp_server.core.wiki_drift import audit_wiki_drift
from mcp_server.handlers._tool_meta import READ_ONLY
from mcp_server.infrastructure.config import WIKI_ROOT
from mcp_server.infrastructure.memory_store import MemoryStore
from mcp_server.infrastructure.memory_store import get_shared_store


schema = {
Expand Down Expand Up @@ -247,7 +247,7 @@ async def handler(args: dict[str, Any] | None = None) -> dict[str, Any]:
include_reauthor = bool(args.get("include_reauthor", True))
reauthor_jobs_max = int(args.get("reauthor_jobs_max") or 3)

store = MemoryStore()
store = get_shared_store()
# Draw a memory pool. Recently-accessed memories are higher-signal
# candidates because they reflect what the user actively works on.
if recent_only:
Expand Down
Loading
Loading