From a771d6d3b925b8a53916798e3899707fe23af18d Mon Sep 17 00:00:00 2001 From: "openai-code-agent[bot]" <242516109+Codex@users.noreply.github.com> Date: Sat, 16 May 2026 06:35:00 +0000 Subject: [PATCH 1/2] Initial plan From a9caefcbff84660acec5d628c0a0625cc8323ba6 Mon Sep 17 00:00:00 2001 From: "openai-code-agent[bot]" <242516109+Codex@users.noreply.github.com> Date: Sat, 16 May 2026 06:50:55 +0000 Subject: [PATCH 2/2] issues: redesign issue definitions and unpause detection Co-authored-by: DarinShapiro <23219821+DarinShapiro@users.noreply.github.com> --- .../thread_observability/api/dashboard.html | 16 +- .../src/thread_observability/api/http_api.py | 14 - .../src/thread_observability/api/mcp_tools.py | 22 +- .../pipeline/analyze_node.py | 11 + .../pipeline/device_discovery.py | 45 - .../pipeline/playbooks.json | 51 ++ .../thread_observability/pipeline/reasoner.py | 862 ++++++++---------- .../services/direct_chat.py | 31 +- .../storage/sqlite_store.py | 132 ++- .../app/tests/test_reasoner.py | 448 +++------ documentation/06-mcp-tools-reference.md | 40 +- 11 files changed, 734 insertions(+), 938 deletions(-) diff --git a/addons/thread-observability/app/src/thread_observability/api/dashboard.html b/addons/thread-observability/app/src/thread_observability/api/dashboard.html index 7f0ff91..8d5869a 100644 --- a/addons/thread-observability/app/src/thread_observability/api/dashboard.html +++ b/addons/thread-observability/app/src/thread_observability/api/dashboard.html @@ -973,7 +973,7 @@

let graphRefreshInFlight = false; const chatState = { conversationId: null, messages: [] }; let nodesSort = { key: null, dir: 'asc' }; // null => server "worst first" default -const SEV_RANK = { critical: 0, error: 0, warning: 1, warn: 1, info: 2, notice: 3 }; +const SEV_RANK = { critical: 0, crit: 0, error: 0, warning: 1, warn: 1, info: 2, notice: 3 }; const ASSESSMENT_STATE_CLASS = { probation: 'good', relaxing: 'good', @@ -1698,16 +1698,6 @@

const pillsHost = $('#issues-sev-pills'); issuesBody.innerHTML = ''; pillsHost.innerHTML = ''; - // Issue detection is paused pending redesign (#5). The API returns - // `status: "placeholder"` until new rules ship. Render the note - // verbatim so users (and any AI consumer scraping the dashboard) - // don't read an empty list as "all clear". - if (s.issues && s.issues.status === 'placeholder') { - const note = s.issues.note - || 'Issue detection is paused pending redesign.'; - issuesBody.appendChild(el('div', {class: 'muted', style: 'padding: 6px 0;'}, note)); - return; - } const issues = ((s.issues && s.issues.issues) || []).slice(); if (!issues.length) { issuesBody.appendChild(el('div', {class:'empty'}, 'No active issues.')); @@ -1719,10 +1709,10 @@

const sev = (i.severity || 'info').toLowerCase(); counts[sev] = (counts[sev] || 0) + 1; } - const order = ['critical', 'error', 'warning', 'warn', 'info', 'notice']; + const order = ['critical', 'crit', 'error', 'warning', 'warn', 'info', 'notice']; for (const k of order) { if (!counts[k]) continue; - const cls = (k === 'critical' || k === 'error') ? 'bad' : (k.startsWith('warn') ? 'warn' : 'muted'); + const cls = (k === 'critical' || k === 'crit' || k === 'error') ? 'bad' : (k.startsWith('warn') ? 'warn' : 'muted'); pillsHost.appendChild(el('span', { class: 'pill ' + cls, style: 'margin-left:4px;', diff --git a/addons/thread-observability/app/src/thread_observability/api/http_api.py b/addons/thread-observability/app/src/thread_observability/api/http_api.py index 9e44fd5..a184aa0 100644 --- a/addons/thread-observability/app/src/thread_observability/api/http_api.py +++ b/addons/thread-observability/app/src/thread_observability/api/http_api.py @@ -866,20 +866,6 @@ def health_snapshot() -> dict[str, object]: @app.get("/v1/issues/active") def list_active_issues() -> dict[str, object]: - # Issue detection is paused pending redesign — see tracking - # issue #5 and placeholder issue #4. We deliberately return an - # empty list with an explicit ``status: "placeholder"`` so - # consumers (dashboard, MCP, AI reasoners) don't misread the - # absence of issues as "all clear". - from ..pipeline.reasoner import ISSUES_PAUSED, ISSUES_PAUSED_NOTE - if ISSUES_PAUSED: - return { - "count": 0, - "issues": [], - "status": "placeholder", - "note": ISSUES_PAUSED_NOTE, - "computed_at": _utc_now(), - } try: issues = get_store().list_active_issues() return {"count": len(issues), "issues": issues, "computed_at": _utc_now()} diff --git a/addons/thread-observability/app/src/thread_observability/api/mcp_tools.py b/addons/thread-observability/app/src/thread_observability/api/mcp_tools.py index d7c0a74..18244ca 100644 --- a/addons/thread-observability/app/src/thread_observability/api/mcp_tools.py +++ b/addons/thread-observability/app/src/thread_observability/api/mcp_tools.py @@ -146,11 +146,12 @@ class ToolCallRequest(BaseModel): { "name": "list_active_issues", "description": ( - "Return all currently-open Thread network issues. " - "NOTE: Issue detection is currently paused pending a redesign of the rule set " - "(see tracking issue #5). Until new rules ship, this tool returns an empty list " - "with `status: \"placeholder\"`. Do NOT infer \"all clear\" from the empty list — " - "instead, reason from the raw data (topology, partitions, links, nodes)." + "Return all currently-open Thread network issues computed by deterministic rules. " + "Each issue includes the affected EUI64 (or null for mesh-wide issues), " + "`first_seen_at`, `last_seen_at`, a severity that reflects actionability × freshness, " + "and an evidence payload that includes the EUIs involved and the observation that triggered it. " + "Current rule taxonomy: " + "`real_partition_split`, `dead_link_reference`, `route_to_otbr_unreachable`." ), "inputSchema": {"type": "object", "properties": {}, "required": []}, }, { @@ -1031,17 +1032,6 @@ async def _dispatch_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any] except Exception as exc: # noqa: BLE001 return {"error": str(exc)} if name == "list_active_issues": - # Mirrors /v1/issues/active. Issue detection is paused - # pending redesign (#5); return an explicit placeholder so AI - # consumers don't infer "all clear" from an empty list. - from ..pipeline.reasoner import ISSUES_PAUSED, ISSUES_PAUSED_NOTE - if ISSUES_PAUSED: - return { - "count": 0, - "issues": [], - "status": "placeholder", - "note": ISSUES_PAUSED_NOTE, - } try: issues = get_store().list_active_issues() return {"count": len(issues), "issues": issues} diff --git a/addons/thread-observability/app/src/thread_observability/pipeline/analyze_node.py b/addons/thread-observability/app/src/thread_observability/pipeline/analyze_node.py index 5ed84c6..f9d4d0c 100644 --- a/addons/thread-observability/app/src/thread_observability/pipeline/analyze_node.py +++ b/addons/thread-observability/app/src/thread_observability/pipeline/analyze_node.py @@ -69,6 +69,9 @@ def _evidence_implicates_eui(evidence: Any, eui64: str) -> bool: members = evidence.get("members") if isinstance(members, list) and eui64 in members: return True + involved = evidence.get("involved_eui64s") + if isinstance(involved, list) and eui64 in involved: + return True # Nested partitions[].members (partition_split shape). partitions = evidence.get("partitions") if isinstance(partitions, list): @@ -78,6 +81,14 @@ def _evidence_implicates_eui(evidence: Any, eui64: str) -> bool: part_members = part.get("members") if isinstance(part_members, list) and eui64 in part_members: return True + sample = part.get("members_sample") + if isinstance(sample, list) and eui64 in sample: + return True + recent_changes = evidence.get("recent_partition_changes") + if isinstance(recent_changes, list): + for row in recent_changes: + if isinstance(row, dict) and row.get("eui64") == eui64: + return True return False diff --git a/addons/thread-observability/app/src/thread_observability/pipeline/device_discovery.py b/addons/thread-observability/app/src/thread_observability/pipeline/device_discovery.py index f0b9f5d..157eebc 100644 --- a/addons/thread-observability/app/src/thread_observability/pipeline/device_discovery.py +++ b/addons/thread-observability/app/src/thread_observability/pipeline/device_discovery.py @@ -1044,24 +1044,6 @@ async def _persist_matter_diagnostics( """ rich = _LAST_MATTER_RICH_INFO if not rich: - # v0.9.43: even with no rich info this cycle (matter-server WS hiccup - # or a single empty poll), we MUST still reconcile the - # ``partition_split`` issue. Otherwise an issue opened on a prior - # cycle becomes immortal — it never sees a non-split observation - # again because the empty-rich early-return below would skip the - # close branch. Latent bug observed live as issue #54 hanging open - # after the partition had long since healed. - try: - active = [ - i for i in s.list_active_issues() - if i.get("kind") == "partition_split" - ] - for issue in active: - s.close_issue(int(issue["id"])) - except Exception as exc: # noqa: BLE001 - log.warning( - "partition_split close-on-empty failed: %s", exc, - ) return { "nodes_with_diagnostics": 0, "links_recorded": 0, @@ -1590,33 +1572,6 @@ async def _persist_matter_diagnostics( for pid, members in sorted(live_partitions.items()) ] - # Open/close partition_split issue (now reasoning over live partitions only). - try: - active = [i for i in s.list_active_issues() if i.get("kind") == "partition_split"] - if split: - distinct_epids = sorted({ - p["extended_pan_id"] for p in partition_summary - if p.get("extended_pan_id") - }) - s.open_issue( - kind="partition_split", - severity="warning", - evidence={ - "partitions": partition_summary, - "partition_count": len(live_partitions), - # If partitions report different extended_pan_ids, - # this is a credentials-mismatch (stale dataset on - # one device) not an RF-fragmentation issue. - "distinct_extended_pan_ids": distinct_epids, - "credentials_mismatch_suspected": len(distinct_epids) > 1, - }, - ) - else: - for issue in active: - s.close_issue(int(issue["id"])) - except Exception as exc: # noqa: BLE001 - log.warning("Failed to update partition_split issue: %s", exc) - log.info( "diagnostics persisted: nodes=%d links=%d partitions=%d split=%s " "changes=%d phantoms_marked=%d phantoms_cleared=%d excluded_partitions=%d " diff --git a/addons/thread-observability/app/src/thread_observability/pipeline/playbooks.json b/addons/thread-observability/app/src/thread_observability/pipeline/playbooks.json index be6f37e..cb51e6d 100644 --- a/addons/thread-observability/app/src/thread_observability/pipeline/playbooks.json +++ b/addons/thread-observability/app/src/thread_observability/pipeline/playbooks.json @@ -129,6 +129,57 @@ ], "references": [] }, + { + "id": "real_partition_split", + "title": "Real partition split (evidence-backed)", + "applies_to": ["real_partition_split"], + "summary": "Multiple live partitions plus evidence of a device transitioning between partitions. This narrows diagnosis vs. a bare partition-count: it suggests an unstable boundary device or RF gap rather than stale IDs.", + "evidence_to_collect": [ + "Issue evidence.partitions and evidence.recent_partition_changes", + "get_mesh_state to confirm current partition membership", + "diff_topology across the split window (if snapshots exist)" + ], + "remediation_steps": [ + "Identify the device(s) in evidence.recent_partition_changes; check their placement and power stability.", + "Bridge the RF gap between partitions (add/relocate a router) and re-run ingest.", + "If partitions disagree on extended_pan_id, resolve credentials drift (re-commission the minority leader)." + ], + "references": [] + }, + { + "id": "dead_link_reference", + "title": "Dead-link reference (unknown neighbor EUI64)", + "applies_to": ["dead_link_reference"], + "summary": "A router references an EUI64 that is not present in the registry-backed nodes table, persisted across multiple ingestion ticks. This usually indicates a recommissioned device leaving stale neighbor/route cache entries.", + "evidence_to_collect": [ + "Issue evidence.references[] (reporter, unknown neighbor, source, seen_count)", + "get_neighbors for the reporter router to see link_established/allocated state", + "list_all_nodes to confirm the neighbor truly is absent" + ], + "remediation_steps": [ + "If the neighbor is a known physical device, re-commission it so HA registry matches the mesh identity.", + "Restart the reporter router to flush stale neighbor cache entries.", + "If references persist and the neighbor is truly gone, inspect for duplicate identities or a stuck child table." + ], + "references": [] + }, + { + "id": "route_to_otbr_unreachable", + "title": "Route to OTBR unreachable (loop/unknown next hop)", + "applies_to": ["route_to_otbr_unreachable"], + "summary": "Walking the next-hop chain from a router to the OTBR terminates in a loop, unknown next hop, or missing route-table entry. This points at routing-table corruption, partition mismatch, or a missing upstream neighbor relationship.", + "evidence_to_collect": [ + "Issue evidence.route_walk (hops + issues)", + "get_mesh_state to confirm partitions and current links", + "get_neighbors for the router and its first upstream hop" + ], + "remediation_steps": [ + "If the route walk reports different_partition, resolve the partition split first.", + "Restart the affected router to repopulate its route table.", + "If unknown_next_hop persists, inspect the router's neighbor table for missing upstream links and RF quality." + ], + "references": [] + }, { "id": "sed_battery_drain", "title": "Sleepy-end-device unexpected battery drain", diff --git a/addons/thread-observability/app/src/thread_observability/pipeline/reasoner.py b/addons/thread-observability/app/src/thread_observability/pipeline/reasoner.py index f2785e0..d9dcc6a 100644 --- a/addons/thread-observability/app/src/thread_observability/pipeline/reasoner.py +++ b/addons/thread-observability/app/src/thread_observability/pipeline/reasoner.py @@ -1,540 +1,448 @@ -"""Deterministic Thread anomaly reasoner. - -Scans the SQLite event stream and opens/closes issues via the issues -table. All rules are deterministic and side-effect-isolated: the only -mutation is ``open_issue`` / ``close_issue`` calls on the store. - -Rules (v1): - -* ``parent_churn`` (warn) — a node emitted >= 3 ``parent_change`` events - within the last :data:`PARENT_CHURN_WINDOW_MIN` minutes. -* ``attach_failures`` (warn) — a node emitted >= 2 ``attach_failed`` - events within the last :data:`ATTACH_FAIL_WINDOW_MIN` minutes. -* ``offline_node`` (crit) — a node has not been seen for at least - :data:`OFFLINE_THRESHOLD_MIN` minutes despite having been seen before. - -Each run also auto-closes issues whose triggering condition no longer -holds (e.g. churn dropped below threshold, node came back online). +"""Deterministic Thread "issues" reasoner (redesigned rules). + +An issue is a falsifiable claim about state that narrows diagnosis. Each +open issue is continually re-observed on each reasoner run (bumping its +``last_seen_at``) and auto-closed when its predicate no longer holds. + +Rule set (v2 - redesigned): + +* ``real_partition_split`` (warn/crit) — multiple *live* partitions plus + evidence of a device transitioning between partitions recently, with no + router-router neighbor link bridging partitions. +* ``dead_link_reference`` (warn) — a router references an unknown EUI64 in + NeighborTable/RouteTable, persisted across N ingestion ticks. +* ``route_to_otbr_unreachable`` (warn/crit) — walking the forwarding path + from a router to the OTBR terminates in a loop, unknown next hop, or + missing route-table entry. + +Severity is a function of actionability × freshness. Rules compute a base +severity from actionability, then the reasoner demotes long-lived issues so +new anomalies rise to the top. """ from __future__ import annotations -from collections import Counter from datetime import UTC, datetime, timedelta from typing import Any import json from ..storage.sqlite_store import SQLiteStore, get_store +from . import routing as routing_mod + +# Optional emergency switch for ops. The API/MCP endpoints do not return a +# placeholder when paused; they simply surface an empty list. +ISSUES_PAUSED = False + +# --- rule parameters --------------------------------------------------------- + +PARTITION_CHANGE_WINDOW_MIN = 30 + +DEAD_LINK_MIN_TICKS = 3 +DEAD_LINK_OBS_EXPIRY_MIN = 10 -PARENT_CHURN_WINDOW_MIN = 30 -PARENT_CHURN_THRESHOLD = 3 - -ATTACH_FAIL_WINDOW_MIN = 15 -ATTACH_FAIL_THRESHOLD = 2 - -OFFLINE_THRESHOLD_MIN = 30 - -# v0.9.43 — Tier 2 #2. -# A "re-attach storm" is what the Foyer Light case looked like in -# production: the same EUI keeps showing up in NeighborTable rows -# across multiple reporters, with its link frame counter resetting -# each cycle. The single-reporter-noise floor is high (a child -# legitimately re-attaches occasionally), so we only fire when at -# least two distinct reporters witness it — that cross-check is -# what makes the rule specific to the partition-wide identity bug -# rather than a flaky child. -RE_ATTACH_STORM_WINDOW_MIN = 30 -RE_ATTACH_STORM_MIN_EVENTS = 2 -RE_ATTACH_STORM_MIN_REPORTERS = 2 - -# v0.9.43 — Tier 2 #1. ``mesh_disagreement`` compares the latest -# Matter cluster-53 self-counter (``nodes.tx_total_count``) against -# the most-recent OTBR ``MGMT_DIAG_GET`` witness for the same target. -# A %-delta over the threshold is the operator-visible signal that -# the router and the BR disagree about how much traffic the router -# has actually sent. -MESH_DISAGREEMENT_PCT_THRESHOLD = 25.0 -# Snapshots must be observed within this window of each other to be -# considered comparable. Otherwise we're comparing apples to a stale -# orange and the %-delta is meaningless. -MESH_DISAGREEMENT_MAX_AGE_MIN = 30 - -# v0.9.44 (Tier 3): observer-suppression grace. -# When an observer-side disruption (our addon, OTBR, Matter Server) -# closes, downstream issues can still fire for a few cycles before -# stale data clears. We extend the suppression window past -# ``ended_at`` by this grace so those tail-fires get annotated too. -OBSERVER_SUPPRESSION_GRACE_SEC = 90 - - -# Issue detection is paused pending a redesign of the rule set. See -# tracking issue #5 ("Redesign issue definitions") and #4 (placeholder -# implementation). The previous rules largely restated state already -# visible elsewhere and biased AI consumers toward specific diagnostic -# paths that were not always correct. Until new rules ship that pass -# the bar described in #5, ``run_reasoner`` is a no-op: it closes any -# residual open issues on first call so the table doesn't leak stale -# rows, then returns a paused-status summary. -# -# The full rule body below is intentionally retained so the redesign -# can re-enable rules incrementally without re-implementing plumbing. -ISSUES_PAUSED = True -ISSUES_PAUSED_NOTE = ( - "Issue detection is paused pending redesign. See tracking issue #5. " - "No issues will be reported until the new rule set lands." -) +# Freshness demotion thresholds. The intent is "actionability × freshness", +# not "noise": a month-old issue is less urgent than the same anomaly +# appearing in the last ingest. +FRESH_DEMOTE_WARN_AFTER_HOURS = 24 +FRESH_DEMOTE_INFO_AFTER_DAYS = 30 def _iso(dt: datetime) -> str: return dt.isoformat() -def run_reasoner( - *, - now: datetime | None = None, - store: SQLiteStore | None = None, -) -> dict[str, Any]: - """Run all rules once and reconcile open issues. +def _parse_iso(value: Any) -> datetime | None: + if not value: + return None + try: + return datetime.fromisoformat(str(value)) + except Exception: # noqa: BLE001 + return None - Returns a summary dict with the lists of newly-opened, still-open and - auto-closed issue ids per rule. - """ - s = store or get_store() - now_dt = now or datetime.now(tz=UTC) - if ISSUES_PAUSED: - # Close any leftover open issues so the table doesn't leak - # stale rows while the rules are paused. This is idempotent: - # subsequent calls find nothing to close and return immediately. - residual_closed: list[int] = [] - for issue in s.list_active_issues(): - try: - if s.close_issue(int(issue["id"])): - residual_closed.append(int(issue["id"])) - except Exception: # noqa: BLE001 - pass - return { - "status": "paused", - "note": ISSUES_PAUSED_NOTE, - "opened": [], - "closed": residual_closed, - "skipped": [], - "computed_at": now_dt.isoformat(), - } +def _demote_severity(*, base: str, first_seen_at: str | None, now: datetime) -> str: + """Demote long-lived issues so fresh ones sort higher.""" + base_norm = str(base or "").strip().lower() + if base_norm not in {"crit", "warn", "info"}: + base_norm = "warn" - opened: list[int] = [] - closed: list[int] = [] - skipped: list[int] = [] - - # ---- gather raw inputs in one lock ---- - churn_window = _iso(now_dt - timedelta(minutes=PARENT_CHURN_WINDOW_MIN)) - attach_window = _iso(now_dt - timedelta(minutes=ATTACH_FAIL_WINDOW_MIN)) - offline_cutoff = _iso(now_dt - timedelta(minutes=OFFLINE_THRESHOLD_MIN)) - re_attach_window = _iso(now_dt - timedelta(minutes=RE_ATTACH_STORM_WINDOW_MIN)) - mesh_disagree_cutoff = _iso( - now_dt - timedelta(minutes=MESH_DISAGREEMENT_MAX_AGE_MIN) - ) - - with s._lock: # noqa: SLF001 - churn_rows = s._conn.execute( # noqa: SLF001 - "SELECT eui64, COUNT(*) AS c FROM events" - " WHERE type = 'parent_change' AND ts >= ?" - " GROUP BY eui64", - (churn_window,), - ).fetchall() + first_dt = _parse_iso(first_seen_at) + if first_dt is None: + return base_norm - attach_rows = s._conn.execute( # noqa: SLF001 - "SELECT eui64, COUNT(*) AS c FROM events" - " WHERE type = 'attach_failed' AND ts >= ?" - " GROUP BY eui64", - (attach_window,), - ).fetchall() + age = now - first_dt + if age >= timedelta(days=FRESH_DEMOTE_INFO_AFTER_DAYS): + return "info" + if age >= timedelta(hours=FRESH_DEMOTE_WARN_AFTER_HOURS): + return "warn" if base_norm == "crit" else "info" + return base_norm + + +def _issue_key(kind: str, eui64: str | None) -> tuple[str, str | None]: + return (str(kind or "").strip(), (str(eui64).strip().lower() if eui64 else None)) - node_rows = s._conn.execute( # noqa: SLF001 - "SELECT eui64, last_seen FROM nodes WHERE last_seen IS NOT NULL" - ).fetchall() - # v0.9.43 — re_attach_storm raw input. - # We pull each event's payload JSON and let Python parse out - # neighbor + reporter; doing it in SQL would require either - # JSON1 (not guaranteed on all Python sqlite builds) or - # generated columns we don't have. - re_attach_rows = s._conn.execute( # noqa: SLF001 - "SELECT eui64, payload_json FROM events" - " WHERE type = 're_attached_node' AND ts >= ?", - (re_attach_window,), +def _compute_real_partition_split( + *, + store: SQLiteStore, + now: datetime, +) -> dict[str, Any] | None: + cutoff = _iso(now - timedelta(minutes=PARTITION_CHANGE_WINDOW_MIN)) + + with store._lock: # noqa: SLF001 + node_rows = store._conn.execute( # noqa: SLF001 + """ + SELECT eui64, partition_id, routing_role, role, status, extended_pan_id, network_name + FROM nodes + WHERE partition_id IS NOT NULL + AND COALESCE(status, '') != 'phantom' + """ + ).fetchall() + change_rows = store._conn.execute( # noqa: SLF001 + "SELECT ts, eui64, payload_json FROM events" + " WHERE type = 'partition_change' AND ts >= ?" + " ORDER BY ts DESC, id DESC", + (cutoff,), + ).fetchall() + link_rows = store._conn.execute( # noqa: SLF001 + """ + SELECT reporter_eui64, neighbor_eui64 + FROM links + WHERE source = 'neighbor_table' + AND neighbor_known = 1 + """ ).fetchall() - # v0.9.43 — mesh_disagreement raw input. Pull each router's - # self-reported MAC TX counter and its most recent OTBR - # second-witness, joined on EUI. We use a window function so - # this is one round-trip instead of N. Wrapped defensively: - # the table only exists on schemas >= v14, and a cold-start - # SQLite without it should not break the reasoner. - try: - mesh_rows = s._conn.execute( # noqa: SLF001 - """ - WITH latest_otbr AS ( - SELECT target_eui64, mac_tx_total, observed_at, - ROW_NUMBER() OVER ( - PARTITION BY target_eui64 - ORDER BY observed_at DESC, id DESC - ) AS rn - FROM otbr_diagnostics - WHERE observed_at >= ? - ) - SELECT n.eui64, n.tx_total_count, n.diag_updated_at, - o.mac_tx_total, o.observed_at AS otbr_observed_at - FROM nodes n - JOIN latest_otbr o - ON o.target_eui64 = n.eui64 AND o.rn = 1 - WHERE n.tx_total_count IS NOT NULL - AND o.mac_tx_total IS NOT NULL - AND n.diag_updated_at >= ? - """, - (mesh_disagree_cutoff, mesh_disagree_cutoff), - ).fetchall() - except Exception: # noqa: BLE001 - mesh_rows = [] - - churn_counts: Counter[str] = Counter({r["eui64"]: int(r["c"]) for r in churn_rows}) - attach_counts: Counter[str] = Counter({r["eui64"]: int(r["c"]) for r in attach_rows}) - offline_nodes = {r["eui64"]: r["last_seen"] for r in node_rows if r["last_seen"] < offline_cutoff} - - # ---- aggregate re_attach events per (neighbor) with distinct reporters ---- - # ``neighbor_eui64`` is the device that re-attached (the subject of the - # issue); ``reporter_eui64`` is who witnessed the counter reset. The - # cross-reporter check is what filters out a single flapping link - # and keeps the alarm specific to identity churn. - re_attach_by_neighbor: dict[str, dict[str, Any]] = {} - for row in re_attach_rows: + nodes = [dict(r) for r in node_rows] + node_by_eui: dict[str, dict[str, Any]] = { + str(n.get("eui64")).lower(): n for n in nodes if n.get("eui64") + } + + partitions: dict[int, list[str]] = {} + leaders: dict[int, str] = {} + for n in nodes: + pid = n.get("partition_id") + eui = n.get("eui64") + if not isinstance(pid, int) or not isinstance(eui, str) or not eui: + continue + partitions.setdefault(pid, []).append(eui.lower()) + if n.get("routing_role") == "leader": + leaders.setdefault(pid, eui.lower()) + + if len(partitions) <= 1: + return None + + recent_changes: list[dict[str, Any]] = [] + changed_euis: set[str] = set() + for row in change_rows: + eui = str(row["eui64"] or "").lower() + if not eui: + continue try: payload = json.loads(row["payload_json"]) if row["payload_json"] else {} except Exception: # noqa: BLE001 payload = {} if not isinstance(payload, dict): continue - neighbor = payload.get("neighbor_eui64") or row["eui64"] - reporter = payload.get("reporter_eui64") - if not neighbor: + from_pid = payload.get("from") + to_pid = payload.get("to") + if not isinstance(from_pid, int) or not isinstance(to_pid, int): + continue + if from_pid == to_pid: continue - bucket = re_attach_by_neighbor.setdefault( - neighbor, {"count": 0, "reporters": set()} + if from_pid not in partitions or to_pid not in partitions: + continue + changed_euis.add(eui) + recent_changes.append( + { + "ts": row["ts"], + "eui64": eui, + "from_partition_id": from_pid, + "to_partition_id": to_pid, + } ) - bucket["count"] += 1 - if reporter: - bucket["reporters"].add(reporter) - - # ---- aggregate mesh_disagreement per router ---- - # We compute the relative delta against the larger of the two - # counters so a target near zero doesn't divide-by-zero or produce - # absurd percentages. A negative delta (BR sees fewer than the - # router claims) is just as interesting as positive, so we use abs. - mesh_disagreements: dict[str, dict[str, Any]] = {} - for row in mesh_rows: - eui = row["eui64"] - self_count = row["tx_total_count"] - otbr_count = row["mac_tx_total"] - if not isinstance(self_count, int) or not isinstance(otbr_count, int): + if len(recent_changes) >= 10: + break + + if not recent_changes: + return None + + # Guard against false-positive "multiple partitions" caused by stale or + # mismatched partition stamps: if we see any router-router neighbor edge + # crossing partition IDs, treat the partition identifiers as suspect and + # do not fire this issue. + bridging_router_links: list[dict[str, Any]] = [] + for row in link_rows: + a = str(row["reporter_eui64"] or "").lower() + b = str(row["neighbor_eui64"] or "").lower() + if not a or not b: + continue + na = node_by_eui.get(a) + nb = node_by_eui.get(b) + if not na or not nb: + continue + ra = na.get("routing_role") + rb = nb.get("routing_role") + if ra not in {"router", "leader"} or rb not in {"router", "leader"}: continue - denom = max(self_count, otbr_count) - if denom <= 0: + pa = na.get("partition_id") + pb = nb.get("partition_id") + if pa is None or pb is None or pa == pb: continue - pct = abs(self_count - otbr_count) * 100.0 / denom - if pct >= MESH_DISAGREEMENT_PCT_THRESHOLD: - mesh_disagreements[eui] = { - "self_tx_total": self_count, - "otbr_tx_total": otbr_count, - "delta_pct": round(pct, 2), - "self_observed_at": row["diag_updated_at"], - "otbr_observed_at": row["otbr_observed_at"], + bridging_router_links.append({"a": a, "b": b, "a_partition_id": pa, "b_partition_id": pb}) + if len(bridging_router_links) >= 5: + break + + if bridging_router_links: + return None + + partition_summary: list[dict[str, Any]] = [] + for pid, members in sorted(partitions.items()): + sample = members[:50] + # Capture partition identity from any node that reported it. + epid = None + netname = None + for e in sample: + n = node_by_eui.get(e) + if not n: + continue + if epid is None and n.get("extended_pan_id"): + epid = n.get("extended_pan_id") + if netname is None and n.get("network_name"): + netname = n.get("network_name") + partition_summary.append( + { + "partition_id": pid, + "leader_eui64": leaders.get(pid), + "member_count": len(members), + "members_sample": sample, + "network_name": netname, + "extended_pan_id": epid, } + ) - active = s.list_active_issues() - active_by_key: dict[tuple[str, str | None], dict[str, Any]] = { - (i["kind"], i.get("eui64")): i for i in active + evidence = { + "observation": { + "partition_count": len(partitions), + "recent_partition_change_count": len(recent_changes), + "window_minutes": PARTITION_CHANGE_WINDOW_MIN, + "router_bridge_link_count": 0, + }, + "partitions": partition_summary, + "recent_partition_changes": recent_changes, + "involved_eui64s": sorted(set(changed_euis) | {e for e in leaders.values() if e}), + "cleared_when": ( + "partition_count <= 1 OR no partition_change events in the last " + f"{PARTITION_CHANGE_WINDOW_MIN} minutes OR a router-router neighbor link bridges partitions" + ), } + return {"kind": "real_partition_split", "eui64": None, "base_severity": "crit", "evidence": evidence} - def _check_suppression( - since: str | None, until: str | None - ) -> list[dict[str, Any]]: - """Return observer events overlapping [since, until + grace]. - - Tier 3: a candidate issue is annotated (and ``crit`` downgraded - to ``warn``) when an observer-side disruption overlaps its - trigger window. We extend the upper bound by - ``OBSERVER_SUPPRESSION_GRACE_SEC`` so an issue that fires in the - seconds immediately after a restart still picks up the prior - outage as context. - - Returns an empty list (no suppression) when the inputs are not - usable (missing timestamps, lookup errors). - """ - if not since: - return [] - try: - upper_dt = ( - datetime.fromisoformat(until) if until else now_dt - ) + timedelta(seconds=OBSERVER_SUPPRESSION_GRACE_SEC) - return s.list_observer_events_in_window( - since=since, until=upper_dt.isoformat() - ) - except Exception: # noqa: BLE001 - return [] - - def _emit( - kind: str, - severity: str, - eui64: str | None, - evidence: dict[str, Any], - *, - trigger_since: str | None = None, - trigger_until: str | None = None, - ) -> None: - # Tier 3 suppression: annotate + downgrade when an observer-side - # disruption overlaps the trigger window. We never drop the - # issue — that would lose a real outage that coincides with a - # routine restart. Downgrading ``crit`` → ``warn`` is enough - # to keep noise out of pager-grade alerts while preserving the - # record. - suppressors = _check_suppression(trigger_since, trigger_until) - if suppressors: - evidence = { - **evidence, - "suppressed_by": [ - { - "id": ev["id"], - "source": ev["source"], - "kind": ev["kind"], - "started_at": ev["started_at"], - "ended_at": ev.get("ended_at"), - } - for ev in suppressors - ], - } - if severity == "crit": - severity = "warn" - issue_id = s.open_issue(kind=kind, severity=severity, eui64=eui64, evidence=evidence) - if (kind, eui64) in active_by_key: - skipped.append(issue_id) - else: - opened.append(issue_id) - # ---- parent_churn ---- - seen_keys: set[tuple[str, str | None]] = set() - for eui, count in churn_counts.items(): - if count >= PARENT_CHURN_THRESHOLD: - seen_keys.add(("parent_churn", eui)) - _emit( - "parent_churn", - "warn", - eui, - { - "count": count, - "window_minutes": PARENT_CHURN_WINDOW_MIN, - "threshold": PARENT_CHURN_THRESHOLD, - }, - trigger_since=churn_window, - ) - - # ---- attach_failures ---- - for eui, count in attach_counts.items(): - if count >= ATTACH_FAIL_THRESHOLD: - seen_keys.add(("attach_failures", eui)) - _emit( - "attach_failures", - "warn", - eui, - { - "count": count, - "window_minutes": ATTACH_FAIL_WINDOW_MIN, - "threshold": ATTACH_FAIL_THRESHOLD, - }, - trigger_since=attach_window, - ) - - # ---- offline_node ---- - # Trigger window for suppression spans from ``last_seen`` (when we - # last had ground truth for this node) to now. This is the most - # important suppression case: an addon restart between last_seen - # and now is precisely the false-positive we want to annotate. - for eui, last_seen in offline_nodes.items(): - seen_keys.add(("offline_node", eui)) - _emit( - "offline_node", - "crit", - eui, - {"last_seen": last_seen, "threshold_minutes": OFFLINE_THRESHOLD_MIN}, - trigger_since=last_seen, +def _compute_dead_link_reference( + *, + store: SQLiteStore, + now_iso: str, +) -> list[dict[str, Any]]: + stale_links = store.list_stale_links() + mature_by_reporter: dict[str, list[dict[str, Any]]] = {} + for link in stale_links: + reporter = str(link.get("reporter_eui64") or "").lower() + neighbor = str(link.get("neighbor_eui64") or "").lower() + source = str(link.get("source") or "").strip() or "unknown" + if not reporter or not neighbor: + continue + obs_key = f"dead_link_reference|{reporter}|{neighbor}|{source}" + obs = store.bump_issue_observation( + obs_key=obs_key, + kind="dead_link_reference", + subject_eui64=reporter, + payload={ + "reporter_eui64": reporter, + "neighbor_eui64": neighbor, + "source": source, + "partition_id": link.get("partition_id"), + }, + now=now_iso, ) - - # ---- re_attach_storm ---- - # Fires when the same neighbor has re-attached at least N times in - # the window AND been witnessed by at least M distinct reporters. - # The cross-reporter requirement is what makes this a partition- - # wide identity signal rather than a single-link flap. - for neighbor, bucket in re_attach_by_neighbor.items(): - reporters = bucket["reporters"] - if ( - bucket["count"] >= RE_ATTACH_STORM_MIN_EVENTS - and len(reporters) >= RE_ATTACH_STORM_MIN_REPORTERS - ): - seen_keys.add(("re_attach_storm", neighbor)) - _emit( - "re_attach_storm", - "warn", - neighbor, - { - "count": bucket["count"], - "distinct_reporters": sorted(reporters), - "window_minutes": RE_ATTACH_STORM_WINDOW_MIN, - "threshold_events": RE_ATTACH_STORM_MIN_EVENTS, - "threshold_reporters": RE_ATTACH_STORM_MIN_REPORTERS, - }, - trigger_since=re_attach_window, - ) - - # ---- mesh_disagreement ---- - for eui, evidence in mesh_disagreements.items(): - seen_keys.add(("mesh_disagreement", eui)) - # Compare the two observation timestamps to pick the earlier as - # the suppression-window start. - self_ts = evidence.get("self_observed_at") - otbr_ts = evidence.get("otbr_observed_at") - candidates = [t for t in (self_ts, otbr_ts) if t] - trigger = min(candidates) if candidates else None - _emit( - "mesh_disagreement", - "warn", - eui, + if int(obs.get("seen_count") or 0) < DEAD_LINK_MIN_TICKS: + continue + mature_by_reporter.setdefault(reporter, []).append( { - **evidence, - "threshold_pct": MESH_DISAGREEMENT_PCT_THRESHOLD, - "max_age_minutes": MESH_DISAGREEMENT_MAX_AGE_MIN, + "neighbor_eui64": neighbor, + "source": source, + "partition_id": link.get("partition_id"), + "seen_count": int(obs.get("seen_count") or 0), + "first_seen_at": obs.get("first_seen_at"), + "last_seen_at": obs.get("last_seen_at"), + } + ) + + out: list[dict[str, Any]] = [] + for reporter, refs in mature_by_reporter.items(): + refs.sort(key=lambda r: (-int(r.get("seen_count") or 0), str(r.get("neighbor_eui64") or ""))) + evidence = { + "observation": { + "mature_reference_count": len(refs), + "min_ticks": DEAD_LINK_MIN_TICKS, }, - trigger_since=trigger, + "references": refs[:25], + "involved_eui64s": sorted({reporter} | {r["neighbor_eui64"] for r in refs if r.get("neighbor_eui64")}), + "cleared_when": "no unknown-neighbor link references persist for >= min_ticks", + } + out.append( + { + "kind": "dead_link_reference", + "eui64": reporter, + "base_severity": "warn", + "evidence": evidence, + } ) + return out - # ---- wrong_network (v0.9.46) ---- - # If multiple non-phantom nodes report differing extended_pan_ids, - # the minority is on stale Thread credentials — typically a device - # re-commissioned while a stale dataset was still cached on HA. - # This is a credentials problem, not an RF/partition-fragmentation - # problem, so it deserves its own issue kind. - try: - all_nodes = s.list_nodes() - except Exception: # noqa: BLE001 - all_nodes = [] - epid_to_nodes: dict[str, list[dict]] = {} - for n in all_nodes: - if n.get("status") == "phantom": + +def _compute_route_to_otbr_unreachable( + *, + store: SQLiteStore, +) -> list[dict[str, Any]]: + # Only routers/leaders participate in route-to-OTBR next-hop chains. + nodes = [ + n for n in store.list_nodes() + if n.get("eui64") + and n.get("routing_role") in {"router", "leader"} + and n.get("status") != "phantom" + ] + out: list[dict[str, Any]] = [] + for n in nodes: + eui = str(n.get("eui64") or "").lower() + if not eui: continue - epid = n.get("extended_pan_id") - if not epid: + walk = routing_mod.walk_route_to_otbr(eui, store=store) + issues = walk.get("issues") if isinstance(walk.get("issues"), list) else [] + codes = {str(i.get("code") or "") for i in issues if isinstance(i, dict)} + interesting = [ + i for i in issues + if isinstance(i, dict) + and str(i.get("code") or "") in {"loop_detected", "unknown_next_hop", "no_route_to_otbr", "max_hops_exceeded"} + ] + if not interesting: continue - epid_to_nodes.setdefault(epid, []).append(n) - if len(epid_to_nodes) >= 2: - # Modal = the extended_pan_id with the most members. - modal_epid, modal_members = max( - epid_to_nodes.items(), key=lambda kv: len(kv[1]) - ) - modal_name = next( - (n.get("network_name") for n in modal_members if n.get("network_name")), - None, + base = "crit" if ("loop_detected" in codes or "unknown_next_hop" in codes) else "warn" + evidence = { + "observation": { + "issue_codes": sorted({str(i.get("code") or "") for i in interesting}), + "issue_count": len(interesting), + }, + "route_walk": walk, + "involved_eui64s": sorted({eui, str(walk.get("otbr_eui64") or "").lower()} - {""}), + "cleared_when": "walk_route_to_otbr reports no loop/unknown-next-hop/no-route issues", + } + out.append( + { + "kind": "route_to_otbr_unreachable", + "eui64": eui, + "base_severity": base, + "evidence": evidence, + } ) - for epid, members in epid_to_nodes.items(): - if epid == modal_epid: - continue - for n in members: - eui = n.get("eui64") - if not eui: - continue - seen_keys.add(("wrong_network", eui)) - _emit( - "wrong_network", - "warn", - eui, - { - "node_extended_pan_id": epid, - "node_network_name": n.get("network_name"), - "modal_extended_pan_id": modal_epid, - "modal_network_name": modal_name, - "modal_member_count": len(modal_members), - "minority_member_count": len(members), - }, - ) - - # ---- auto-close issues whose trigger no longer holds ---- + return out + + +def run_reasoner( + *, + now: datetime | None = None, + store: SQLiteStore | None = None, +) -> dict[str, Any]: + """Run every rule once and reconcile open issues.""" + s = store or get_store() + now_dt = now or datetime.now(tz=UTC) + now_iso = _iso(now_dt) + + if ISSUES_PAUSED: + return { + "status": "paused", + "opened": [], + "closed": [], + "still_open": [], + "computed_at": now_iso, + } + managed_kinds = { - "parent_churn", - "attach_failures", - "offline_node", - "re_attach_storm", - "mesh_disagreement", - "wrong_network", + "real_partition_split", + "dead_link_reference", + "route_to_otbr_unreachable", } - for (kind, eui), issue in active_by_key.items(): + + # Snapshot existing open issues for managed kinds so severity demotion can + # read ``first_seen_at`` and we can tell whether an id is newly opened. + existing: dict[tuple[str, str | None], dict[str, Any]] = {} + for issue in s.list_active_issues(): + kind = str(issue.get("kind") or "") if kind not in managed_kinds: continue - if (kind, eui) in seen_keys: + existing[_issue_key(kind, issue.get("eui64"))] = issue + + observations: list[dict[str, Any]] = [] + split = _compute_real_partition_split(store=s, now=now_dt) + if split is not None: + observations.append(split) + observations.extend(_compute_dead_link_reference(store=s, now_iso=now_iso)) + observations.extend(_compute_route_to_otbr_unreachable(store=s)) + + opened: list[int] = [] + still_open: list[int] = [] + seen_keys: set[tuple[str, str | None]] = set() + + for obs in observations: + kind = str(obs.get("kind") or "") + eui64 = obs.get("eui64") + base = str(obs.get("base_severity") or "warn") + evidence = obs.get("evidence") if isinstance(obs.get("evidence"), dict) else {} + key = _issue_key(kind, eui64) + prior = existing.get(key) + severity = _demote_severity( + base=base, + first_seen_at=(prior.get("first_seen_at") if isinstance(prior, dict) else None), + now=now_dt, + ) + issue_id = s.open_issue(kind=kind, severity=severity, eui64=eui64, evidence=evidence) + seen_keys.add(key) + if prior is None: + opened.append(issue_id) + else: + still_open.append(issue_id) + + closed: list[int] = [] + for key, issue in existing.items(): + if key in seen_keys: + continue + try: + iid = int(issue.get("id")) + except Exception: # noqa: BLE001 continue - if s.close_issue(int(issue["id"])): - closed.append(int(issue["id"])) - - # ---- v0.9.45: auto-close stale ``partition_split`` issues ---- - # ``partition_split`` is *opened* by the matter_discovery stage (it - # has the full per-router partition evidence). The reasoner runs - # every tick regardless, so it's the right owner of the close-on- - # resolve path: if the current live topology shows only one - # partition (or none), any still-open partition_split issue is - # stale and should close. This makes the reasoner the single - # source of truth for issue *lifecycle* even when discovery - # closing the issue itself silently failed (observed live as a - # partition_split that resolved in topology but stayed open in - # the issues table). + if s.close_issue(iid): + closed.append(iid) + + # Best-effort cleanup of dead_link observation rows. try: - from . import topology as topology_mod # noqa: PLC0415 - - topo = topology_mod.build_topology(store=s) - live_partitions = topo.get("partitions") or [] - if len(live_partitions) <= 1: - for (kind, eui), issue in active_by_key.items(): - if kind != "partition_split": - continue - if s.close_issue(int(issue["id"])): - closed.append(int(issue["id"])) + s.sweep_issue_observations( + kind="dead_link_reference", + last_seen_before=_iso(now_dt - timedelta(minutes=DEAD_LINK_OBS_EXPIRY_MIN)), + ) except Exception: # noqa: BLE001 - # Topology can fail in unit tests that stub the store; the - # close path is best-effort. pass return { - "ran_at": _iso(now_dt), + "status": "ok", "opened": opened, - "still_open": skipped, "closed": closed, + "still_open": still_open, + "computed_at": now_iso, "rules": { - "parent_churn": { - "window_minutes": PARENT_CHURN_WINDOW_MIN, - "threshold": PARENT_CHURN_THRESHOLD, - }, - "attach_failures": { - "window_minutes": ATTACH_FAIL_WINDOW_MIN, - "threshold": ATTACH_FAIL_THRESHOLD, - }, - "offline_node": {"threshold_minutes": OFFLINE_THRESHOLD_MIN}, - "re_attach_storm": { - "window_minutes": RE_ATTACH_STORM_WINDOW_MIN, - "threshold_events": RE_ATTACH_STORM_MIN_EVENTS, - "threshold_reporters": RE_ATTACH_STORM_MIN_REPORTERS, - }, - "mesh_disagreement": { - "threshold_pct": MESH_DISAGREEMENT_PCT_THRESHOLD, - "max_age_minutes": MESH_DISAGREEMENT_MAX_AGE_MIN, - }, - "wrong_network": { - "trigger": "node extended_pan_id differs from modal mesh value", - }, + "real_partition_split": {"window_minutes": PARTITION_CHANGE_WINDOW_MIN}, + "dead_link_reference": {"min_ticks": DEAD_LINK_MIN_TICKS}, + "route_to_otbr_unreachable": {}, }, } + diff --git a/addons/thread-observability/app/src/thread_observability/services/direct_chat.py b/addons/thread-observability/app/src/thread_observability/services/direct_chat.py index 658c100..d9498dd 100644 --- a/addons/thread-observability/app/src/thread_observability/services/direct_chat.py +++ b/addons/thread-observability/app/src/thread_observability/services/direct_chat.py @@ -768,17 +768,17 @@ def _compact_health_snapshot(result: dict[str, Any]) -> dict[str, Any]: "computed_at": data.get("computed_at") if isinstance(data, dict) else None, "status": data.get("status") if isinstance(data, dict) else None, "data_age_seconds": data.get("data_age_seconds") if isinstance(data, dict) else None, - "summary": { - "healthy_nodes": summary.get("healthy_nodes"), - "online_nodes": summary.get("online_nodes"), - "sleeping_nodes": summary.get("sleeping_nodes"), - "stale_nodes": summary.get("stale_nodes"), - "offline_nodes": summary.get("offline_nodes"), - "total_nodes": summary.get("total_nodes"), - "duplicate_physical_device_groups": summary.get("duplicate_physical_device_groups"), - "duplicate_physical_device_rows": summary.get("duplicate_physical_device_rows"), - "distinct_thread_networks": summary.get("distinct_thread_networks"), - }, + # Flatten the load-bearing fields so prompt compaction doesn't + # collapse them behind a max-depth boundary. + "healthy_nodes": summary.get("healthy_nodes"), + "online_nodes": summary.get("online_nodes"), + "sleeping_nodes": summary.get("sleeping_nodes"), + "stale_nodes": summary.get("stale_nodes"), + "offline_nodes": summary.get("offline_nodes"), + "total_nodes": summary.get("total_nodes"), + "duplicate_physical_device_groups": summary.get("duplicate_physical_device_groups"), + "duplicate_physical_device_rows": summary.get("duplicate_physical_device_rows"), + "distinct_thread_networks": summary.get("distinct_thread_networks"), "active_issue_count": active_issues.get("count"), "active_issue_severity_counts": active_issues.get("by_severity"), "as_of": meta.get("as_of"), @@ -1776,7 +1776,14 @@ async def direct_chat_turn( "role": "tool", "tool_call_id": tool_call["id"], "content": _serialize_for_prompt( - _tool_result_for_prompt(tool_call["name"], tool_call["arguments"], result), + ( + { + "data": _tool_result_for_prompt(tool_call["name"], tool_call["arguments"], result), + **({"meta": result.get("meta")} if isinstance(result, dict) and isinstance(result.get("meta"), dict) else {}), + } + if isinstance(result, dict) and isinstance(result.get("data"), dict) + else _tool_result_for_prompt(tool_call["name"], tool_call["arguments"], result) + ), max_chars=_MAX_TOOL_RESULT_MESSAGE_CHARS, ), } diff --git a/addons/thread-observability/app/src/thread_observability/storage/sqlite_store.py b/addons/thread-observability/app/src/thread_observability/storage/sqlite_store.py index fa2e489..a05a014 100644 --- a/addons/thread-observability/app/src/thread_observability/storage/sqlite_store.py +++ b/addons/thread-observability/app/src/thread_observability/storage/sqlite_store.py @@ -678,6 +678,36 @@ CREATE INDEX IF NOT EXISTS idx_link_signal_samples_link_ts ON link_signal_samples(reporter_eui64, neighbor_eui64, source, observed_at DESC); """, + # v28 (issues redesign): track first/last seen timestamps per issue so + # callers can reason about freshness and rules can compute severity as a + # function of actionability × recency. + """ + ALTER TABLE issues ADD COLUMN first_seen_at TEXT; + ALTER TABLE issues ADD COLUMN last_seen_at TEXT; + CREATE INDEX IF NOT EXISTS idx_issues_last_seen + ON issues(closed_at, last_seen_at); + + -- Backfill for existing rows created before the redesign. + UPDATE issues SET first_seen_at = opened_at WHERE first_seen_at IS NULL; + UPDATE issues + SET last_seen_at = COALESCE(closed_at, opened_at) + WHERE last_seen_at IS NULL; + """, + # v29 (issues redesign): observation accumulator for rules that require + # persistence across multiple ingestion ticks before surfacing an issue. + """ + CREATE TABLE IF NOT EXISTS issue_observations ( + obs_key TEXT PRIMARY KEY, + kind TEXT NOT NULL, + subject_eui64 TEXT, + first_seen_at TEXT NOT NULL, + last_seen_at TEXT NOT NULL, + seen_count INTEGER NOT NULL DEFAULT 1, + payload_json TEXT + ); + CREATE INDEX IF NOT EXISTS idx_issue_observations_kind + ON issue_observations(kind, last_seen_at DESC); + """, ] @@ -685,6 +715,20 @@ def _utc_now() -> str: return utc_now_iso() +def _normalize_issue_severity(severity: str) -> str: + """Normalize severity strings to the small stable set the UI expects.""" + s = str(severity or "").strip().lower() + if s in {"warning", "warn"}: + return "warn" + if s in {"critical", "crit", "bad", "error"}: + return "crit" + if s in {"info", "informational"}: + return "info" + # Default to warn so new severities don't silently drop out of + # health/dashboard severity buckets. + return "warn" + + class SQLiteStore: """Thin wrapper around the on-disk SQLite database.""" @@ -2658,6 +2702,7 @@ def open_issue( evidence is merged via REPLACE (last-write-wins). """ now = _utc_now() + severity = _normalize_issue_severity(severity) evidence_json = json.dumps(evidence) if evidence is not None else None with self._tx() as conn: if dedupe: @@ -2671,23 +2716,30 @@ def open_issue( existing_id = int(row[0]) if evidence_json is not None: conn.execute( - "UPDATE issues SET evidence_json = ?, severity = ?" + "UPDATE issues SET evidence_json = ?, severity = ?, last_seen_at = ?" " WHERE id = ?", - (evidence_json, severity, existing_id), + (evidence_json, severity, now, existing_id), + ) + else: + conn.execute( + "UPDATE issues SET severity = ?, last_seen_at = ? WHERE id = ?", + (severity, now, existing_id), ) return existing_id cur = conn.execute( - "INSERT INTO issues(opened_at, severity, kind, eui64, evidence_json)" - " VALUES (?, ?, ?, ?, ?)", - (now, severity, kind, eui64, evidence_json), + "INSERT INTO issues(opened_at, severity, kind, eui64, evidence_json, first_seen_at, last_seen_at)" + " VALUES (?, ?, ?, ?, ?, ?, ?)", + (now, severity, kind, eui64, evidence_json, now, now), ) return int(cur.lastrowid or 0) def close_issue(self, issue_id: int) -> bool: + now = _utc_now() with self._tx() as conn: cur = conn.execute( - "UPDATE issues SET closed_at = ? WHERE id = ? AND closed_at IS NULL", - (_utc_now(), issue_id), + "UPDATE issues SET closed_at = ?, last_seen_at = ?" + " WHERE id = ? AND closed_at IS NULL", + (now, now, issue_id), ) return cur.rowcount > 0 @@ -2847,7 +2899,7 @@ def list_active_issues(self) -> list[dict[str, Any]]: with self._lock: rows = self._conn.execute( "SELECT * FROM issues WHERE closed_at IS NULL" - " ORDER BY opened_at DESC, id DESC" + " ORDER BY COALESCE(last_seen_at, opened_at) DESC, id DESC" ).fetchall() out: list[dict[str, Any]] = [] for r in rows: @@ -2858,9 +2910,72 @@ def list_active_issues(self) -> list[dict[str, Any]]: d["evidence"] = json.loads(evj) except Exception: # noqa: BLE001 d["evidence"] = {"_raw": evj} + d["severity"] = _normalize_issue_severity(str(d.get("severity") or "")) out.append(d) return out + # -- issue observation accumulator --------------------------------- + + def bump_issue_observation( + self, + *, + obs_key: str, + kind: str, + subject_eui64: str | None, + payload: dict[str, Any] | None = None, + now: str | None = None, + ) -> dict[str, Any]: + """Upsert an observation accumulator row and return its state.""" + now_iso = now or _utc_now() + payload_json = json.dumps(payload) if payload is not None else None + with self._tx() as conn: + row = conn.execute( + "SELECT first_seen_at, last_seen_at, seen_count FROM issue_observations" + " WHERE obs_key = ?", + (obs_key,), + ).fetchone() + if row: + first_seen_at = str(row[0]) + new_count = int(row[2] or 0) + 1 + conn.execute( + "UPDATE issue_observations" + " SET last_seen_at = ?, seen_count = ?, payload_json = ?" + " WHERE obs_key = ?", + (now_iso, new_count, payload_json, obs_key), + ) + return { + "obs_key": obs_key, + "kind": kind, + "subject_eui64": subject_eui64, + "first_seen_at": first_seen_at, + "last_seen_at": now_iso, + "seen_count": new_count, + "payload": payload or {}, + } + conn.execute( + "INSERT INTO issue_observations(obs_key, kind, subject_eui64, first_seen_at, last_seen_at, seen_count, payload_json)" + " VALUES (?, ?, ?, ?, ?, 1, ?)", + (obs_key, kind, subject_eui64, now_iso, now_iso, payload_json), + ) + return { + "obs_key": obs_key, + "kind": kind, + "subject_eui64": subject_eui64, + "first_seen_at": now_iso, + "last_seen_at": now_iso, + "seen_count": 1, + "payload": payload or {}, + } + + def sweep_issue_observations(self, *, kind: str, last_seen_before: str) -> int: + """Delete accumulator rows that haven't been observed recently.""" + with self._tx() as conn: + cur = conn.execute( + "DELETE FROM issue_observations WHERE kind = ? AND last_seen_at < ?", + (kind, last_seen_before), + ) + return int(cur.rowcount or 0) + def list_issues_in_window( self, *, @@ -2897,6 +3012,7 @@ def list_issues_in_window( d["evidence"] = json.loads(evj) except Exception: # noqa: BLE001 d["evidence"] = {"_raw": evj} + d["severity"] = _normalize_issue_severity(str(d.get("severity") or "")) out.append(d) return out diff --git a/addons/thread-observability/app/tests/test_reasoner.py b/addons/thread-observability/app/tests/test_reasoner.py index 17f57b7..9aab8d4 100644 --- a/addons/thread-observability/app/tests/test_reasoner.py +++ b/addons/thread-observability/app/tests/test_reasoner.py @@ -1,384 +1,142 @@ -"""Tests for the deterministic anomaly reasoner.""" +"""Tests for the redesigned deterministic issues reasoner.""" from __future__ import annotations from datetime import UTC, datetime, timedelta -import pytest - -from thread_observability.pipeline import reasoner as reasoner_mod from thread_observability.pipeline.reasoner import ( - ATTACH_FAIL_THRESHOLD, - MESH_DISAGREEMENT_PCT_THRESHOLD, - OFFLINE_THRESHOLD_MIN, - PARENT_CHURN_THRESHOLD, - RE_ATTACH_STORM_MIN_EVENTS, - RE_ATTACH_STORM_MIN_REPORTERS, + DEAD_LINK_MIN_TICKS, + PARTITION_CHANGE_WINDOW_MIN, run_reasoner, ) from thread_observability.storage.sqlite_store import SQLiteStore -@pytest.fixture(autouse=True) -def _unpause_reasoner(monkeypatch: pytest.MonkeyPatch) -> None: - """Issue detection is paused globally (see #4/#5) but these tests - cover the rule implementations themselves, so we override the - pause flag for the duration of this file. The rules are preserved - behind the pause so the redesign can re-enable them incrementally. - """ - monkeypatch.setattr(reasoner_mod, "ISSUES_PAUSED", False) - - def _now() -> datetime: return datetime.now(tz=UTC) -def test_reasoner_no_events(store: SQLiteStore) -> None: +def test_reasoner_no_observations(store: SQLiteStore) -> None: out = run_reasoner(store=store) assert out["opened"] == [] assert out["closed"] == [] assert store.list_active_issues() == [] -def test_parent_churn_opens_issue(store: SQLiteStore) -> None: - eui = "11" * 8 - for i in range(PARENT_CHURN_THRESHOLD): - store.insert_event( - eui64=eui, - type="parent_change", - ts=(_now() - timedelta(minutes=i)).isoformat(), - parent_eui64="aa" * 8, - ) - out = run_reasoner(store=store) - assert len(out["opened"]) == 1 - issues = store.list_active_issues() - assert len(issues) == 1 - assert issues[0]["kind"] == "parent_churn" - assert issues[0]["eui64"] == eui - assert issues[0]["evidence"]["count"] == PARENT_CHURN_THRESHOLD - - -def test_parent_churn_dedup(store: SQLiteStore) -> None: - eui = "11" * 8 - for i in range(PARENT_CHURN_THRESHOLD + 1): - store.insert_event(eui64=eui, type="parent_change", - ts=(_now() - timedelta(minutes=i)).isoformat(), - parent_eui64="aa" * 8) - run_reasoner(store=store) - second = run_reasoner(store=store) - assert second["opened"] == [] - assert len(second["still_open"]) == 1 - assert len(store.list_active_issues()) == 1 - - -def test_attach_failures_open_and_close(store: SQLiteStore) -> None: - eui = "22" * 8 - for i in range(ATTACH_FAIL_THRESHOLD): - store.insert_event(eui64=eui, type="attach_failed", - ts=(_now() - timedelta(minutes=i)).isoformat()) - first = run_reasoner(store=store) - assert len(first["opened"]) == 1 - issue_id = first["opened"][0] - - # Advance time so the failure events fall outside the window; - # rerun and confirm the attach_failures issue auto-closes. (The node - # will also flip to offline in that future, which is correct behaviour.) - far_future = _now() + timedelta(hours=2) - closed = run_reasoner(store=store, now=far_future) - assert issue_id in closed["closed"] - still_open = store.list_active_issues() - assert all(i["kind"] != "attach_failures" for i in still_open) - - -def test_offline_node_opens_crit_issue(store: SQLiteStore) -> None: - eui = "33" * 8 - old = (_now() - timedelta(minutes=OFFLINE_THRESHOLD_MIN + 5)).isoformat() - # Registry-first (v9): event ingestion no longer auto-creates node - # rows. Seed the node first so insert_event can UPDATE its last_seen. - store.upsert_node_metadata(eui64=eui) - store.insert_event(eui64=eui, type="attach", ts=old) - out = run_reasoner(store=store) - assert len(out["opened"]) == 1 - issues = store.list_active_issues() - assert issues[0]["kind"] == "offline_node" - assert issues[0]["severity"] == "crit" - - -# --------------------------------------------------------------------------- -# v0.9.43 — Tier 2 rules -# --------------------------------------------------------------------------- - - -def test_re_attach_storm_requires_multiple_reporters(store: SQLiteStore) -> None: - """A single reporter shouldn't trip the storm rule no matter the count. - - The whole point of the cross-reporter requirement is to filter out a - single flaky link and reserve the alarm for a partition-wide - identity problem (the Foyer-Light case). - """ - neighbor = "aa" * 8 - store.upsert_node_metadata(eui64=neighbor) - reporter_a = "bb" * 8 - store.upsert_node_metadata(eui64=reporter_a) - for _ in range(5): - store.insert_event( - eui64=neighbor, - type="re_attached_node", - payload={ - "neighbor_eui64": neighbor, - "reporter_eui64": reporter_a, - "counter": "link_frame_counter", - "old_value": 100, "new_value": 1, - }, - ) - out = run_reasoner(store=store) - assert not any( - i["kind"] == "re_attach_storm" for i in store.list_active_issues() - ), "single-reporter storm should NOT open an issue" - assert out["opened"] == [] - - -def test_re_attach_storm_opens_with_distinct_reporters(store: SQLiteStore) -> None: - neighbor = "aa" * 8 - store.upsert_node_metadata(eui64=neighbor) - reporters = [f"{i:02x}" * 8 for i in range(0xb0, 0xb0 + RE_ATTACH_STORM_MIN_REPORTERS)] - for r in reporters: - store.upsert_node_metadata(eui64=r) - # One event per reporter — meets MIN_EVENTS (2) and MIN_REPORTERS (2). - assert RE_ATTACH_STORM_MIN_EVENTS <= len(reporters) - for r in reporters: - store.insert_event( - eui64=neighbor, - type="re_attached_node", - payload={ - "neighbor_eui64": neighbor, - "reporter_eui64": r, - "counter": "link_frame_counter", - "old_value": 100, "new_value": 1, - }, - ) - out = run_reasoner(store=store) - storms = [i for i in store.list_active_issues() if i["kind"] == "re_attach_storm"] - assert len(storms) == 1 - assert storms[0]["eui64"] == neighbor - assert len(out["opened"]) == 1 - assert sorted(storms[0]["evidence"]["distinct_reporters"]) == sorted(reporters) - - -def test_mesh_disagreement_opens_when_delta_over_threshold(store: SQLiteStore) -> None: - eui = "cc" * 8 - # Seed a node with a self-reported MAC TX counter and a fresh diag ts. - store.upsert_node_metadata(eui64=eui) - # Set tx_total_count + diag_updated_at directly via the diagnostics - # setter used by the discovery pipeline. - store.set_node_diagnostics(eui, tx_total_count=1000) - # OTBR-witnessed value ≫ threshold below. - store.insert_otbr_diagnostic( - target_eui64=eui, - target_rloc16=0x4400, - mac_tx_total=500, # 50% delta vs. 1000 - ) - assert MESH_DISAGREEMENT_PCT_THRESHOLD <= 50.0 - run_reasoner(store=store) - disagreements = [ - i for i in store.list_active_issues() if i["kind"] == "mesh_disagreement" - ] - assert len(disagreements) == 1 - ev = disagreements[0]["evidence"] - assert ev["self_tx_total"] == 1000 - assert ev["otbr_tx_total"] == 500 - assert ev["delta_pct"] >= MESH_DISAGREEMENT_PCT_THRESHOLD - - -def test_mesh_disagreement_skips_when_under_threshold(store: SQLiteStore) -> None: - eui = "dd" * 8 - store.upsert_node_metadata(eui64=eui) - store.set_node_diagnostics(eui, tx_total_count=1000) - # 5% delta — well under default 25% threshold. - store.insert_otbr_diagnostic(target_eui64=eui, mac_tx_total=950) - run_reasoner(store=store) - assert not any( - i["kind"] == "mesh_disagreement" for i in store.list_active_issues() - ) - - -# --------------------------------------------------------------------------- -# v0.9.44 — Tier 3 observer-suppression annotation -# --------------------------------------------------------------------------- - +def test_dead_link_reference_requires_persistence(store: SQLiteStore) -> None: + reporter = "aa" * 8 + unknown = "bb" * 8 + store.upsert_node_metadata(eui64=reporter) -def test_offline_issue_downgrades_when_observer_was_down(store: SQLiteStore) -> None: - """A ``crit`` ``offline_node`` issue downgrades to ``warn`` and - carries ``suppressed_by`` evidence when an observer-side disruption - overlaps the (last_seen → now) trigger window. - """ - eui = "ee" * 8 - old = (_now() - timedelta(minutes=OFFLINE_THRESHOLD_MIN + 5)).isoformat() - store.upsert_node_metadata(eui64=eui) - store.insert_event(eui64=eui, type="attach", ts=old) - - # Observer outage that spans the gap between last_seen and now. - obs_started = (_now() - timedelta(minutes=OFFLINE_THRESHOLD_MIN)).isoformat() - obs_ended = (_now() - timedelta(minutes=OFFLINE_THRESHOLD_MIN - 2)).isoformat() - store.insert_observer_event( - source="addon:core_matter_server", - kind="outage", - started_at=obs_started, - ended_at=obs_ended, + # Unknown neighbor reference appears in the current snapshot. + store.replace_links_for_reporter( + reporter, + "neighbor_table", + [{"neighbor_eui64": unknown, "link_established": True}], + partition_id=1, + observed_at=_now().isoformat(), ) - run_reasoner(store=store) - issues = [i for i in store.list_active_issues() if i["kind"] == "offline_node"] - assert len(issues) == 1 - issue = issues[0] - assert issue["severity"] == "warn", "crit should have been downgraded" - assert "suppressed_by" in issue["evidence"] - suppressors = issue["evidence"]["suppressed_by"] - assert len(suppressors) == 1 - assert suppressors[0]["source"] == "addon:core_matter_server" - assert suppressors[0]["kind"] == "outage" - - -def test_offline_issue_stays_crit_when_no_observer_disruption(store: SQLiteStore) -> None: - """Same scenario but without an overlapping observer event — the - issue remains at full ``crit`` severity with no suppression. - """ - eui = "ff" * 8 - old = (_now() - timedelta(minutes=OFFLINE_THRESHOLD_MIN + 5)).isoformat() - store.upsert_node_metadata(eui64=eui) - store.insert_event(eui64=eui, type="attach", ts=old) + for _ in range(DEAD_LINK_MIN_TICKS - 1): + run_reasoner(store=store) + assert not any(i["kind"] == "dead_link_reference" for i in store.list_active_issues()) run_reasoner(store=store) - issues = [i for i in store.list_active_issues() if i["kind"] == "offline_node"] + issues = [i for i in store.list_active_issues() if i["kind"] == "dead_link_reference"] assert len(issues) == 1 - assert issues[0]["severity"] == "crit" - assert "suppressed_by" not in issues[0]["evidence"] - - -def test_observer_event_strictly_before_trigger_does_not_suppress( - store: SQLiteStore, -) -> None: - """An old observer event from yesterday must not suppress today's - issues. Suppression only applies within the trigger window plus grace. - """ - eui = "ab" * 8 - old = (_now() - timedelta(minutes=OFFLINE_THRESHOLD_MIN + 5)).isoformat() - store.upsert_node_metadata(eui64=eui) - store.insert_event(eui64=eui, type="attach", ts=old) - - # An observer outage from 6 hours ago — well before last_seen. - long_ago_start = (_now() - timedelta(hours=6)).isoformat() - long_ago_end = (_now() - timedelta(hours=6, minutes=-1)).isoformat() - store.insert_observer_event( - source="addon:self", kind="restart", - started_at=long_ago_start, ended_at=long_ago_end, + assert issues[0]["eui64"] == reporter + refs = issues[0]["evidence"]["references"] + assert any(r.get("neighbor_eui64") == unknown for r in refs) + + # Clearing predicate: remove the unknown reference, then re-run. + store.replace_links_for_reporter( + reporter, + "neighbor_table", + [], + partition_id=1, + observed_at=_now().isoformat(), + ) + out = run_reasoner(store=store) + assert out["closed"] + assert not any(i["kind"] == "dead_link_reference" for i in store.list_active_issues()) + + +def test_route_to_otbr_unreachable_opens_on_unknown_next_hop(store: SQLiteStore) -> None: + otbr = "11" * 8 + router = "22" * 8 + store.upsert_node_metadata(eui64=otbr, role="border_router") + store.upsert_node_metadata(eui64=router) + store.set_node_diagnostics(otbr, partition_id=1, routing_role="leader") + store.set_node_diagnostics(router, partition_id=1, routing_role="router") + store.set_node_router_id(otbr, 1) + store.set_node_router_id(router, 2) + + # RouteTable entry points to a router_id that doesn't exist in the + # partition's router index → unknown_next_hop. + store.replace_links_for_reporter( + router, + "route_table", + [ + { + "neighbor_eui64": otbr, + "path_cost": 2, + "next_hop_router_id": 10, + "link_established": True, + } + ], + partition_id=1, + observed_at=_now().isoformat(), ) run_reasoner(store=store) - issues = [i for i in store.list_active_issues() if i["kind"] == "offline_node"] + issues = [i for i in store.list_active_issues() if i["kind"] == "route_to_otbr_unreachable"] assert len(issues) == 1 + assert issues[0]["eui64"] == router assert issues[0]["severity"] == "crit" - assert "suppressed_by" not in issues[0]["evidence"] - -def test_reasoner_closes_stale_partition_split_when_topology_resolved( - store: SQLiteStore, -) -> None: - """v0.9.45: the reasoner owns ``partition_split`` close-on-resolve. - - When live topology shows <= 1 partition, any still-open - partition_split issue is stale and must be closed by the reasoner - even if the discovery stage never ran the close path. - """ - eui = "aa" * 8 - store.upsert_node_metadata(eui64=eui, friendly_name="X", role="router") - issue_id = store.open_issue( - kind="partition_split", - severity="warning", - eui64=None, - evidence={ - "partition_count": 2, - "partitions": [ - {"partition_id": 1, "members": [eui]}, - {"partition_id": 2, "members": ["bb" * 8]}, - ], - }, + # Clear by switching to a direct route (path_cost=1 + link_established). + store.replace_links_for_reporter( + router, + "route_table", + [{"neighbor_eui64": otbr, "path_cost": 1, "link_established": True}], + partition_id=1, + observed_at=_now().isoformat(), ) - assert any(i["id"] == issue_id for i in store.list_active_issues()) - - result = run_reasoner(store=store) - assert issue_id in result["closed"] - assert not any(i["id"] == issue_id for i in store.list_active_issues()) - - -def test_wrong_network_opens_issue_for_minority(store: SQLiteStore) -> None: - """v0.9.46: minority extended_pan_id triggers wrong_network.""" - majority_epid = "aaaaaaaaaaaaaaaa" - minority_epid = "bbbbbbbbbbbbbbbb" - for i, eui in enumerate(("11" * 8, "22" * 8, "33" * 8)): - store.upsert_node_metadata(eui64=eui, friendly_name=f"r{i}") - store.set_node_diagnostics( - eui, network_name="ha-thread-main", extended_pan_id=majority_epid, - ) - minority_eui = "44" * 8 - store.upsert_node_metadata(eui64=minority_eui, friendly_name="stray") - store.set_node_diagnostics( - minority_eui, network_name="ha-thread-old", extended_pan_id=minority_epid, - ) - - out = run_reasoner(store=store) - issues = [i for i in store.list_active_issues() if i["kind"] == "wrong_network"] - assert len(issues) == 1 - issue = issues[0] - assert issue["eui64"] == minority_eui - ev = issue["evidence"] - assert ev["node_extended_pan_id"] == minority_epid - assert ev["modal_extended_pan_id"] == majority_epid - assert ev["modal_member_count"] == 3 - assert ev["minority_member_count"] == 1 - assert issue["id"] in out["opened"] - - -def test_wrong_network_closes_when_resolved(store: SQLiteStore) -> None: - """Once the minority node adopts the modal EPID, the issue auto-closes.""" - eui = "55" * 8 - store.upsert_node_metadata(eui64=eui) - store.set_node_diagnostics(eui, extended_pan_id="bbbbbbbbbbbbbbbb") - for other in ("66" * 8, "77" * 8): - store.upsert_node_metadata(eui64=other) - store.set_node_diagnostics(other, extended_pan_id="aaaaaaaaaaaaaaaa") - run_reasoner(store=store) - active = [i for i in store.list_active_issues() if i["kind"] == "wrong_network"] - assert len(active) == 1 - issue_id = active[0]["id"] - - # Minority node re-attached on the right network. - store.set_node_diagnostics(eui, extended_pan_id="aaaaaaaaaaaaaaaa") out = run_reasoner(store=store) - assert issue_id in out["closed"] - + assert out["closed"] + assert not any(i["kind"] == "route_to_otbr_unreachable" for i in store.list_active_issues()) + + +def test_real_partition_split_requires_recent_partition_change(store: SQLiteStore) -> None: + # Two live partitions exist. + leader_a = "aa" * 8 + leader_b = "bb" * 8 + mover = "cc" * 8 + store.upsert_node_metadata(eui64=leader_a) + store.upsert_node_metadata(eui64=leader_b) + store.upsert_node_metadata(eui64=mover) + store.set_node_diagnostics(leader_a, partition_id=1, routing_role="leader") + store.set_node_diagnostics(leader_b, partition_id=2, routing_role="leader") + store.set_node_diagnostics(mover, partition_id=2, routing_role="router") + + # Evidence: a device transitioned between partitions within the window. + ts = (_now() - timedelta(minutes=1)).isoformat() + store.insert_event( + eui64=mover, + type="partition_change", + ts=ts, + payload={"from": 1, "to": 2}, + ) -def test_wrong_network_no_issue_when_all_match(store: SQLiteStore) -> None: - """Sanity: a healthy mesh on one EPID must not fire wrong_network.""" - for eui in ("11" * 8, "22" * 8, "33" * 8): - store.upsert_node_metadata(eui64=eui) - store.set_node_diagnostics(eui, extended_pan_id="aaaaaaaaaaaaaaaa") run_reasoner(store=store) - assert not [i for i in store.list_active_issues() if i["kind"] == "wrong_network"] + issues = [i for i in store.list_active_issues() if i["kind"] == "real_partition_split"] + assert len(issues) == 1 + assert issues[0]["eui64"] is None + assert any(ch.get("eui64") == mover for ch in issues[0]["evidence"]["recent_partition_changes"]) + # Advance beyond the window; no recent partition_change → auto-close. + future = _now() + timedelta(minutes=PARTITION_CHANGE_WINDOW_MIN + 5) + out = run_reasoner(store=store, now=future) + assert out["closed"] + assert not any(i["kind"] == "real_partition_split" for i in store.list_active_issues()) -def test_run_reasoner_paused_closes_residuals_and_returns_status( - store: SQLiteStore, monkeypatch: pytest.MonkeyPatch -) -> None: - """Issue detection paused (#4): residual open issues are closed - and the summary carries ``status: "paused"`` plus a note. The - rules are not evaluated.""" - # Seed an open issue, then enable pause and re-run. - issue_id = store.open_issue(kind="parent_churn", severity="warn", eui64="aa" * 8) - monkeypatch.setattr(reasoner_mod, "ISSUES_PAUSED", True) - out = run_reasoner(store=store) - assert out["status"] == "paused" - assert out["opened"] == [] - assert issue_id in out["closed"] - assert store.list_active_issues() == [] - assert "note" in out and out["note"] diff --git a/documentation/06-mcp-tools-reference.md b/documentation/06-mcp-tools-reference.md index 60bf1a8..fae93e0 100644 --- a/documentation/06-mcp-tools-reference.md +++ b/documentation/06-mcp-tools-reference.md @@ -1,8 +1,8 @@ # MCP Tools Reference This document is generated from `thread_observability.api.mcp_tools.TOOL_DEFS` and `RESOURCE_DEFS`. -Generated at: `2026-05-14T04:53:32+00:00` -Tool count: `41` +Generated at: `2026-05-16T06:50:27+00:00` +Tool count: `43` Resource count: `1` Shared background resource: [glossary.md](glossary.md) @@ -19,7 +19,7 @@ Shared background resource: [glossary.md](glossary.md) ### `get_mesh_state` -Use when: starting a triage session or answering 'what does the mesh look like right now?'. Returns the live Thread mesh: nodes + links + partition_id, computed deterministically from the SQLite event log and most-recent Matter discovery tick. Phantom nodes are excluded by default. Returns: {nodes:[{eui64, role, partition_id, parent_eui64, last_rssi, last_lqi, status, ...}], links:[...], partition_id, computed_at, node_count, link_count}. Caveats: derived from the latest persisted pipeline state. Check meta.cache_age_s on the response; if stale, call ingest_now to force a refresh. +Use when: starting a triage session or answering 'what does the mesh look like right now?'. Returns the live Thread mesh: nodes + links + partition_id, computed deterministically from the latest retained Thread events and most-recent Matter discovery tick. Phantom nodes are excluded by default. Returns: {nodes:[{eui64, role, partition_id, parent_eui64, last_rssi, last_lqi, status, ...}], links:[...], partition_id, computed_at, node_count, link_count}. Caveats: derived from the latest persisted pipeline state. Check meta.cache_age_s on the response; if stale, call ingest_now to force a refresh. | Argument | Type | Required | Default | Description | | --- | --- | --- | --- | --- | @@ -28,7 +28,7 @@ Use when: starting a triage session or answering 'what does the mesh look like r ### `list_active_issues` -Return all currently-open Thread network issues from the SQLite issues table. NOTE: Issue detection is currently paused pending a redesign of the rule set (see tracking issue #5). Until new rules ship, this tool returns an empty list with `status: "placeholder"`. Do NOT infer "all clear" from the empty list — instead, reason from the raw data (topology, partitions, links, nodes). +Return all currently-open Thread network issues computed by deterministic rules. Each issue includes the affected EUI64 (or null for mesh-wide issues), `first_seen_at`, `last_seen_at`, a severity that reflects actionability × freshness, and an evidence payload that includes the EUIs involved and the observation that triggered it. Current rule taxonomy: `real_partition_split`, `dead_link_reference`, `route_to_otbr_unreachable`. Arguments: none @@ -125,7 +125,7 @@ Arguments: none ### `get_storage_stats` -Return SQLite store stats (schema version, file size, row counts per table, oldest/newest event timestamps) plus the active time-series backend. +Return storage stats for retained network data (schema version, file size, row counts, oldest/newest event timestamps) plus the active time-series backend. Arguments: none @@ -139,7 +139,7 @@ Use when: reviewing dashboard chat usage and grounding behavior without inspecti ### `query_history` -Tier 4 unified timeline. Return a single newest-first stream that merges canonical events, issue open/close lifecycle, and observer (addon/OTBR/Matter Server) outage windows over a time range. Each row is normalized to {ts, source, kind, eui64?, severity?, details, ref_id} so an AI consultant can correlate Thread-side, issue-side and observer-side activity in one round-trip. Filter by eui64, kind list, or source list. +Tier 4 unified timeline. Return a single newest-first stream that merges canonical events, issue open/close lifecycle, and observer (addon/OTBR/Matter Server) outage windows over a time range. Each row is normalized to {ts, source, kind, eui64?, severity?, details, ref_id} so an AI consultant can correlate Thread-side, issue-side and observer-side activity in one round-trip. Filter by eui64, kind list, or source list. Use this for chronology questions like what happened when, not for per-link RSSI/LQI trends or structural topology diffs. Prefer get_signal_series for per-node signal over time, get_node_link_signal_history for adjacent-link quality changes, and diff_topology_history for before/after topology structure. | Argument | Type | Required | Default | Description | | --- | --- | --- | --- | --- | @@ -171,7 +171,7 @@ Tier 4. List topology snapshot summaries (id, captured_at, hash, partition_id, n ### `diff_topology_history` -Tier 4. Return a structured diff between two topology snapshots: added/removed nodes, per-node role/partition/parent transitions, and added/removed links. ``snapshot_id_a`` is the older / baseline, ``snapshot_id_b`` is the newer / candidate. +Tier 4. Return a structured diff between two topology snapshots: added/removed nodes, per-node role/partition/parent transitions, and added/removed links. ``snapshot_id_a`` is the older / baseline, ``snapshot_id_b`` is the newer / candidate. Use this for structural network-change questions, not to claim that signal quality improved or degraded. For RSSI/LQI evidence use get_signal_series or get_node_link_signal_history instead. | Argument | Type | Required | Default | Description | | --- | --- | --- | --- | --- | @@ -212,7 +212,7 @@ Arguments: none ### `get_timeseries_health` -Probe the time-series backend (Influx if configured, else SQLite fallback) and return status. +Probe the active time-series backend and return status. Arguments: none @@ -303,6 +303,30 @@ Use when: a node looks unhealthy and you want to know whether a peer on the same | `until` | `string` | no | `` | ISO-8601 upper bound for both series; default now. | | `resolution` | `string` | no | `raw` | Return raw samples or 5-minute rollups for both nodes. | +### `get_signal_series` + +Use when: you need before/after per-device signal evidence over time, such as whether a node's RSSI or LQI got better or worse across a troubleshooting window. Returns event-backed RSSI/LQI samples for one node over [since, until], plus summary metrics (first, last, delta, min, max, avg). Use this for one node's own observed signal samples, not for peer-by-peer adjacent-link comparison. Caveats: event-driven telemetry only; sparse series mean the backend did not observe signal-bearing events in that window. If the question is which peer or link improved, degraded, appeared, or disappeared over time, prefer get_node_link_signal_history. + +| Argument | Type | Required | Default | Description | +| --- | --- | --- | --- | --- | +| `eui64` | `string` | yes | `` | Target node EUI-64 whose signal series should be returned. | +| `since` | `string` | no | `` | ISO-8601 lower bound; default 24h ago. | +| `until` | `string` | no | `` | ISO-8601 upper bound; default now. | +| `resolution` | `string` | no | `raw` | Return raw event samples or 5-minute averages. | + +### `get_node_link_signal_history` + +Use when: you need retained historical link-by-link signal changes for one node across pipeline observations. Returns adjacent-link history over [since, until], including added/changed/heartbeat/removed samples and per-link RSSI/LQI summaries. Prefer this for network-change questions about which links or peers improved or degraded over time. Use this instead of get_signal_series when the question is about adjacent peers, link appearance/disappearance, or per-link quality change rather than one node's event-backed signal samples. + +| Argument | Type | Required | Default | Description | +| --- | --- | --- | --- | --- | +| `eui64` | `string` | yes | `` | Target node EUI-64 whose adjacent-link history should be returned. | +| `since` | `string` | no | `` | ISO-8601 lower bound; default 24h ago. | +| `until` | `string` | no | `` | ISO-8601 upper bound; default now. | +| `peer_eui64` | `string` | no | `` | Optional peer EUI-64 to limit history to one adjacent link. | +| `source` | `string` | no | `` | Optional source filter. | +| `limit` | `integer` | no | `5000` | Maximum historical samples to scan. | + ### `get_assessment_state` Use when: you need to know whether Background Diagnostics is currently scheduled, when the next assessment will run, and how much of today's call budget has been used. Returns the live scheduler snapshot (state, next_assessment_at, budget). Read-only.