diff --git a/docs/pvr_triage_overview.md b/docs/pvr_triage_overview.md new file mode 100644 index 0000000..6103e94 --- /dev/null +++ b/docs/pvr_triage_overview.md @@ -0,0 +1,142 @@ +# PVR Triage Taskflows — Overview + +> 30-minute sync reference. Last updated: 2026-03-03. + +--- + +## The Problem + +OSS maintainers get flooded with low-quality vulnerability reports via GitHub's Private Vulnerability Reporting (PVR). Most are vague, duplicated, or AI-generated. Reviewing each one manually is expensive. + +--- + +## The Solution: 4 Taskflows + +``` +┌─────────────────────────────────────────────────────────────┐ +│ INBOX │ +│ (GHSAs in triage state via GitHub PVR) │ +└───────────────────────┬─────────────────────────────────────┘ + │ + ▼ + ┌─────────────────────────┐ + │ pvr_triage_batch │ "What's in my inbox?" + │ │ + │ • List triage GHSAs │ + │ • Score each by │ + │ severity + quality │ + │ • Show Age (days) │ + │ • Rank: highest first │ + │ (oldest wins ties) │ + └────────────┬────────────┘ + │ ranked queue saved to REPORT_DIR + ▼ + ┌─────────────────────────┐ + │ pvr_triage │ "Is this real?" + │ (one advisory) │ + │ │ + │ Task 1: init │ + │ Task 2: fetch & parse │ + │ Task 3: quality gate ──┼──► fast-close? ──► skip to Task 7 + │ Task 4: verify code │ + │ Task 5: write report │ + │ Task 6: save report │ + │ Task 7: draft response │ + │ Task 8: save + record │ + └────────────┬────────────┘ + │ _triage.md + _response_triage.md saved + ▼ + Maintainer reviews + (edits draft if needed) + │ + ┌────────┴────────┐ + │ │ + ▼ ▼ + ┌──────────────────┐ ┌──────────────────────┐ + │ pvr_respond │ │ pvr_respond_batch │ + │ (one at a time) │ │ (all at once) │ + │ │ │ │ + │ confirm-gated: │ │ • list_pending │ + │ accept (→draft) │ │ • for each: │ + │ reject (→closed)│ │ - confirm-gated │ + │ │ │ state change │ + │ mark as applied │ │ - mark as applied │ + │ post draft │ │ • post drafts │ + │ manually via UI │ │ manually via UI │ + └──────────────────┘ └──────────────────────┘ +``` + +--- + +## The Quality Gate (Task 3) — Key Logic + +``` +Reporter has history? + │ + ├── HIGH TRUST ──────────────────► Always full verification + │ (≥60% confirmed, ≤20% low) + │ + ├── SKEPTICISM ──────────────────► Fast-close if 0 quality signals + │ (≤20% confirmed OR ≥50% low) (no prior report needed) + │ + └── NORMAL / NEW ────────────────► Fast-close only if: + 0 quality signals + AND prior similar report exists +``` + +**Quality signals:** file paths cited · PoC provided · line numbers cited + +**Fast-close effect:** skip code verification → use canned response template requesting specifics + +--- + +## Scoring (batch) + +``` +priority_score = severity_weight + quality_weight + +severity: critical=4 high=3 medium=2 low=1 +quality: +1 per signal (files, PoC, lines) → max +3 + +≥5 Triage Immediately +≥3 Triage Soon + 2 Triage +≤1 Likely Low Quality — Fast Close +``` + +--- + +## Output Files (all in REPORT_DIR) + +| File | Written by | What it is | +|---|---|---| +| `GHSA-xxxx_triage.md` | pvr_triage | Full analysis report | +| `GHSA-xxxx_response_triage.md` | pvr_triage | Draft reply to reporter | +| `GHSA-xxxx_response_sent.md` | pvr_respond / batch | State-transition applied marker (idempotent) | +| `batch_queue__.md` | pvr_triage_batch | Ranked inbox table | + +--- + +## Reporter Reputation (background) + +Every completed triage records **verdict + quality** against the reporter's GitHub login in a local SQLite DB. Score feeds back into the next triage's quality gate automatically. No manual configuration. + +--- + +## One-liner workflow + +```bash +./scripts/run_pvr_triage.sh batch owner/repo # see inbox +./scripts/run_pvr_triage.sh triage owner/repo GHSA-xxx # analyse one +./scripts/run_pvr_triage.sh respond owner/repo GHSA-xxx accept # accept one (triage→draft) +./scripts/run_pvr_triage.sh respond owner/repo GHSA-xxx reject # reject one (triage→closed) +./scripts/run_pvr_triage.sh respond_batch owner/repo reject # bulk state transition +# Then post each *_response_triage.md manually via the advisory URL +``` + +--- + +## Further reading + +- [`taskflows/pvr_triage/README.md`](../src/seclab_taskflows/taskflows/pvr_triage/README.md) — full usage docs for all four taskflows +- [`taskflows/pvr_triage/SCORING.md`](../src/seclab_taskflows/taskflows/pvr_triage/SCORING.md) — authoritative scoring reference and fast-close decision tables diff --git a/pyproject.toml b/pyproject.toml index 02eb813..81e866f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -102,6 +102,7 @@ ignore = [ "RUF015", # Prefer `next(iter())` over single element slice "S607", # Starting a process with a partial executable path "SIM101", # Use a ternary expression instead of if-else-block + "SIM105", # Use contextlib.suppress (false positive: try block contains assignment) "SIM114", # Combine `if` branches using logical `or` operator "SIM117", # Use a single `with` statement with multiple contexts "SIM118", # Use `key in dict` instead of `key in dict.keys()` @@ -117,6 +118,9 @@ ignore = [ [tool.ruff.lint.per-file-ignores] "tests/*" = [ + "PLC0415", # Import not at top of file (deliberate in setUp/test methods for patching) + "PT009", # Use assert instead of unittest-style assertEqual (TestCase subclass) + "PT027", # Use pytest.raises instead of assertRaises (TestCase subclass) "S101", # Use of assert (standard in pytest) "SLF001", # Private member accessed (tests legitimately access module internals) ] diff --git a/scripts/demo_pvr_triage.sh b/scripts/demo_pvr_triage.sh new file mode 100755 index 0000000..ba09354 --- /dev/null +++ b/scripts/demo_pvr_triage.sh @@ -0,0 +1,272 @@ +#!/bin/bash +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT +# +# Live demo of PVR triage taskflows against anticomputer/vulnerable-test-app. +# +# Exercises: advisory listing, dedup detection, security policy fetch, +# code verification, report generation, and batch scoring. +# +# Prerequisites: +# - gh CLI authenticated +# - passage available for AI token +# - seclab-taskflows installed in .venv +# +# Usage: +# ./scripts/demo_pvr_triage.sh [tools|batch|triage|all] +# +# tools - test individual MCP tools against live API (fast, no AI calls) +# batch - run the batch scoring taskflow +# triage - run full single-advisory triage on the high-quality report +# all - run everything in sequence + +set -euo pipefail + +__dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +__root="$(cd "${__dir}/.." && pwd)" + +REPO="anticomputer/vulnerable-test-app" +# Advisory state: "draft" for owner-created test advisories, +# "triage" for real PVR submissions from external reporters. +ADVISORY_STATE="${ADVISORY_STATE:-draft}" + +# --- environment --- + +if [ -d "${__root}/.venv/bin" ]; then + export PATH="${__root}/.venv/bin:${PATH}" +fi + +export GH_TOKEN="${GH_TOKEN:-$(gh auth token 2>/dev/null)}" +if [ -z "${GH_TOKEN}" ]; then + echo "FATAL: gh auth token failed. Run: gh auth login" >&2 + exit 1 +fi + +export AI_API_TOKEN="${AI_API_TOKEN:-$(passage show github/capi-token 2>/dev/null)}" +if [ -z "${AI_API_TOKEN}" ]; then + echo "FATAL: AI_API_TOKEN not set and passage unavailable." >&2 + exit 1 +fi + +export AI_API_ENDPOINT="${AI_API_ENDPOINT:-https://api.githubcopilot.com}" +export REPORT_DIR="${REPORT_DIR:-${__root}/reports/demo}" +export LOG_DIR="${LOG_DIR:-${__root}/logs}" +mkdir -p "${REPORT_DIR}" "${LOG_DIR}" + +# --- helpers --- + +sep() { echo; echo "========== $1 =========="; echo; } +ok() { echo "[OK] $1"; } +fail() { echo "[FAIL] $1" >&2; FAILURES=$((FAILURES + 1)); } + +FAILURES=0 + +run_agent() { + python -m seclab_taskflow_agent "$@" +} + +# --- tools: test individual MCP tools against live API --- + +cmd_tools() { + sep "MCP Tool Tests (live API, no AI calls)" + + echo "--- list_pvr_advisories (state=draft) ---" + ADVISORIES=$(python -c " +import seclab_taskflows.mcp_servers.pvr_ghsa as pvr +print(pvr.list_pvr_advisories.fn(owner='anticomputer', repo='vulnerable-test-app', state='draft')) +") + COUNT=$(echo "$ADVISORIES" | python -c "import sys,json; print(len(json.load(sys.stdin)))") + if [ "$COUNT" -ge 1 ]; then + ok "Found $COUNT advisories in draft state" + else + fail "No advisories found. Create test advisories first." + return + fi + echo "$ADVISORIES" | python -c " +import sys, json +for a in json.load(sys.stdin): + print(f\" {a['ghsa_id']} {a['severity']:8s} {a['summary']}\") +" + echo + + echo "--- fetch_pvr_advisory (first advisory) ---" + GHSA=$(echo "$ADVISORIES" | python -c "import sys,json; print(json.load(sys.stdin)[0]['ghsa_id'])") + DETAIL=$(python -c " +import seclab_taskflows.mcp_servers.pvr_ghsa as pvr +print(pvr.fetch_pvr_advisory.fn(owner='anticomputer', repo='vulnerable-test-app', ghsa_id='${GHSA}')) +") + if echo "$DETAIL" | python -c "import sys,json; d=json.load(sys.stdin); assert d['ghsa_id']" 2>/dev/null; then + ok "Fetched ${GHSA}: $(echo "$DETAIL" | python -c "import sys,json; d=json.load(sys.stdin); print(f\"{d['severity']} - CWEs: {d['cwes']}\")")" + else + fail "Failed to fetch ${GHSA}" + fi + echo + + echo "--- fetch_security_policy ---" + POLICY=$(python -c " +import seclab_taskflows.mcp_servers.pvr_ghsa as pvr +print(pvr.fetch_security_policy.fn(owner='anticomputer', repo='vulnerable-test-app')) +") + if [ -n "$POLICY" ]; then + ok "Security policy found ($(echo "$POLICY" | wc -l | tr -d ' ') lines)" + echo "$POLICY" | head -5 | sed 's/^/ /' + echo " ..." + else + fail "No security policy found" + fi + echo + + echo "--- compare_advisories (dedup detection) ---" + DEDUP=$(python -c " +import seclab_taskflows.mcp_servers.pvr_ghsa as pvr +print(pvr.compare_advisories.fn(owner='anticomputer', repo='vulnerable-test-app', state='draft', target_ghsa='')) +") + CLUSTERS=$(echo "$DEDUP" | python -c "import sys,json; print(len(json.load(sys.stdin)['clusters']))") + TOTAL=$(echo "$DEDUP" | python -c "import sys,json; print(json.load(sys.stdin)['total'])") + ok "Compared $TOTAL advisories, found $CLUSTERS duplicate cluster(s)" + echo "$DEDUP" | python -c " +import sys, json +d = json.load(sys.stdin) +for c in d['clusters']: + print(f\" Cluster [{c['match_level']}]: {', '.join(c['advisories'])}\") + for r in c['reasons']: + print(f\" - {r}\") +for s in d['singles']: + print(f\" Single: {s}\") +" + echo + + echo "--- fetch_file_at_ref (main.go lines 25-30) ---" + CODE=$(python -c " +import seclab_taskflows.mcp_servers.pvr_ghsa as pvr +print(pvr.fetch_file_at_ref.fn(owner='anticomputer', repo='vulnerable-test-app', path='main.go', ref='main', start_line=25, length=6)) +") + if echo "$CODE" | grep -q "searchHandler"; then + ok "Fetched vulnerable code at main.go:25" + echo "$CODE" | sed 's/^/ /' + else + fail "Failed to fetch main.go" + fi + echo + + echo "--- resolve_version_ref (0.0.1 -- expected to fail, no tags) ---" + VER=$(python -c " +import seclab_taskflows.mcp_servers.pvr_ghsa as pvr +print(pvr.resolve_version_ref.fn(owner='anticomputer', repo='vulnerable-test-app', version='0.0.1')) +") + if echo "$VER" | grep -q "Could not resolve"; then + ok "Graceful failure: no tags in repo (expected)" + else + ok "Resolved: $VER" + fi + echo + + sep "Tool Tests Complete ($FAILURES failures)" +} + +# --- batch: run batch scoring taskflow --- + +cmd_batch() { + sep "Batch Scoring Taskflow" + echo "Repo: ${REPO}" + echo "Report dir: ${REPORT_DIR}" + echo + + # The test advisories are in draft state (owner-created), so patch the + # taskflow call to use state=draft. The batch taskflow defaults to triage + # state, but we can override via the run_agent globals. + run_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_triage_batch \ + -g "repo=${REPO}" \ + -g "state=${ADVISORY_STATE}" + + echo + BATCH_REPORT=$(ls -t "${REPORT_DIR}"/batch_queue_*.md 2>/dev/null | head -1) + if [ -n "${BATCH_REPORT}" ]; then + ok "Batch report: ${BATCH_REPORT}" + echo + cat "${BATCH_REPORT}" + else + fail "No batch report generated" + fi +} + +# --- triage: run full single-advisory triage --- + +cmd_triage() { + local ghsa="${1:-}" + + if [ -z "$ghsa" ]; then + # Pick the high-quality SQL injection report + ghsa=$(python -c " +import json, seclab_taskflows.mcp_servers.pvr_ghsa as pvr +advs = json.loads(pvr.list_pvr_advisories.fn(owner='anticomputer', repo='vulnerable-test-app', state='draft')) +for a in advs: + if 'SQL' in a['summary'] or 'sql' in a['summary'].lower(): + print(a['ghsa_id']) + break +else: + print(advs[0]['ghsa_id'] if advs else '') +") + fi + + if [ -z "$ghsa" ]; then + fail "No advisories found to triage" + return + fi + + sep "Single Advisory Triage: ${ghsa}" + echo "Repo: ${REPO}" + echo "GHSA: ${ghsa}" + echo "Report dir: ${REPORT_DIR}" + echo + + run_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_triage \ + -g "repo=${REPO}" \ + -g "ghsa=${ghsa}" \ + -g "state=${ADVISORY_STATE}" + + echo + TRIAGE_REPORT="${REPORT_DIR}/${ghsa}_triage.md" + RESPONSE_DRAFT="${REPORT_DIR}/${ghsa}_response_triage.md" + if [ -f "${TRIAGE_REPORT}" ]; then + ok "Triage report: ${TRIAGE_REPORT}" + echo + cat "${TRIAGE_REPORT}" + else + fail "No triage report generated" + fi + echo + if [ -f "${RESPONSE_DRAFT}" ]; then + sep "Response Draft" + cat "${RESPONSE_DRAFT}" + fi +} + +# --- all: run everything --- + +cmd_all() { + cmd_tools + cmd_batch + cmd_triage "${1:-}" + sep "Demo Complete ($FAILURES total failures)" +} + +# --- dispatch --- + +case "${1:-tools}" in + tools) cmd_tools ;; + batch) cmd_batch ;; + triage) shift; cmd_triage "${1:-}" ;; + all) shift; cmd_all "${1:-}" ;; + -h|--help|help) + echo "Usage: $0 [tools|batch|triage [GHSA]|all]" + echo + echo " tools - test MCP tools against live API (no AI calls)" + echo " batch - run batch scoring taskflow" + echo " triage - run full triage (picks SQL injection report by default)" + echo " all - run everything in sequence" + ;; + *) echo "Unknown command: $1" >&2; exit 1 ;; +esac diff --git a/scripts/run_pvr_triage.sh b/scripts/run_pvr_triage.sh new file mode 100755 index 0000000..70481db --- /dev/null +++ b/scripts/run_pvr_triage.sh @@ -0,0 +1,208 @@ +#!/bin/bash +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT +# +# Local test / demo script for the PVR triage taskflows. +# +# Usage: +# ./scripts/run_pvr_triage.sh batch +# ./scripts/run_pvr_triage.sh triage +# ./scripts/run_pvr_triage.sh respond +# ./scripts/run_pvr_triage.sh respond_batch +# ./scripts/run_pvr_triage.sh demo +# +# Environment (any already-set values are respected): +# GH_TOKEN — GitHub token; falls back to: gh auth token +# AI_API_TOKEN — AI API key (required, must be set before running) +# AI_API_ENDPOINT — defaults to https://api.githubcopilot.com +# REPORT_DIR — defaults to ./reports +# LOG_DIR — defaults to ./logs + +set -euo pipefail + +__dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +__root="$(cd "${__dir}/.." && pwd)" + +# --------------------------------------------------------------------------- +# Usage (defined early so --help can fire before env validation) +# --------------------------------------------------------------------------- + +usage() { + cat < [args] + +Commands: + batch + Score unprocessed triage advisories and save a ranked queue table to REPORT_DIR. + Advisories already present in REPORT_DIR are skipped. + + triage + Run full triage on one advisory: verify code, generate report + response draft. + + respond + Apply a state transition to a GitHub advisory. action = accept | reject + Requires pvr_triage to have been run first for the given GHSA. + Post the response draft manually via the advisory URL after running. + + respond_batch + Scan REPORT_DIR and apply state transitions to all pending advisories. + action = accept | reject + + demo + Full pipeline on the given repo (batch → triage on first triage advisory → report preview). + Does not post anything to GitHub. + +Environment: + GH_TOKEN — GitHub token; falls back to: gh auth token + AI_API_TOKEN — AI API key (required, must be set before running) + AI_API_ENDPOINT — defaults to https://api.githubcopilot.com + REPORT_DIR — defaults to ./reports + LOG_DIR — defaults to ./logs +EOF +} + +case "${1:-}" in + -h|--help|help|"") usage; exit 0 ;; +esac + +# --------------------------------------------------------------------------- +# Environment setup +# --------------------------------------------------------------------------- + +# Prepend local venv to PATH if present (resolves 'python' for MCP servers) +if [ -d "${__root}/.venv/bin" ]; then + export PATH="${__root}/.venv/bin:${PATH}" +fi + +# GitHub token +if [ -z "${GH_TOKEN:-}" ]; then + if command -v gh &>/dev/null; then + GH_TOKEN="$(gh auth token 2>/dev/null)" || true + fi + if [ -z "${GH_TOKEN:-}" ]; then + echo "ERROR: GH_TOKEN not set and 'gh auth token' failed." >&2 + exit 1 + fi + export GH_TOKEN +fi + +# AI API token +if [ -z "${AI_API_TOKEN:-}" ]; then + echo "ERROR: AI_API_TOKEN is not set." >&2 + exit 1 +fi + +export AI_API_ENDPOINT="${AI_API_ENDPOINT:-https://api.githubcopilot.com}" + +export REPORT_DIR="${REPORT_DIR:-${__root}/reports}" +mkdir -p "${REPORT_DIR}" + +export LOG_DIR="${LOG_DIR:-${__root}/logs}" +mkdir -p "${LOG_DIR}" + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +run_agent() { + python -m seclab_taskflow_agent "$@" +} + +# --------------------------------------------------------------------------- +# Commands +# --------------------------------------------------------------------------- + +cmd_batch() { + local repo="${1:?Usage: $0 batch }" + echo "==> Scoring inbox for ${repo} ..." + run_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_triage_batch \ + -g "repo=${repo}" +} + +cmd_triage() { + local repo="${1:?Usage: $0 triage }" + local ghsa="${2:?Usage: $0 triage }" + echo "==> Triaging ${ghsa} in ${repo} ..." + run_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_triage \ + -g "repo=${repo}" \ + -g "ghsa=${ghsa}" +} + +cmd_respond() { + local repo="${1:?Usage: $0 respond }" + local ghsa="${2:?Usage: $0 respond }" + local action="${3:?Usage: $0 respond }" + case "${action}" in + accept|reject) ;; + *) echo "ERROR: action must be accept or reject" >&2; exit 1 ;; + esac + echo "==> Responding to ${ghsa} in ${repo} (action=${action}) ..." + run_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_respond \ + -g "repo=${repo}" \ + -g "ghsa=${ghsa}" \ + -g "action=${action}" +} + +cmd_respond_batch() { + local repo="${1:?Usage: $0 respond_batch }" + local action="${2:?Usage: $0 respond_batch }" + case "${action}" in + accept|reject) ;; + *) echo "ERROR: action must be accept or reject" >&2; exit 1 ;; + esac + echo "==> Bulk respond for ${repo} (action=${action}) ..." + run_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_respond_batch \ + -g "repo=${repo}" \ + -g "action=${action}" +} + +cmd_demo() { + local repo="${1:?Usage: $0 demo }" + + # Pick the first triage advisory, or bail if none + local ghsa + ghsa="$(gh api "/repos/${repo}/security-advisories?state=triage&per_page=1" \ + --jq '.[0].ghsa_id // empty' 2>/dev/null)" || true + + if [ -z "${ghsa}" ]; then + echo "No triage advisories found in ${repo}. Create one at:" >&2 + echo " https://github.com/${repo}/security/advisories/new" >&2 + exit 1 + fi + + echo "==> Demo: ${repo} advisory: ${ghsa}" + echo + + echo "--- Step 1: batch inbox score ---" + cmd_batch "${repo}" + echo + + echo "--- Step 2: full triage ---" + cmd_triage "${repo}" "${ghsa}" + echo + + echo "--- Reports written to ${REPORT_DIR} ---" + ls -1 "${REPORT_DIR}"/*.md 2>/dev/null || true + echo + echo "To accept (triage → draft) or reject (triage → closed):" + echo " $0 respond ${repo} ${ghsa} accept" + echo " $0 respond ${repo} ${ghsa} reject" + echo "Then post the response draft manually via the advisory URL." +} + +# --------------------------------------------------------------------------- +# Dispatch +# --------------------------------------------------------------------------- + +case "${1:-}" in + batch) shift; cmd_batch "$@" ;; + triage) shift; cmd_triage "$@" ;; + respond) shift; cmd_respond "$@" ;; + respond_batch) shift; cmd_respond_batch "$@" ;; + demo) shift; cmd_demo "$@" ;; + *) echo "ERROR: unknown command '${1}'" >&2; usage; exit 1 ;; +esac diff --git a/src/seclab_taskflows/configs/model_config_pvr_triage.yaml b/src/seclab_taskflows/configs/model_config_pvr_triage.yaml new file mode 100644 index 0000000..bdd73c5 --- /dev/null +++ b/src/seclab_taskflows/configs/model_config_pvr_triage.yaml @@ -0,0 +1,22 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# PVR triage model configuration. +# AI_API_ENDPOINT defaults to https://api.githubcopilot.com (set in run_pvr_triage.sh). +# Override AI_API_ENDPOINT and AI_API_TOKEN for other providers. + +seclab-taskflow-agent: + version: "1.0" + filetype: model_config + +models: + # Primary model for code analysis and triage reasoning + triage: claude-opus-4.6 + # Lighter model for structured data extraction tasks + extraction: gpt-5-mini + +model_settings: + extraction: + temperature: 1 + triage: + temperature: 1 diff --git a/src/seclab_taskflows/mcp_servers/pvr_ghsa.py b/src/seclab_taskflows/mcp_servers/pvr_ghsa.py new file mode 100644 index 0000000..f46f66d --- /dev/null +++ b/src/seclab_taskflows/mcp_servers/pvr_ghsa.py @@ -0,0 +1,804 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# PVR GHSA MCP Server +# +# Tools for fetching and parsing GitHub Security Advisories +# submitted via Private Vulnerability Reporting (PVR) (triage state). +# Uses the gh CLI for all GitHub API calls. + +from __future__ import annotations + +import json +import logging +import os +import re +import subprocess +from collections import defaultdict +from datetime import datetime, timezone +from pathlib import Path + +from fastmcp import FastMCP +from pydantic import Field +from seclab_taskflow_agent.path_utils import log_file_name + +_raw_report_dir = os.getenv("REPORT_DIR") +REPORT_DIR = Path(_raw_report_dir) if _raw_report_dir and _raw_report_dir.strip() else Path("reports") + +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(levelname)s - %(message)s", + filename=log_file_name("mcp_pvr_ghsa.log"), + filemode="a", +) + +mcp = FastMCP("PVRAdvisories") + + +def _gh_api( + path: str, + method: str = "GET", + body: dict | None = None, +) -> tuple[dict | list | None, str | None]: + """ + Call the GitHub REST API via the gh CLI. + + Returns (data, error). On success data is the parsed JSON response and + error is None. On failure data is None and error is a string. + If body is provided it is passed as JSON via stdin (--input -). + """ + cmd = ["gh", "api", "--method", method, path] + env = os.environ.copy() + stdin_data = None + + if body is not None: + cmd += ["--input", "-"] + stdin_data = json.dumps(body) + + try: + result = subprocess.run( + cmd, + input=stdin_data, + capture_output=True, + text=True, + env=env, + timeout=30, + ) + except subprocess.TimeoutExpired: + return None, "gh api call timed out" + except FileNotFoundError: + return None, "gh CLI not found in PATH" + + if result.returncode != 0: + stderr = result.stderr.strip() + stdout = result.stdout.strip() + msg = stderr or stdout or f"gh exited with code {result.returncode}" + logging.error("gh api error: %s", msg) + return None, msg + + try: + data = json.loads(result.stdout) + except json.JSONDecodeError as e: + return None, f"JSON parse error: {e}" + + return data, None + + +def _parse_advisory(raw: dict) -> dict: + """ + Extract the fields relevant to PVR triage from a raw advisory API response. + Separates description text from structured metadata. + """ + vulns = [] + for v in raw.get("vulnerabilities") or []: + pkg = v.get("package") or {} + vulns.append({ + "ecosystem": pkg.get("ecosystem", ""), + "package": pkg.get("name", ""), + "vulnerable_versions": v.get("vulnerable_version_range", ""), + "patched_versions": v.get("patched_versions", ""), + }) + + cwes = [c.get("cwe_id", "") for c in (raw.get("cwes") or [])] + + credits_ = [ + {"login": c.get("user", {}).get("login", ""), "type": c.get("type", "")} + for c in (raw.get("credits_detailed") or []) + ] + + submission = raw.get("submission") or {} + + return { + "ghsa_id": raw.get("ghsa_id", ""), + "cve_id": raw.get("cve_id"), + "html_url": raw.get("html_url", ""), + "state": raw.get("state", ""), + "severity": raw.get("severity", ""), + "summary": raw.get("summary", ""), + # Full description returned separately so metadata stays compact + "description": raw.get("description", ""), + "vulnerabilities": vulns, + "cwes": cwes, + "credits": credits_, + # submission.accepted=true means this arrived via PVR + "pvr_submission": { + "via_pvr": bool(submission), + "accepted": submission.get("accepted", False), + }, + "created_at": raw.get("created_at", ""), + "updated_at": raw.get("updated_at", ""), + "collaborating_users": [ + u.get("login", "") for u in (raw.get("collaborating_users") or []) + ], + } + + +# --------------------------------------------------------------------------- +# Advisory fingerprinting and comparison +# --------------------------------------------------------------------------- + +# Common file path patterns in advisory descriptions +_FILE_PATH_RE = re.compile( + r"(?:^|[\s`\"'(])([a-zA-Z0-9_.][a-zA-Z0-9_./\\-]*\.[a-zA-Z]{1,10})(?:[\s`\"'),:;]|$)", + re.MULTILINE, +) + +# Line number references like "line 42", "L42", ":42" +_LINE_REF_RE = re.compile(r"(?:line\s+|L|:)(\d{1,6})\b", re.IGNORECASE) + +# Known source file extensions (filter false positives from _FILE_PATH_RE) +_SRC_EXTS = frozenset({ + "py", "js", "ts", "go", "rs", "rb", "java", "c", "cpp", "cc", "h", "hpp", + "cs", "php", "swift", "kt", "scala", "pl", "pm", "sh", "bash", "zsh", + "yaml", "yml", "json", "xml", "toml", "cfg", "ini", "conf", "html", + "jsx", "tsx", "vue", "svelte", "erb", "ejs", "sql", "r", "m", "mm", +}) + + +def _extract_file_paths(text: str) -> list[str]: + """Extract likely source file paths from free-form text.""" + paths = [] + for m in _FILE_PATH_RE.finditer(text): + p = m.group(1) + ext = p.rsplit(".", 1)[-1].lower() if "." in p else "" + if ext in _SRC_EXTS and "/" in p: + paths.append(p) + return sorted(set(paths)) + + +def _fingerprint_advisory(parsed: dict) -> dict: + """ + Build a structural fingerprint from a parsed advisory. + + Returns a dict with normalized, comparable fields: + cwes, packages, versions, file_paths, severity, summary_norm + """ + desc = parsed.get("description", "") + + # Normalized summary: lowercase, strip whitespace/punctuation + summary = parsed.get("summary", "").lower().strip() + summary_norm = re.sub(r"[^a-z0-9 ]", "", summary) + + # Packages: (ecosystem, name) tuples + packages = set() + for v in parsed.get("vulnerabilities", []): + eco = v.get("ecosystem", "").lower().strip() + pkg = v.get("package", "").lower().strip() + if pkg: + packages.add((eco, pkg)) + + # Version ranges as normalized strings + versions = set() + for v in parsed.get("vulnerabilities", []): + vr = v.get("vulnerable_versions", "").strip() + if vr: + versions.add(vr) + + return { + "ghsa_id": parsed.get("ghsa_id", ""), + "cwes": set(parsed.get("cwes", [])), + "packages": packages, + "versions": versions, + "file_paths": set(_extract_file_paths(desc)), + "severity": parsed.get("severity", "").lower(), + "summary_norm": summary_norm, + } + + +def _compare_fingerprints(a: dict, b: dict) -> dict: + """ + Compare two advisory fingerprints and return a similarity result. + + Returns: + match_level: "strong", "moderate", "weak", or "none" + reasons: list of strings explaining why they matched + overlap: dict of shared fields + """ + reasons = [] + overlap = {} + + # CWE overlap + cwe_shared = a["cwes"] & b["cwes"] + if cwe_shared: + reasons.append(f"shared CWE: {', '.join(sorted(cwe_shared))}") + overlap["cwes"] = sorted(cwe_shared) + + # Package overlap + pkg_shared = a["packages"] & b["packages"] + if pkg_shared: + reasons.append(f"same package: {', '.join(f'{e}/{p}' for e, p in sorted(pkg_shared))}") + overlap["packages"] = [f"{e}/{p}" for e, p in sorted(pkg_shared)] + + # Version range overlap + ver_shared = a["versions"] & b["versions"] + if ver_shared: + reasons.append(f"same version range: {', '.join(sorted(ver_shared))}") + overlap["versions"] = sorted(ver_shared) + + # File path overlap + file_shared = a["file_paths"] & b["file_paths"] + if file_shared: + reasons.append(f"shared files: {', '.join(sorted(file_shared))}") + overlap["file_paths"] = sorted(file_shared) + + # Summary similarity (exact match after normalization) + if a["summary_norm"] and a["summary_norm"] == b["summary_norm"]: + reasons.append("identical summary") + overlap["summary"] = True + + # Determine match level + if not reasons: + level = "none" + elif pkg_shared and (cwe_shared or file_shared or ver_shared): + level = "strong" + elif pkg_shared or (cwe_shared and file_shared): + level = "moderate" + else: + level = "weak" + + return {"match_level": level, "reasons": reasons, "overlap": overlap, + "note": "structural comparison only; 'none' means insufficient " + "metadata overlap, not necessarily distinct vulnerabilities"} + + +@mcp.tool() +def fetch_security_policy( + owner: str = Field(description="Repository owner (user or org name)"), + repo: str = Field(description="Repository name"), +) -> str: + """ + Fetch the repository's SECURITY.md security policy. + + Checks the standard locations in order: /SECURITY.md, /.github/SECURITY.md, + /docs/SECURITY.md. Also checks the org-level .github repo as a fallback. + Returns the policy content, or an empty string if no policy is found. + """ + # Standard locations per GitHub docs + candidates = [ + f"/repos/{owner}/{repo}/contents/SECURITY.md", + f"/repos/{owner}/{repo}/contents/.github/SECURITY.md", + f"/repos/{owner}/{repo}/contents/docs/SECURITY.md", + # Org-level fallback (.github repo) + f"/repos/{owner}/.github/contents/SECURITY.md", + ] + + cmd_base = ["gh", "api", "--method", "GET", "-H", "Accept: application/vnd.github.raw+json"] + env = os.environ.copy() + + for api_path in candidates: + try: + result = subprocess.run( + [*cmd_base, api_path], + capture_output=True, text=True, env=env, timeout=15, + ) + except (subprocess.TimeoutExpired, FileNotFoundError): + continue + if result.returncode == 0 and result.stdout.strip(): + logging.info("Security policy found at %s", api_path) + return result.stdout + return "" + + +@mcp.tool() +def fetch_pvr_advisory( + owner: str = Field(description="Repository owner (user or org name)"), + repo: str = Field(description="Repository name"), + ghsa_id: str = Field(description="GHSA ID of the advisory, e.g. GHSA-xxxx-xxxx-xxxx"), +) -> str: + """ + Fetch a single repository security advisory by GHSA ID. + + Returns structured advisory metadata and the full description text. + Works for advisories in triage state (requires repo or security_events scope on GH_TOKEN). + """ + path = f"/repos/{owner}/{repo}/security-advisories/{ghsa_id}" + data, err = _gh_api(path) + if err: + return f"Error fetching advisory {ghsa_id}: {err}" + parsed = _parse_advisory(data) + return json.dumps(parsed, indent=2) + + +@mcp.tool() +def list_pvr_advisories( + owner: str = Field(description="Repository owner (user or org name)"), + repo: str = Field(description="Repository name"), + state: str = Field( + default="triage", + description="Advisory state to filter by: triage, draft, published, closed, or withdrawn. Default: triage", + ), +) -> str: + """ + List repository security advisories, defaulting to triage state. + + Returns a JSON summary list (no description text). Each entry includes + ghsa_id, severity, summary, state, pvr_submission, and created_at. + Returns an empty JSON list when no advisories are found. + Paginates automatically through all pages (100 items per page). + """ + base_path = f"/repos/{owner}/{repo}/security-advisories?state={state}&per_page=100" + all_data: list = [] + page = 1 + max_pages = 50 # hard cap: 5000 advisories max + while page <= max_pages: + data, err = _gh_api(f"{base_path}&page={page}") + if err: + return f"Error listing advisories: {err}" + if not isinstance(data, list): + return f"Unexpected response: {data}" + if not data: + break + all_data.extend(data) + if len(data) < 100: + break + page += 1 + + results = [] + for raw in all_data: + submission = raw.get("submission") or {} + results.append({ + "ghsa_id": raw.get("ghsa_id", ""), + "severity": raw.get("severity", ""), + "summary": raw.get("summary", ""), + "state": raw.get("state", ""), + "pvr_submission": { + "via_pvr": bool(submission), + "accepted": submission.get("accepted", False), + }, + "created_at": raw.get("created_at", ""), + }) + + return json.dumps(results, indent=2) + + +@mcp.tool() +def compare_advisories( + owner: str = Field(description="Repository owner (user or org name)"), + repo: str = Field(description="Repository name"), + state: str = Field( + default="triage", + description="Advisory state to compare. Default: triage", + ), + target_ghsa: str = Field( + default="", + description="Optional: compare only this GHSA against the others. " + "If empty, compares all advisories pairwise.", + ), +) -> str: + """ + Detect duplicate or near-duplicate advisories in a repository's inbox. + + Fetches advisories in the given state, computes structural fingerprints + (CWE, package, version range, file paths, summary), and identifies + pairs or clusters that likely describe the same vulnerability. + + When target_ghsa is set, only comparisons involving that advisory are returned. + + Returns JSON with: + - clusters: list of {advisories: [...ghsa_ids], match_level, reasons} + - singles: list of ghsa_ids with no duplicates detected + - total: total advisory count + """ + base_path = f"/repos/{owner}/{repo}/security-advisories?state={state}&per_page=100" + all_raw: list = [] + page = 1 + while page <= 50: + data, err = _gh_api(f"{base_path}&page={page}") + if err: + return f"Error listing advisories: {err}" + if not isinstance(data, list) or not data: + break + all_raw.extend(data) + if len(data) < 100: + break + page += 1 + + if len(all_raw) < 2: + return json.dumps({ + "clusters": [], + "singles": [_parse_advisory(r).get("ghsa_id", "") for r in all_raw], + "total": len(all_raw), + }, indent=2) + + # Parse and fingerprint all advisories + parsed = [_parse_advisory(r) for r in all_raw] + fps = [_fingerprint_advisory(p) for p in parsed] + + # Pairwise comparison + # Union-find for clustering + id_to_idx = {fp["ghsa_id"]: i for i, fp in enumerate(fps)} + parent = list(range(len(fps))) + + def find(x): + while parent[x] != x: + parent[x] = parent[parent[x]] + x = parent[x] + return x + + def union(x, y): + px, py = find(x), find(y) + if px != py: + parent[px] = py + + matches = [] + for i in range(len(fps)): + for j in range(i + 1, len(fps)): + if target_ghsa and fps[i]["ghsa_id"] != target_ghsa and fps[j]["ghsa_id"] != target_ghsa: + continue + result = _compare_fingerprints(fps[i], fps[j]) + if result["match_level"] != "none": + matches.append({ + "a": fps[i]["ghsa_id"], + "b": fps[j]["ghsa_id"], + "match_level": result["match_level"], + "reasons": result["reasons"], + }) + if result["match_level"] == "strong": + union(i, j) + + # Build clusters from union-find + cluster_map: dict[int, list[str]] = defaultdict(list) + for i, fp in enumerate(fps): + root = find(i) + cluster_map[root].append(fp["ghsa_id"]) + + clusters = [] + singles = [] + for members in cluster_map.values(): + if len(members) > 1: + # Find the match details for this cluster + cluster_matches = [ + m for m in matches + if m["a"] in members and m["b"] in members + ] + best_level = "weak" + all_reasons: list[str] = [] + for cm in cluster_matches: + all_reasons.extend(cm["reasons"]) + if cm["match_level"] == "strong": + best_level = "strong" + elif cm["match_level"] == "moderate" and best_level != "strong": + best_level = "moderate" + clusters.append({ + "advisories": sorted(members), + "match_level": best_level, + "reasons": sorted(set(all_reasons)), + }) + else: + singles.extend(members) + + # Also include weak matches (not clustered) as informational + weak_matches = [m for m in matches if m["match_level"] == "weak"] + + return json.dumps({ + "clusters": clusters, + "weak_matches": weak_matches, + "singles": sorted(singles), + "total": len(fps), + }, indent=2) + + +@mcp.tool() +def resolve_version_ref( + owner: str = Field(description="Repository owner"), + repo: str = Field(description="Repository name"), + version: str = Field( + description="Version string to resolve, e.g. '1.25.4' or 'v1.25.4'. " + "Will try matching git tags directly and with a 'v' prefix." + ), +) -> str: + """ + Resolve a version string to a git commit SHA and tag name. + + Returns the tag name and commit SHA if found. + """ + # Try both bare version and v-prefixed tag + candidates = [version, f"v{version}"] if not version.startswith("v") else [version, version[1:]] + + for tag in candidates: + path = f"/repos/{owner}/{repo}/git/refs/tags/{tag}" + data, err = _gh_api(path) + if err or not data: + continue + # Lightweight tags point directly to a commit; annotated tags point to a tag object + obj = data.get("object", {}) + ref_sha = obj.get("sha", "") + ref_type = obj.get("type", "") + + if ref_type == "tag": + # Annotated tag: dereference to the commit + tag_path = f"/repos/{owner}/{repo}/git/tags/{ref_sha}" + tag_data, tag_err = _gh_api(tag_path) + if not tag_err and tag_data: + commit_sha = tag_data.get("object", {}).get("sha", "") + return json.dumps({"tag": tag, "commit_sha": commit_sha, "type": "annotated"}) + elif ref_type == "commit": + return json.dumps({"tag": tag, "commit_sha": ref_sha, "type": "lightweight"}) + + return f"Could not resolve version '{version}' to a tag in {owner}/{repo}." + + +@mcp.tool() +def fetch_file_at_ref( + owner: str = Field(description="Repository owner"), + repo: str = Field(description="Repository name"), + path: str = Field(description="File path within the repository"), + ref: str = Field(description="Git ref (commit SHA, tag, or branch) to fetch the file at"), + start_line: int = Field(default=1, description="First line to return (1-indexed)"), + length: int = Field(default=100, description="Number of lines to return (max 500)"), +) -> str: + """ + Fetch a range of lines from a file at a specific git ref (commit SHA or tag). + """ + # Use gh api with the ref query parameter + cmd = [ + "gh", "api", + "--method", "GET", + f"/repos/{owner}/{repo}/contents/{path}", + "-f", f"ref={ref}", + "-H", "Accept: application/vnd.github.raw+json", + ] + env = os.environ.copy() + + try: + result = subprocess.run(cmd, capture_output=True, text=True, env=env, timeout=30) + except subprocess.TimeoutExpired: + return "Error: gh api call timed out" + except FileNotFoundError: + return "Error: gh CLI not found in PATH" + + if result.returncode != 0: + return f"Error fetching {path}@{ref}: {result.stderr.strip() or result.stdout.strip()}" + + lines = result.stdout.splitlines() + if start_line < 1: + start_line = 1 + if length < 1: + length = 50 + length = min(length, 500) # cap to avoid returning enormous files + if start_line > len(lines): + return f"start_line {start_line} exceeds file length ({len(lines)} lines) in {path}@{ref}" + chunk = lines[start_line - 1: start_line - 1 + length] + if not chunk: + return f"No lines in range {start_line}-{start_line + length - 1} in {path}@{ref}" + return "\n".join(f"{start_line + i}: {line}" for i, line in enumerate(chunk)) + + +@mcp.tool() +def save_triage_report( + ghsa_id: str = Field(description="GHSA ID, used as the filename stem, e.g. GHSA-xxxx-xxxx-xxxx"), + report: str = Field(description="Full markdown report content to write to disk"), +) -> str: + """ + Write the triage report to a markdown file in the report output directory. + + The file is written to REPORT_DIR/{ghsa_id}_triage.md. + REPORT_DIR defaults to './reports' and can be overridden via the REPORT_DIR + environment variable. Returns the absolute path of the written file. + """ + REPORT_DIR.mkdir(parents=True, exist_ok=True) + # Sanitize the GHSA ID to prevent path traversal + safe_name = "".join(c for c in ghsa_id if c.isalnum() or c in "-_") + if not safe_name: + return "Error: ghsa_id produced an empty filename after sanitization" + out_path = REPORT_DIR / f"{safe_name}_triage.md" + # The agent sometimes passes the report as a JSON-encoded string + # (with outer quotes and escape sequences). Decode it if so. + content = report + if content.startswith('"') and content.endswith('"'): + try: + content = json.loads(content) + except json.JSONDecodeError: + pass + out_path.write_text(content, encoding="utf-8") + logging.info("Triage report written to %s", out_path) + return str(out_path.resolve()) + + +@mcp.tool() +def reject_pvr_advisory( + owner: str = Field(description="Repository owner (user or org name)"), + repo: str = Field(description="Repository name"), + ghsa_id: str = Field(description="GHSA ID of the advisory, e.g. GHSA-xxxx-xxxx-xxxx"), +) -> str: + """ + Close (reject) a security advisory. + + Sets the advisory state to 'closed' via the GitHub API. Requires a GH_TOKEN + with security_events write scope. + + Note: the GitHub REST API has no comments endpoint for security advisories. + Post the response draft to the reporter manually via the advisory URL. + """ + path = f"/repos/{owner}/{repo}/security-advisories/{ghsa_id}" + _, err = _gh_api(path, method="PATCH", body={"state": "closed"}) + if err: + return f"Error closing advisory {ghsa_id}: {err}" + return f"Advisory {ghsa_id} closed (state: closed)." + + +@mcp.tool() +def accept_pvr_advisory( + owner: str = Field(description="Repository owner (user or org name)"), + repo: str = Field(description="Repository name"), + ghsa_id: str = Field(description="GHSA ID of the advisory, e.g. GHSA-xxxx-xxxx-xxxx"), +) -> str: + """ + Accept a PVR advisory by moving it from triage to draft state. + + Sets the advisory state to 'draft' via the GitHub API (triage → draft transition). + Use this when the vulnerability is confirmed and the maintainer intends to publish + a security advisory. Requires a GH_TOKEN with security_events write scope. + + Note: the GitHub REST API has no comments endpoint for security advisories. + Post the response draft to the reporter manually via the advisory URL. + """ + path = f"/repos/{owner}/{repo}/security-advisories/{ghsa_id}" + _, err = _gh_api(path, method="PATCH", body={"state": "draft"}) + if err: + return f"Error accepting advisory {ghsa_id}: {err}" + return f"Advisory {ghsa_id} accepted (state: draft)." + + +@mcp.tool() +def find_similar_triage_reports( + vuln_type: str = Field(description="Vulnerability class to search for, e.g. 'path traversal', 'XSS'"), + affected_component: str = Field(description="Component, endpoint, or feature to search for"), +) -> str: + """ + Search existing triage reports for similar vulnerability types and affected components. + + Scans REPORT_DIR for *_triage.md files and performs case-insensitive substring + matching across the full file content for vuln_type and/or affected_component. + A report matches if at least one non-empty search term is found anywhere in the file. + Returns an empty list if both terms are empty/whitespace. + Returns a JSON list of matching reports with ghsa_id, verdict, quality, and path. + """ + if not REPORT_DIR.exists(): + return json.dumps([]) + + vuln_lower = vuln_type.strip().lower() + component_lower = affected_component.strip().lower() + + # Both terms empty → no meaningful search possible + if not vuln_lower and not component_lower: + return json.dumps([]) + + matches = [] + + for report_path in sorted(REPORT_DIR.glob("*_triage.md")): + # Skip batch queue reports and response drafts — only match individual GHSA triage reports + stem = report_path.stem # e.g. "GHSA-xxxx-xxxx-xxxx_triage" + if stem.startswith("batch_queue_") or stem.endswith("_response_triage"): + continue + try: + content = report_path.read_text(encoding="utf-8") + except OSError: + continue + + content_lower = content.lower() + matched = (vuln_lower and vuln_lower in content_lower) or ( + component_lower and component_lower in content_lower + ) + if not matched: + continue + + # Extract GHSA ID from filename: {ghsa_id}_triage.md + ghsa_id = stem.replace("_triage", "") + + # Extract verdict from report (handles **CONFIRMED** and **[CONFIRMED]**) + verdict = "UNKNOWN" + verdict_match = re.search(r"\*\*\[?\s*(CONFIRMED|UNCONFIRMED|INCONCLUSIVE)\s*\]?\*\*", content) + if verdict_match: + verdict = verdict_match.group(1) + + # Extract quality rating — report format: "Rate overall quality: High / Medium / Low" + quality = "Unknown" + quality_match = re.search(r"Rate overall quality[:\s]*\**\s*(High|Medium|Low)\b", content, re.IGNORECASE) + if quality_match: + quality = quality_match.group(1) + + matches.append({ + "ghsa_id": ghsa_id, + "verdict": verdict, + "quality": quality, + "path": str(report_path), + }) + + return json.dumps(matches, indent=2) + + +@mcp.tool() +def read_triage_report( + ghsa_id: str = Field(description="GHSA ID, used to locate the report file, e.g. GHSA-xxxx-xxxx-xxxx"), +) -> str: + """ + Read a previously saved triage report from disk. + + Reads REPORT_DIR/{ghsa_id}_triage.md and returns its content. + Returns an error string if the file does not exist. + """ + safe_name = "".join(c for c in ghsa_id if c.isalnum() or c in "-_") + report_path = REPORT_DIR / f"{safe_name}_triage.md" + if not report_path.exists(): + return f"Report not found: {report_path}" + return report_path.read_text(encoding="utf-8") + + +@mcp.tool() +def list_pending_responses() -> str: + """ + List advisories that have a response draft but have not yet been sent. + + Globs REPORT_DIR for *_response_triage.md files and skips any whose + corresponding *_response_sent.md marker exists. + Returns a JSON list of {ghsa_id, triage_report_exists} objects. + """ + if not REPORT_DIR.exists(): + return json.dumps([]) + + results = [] + for draft_path in sorted(REPORT_DIR.glob("*_response_triage.md")): + # stem is e.g. "GHSA-xxxx-xxxx-xxxx_response_triage" + stem = draft_path.stem + # Extract ghsa_id: remove "_response_triage" suffix + ghsa_id = stem.replace("_response_triage", "") + safe_name = "".join(c for c in ghsa_id if c.isalnum() or c in "-_") + + # Skip if sent marker exists + sent_marker = REPORT_DIR / f"{safe_name}_response_sent.md" + if sent_marker.exists(): + continue + + triage_report = REPORT_DIR / f"{safe_name}_triage.md" + results.append({ + "ghsa_id": ghsa_id, + "triage_report_exists": triage_report.exists(), + }) + + return json.dumps(results, indent=2) + + +@mcp.tool() +def mark_response_sent( + ghsa_id: str = Field(description="GHSA ID of the advisory whose response was sent"), +) -> str: + """ + Create a marker file indicating that the response for this advisory has been sent. + + Writes REPORT_DIR/{ghsa_id}_response_sent.md with an ISO timestamp. + Returns the path of the created marker, or an error string if ghsa_id is empty. + """ + safe_name = "".join(c for c in ghsa_id if c.isalnum() or c in "-_") + if not safe_name: + return "Error: ghsa_id produced an empty filename after sanitization" + REPORT_DIR.mkdir(parents=True, exist_ok=True) + marker_path = REPORT_DIR / f"{safe_name}_response_sent.md" + timestamp = datetime.now(timezone.utc).isoformat() + marker_path.write_text(f"Response sent: {timestamp}\n", encoding="utf-8") + logging.info("Response sent marker written to %s", marker_path) + return str(marker_path.resolve()) + + +if __name__ == "__main__": + mcp.run(show_banner=False) diff --git a/src/seclab_taskflows/mcp_servers/reporter_reputation.py b/src/seclab_taskflows/mcp_servers/reporter_reputation.py new file mode 100644 index 0000000..5f2c299 --- /dev/null +++ b/src/seclab_taskflows/mcp_servers/reporter_reputation.py @@ -0,0 +1,224 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# Reporter Reputation MCP Server +# +# Tracks PVR reporter history and computes reputation scores based on +# past triage outcomes. Uses a local SQLite database. + +from __future__ import annotations + +import json +import logging +import os +from datetime import datetime, timezone +from pathlib import Path + +from fastmcp import FastMCP +from pydantic import Field +from seclab_taskflow_agent.path_utils import log_file_name, mcp_data_dir +from sqlalchemy import Text, UniqueConstraint, create_engine +from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column + +REPORTER_DB_DIR = mcp_data_dir("seclab-taskflows", "reporter_reputation", "REPORTER_DB_DIR") + +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(levelname)s - %(message)s", + filename=log_file_name("mcp_reporter_reputation.log"), + filemode="a", +) + + +class Base(DeclarativeBase): + pass + + +VALID_VERDICTS = frozenset({"CONFIRMED", "UNCONFIRMED", "INCONCLUSIVE"}) +VALID_QUALITIES = frozenset({"High", "Medium", "Low"}) + + +class ReporterRecord(Base): + __tablename__ = "reporter_records" + __table_args__ = (UniqueConstraint("login", "ghsa_id", name="uq_reporter_ghsa"),) + + id: Mapped[int] = mapped_column(primary_key=True) + login: Mapped[str] + ghsa_id: Mapped[str] + repo: Mapped[str] + verdict: Mapped[str] # CONFIRMED / UNCONFIRMED / INCONCLUSIVE + quality: Mapped[str] # High / Medium / Low + timestamp: Mapped[str] = mapped_column(Text) # ISO8601 + + def __repr__(self) -> str: + return ( + f"" + ) + + +class ReporterReputationBackend: + def __init__(self, db_dir: Path | str) -> None: + if str(db_dir) == "sqlite://": + # Explicit in-memory sentinel (used in tests) + connection_string = "sqlite://" + else: + db_path = Path(db_dir) + db_path.mkdir(parents=True, exist_ok=True) + connection_string = f"sqlite:///{db_path}/reporter_reputation.db" + self.engine = create_engine(connection_string, echo=False) + Base.metadata.create_all(self.engine) + + def record_triage_result( + self, login: str, ghsa_id: str, repo: str, verdict: str, quality: str + ) -> str: + """Insert or update a triage result record for a reporter.""" + if verdict not in VALID_VERDICTS: + raise ValueError(f"Invalid verdict {verdict!r}. Must be one of {sorted(VALID_VERDICTS)}") + if quality not in VALID_QUALITIES: + raise ValueError(f"Invalid quality {quality!r}. Must be one of {sorted(VALID_QUALITIES)}") + timestamp = datetime.now(timezone.utc).isoformat() + with Session(self.engine) as session: + existing = ( + session.query(ReporterRecord) + .filter_by(login=login, ghsa_id=ghsa_id) + .first() + ) + if existing: + existing.repo = repo + existing.verdict = verdict + existing.quality = quality + existing.timestamp = timestamp + else: + session.add( + ReporterRecord( + login=login, + ghsa_id=ghsa_id, + repo=repo, + verdict=verdict, + quality=quality, + timestamp=timestamp, + ) + ) + session.commit() + return "recorded" + + def get_reporter_history(self, login: str) -> list[dict]: + """Return all triage records for a reporter, newest first.""" + with Session(self.engine) as session: + rows = ( + session.query(ReporterRecord) + .filter_by(login=login) + .order_by(ReporterRecord.timestamp.desc()) + .all() + ) + return [ + { + "login": r.login, + "ghsa_id": r.ghsa_id, + "repo": r.repo, + "verdict": r.verdict, + "quality": r.quality, + "timestamp": r.timestamp, + } + for r in rows + ] + + def get_reporter_score(self, login: str) -> dict: + """Compute and return a reputation summary for a reporter.""" + history = self.get_reporter_history(login) + total = len(history) + if total == 0: + return { + "login": login, + "total_reports": 0, + "confirmed_pct": 0.0, + "quality_breakdown": {"High": 0, "Medium": 0, "Low": 0}, + "recommendation": "no history", + } + + confirmed = sum(1 for r in history if r["verdict"] == "CONFIRMED") + confirmed_pct = confirmed / total + + quality_breakdown: dict[str, int] = {"High": 0, "Medium": 0, "Low": 0} + for r in history: + q = r["quality"] + if q in quality_breakdown: + quality_breakdown[q] += 1 + + low_share = quality_breakdown["Low"] / total + + # Derive recommendation + if confirmed_pct >= 0.6 and low_share <= 0.2: + recommendation = "high trust" + elif confirmed_pct <= 0.2 or low_share >= 0.5: + recommendation = "treat with skepticism" + else: + recommendation = "normal" + + return { + "login": login, + "total_reports": total, + "confirmed_pct": round(confirmed_pct, 4), + "quality_breakdown": quality_breakdown, + "recommendation": recommendation, + } + + +mcp = FastMCP("ReporterReputation") + +backend = ReporterReputationBackend(REPORTER_DB_DIR) + + +@mcp.tool() +def record_triage_result( + login: str = Field(description="GitHub login of the reporter"), + ghsa_id: str = Field(description="GHSA ID of the advisory, e.g. GHSA-xxxx-xxxx-xxxx"), + repo: str = Field(description="Repository in owner/repo format"), + verdict: str = Field(description="Triage verdict: CONFIRMED, UNCONFIRMED, or INCONCLUSIVE"), + quality: str = Field(description="Report quality rating: High, Medium, or Low"), +) -> str: + """ + Record or update a triage result for a PVR reporter. + + Upserts a row keyed by (login, ghsa_id). Re-running triage on the same + GHSA advisory updates the existing record rather than creating a duplicate. + Returns 'recorded' on success, or an error string for invalid inputs. + """ + try: + return backend.record_triage_result(login, ghsa_id, repo, verdict, quality) + except ValueError as e: + return f"Error: {e}" + + +@mcp.tool() +def get_reporter_history( + login: str = Field(description="GitHub login of the reporter"), +) -> str: + """ + Retrieve the full triage history for a reporter. + + Returns a JSON list of all records for this login, newest first. + Returns an empty JSON list if no history is found. + """ + history = backend.get_reporter_history(login) + return json.dumps(history, indent=2) + + +@mcp.tool() +def get_reporter_score( + login: str = Field(description="GitHub login of the reporter"), +) -> str: + """ + Compute and return a reputation score for a PVR reporter. + + Returns a JSON summary including total_reports, confirmed_pct, + quality_breakdown, and a plain-language recommendation: + 'high trust', 'normal', or 'treat with skepticism'. + """ + score = backend.get_reporter_score(login) + return json.dumps(score, indent=2) + + +if __name__ == "__main__": + mcp.run(show_banner=False) diff --git a/src/seclab_taskflows/personalities/pvr_analyst.yaml b/src/seclab_taskflows/personalities/pvr_analyst.yaml new file mode 100644 index 0000000..a128a83 --- /dev/null +++ b/src/seclab_taskflows/personalities/pvr_analyst.yaml @@ -0,0 +1,39 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# Personality for PVR (Private Vulnerability Report) triage analysis. + +seclab-taskflow-agent: + version: "1.0" + filetype: personality + +personality: | + You are a security vulnerability triage analyst for an open source software maintainer. + + Your job is to verify vulnerability claims made in Private Vulnerability Reports (PVRs), + which arrive as GitHub Security Advisories (GHSAs) in triage state. + + Core principles: + - Base all conclusions on actual code evidence. Do not speculate. + - If you cannot verify a claim, say so explicitly. + - Distinguish between confirmed vulnerabilities and unverified claims. + - Be concise and direct. Maintainers are busy. + - Flag low-quality ("AI slop") reports: vague claims, wrong file paths, non-working PoC, + incorrect function signatures, or descriptions that don't match the actual code. + + Execution rules: + - Execute each task fully and autonomously. Do not ask for permission or confirmation. + - Do not offer to do additional work. Do not say "if you'd like" or "shall I". + - Complete every step described in the task prompt, then stop. + - When storing results in memcache, proceed immediately to the next step. + - Output only findings and results, not suggestions for next steps. + +task: | + Analyze the provided vulnerability report and verify claims against the actual source code. + Produce factual, evidence-based findings. Never guess or assume. + Execute all steps described in the user prompt without asking for direction. + +toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflows.toolboxes.gh_file_viewer + - seclab_taskflow_agent.toolboxes.memcache diff --git a/src/seclab_taskflows/taskflows/pvr_triage/MANUAL_RESPONSE.md b/src/seclab_taskflows/taskflows/pvr_triage/MANUAL_RESPONSE.md new file mode 100644 index 0000000..b2f82e8 --- /dev/null +++ b/src/seclab_taskflows/taskflows/pvr_triage/MANUAL_RESPONSE.md @@ -0,0 +1,32 @@ +# Posting a Response to a PVR Advisory + +The GitHub REST API has no comments endpoint for repository security advisories +(`/repos/{owner}/{repo}/security-advisories/{ghsa_id}/comments` → 404). The comment +thread visible in the advisory UI is internal to GitHub and not publicly accessible via +the API. + +After `pvr_respond` or `pvr_respond_batch` applies the state transition (accept/reject), +post the generated response draft to the reporter manually: + +## Steps + +1. Open the response draft generated by `pvr_triage`: + ```bash + cat reports/GHSA-xxxx-xxxx-xxxx_response_triage.md + ``` + +2. Open the advisory URL — printed in the triage report under `html_url`, or construct + it directly: + ``` + https://github.com/{owner}/{repo}/security/advisories/{GHSA-ID} + ``` + +3. Scroll to the comment box at the bottom of the advisory page, paste the response + draft, edit if needed, and submit. The comment is visible only to the reporter and + collaborators on the advisory (not public). + +## Tracking + +`pvr_respond` creates `REPORT_DIR/{GHSA-ID}_response_sent.md` after the state +transition. This marker prevents re-processing by `pvr_respond_batch` but does **not** +confirm that the comment was posted. Use it as a reminder to complete the manual step. diff --git a/src/seclab_taskflows/taskflows/pvr_triage/README.md b/src/seclab_taskflows/taskflows/pvr_triage/README.md new file mode 100644 index 0000000..67f2210 --- /dev/null +++ b/src/seclab_taskflows/taskflows/pvr_triage/README.md @@ -0,0 +1,432 @@ +# PVR Triage Taskflows + +Tools for triaging GitHub Security Advisories submitted via [Private Vulnerability Reporting (PVR)](https://docs.github.com/en/code-security/security-advisories/guidance-on-reporting-and-writing-information-about-vulnerabilities/privately-reporting-a-security-vulnerability). The taskflows fetch an advisory in triage state, verify the claimed vulnerability against actual source code, detect duplicate reports, score report quality, and generate a structured analysis and a ready-to-send response draft. + +Four taskflows cover the full triage lifecycle: + +| Taskflow | Purpose | +|---|---| +| `pvr_triage` | Deep-analyse one advisory end-to-end | +| `pvr_triage_batch` | Score an entire inbox and produce a ranked queue | +| `pvr_respond` | Post the response for one advisory once you've reviewed the analysis | +| `pvr_respond_batch` | Scan REPORT_DIR and post all pending response drafts in a single session | + +--- + +## Requirements + +- Python ≥ 3.9 (or Docker via `run_seclab_agent.sh`) +- `gh` CLI installed and authenticated +- A GitHub token with **`repo`** and **`security_events`** scopes + - Write-back actions (`pvr_respond`) additionally require **`security_events` write** scope +- AI API credentials (`AI_API_TOKEN`, `AI_API_ENDPOINT`) + +### Environment variables + +| Variable | Required by | Description | +|---|---|---| +| `GH_TOKEN` | all | GitHub personal access token | +| `AI_API_TOKEN` | all | API key for the AI provider | +| `AI_API_ENDPOINT` | all | Model endpoint (defaults to `https://api.githubcopilot.com`) | +| `REPORT_DIR` | all | Directory where triage reports are written. Defaults to `./reports` | +| `LOG_DIR` | all | Directory for MCP server logs. Auto-detected via `platformdirs` if not set | +| `REPORTER_DB_DIR` | `pvr_triage` | Directory for the reporter reputation SQLite database. Auto-detected if not set | +| `PVR_CONTAINER_VALIDATION` | `pvr_triage` | Set to `true` to enable container-based SAST and reachability validation. Requires Docker. | +| `CONTAINER_WORKSPACE` | `pvr_triage` | Host directory mounted to `/workspace` in the SAST container. Optional. | + +A minimal `.env` for local use: + +``` +GH_TOKEN=ghp_... +AI_API_TOKEN=... +AI_API_ENDPOINT=https://api.githubcopilot.com +REPORT_DIR=/path/to/reports +LOG_DIR=/path/to/logs +``` + +--- + +## Taskflow 1 — Single advisory triage (`pvr_triage`) + +Runs a full analysis on one GHSA in triage state and produces: + +- A structured triage report saved to `REPORT_DIR/_triage.md` +- A response draft saved to `REPORT_DIR/_response_triage.md` +- A record in the reporter reputation database + +```bash +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_triage \ + -g repo=owner/repo \ + -g ghsa=GHSA-xxxx-xxxx-xxxx + +# For testing with owner-created advisories (draft state instead of triage): +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_triage \ + -g repo=owner/repo \ + -g ghsa=GHSA-xxxx-xxxx-xxxx \ + -g state=draft +``` + +### What it does (9 tasks) + +1. **Initialize** — clears the in-memory cache. +2. **Fetch & parse** — fetches the advisory from the GitHub API and extracts structured metadata: vulnerability type, affected component, file references, PoC quality signals, reporter credits. Also fetches the repository's SECURITY.md security policy (if one exists) for policy compliance evaluation. +3. **Quality gate** — calls `get_reporter_score` for the reporter's history, `find_similar_triage_reports` to find prior reports, and `compare_advisories` to detect duplicates in the current triage inbox. If a security policy was found, evaluates the report against it (scope, required elements, exclusions). Computes `fast_close` using a reputation-gated decision tree: + - **high-trust reporter** → always `fast_close = false` (full verification). + - **skepticism reporter** → `fast_close = true` when all three quality signals are absent (prior similar report not required). + - **normal / no history** → `fast_close = true` only when all three signals are absent *and* a prior similar report exists. + Fast-close skips deep code analysis. Duplicate detection results are surfaced in the report but never trigger automatic fast-close. +4. **Code verification** — resolves the claimed version to a git tag/SHA, fetches the relevant source files, and checks whether the vulnerability pattern is actually present. After verifying at the claimed version, also checks HEAD to determine patch status (`still_vulnerable` / `patched` / `could_not_determine`). Skipped automatically when `fast_close` is true. +5. **Container validation** (optional) — when `PVR_CONTAINER_VALIDATION=true`, clones the repo at the affected version into an isolated SAST container and performs: semgrep scanning on reported files, call graph / reachability analysis on reported functions (pyan3 for Python, cscope for C/C++), best-effort PoC reproduction, and patch diff analysis. Skipped when not enabled or when fast-close is active. +6. **Report generation** — writes a markdown report covering: Verdict, Code Verification, Validation Results (if container validation ran), Severity Assessment, CVSS 3.1 assessment, Duplicate/Prior Reports, Patch Status, Report Quality, Reporter Reputation, and Recommendations. +7. **Save report** — writes the report to `REPORT_DIR/_triage.md` and prints the path. +8. **Response draft** — drafts a plain-text reply to the reporter (≤200 words, no markdown headers) tailored to the verdict: acknowledge + credit for CONFIRMED, cite evidence for UNCONFIRMED, explain missing info for INCONCLUSIVE, or request specific details for fast-close. +9. **Update reputation + save response** — records the triage outcome in the reporter reputation database and saves the response draft to `REPORT_DIR/_response_triage.md`. + +### Report structure + +``` +## PVR Triage Analysis: GHSA-xxxx-xxxx-xxxx + +**Repository:** owner/repo +**Claimed Severity:** high +**Vulnerability Type:** path traversal + +### Verdict +**[CONFIRMED / UNCONFIRMED / INCONCLUSIVE]** + +### Code Verification +### Validation Results (only when container validation ran) +### Severity Assessment +### CVSS Assessment +### Duplicate / Prior Reports +### Patch Status +### Security Policy Compliance (only when repo has SECURITY.md) +### Report Quality +### Reporter Reputation +### Recommendations +``` + +--- + +## Taskflow 2 — Batch inbox scoring (`pvr_triage_batch`) + +Lists advisories in triage state for a repository, scores each unprocessed one by priority, and saves a ranked markdown table. Advisories with an existing triage report in `REPORT_DIR` are skipped and their count is noted in the output. + +```bash +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_triage_batch \ + -g repo=owner/repo + +# For testing with owner-created advisories (draft state instead of triage): +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_triage_batch \ + -g repo=owner/repo \ + -g state=draft +``` + +### Output + +Saved to `REPORT_DIR/batch_queue__.md`: + +```markdown +# PVR Batch Triage Queue: owner/repo + +| GHSA | Age (days) | Severity | Vuln Type | Quality Signals | Priority | Duplicates | Status | Suggested Action | +|------|------------|----------|-----------|-----------------|----------|------------|--------|-----------------| +| GHSA-... | 14 | high | SQL injection | PoC, Files | 6 | - | Not triaged | Triage Immediately | +| GHSA-... | 7 | high | SQL injection | PoC | 4 | GHSA-... [strong] | Not triaged | Likely Duplicate -- Triage Best | +| GHSA-... | 3 | medium | XSS | None | 1 | - | Not triaged | Likely Low Quality -- Fast Close | +``` + +Rows are sorted by priority score descending; ties are broken by `created_at` ascending (oldest advisory first). + +### Priority scoring + +Advisories with an existing report in `REPORT_DIR` are skipped entirely. Only unprocessed advisories are scored: + +``` +priority_score = severity_weight + quality_weight + +severity_weight: critical=4 high=3 medium=2 low=1 unknown=1 +quality_weight: has_file_references(+1) + has_poc(+1) + has_line_numbers(+1) +``` + +**Suggested actions:** + +| Score | Action | +|---|---| +| ≥ 5 | Triage Immediately | +| ≥ 3 | Triage Soon | +| 2 | Triage | +| ≤ 1 | Likely Low Quality — Fast Close | + +--- + +## Taskflow 3 — Write-back (`pvr_respond`) + +Loads an existing triage report and applies the chosen state transition to the GitHub advisory. All write-back calls are confirm-gated — the agent will prompt for confirmation before making any change. + +```bash +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_respond \ + -g repo=owner/repo \ + -g ghsa=GHSA-xxxx-xxxx-xxxx \ + -g action=accept +``` + +### Actions + +| `action` | API call | When to use | +|---|---|---| +| `accept` | Sets advisory state to `draft` (triage → draft) | Vulnerability confirmed — maintainer intends to publish an advisory | +| `reject` | Sets advisory state to `closed` | Report is clearly invalid or low quality | + +> **Note:** `pvr_respond` requires that `pvr_triage` has already been run for the GHSA so that `_triage.md` and `_response_triage.md` exist in `REPORT_DIR`. + +> **Posting the response:** The GitHub REST API has no comments endpoint for security advisories. After running `pvr_respond`, post the response draft manually via the advisory URL. See [`MANUAL_RESPONSE.md`](MANUAL_RESPONSE.md) for instructions and language. + +### Confirm gate + +The toolbox marks `accept_pvr_advisory` and `reject_pvr_advisory` as `confirm`-gated. The agent will print the verdict and summary, then ask for explicit confirmation before making any change to GitHub. + +After a successful state transition, `pvr_respond` calls `mark_response_sent` to create a `_response_sent.md` marker so `pvr_respond_batch` will skip this advisory in future runs. + +--- + +## Taskflow 4 — Bulk respond (`pvr_respond_batch`) + +Scans `REPORT_DIR` for advisories that have a response draft (`*_response_triage.md`) but no applied marker (`*_response_sent.md`), and applies the chosen state transition to each in a single session. + +```bash +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_respond_batch \ + -g repo=owner/repo \ + -g action=reject + +# or via the helper script: +./scripts/run_pvr_triage.sh respond_batch owner/repo reject +``` + +### How it works + +**Task 1** calls `list_pending_responses` (local read-only, no confirm gate) to find all pending advisories and prints a summary table. If there are none it stops immediately. + +**Task 2** iterates over every pending entry: +1. Reads the triage report from disk. +2. Prints a per-item summary (GHSA, verdict). +3. Executes the chosen action (`accept` / `reject`) via the confirm-gated write-back tool. +4. On success, calls `mark_response_sent` to create a `*_response_sent.md` marker so the advisory is skipped in future runs. + +Prints a final count and a reminder to post each response draft manually. + +### Applied markers + +`pvr_respond` also calls `mark_response_sent` after a successful state transition, keeping single-advisory and bulk runs in sync. Once a marker exists, neither `pvr_respond` nor `pvr_respond_batch` will re-process it. + +--- + +## Duplicate Detection + +Both `pvr_triage` and `pvr_triage_batch` use the `compare_advisories` tool to detect duplicate or near-duplicate advisories in the triage inbox. + +**How it works:** Dedup uses two layers: + +1. **Structural comparison** (`compare_advisories` tool) -- fingerprints each advisory by CWE IDs, package, version range, and file paths from the description. Pairs with overlapping fields are flagged: + +| Level | Meaning | +|---|---| +| strong | Same package AND (same CWE or same files or same version range) | +| moderate | Same package alone, or CWE + files overlap | +| weak | Any single field overlap | + +2. **Semantic analysis** (agent judgment) -- the agent reads all advisory descriptions and identifies groups that describe the same vulnerability even when structural metadata differs. Catches duplicates that use different wording, cite different CWEs, or lack structured metadata entirely (common with low-quality reports). A structural match of "none" means insufficient metadata, not necessarily distinct vulnerabilities. + +**In batch mode:** The scored queue table includes a Duplicates column showing cluster membership. Strong structural matches get the "Likely Duplicate -- Triage Best" action. Semantic duplicates identified by the agent are added with match level "semantic". + +**In single-advisory mode:** The quality gate checks for both structural and semantic duplicates and surfaces the info in the report, but never auto-closes. Maintainers always decide. + +See [SCORING.md](SCORING.md) Section 5 for full details. + +--- + +## Container Validation (optional) + +When `PVR_CONTAINER_VALIDATION=true`, `pvr_triage` performs automated validation in an isolated Docker container running the SAST image (`seclab-shell-sast:latest`). + +### What it does + +1. **Clone + checkout** — clones the repo and checks out the affected version. +2. **SAST scan** — runs semgrep on reported file paths. +3. **Reachability analysis** — traces the call graph to determine if the reported function is reachable from public entry points (pyan3 for Python, cscope for C/C++, grep-based for others). +4. **PoC reproduction** — attempts best-effort reproduction of provided PoC steps (safe commands only; no network access or destructive operations). +5. **Patch analysis** — diffs the affected version against HEAD to verify whether a fix exists and addresses the reported vulnerability. + +### Prerequisites + +```bash +# Build the SAST container image +./scripts/build_container_images.sh + +# Enable container validation +export PVR_CONTAINER_VALIDATION=true +``` + +### Effect on triage + +- Unreachable functions → severity downgrade in the assessment +- Semgrep findings → corroborate or contradict reporter claims +- Successful PoC reproduction → strongest confirmation evidence +- Results appear in the **Validation Results** section of the triage report + +See [SCORING.md](SCORING.md) Section 6 for full details. + +--- + +## Typical workflow + +``` +1. Run pvr_triage_batch to see what's in your inbox and prioritise. + +2. For each advisory you want to analyse: + Run pvr_triage. + +3. Review the saved report in REPORT_DIR: + - Check the Verdict and Code Verification sections. + - Edit the response draft (_response_triage.md) if needed. + +4a. Apply a state transition with pvr_respond: + - action=accept → move to draft (triage → draft) + - action=reject → close (triage → closed) + Then post the response draft manually via the advisory URL. + +4b. Or apply state transitions to all pending advisories at once with pvr_respond_batch: + Scans REPORT_DIR for pending entries (no _response_sent.md marker) + and applies the chosen action to all of them in one session. + Then post each response draft manually via the advisory URL. +``` + +### Example session + +```bash +# Step 1: score the inbox +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_triage_batch \ + -g repo=acme/widget + +# Step 2: triage the highest-priority advisory +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_triage \ + -g repo=acme/widget \ + -g ghsa=GHSA-1234-5678-abcd + +# Step 3: review the output +cat reports/GHSA-1234-5678-abcd_triage.md +cat reports/GHSA-1234-5678-abcd_response_triage.md + +# Step 4a: accept (triage → draft) — vulnerability confirmed +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_respond \ + -g repo=acme/widget \ + -g ghsa=GHSA-1234-5678-abcd \ + -g action=accept + +# Step 4b: or reject (triage → closed) — invalid or low-quality report +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_respond \ + -g repo=acme/widget \ + -g ghsa=GHSA-1234-5678-abcd \ + -g action=reject + +# Step 4c: or apply state transitions to all pending advisories at once +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_respond_batch \ + -g repo=acme/widget \ + -g action=reject + +# Step 5: post each response draft manually via the advisory URL +# See taskflows/pvr_triage/MANUAL_RESPONSE.md for instructions +``` + +--- + +## Reporter reputation + +Every completed `pvr_triage` run records the verdict and quality rating against the reporter's GitHub login in a local SQLite database (`REPORTER_DB_DIR/reporter_reputation.db`). + +The quality gate in Task 3 of `pvr_triage` calls `get_reporter_score` automatically before any code analysis. The score summary appears in the report under **Reporter Reputation**. + +**Reputation thresholds:** + +| Condition | Recommendation | +|---|---| +| confirmed_pct ≥ 60% and Low-quality share ≤ 20% | high trust | +| confirmed_pct ≤ 20% or Low-quality share ≥ 50% | treat with skepticism | +| Otherwise | normal | + +Reputation directly gates the fast-close decision. See [SCORING.md](SCORING.md) Section 3 for the full three-path decision table and reputation × fast-close matrix. + +--- + +## Models + +The taskflows use `seclab_taskflows.configs.model_config_pvr_triage`, which defines two model roles: + +| Role | Used for | Default model | +|---|---|---| +| `triage` | Code verification and report generation | `claude-opus-4.6` | +| `extraction` | Fetch/parse, quality gate, save tasks | `gpt-5-mini` | + +Override the model config by setting `AI_API_ENDPOINT` and `AI_API_TOKEN` to point at a compatible provider. + +--- + +## Output files + +All files are written to `REPORT_DIR` (default: `./reports`). + +| File | Written by | Contents | +|---|---|---| +| `_triage.md` | `pvr_triage` task 6 | Full triage analysis report | +| `_response_triage.md` | `pvr_triage` task 8 | Plain-text response draft for the reporter | +| `_response_sent.md` | `pvr_respond` / `pvr_respond_batch` | Marker: state transition applied (contains ISO timestamp); post draft manually | +| `batch_queue__.md` | `pvr_triage_batch` task 3 | Ranked inbox table with Age column | + +--- + +## Globals reference + +| Global | Taskflow | Default | Description | +|---|---|---|---| +| `repo` | all | (required) | Repository in `owner/repo` format | +| `ghsa` | `pvr_triage`, `pvr_respond` | (required) | GHSA ID of the advisory | +| `action` | `pvr_respond`, `pvr_respond_batch` | (required) | `accept` or `reject` | +| `state` | `pvr_triage`, `pvr_triage_batch` | `triage` | Advisory state to filter. Use `draft` for testing with owner-created advisories. | + +--- + +## Demo script + +A demo script is included for live testing against a test repository: + +```bash +# Test all MCP tools against live API (no AI calls, fast): +./scripts/demo_pvr_triage.sh tools + +# Run batch scoring: +./scripts/demo_pvr_triage.sh batch + +# Run full triage on a specific advisory: +./scripts/demo_pvr_triage.sh triage GHSA-xxxx-xxxx-xxxx + +# Run everything: +./scripts/demo_pvr_triage.sh all +``` + +The demo script uses `gh auth token` for GitHub API access and +`passage show github/capi-token` for the AI endpoint token. Override +with `GH_TOKEN` and `AI_API_TOKEN` environment variables. + +Set `ADVISORY_STATE=triage` to test against real PVR submissions +(default is `draft` for the test repository). diff --git a/src/seclab_taskflows/taskflows/pvr_triage/SCORING.md b/src/seclab_taskflows/taskflows/pvr_triage/SCORING.md new file mode 100644 index 0000000..d09679a --- /dev/null +++ b/src/seclab_taskflows/taskflows/pvr_triage/SCORING.md @@ -0,0 +1,244 @@ +# PVR Triage Scoring Reference + +This document describes every scoring decision made by the PVR triage taskflows: batch priority scoring, single-advisory quality signals, fast-close detection, and reporter reputation thresholds. All values are authoritative — they reflect the exact constants in the taskflow YAML and MCP server code. + +--- + +## 1. Batch Priority Score (`pvr_triage_batch`) + +Used to rank unprocessed advisories in triage state before analysis. + +### Severity weight + +| Severity | Weight | +|---|---| +| critical | 4 | +| high | 3 | +| medium | 2 | +| low | 1 | +| unknown | 1 | + +### Quality weight + +Extracted from the advisory description text. Each signal present adds 1 point. + +| Signal | Condition | +|---|---| +| `has_file_references` | Description mentions at least one specific source file path | +| `has_poc` | Description includes reproduction steps or exploit code | +| `has_line_numbers` | Description cites at least one line number | + +### Formula + +``` +priority_score = severity_weight + quality_weight (max: 7) +``` + +### Suggested action thresholds + +| priority_score | Suggested action | +|---|---| +| ≥ 5 | Triage Immediately | +| ≥ 3 | Triage Soon | +| 2 | Triage | +| ≤ 1 | Likely Low Quality — Fast Close | + +### Score reference table + +| Severity | No signals | 1 signal | 2 signals | 3 signals | +|---|---|---|---|---| +| critical | 4 — Triage Soon | 5 — **Triage Immediately** | 6 — **Triage Immediately** | 7 — **Triage Immediately** | +| high | 3 — Triage Soon | 4 — Triage Soon | 5 — **Triage Immediately** | 6 — **Triage Immediately** | +| medium | 2 — Triage | 3 — Triage Soon | 4 — Triage Soon | 5 — **Triage Immediately** | +| low | 1 — Fast Close | 2 — Triage | 3 — Triage Soon | 4 — Triage Soon | + +**Key observations:** +- A bare `critical` with no quality signals scores 4 — Triage Soon, not Triage Immediately. +- `high` needs at least two quality signals to reach Triage Immediately. +- `medium` needs all three quality signals to reach Triage Immediately. +- Any `low` severity report with no quality signals is Fast Close. + +### Already-triaged advisories + +Advisories with an existing `_triage.md` in `REPORT_DIR` are skipped entirely and do not appear in the scored queue. Their count is noted in the batch report summary. + +--- + +## 2. Single-Advisory Quality Signals (`pvr_triage`) + +The quality gate in Task 3 extracts the same three signals as the batch scorer, plus two additional ones used for the report quality rating. + +| Signal | Used in | +|---|---| +| `has_file_references` | Fast-close, report quality rating | +| `has_line_numbers` | Fast-close, report quality rating | +| `has_poc` | Fast-close, report quality rating | +| `has_version_info` | Report quality rating only | +| `has_code_snippets` | Report quality rating only | + +### Report quality rating + +Assigned by the analyst in the report generation task. + +| Rating | Criteria | +|---|---| +| High | Specific, accurate claims; verified PoC; correct file paths and line numbers | +| Medium | Partially accurate; some details wrong or missing | +| Low | Vague, speculative, or significantly inaccurate ("AI slop") | + +--- + +## 3. Fast-Close Detection (`pvr_triage`) + +The quality gate evaluates `fast_close` via a three-path decision tree gated on the reporter's reputation. + +### Path A — High-trust reporter + +| Condition | Result | +|---|---| +| `reporter_score.recommendation == "high trust"` | `fast_close = false` unconditionally | + +High-trust reporters always receive full code verification regardless of quality signals. + +### Path B — Skepticism reporter + +| Condition | Result | +|---|---| +| `reporter_score.recommendation == "treat with skepticism"` **and** all three signals absent | `fast_close = true` | +| `reporter_score.recommendation == "treat with skepticism"` **and** any signal present | `fast_close = false` | + +For skepticism reporters, a prior similar report is **not** required — the three absent quality signals alone are sufficient to trigger fast-close. + +### Path C — Normal / no history + +All four conditions must hold simultaneously: + +1. `has_file_references` is false +2. `has_poc` is false +3. `has_line_numbers` is false +4. At least one similar report already exists in `REPORT_DIR` with verdict `UNCONFIRMED` or `CONFIRMED` + +Conditions 1–3 alone are not sufficient — there must also be a prior report on a similar issue. A novel low-quality report for an unseen component proceeds to full verification. + +### Reputation × fast-close summary matrix + +| Reputation | No quality signals, no prior similar | No quality signals, prior similar exists | Any quality signal present | +|---|---|---|---| +| high trust | full verification | full verification | full verification | +| normal / no history | full verification | **fast-close** | full verification | +| treat with skepticism | **fast-close** | **fast-close** | full verification | + +When `fast_close` is true, code verification is skipped entirely. The response draft uses the fast-close template (requests specific file path, line number, and reproduction steps). + +--- + +## 4. Reporter Reputation (`reporter_reputation.py`) + +Accumulated from every completed `pvr_triage` run. Keyed by GitHub login. + +### Inputs per record + +| Field | Values | +|---|---| +| verdict | CONFIRMED / UNCONFIRMED / INCONCLUSIVE | +| quality | High / Medium / Low | + +### Score metrics + +``` +confirmed_pct = confirmed_count / total_reports +low_share = Low_count / total_reports +``` + +### Recommendation thresholds + +| Condition | Recommendation | +|---|---| +| confirmed_pct ≥ 0.60 **and** low_share ≤ 0.20 | high trust | +| confirmed_pct ≤ 0.20 **or** low_share ≥ 0.50 | treat with skepticism | +| Otherwise | normal | +| No history | no history | + +### Effect on triage + +The reputation score directly influences the fast-close decision (see Section 3): + +- **high trust** — always forces full code verification. +- **treat with skepticism** — lowers the fast-close bar: only three absent quality signals are needed (no prior similar report required). +- **normal / no history** — standard four-condition fast-close applies. + +The score also appears in the triage report under **Reporter Reputation** for maintainer awareness. + +--- + +## 5. Duplicate Detection (`compare_advisories`) + +The `compare_advisories` tool detects duplicate or near-duplicate advisories in a repository's triage inbox before individual triage work begins. + +### Fingerprint fields + +Each advisory is fingerprinted using these structural fields: + +| Field | Source | +|---|---| +| CWE IDs | Advisory `cwes` metadata | +| Package (ecosystem + name) | Advisory `vulnerabilities` metadata | +| Vulnerable version range | Advisory `vulnerabilities` metadata | +| File paths | Extracted from description text via regex | +| Normalized summary | Summary lowercased, non-alphanumeric stripped | + +### Match levels + +| Level | Condition | +|---|---| +| strong | Same package AND (same CWE or same files or same version range) | +| moderate | Same package alone, or same CWE AND same files (no package overlap) | +| weak | Any single field overlap (CWE only, or file paths only, etc.) | +| none | No field overlap | + +### Clustering + +Strong and moderate matches are clustered via union-find. The batch queue output shows each cluster with its member GHSAs and match reasons. + +### Effect on triage + +- Batch scorer: strong-match clusters get "Likely Duplicate -- Triage Best" suggested action +- Single-advisory triage: quality gate surfaces duplicate info but does NOT auto-close. Maintainers decide. +- Triage report: Duplicate/Prior Reports section prominently flags cluster membership + +### Conservative design + +Dedup detection is intentionally conservative: +- Only structural field overlap, no semantic similarity +- Never auto-closes advisories based on dedup alone +- Weak matches are surfaced as informational, not clustered +- Maintainer always makes the final accept/reject decision + +--- + +## 6. Container Validation (`pvr_triage` Task 4b) + +Optional automated validation using the SAST container. Gated by `PVR_CONTAINER_VALIDATION=true`. + +### Validation steps + +| Step | Tool | Purpose | +|---|---|---| +| Clone + checkout | git | Clone repo at affected version into container | +| SAST scan | semgrep | Scan reported files for vulnerability patterns | +| Reachability | pyan3 / cscope / rg | Trace call graph to determine if vuln function is reachable from public entry points | +| PoC reproduction | shell_exec | Best-effort reproduction of provided PoC steps (safe commands only) | +| Patch analysis | git diff | Compare affected version to HEAD to verify patch addresses the reported vulnerability | + +### Effect on triage + +- Reachability results factor into severity assessment (unreachable code = lower impact) +- SAST findings corroborate or contradict the reporter's claims +- PoC reproduction provides strongest evidence for confirmation +- Patch analysis validates whether a fix exists + +### Prerequisites + +- Docker installed and running +- `seclab-shell-sast:latest` image built (`scripts/build_container_images.sh`) +- `PVR_CONTAINER_VALIDATION=true` set in environment diff --git a/src/seclab_taskflows/taskflows/pvr_triage/pvr_respond.yaml b/src/seclab_taskflows/taskflows/pvr_triage/pvr_respond.yaml new file mode 100644 index 0000000..c4370cf --- /dev/null +++ b/src/seclab_taskflows/taskflows/pvr_triage/pvr_respond.yaml @@ -0,0 +1,112 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# PVR Respond Taskflow +# +# Loads a previously generated triage report and response draft from disk +# and executes the selected write-back action on the GitHub advisory. +# All write-back API calls are confirm-gated in the pvr_ghsa toolbox. +# +# Usage: +# python -m seclab_taskflow_agent \ +# -t seclab_taskflows.taskflows.pvr_triage.pvr_respond \ +# -g repo=owner/repo \ +# -g ghsa=GHSA-xxxx-xxxx-xxxx \ +# -g action=accept|reject +# +# Required environment variables: +# GH_TOKEN - GitHub token with security_events write scope +# AI_API_TOKEN - API token for the AI model provider +# AI_API_ENDPOINT - Model provider endpoint (default: https://api.githubcopilot.com) +# REPORT_DIR - Directory where triage reports are stored + +seclab-taskflow-agent: + version: "1.0" + filetype: taskflow + +model_config: seclab_taskflows.configs.model_config_pvr_triage + +globals: + # GitHub repository in owner/repo format + repo: + # GHSA ID of the advisory to act on + ghsa: + # Action to perform: accept or reject + action: + +taskflow: + # ------------------------------------------------------------------------- + # Task 1: Load triage report and response draft from disk + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflow_agent.personalities.assistant + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Read the triage report for advisory "{{ globals.ghsa }}" using read_triage_report + with ghsa_id="{{ globals.ghsa }}". + + Store the triage report content under memcache key "triage_report". + + Read the response draft using read_triage_report with + ghsa_id="{{ globals.ghsa }}_response". + + Store the response draft content under memcache key "response_draft". + + From the triage report, extract and print: + - Verdict (CONFIRMED / UNCONFIRMED / INCONCLUSIVE) + - Report Quality (High / Medium / Low) + - A 1-2 sentence summary of the findings + + Then print the full response draft. + + If either file is missing (read_triage_report returns "Report not found"), + print a clear error message and stop. + + # ------------------------------------------------------------------------- + # Task 2: Confirm and execute write-back action + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflow_agent.personalities.assistant + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve "triage_report" and "response_draft" from memcache. + + Extract owner and repo from "{{ globals.repo }}" (format: owner/repo). + + The requested action is: "{{ globals.action }}" + + Execute the action as follows: + + If action is "accept": + Call accept_pvr_advisory with: + - owner: extracted owner + - repo: extracted repo + - ghsa_id: "{{ globals.ghsa }}" + + If action is "reject": + Call reject_pvr_advisory with: + - owner: extracted owner + - repo: extracted repo + - ghsa_id: "{{ globals.ghsa }}" + + If action is anything else: + Print: "Unknown action '{{ globals.action }}'. Valid actions: accept, reject" + and stop. + + Print the result returned by the API call. + + On success, call mark_response_sent with ghsa_id="{{ globals.ghsa }}" to record + that the state transition has been applied. + + Then print: "Response draft saved at REPORT_DIR/{{ globals.ghsa }}_response_triage.md + — post it to the reporter manually via the advisory URL." diff --git a/src/seclab_taskflows/taskflows/pvr_triage/pvr_respond_batch.yaml b/src/seclab_taskflows/taskflows/pvr_triage/pvr_respond_batch.yaml new file mode 100644 index 0000000..061d2ce --- /dev/null +++ b/src/seclab_taskflows/taskflows/pvr_triage/pvr_respond_batch.yaml @@ -0,0 +1,95 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# PVR Bulk Respond Taskflow +# +# Scans REPORT_DIR for pending response drafts (advisories with a +# *_response_triage.md but no *_response_sent.md marker) and applies +# the chosen state transition to each in a single session. +# +# Usage: +# python -m seclab_taskflow_agent \ +# -t seclab_taskflows.taskflows.pvr_triage.pvr_respond_batch \ +# -g repo=owner/repo \ +# -g action=accept|reject +# +# Required environment variables: +# GH_TOKEN - GitHub token with security_events write scope +# AI_API_TOKEN - API token for the AI model provider +# AI_API_ENDPOINT - Model provider endpoint (default: https://api.githubcopilot.com) +# REPORT_DIR - Directory where triage reports are stored + +seclab-taskflow-agent: + version: "1.0" + filetype: taskflow + +model_config: seclab_taskflows.configs.model_config_pvr_triage + +globals: + # GitHub repository in owner/repo format + repo: + # Action to apply to all pending responses: accept or reject + action: + +taskflow: + # ------------------------------------------------------------------------- + # Task 1: List pending responses + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflow_agent.personalities.assistant + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Call list_pending_responses to find all advisories with a response draft + that has not yet been sent. + + If the result is an empty list, print "No pending responses." and stop. + + Otherwise print a summary table: + + | GHSA | Triage Report Exists | + |------|---------------------| + [one row per pending entry] + + Store the list under memcache key "pending_responses". + + # ------------------------------------------------------------------------- + # Task 2: Send each response + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflow_agent.personalities.assistant + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve "pending_responses" from memcache. + + Extract owner and repo from "{{ globals.repo }}" (format: owner/repo). + + The requested action is: "{{ globals.action }}" + + For each entry in pending_responses: + 1. Call read_triage_report with ghsa_id=entry.ghsa_id to get the triage report. + 2. Print a per-item summary: + GHSA: {entry.ghsa_id} + Verdict: [extracted from triage report] + 3. Execute the action: + If action is "accept": + Call accept_pvr_advisory with owner, repo, ghsa_id=entry.ghsa_id. + If action is "reject": + Call reject_pvr_advisory with owner, repo, ghsa_id=entry.ghsa_id. + If action is anything else: + Print: "Unknown action '{{ globals.action }}'. Skipping {entry.ghsa_id}." + and continue to the next entry. + 4. On success, call mark_response_sent with ghsa_id=entry.ghsa_id. + Print: "Applied: {entry.ghsa_id} — post response draft manually via advisory URL." + + After processing all entries, print: + "Applied N / M state transitions. Post response drafts manually via each advisory URL." diff --git a/src/seclab_taskflows/taskflows/pvr_triage/pvr_triage.yaml b/src/seclab_taskflows/taskflows/pvr_triage/pvr_triage.yaml new file mode 100644 index 0000000..f22d679 --- /dev/null +++ b/src/seclab_taskflows/taskflows/pvr_triage/pvr_triage.yaml @@ -0,0 +1,637 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# PVR Triage Taskflow +# +# Fetches a GHSA in triage state submitted via Private Vulnerability Reporting, +# verifies the vulnerability claim against actual source code, assesses +# impact and report quality, and generates a structured triage analysis +# for the maintainer. +# +# Usage: +# python -m seclab_taskflow_agent \ +# -t seclab_taskflows.taskflows.pvr_triage.pvr_triage \ +# -g repo=owner/repo \ +# -g ghsa=GHSA-xxxx-xxxx-xxxx +# +# Required environment variables: +# GH_TOKEN - GitHub token with repo and security_events scope +# AI_API_TOKEN - API token for the AI model provider +# AI_API_ENDPOINT - Model provider endpoint (default: https://api.githubcopilot.com) + +seclab-taskflow-agent: + version: "1.0" + filetype: taskflow + +model_config: seclab_taskflows.configs.model_config_pvr_triage + +globals: + # GitHub repository in owner/repo format + repo: + # GHSA ID of the advisory to triage + ghsa: + # Advisory state to filter by for dedup comparison (default: triage). + # Use "draft" for testing with owner-created advisories. + state: triage + +taskflow: + # ------------------------------------------------------------------------- + # Task 1: Initialize + # ------------------------------------------------------------------------- + - task: + must_complete: true + headless: true + agents: + - seclab_taskflow_agent.personalities.assistant + toolboxes: + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Clear the memory cache. + + # ------------------------------------------------------------------------- + # Task 2: Fetch and parse the GHSA + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflows.personalities.pvr_analyst + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Fetch the security advisory {{ globals.ghsa }} for repository {{ globals.repo }}. + + Extract the owner and repo name from "{{ globals.repo }}" (format: owner/repo). + + Store the full raw advisory description text under key "pvr_description". + + Then store a structured summary in memcache under the key "pvr_parsed" as a JSON + object with these fields: + - ghsa_id: the GHSA ID + - repo: "{{ globals.repo }}" + - summary: the advisory one-line summary + - severity_claimed: the severity rating in the advisory (critical/high/medium/low) + - vuln_type: vulnerability class (e.g. "path traversal", "IDOR", "XSS", "SQL injection") + - affected_component: the component, endpoint, or feature described as vulnerable + - affected_files: list of source file paths explicitly mentioned (empty list if none) + - affected_functions: list of function/method names mentioned (empty list if none) + - affected_versions: for version ranges, prefer the structured vulnerabilities[].vulnerable_versions + field from the advisory API response. Fall back to parsing the description only if + the structured field is absent or empty. Empty list if none found. + - poc_provided: true if a proof-of-concept or reproduction steps are described + - poc_summary: brief description of the PoC steps, or null if none provided + - quality_signals: + has_file_references: true if specific source file paths are cited + has_line_numbers: true if specific line numbers are cited + has_poc: true if reproduction steps are provided + has_version_info: true if specific affected versions are mentioned + has_code_snippets: true if actual code is quoted in the report + - credits: the credits list from the advisory API response (list of {login, type} objects) + + Do not perform any code analysis yet. + + Also call fetch_security_policy with owner and repo. If a security policy + is returned (non-empty), store it under memcache key "security_policy". + If no policy is found, store an empty string. + + Execute all steps above, then stop. Do not ask what to do next. + + # ------------------------------------------------------------------------- + # Task 3: Quick Quality Gate + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflows.personalities.pvr_analyst + toolboxes: + - seclab_taskflow_agent.toolboxes.memcache + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflows.toolboxes.reporter_reputation + user_prompt: | + Retrieve "pvr_parsed" and "security_policy" from memcache. + + Extract reporter login from pvr_parsed.credits: find the first entry with + type "reporter" and use its login. If credits is empty or no reporter type + is found, use "unknown". + + Call get_reporter_score with that login and store the result as reporter_score. + + If security_policy is non-empty, evaluate the PVR report against the + repository's security policy. Check: + - Scope: does the reported vulnerability type and component fall within + the scope defined by the policy? If the policy defines supported versions, + is the reported version in scope? + - Required elements: does the report contain what the policy asks reporters + to include (e.g. reproduction steps, impact assessment, affected versions)? + - Reporting channel: the report was submitted via PVR which is generally + the correct private channel, but note if the policy specifies a different + preferred channel (e.g. email, HackerOne). + - Out-of-scope: does the policy explicitly exclude certain vulnerability + classes, test environments, or components that match this report? + + Store the evaluation under the key "policy_compliance" as: + { + "policy_found": true, + "in_scope": true/false/null (null if policy doesn't define scope), + "version_in_scope": true/false/null, + "required_elements_present": list of elements met, + "required_elements_missing": list of elements the policy asks for but + the report does not provide, + "out_of_scope_match": true if the report matches an explicit exclusion, + "notes": brief summary of compliance assessment + } + + If security_policy is empty, store: + {"policy_found": false} + + Call find_similar_triage_reports with: + - vuln_type: pvr_parsed.vuln_type + - affected_component: pvr_parsed.affected_component + + Extract owner and repo from "{{ globals.repo }}" (format: owner/repo). + Call compare_advisories with owner, repo, state="{{ globals.state }}", + and target_ghsa="{{ globals.ghsa }}" to check if this advisory is a + structural duplicate of another advisory currently in the triage inbox. + Store the result as dedup_result. + + Note: compare_advisories uses structural field overlap only. A result + of "none" means insufficient metadata to determine, not necessarily + that the advisories are distinct. + + Additionally, if dedup_result contains other advisories (even with + match_level "none"), read the summaries and descriptions of the other + triage-state advisories returned in the comparison. Use your own + judgment to determine if any describe the same underlying vulnerability + as this one, even if the structural metadata differs. Two reports about + the same code path, attack scenario, or root cause are duplicates + regardless of CWE tags or wording. + + Evaluate fast_close based on reporter_score.recommendation: + + If reporter_score.recommendation is "high trust": + Set fast_close = false unconditionally. + Set reason = "High-trust reporter — full verification required." + + Else if reporter_score.recommendation is "treat with skepticism": + Set fast_close = true if ALL THREE quality signals are absent: + - pvr_parsed.quality_signals.has_file_references is false + - pvr_parsed.quality_signals.has_poc is false + - pvr_parsed.quality_signals.has_line_numbers is false + (Prior similar report NOT required for skepticism reporters.) + Set reason accordingly. + + Else (normal / no history): + Set fast_close = true only if ALL FOUR conditions hold: + - pvr_parsed.quality_signals.has_file_references is false + - pvr_parsed.quality_signals.has_poc is false + - pvr_parsed.quality_signals.has_line_numbers is false + - At least one similar report exists with verdict UNCONFIRMED or CONFIRMED + Set reason accordingly. + + Store under memcache key "quality_gate": + { + "fast_close": true or false, + "reason": "brief explanation of why fast_close was triggered or not", + "reporter_login": "the login extracted above", + "reporter_score": {the full object returned by get_reporter_score}, + "similar_reports": [the list returned by find_similar_triage_reports], + "duplicate_advisories": { + "structural_clusters": dedup_result.clusters (only clusters containing this GHSA), + "semantic_duplicates": list of other GHSA IDs you identified as describing + the same vulnerability via semantic analysis (may be empty), + "is_duplicate": true if this GHSA appears in any structural cluster + (strong or moderate) OR you identified semantic duplicates + } + } + + If duplicate_advisories.is_duplicate is true, note this in the reason field + but do NOT auto-set fast_close. Duplicates still require human judgment. + The report will surface the duplicate info for the maintainer to decide. + + Execute all steps above, then stop. Do not ask what to do next. + + # ------------------------------------------------------------------------- + # Task 4: Verify vulnerability in source code + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: triage + agents: + - seclab_taskflows.personalities.pvr_analyst + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflows.toolboxes.gh_file_viewer + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve "pvr_parsed", "pvr_description", and "quality_gate" from memcache. + + If quality_gate.fast_close is true, store under "code_verification": + { + "ref_used": null, + "files_examined": [], + "vulnerability_confirmed": null, + "confirmation_evidence": "Fast-close: quality gate triggered. Reason: {quality_gate.reason}", + "mitigation_found": null, + "mitigation_details": null, + "patch_status": "could_not_determine", + "patch_notes": null, + "notes": "Skipped deep analysis." + } + and stop. Do not fetch any files. + + Otherwise proceed with full code verification: + + Extract owner and repo from "{{ globals.repo }}" (format: owner/repo). + + Verify the vulnerability at the affected version, not HEAD. + If affected_versions lists a version (e.g. "<= 1.25.4"), resolve the + upper bound to a git commit SHA using resolve_version_ref, then use + fetch_file_at_ref to fetch code at that SHA. If no version is specified, + fall back to fetch_file_from_gh / get_file_lines_from_gh (HEAD). + + If affected_files or affected_versions are empty, read pvr_description + directly to identify any file paths, function names, or version references + the extraction may have missed. Advisory descriptions vary widely in format + and structure — treat pvr_parsed as a starting point, not a complete picture. + + For each file path identified: + 1. Resolve the version to a SHA (if available). + 2. Fetch the file at that SHA using fetch_file_at_ref. + 3. Locate the affected function(s) at the stated line numbers. + 4. Check whether the vulnerability pattern described in the advisory + is present at that version. + 5. Look for authorization checks, input validation, or other mitigations. + + If no specific files are named, use search_repo_from_gh to locate + the affected function names or code patterns. + + Focus on the specific code path described. Do not perform a broad audit. + + After completing the main verification at the claimed version, re-check the + same code pattern at HEAD using fetch_file_from_gh. Add to code_verification: + "patch_status": "still_vulnerable" | "patched" | "could_not_determine" + "patch_notes": brief description of what changed at HEAD (or null) + + Store your findings under memcache key "code_verification" as JSON: + - ref_used: the git SHA or ref used for code fetching (or "HEAD" if none) + - files_examined: list of file paths fetched + - vulnerability_confirmed: true / false / null (null = could not determine) + - confirmation_evidence: precise description of what the code does, + including file path and line numbers + - mitigation_found: true if existing checks prevent exploitation + - mitigation_details: description of mitigating code, or null + - patch_status: "still_vulnerable" | "patched" | "could_not_determine" + - patch_notes: description of HEAD state vs claimed version (or null) + - notes: any additional observations + + Execute all steps above, then stop. Do not ask what to do next. + + # ------------------------------------------------------------------------- + # Task 4b: Container-based validation (optional) + # ------------------------------------------------------------------------- + # Gated by PVR_CONTAINER_VALIDATION env var. When enabled, clones the repo + # at the affected version into an isolated SAST container and performs: + # - SAST scanning (semgrep) on reported file paths + # - Call graph / reachability analysis on reported functions + # - PoC reproduction attempt if reproduction steps are provided + # - Patch diff analysis between affected and fixed versions + # + # Requires Docker and the seclab-shell-sast image. + # Set PVR_CONTAINER_VALIDATION=true and optionally CONTAINER_WORKSPACE. + - task: + must_complete: false + model: triage + headless: true + agents: + - seclab_taskflows.personalities.pvr_analyst + toolboxes: + - seclab_taskflows.toolboxes.container_shell_sast + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + env: + CONTAINER_PERSIST: "true" + CONTAINER_PERSIST_KEY: "pvr-validation-{{ globals.ghsa }}" + CONTAINER_TIMEOUT: "120" + user_prompt: | + Check if container validation is enabled: + If the environment variable PVR_CONTAINER_VALIDATION is not set or not "true", + store under memcache key "container_validation": + {"enabled": false, "reason": "PVR_CONTAINER_VALIDATION not set"} + and stop immediately. Do not execute any shell commands. + + Otherwise, retrieve "pvr_parsed", "pvr_description", "code_verification", + and "quality_gate" from memcache. + + If quality_gate.fast_close is true or code_verification.vulnerability_confirmed + is false, store under "container_validation": + {"enabled": true, "skipped": true, "reason": "fast_close or unconfirmed"} + and stop. + + Extract owner and repo from "{{ globals.repo }}" (format: owner/repo). + + Perform container-based validation in the SAST container: + + Step 1: Clone and checkout + shell_exec: git clone --depth=50 https://github.com/{owner}/{repo}.git /workspace/repo + If code_verification.ref_used is not null and not "HEAD": + shell_exec: cd /workspace/repo && git checkout {ref_used} + + Step 2: SAST scan on reported files + For each file in pvr_parsed.affected_files (if any): + shell_exec: semgrep scan --config=auto --json /workspace/repo/{file} + Store findings as sast_results. + + Step 3: Reachability analysis + Determine the language from file extensions in pvr_parsed.affected_files. + For Python projects: + shell_exec: cd /workspace/repo && pyan3 $(fd -e py . | tr '\n' ' ') --dot --no-defines 2>/dev/null | grep -i "{function_name}" || echo "function not found in call graph" + For C/C++ projects: + shell_exec: cd /workspace/repo && ctags -R --fields=+ne . && cscope -R -b 2>/dev/null + shell_exec: cd /workspace/repo && cscope -R -L -2 {function_name} 2>/dev/null || echo "no callers found" + For other languages: + shell_exec: cd /workspace/repo && rg -n "def |function |func |fn " --glob "*.{ext}" | grep -i "{function_name}" || echo "function not found" + shell_exec: cd /workspace/repo && rg -n "{function_name}" --glob "*.{ext}" | head -30 + Assess: is the reported function reachable from public API / entry points? + + Step 4: PoC reproduction attempt + If pvr_parsed.poc_provided is true and pvr_parsed.poc_summary describes + executable steps (curl commands, script, input data): + Attempt a best-effort reproduction. If it involves running code: + shell_exec: cd /workspace/repo && {appropriate setup and run commands} + Do NOT execute anything that could be destructive or contact external services. + If the PoC requires network access or external dependencies, note this as + a limitation rather than attempting it. + + Step 5: Patch status verification + If code_verification.patch_status is "patched" or if a patched version exists: + shell_exec: cd /workspace/repo && git log --oneline --all --grep="{cve_id or ghsa_id or vuln keyword}" | head -10 + shell_exec: cd /workspace/repo && git diff {affected_ref}..HEAD -- {affected_files} | head -100 + Assess: does the diff actually address the reported vulnerability? + + Store results under memcache key "container_validation": + { + "enabled": true, + "skipped": false, + "sast_findings": list of semgrep findings relevant to the reported vuln type, + "reachability": { + "function_reachable": true/false/null, + "call_chain": brief description of how the function is reached (or null), + "entry_points": list of public entry points that reach the vulnerable function + }, + "poc_result": { + "attempted": true/false, + "reproduced": true/false/null, + "details": description of what happened + }, + "patch_analysis": { + "patch_found": true/false, + "patch_addresses_vuln": true/false/null, + "details": description of the patch + }, + "notes": any additional observations + } + + # ------------------------------------------------------------------------- + # Task 5: Generate triage report + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: triage + agents: + - seclab_taskflows.personalities.pvr_analyst + toolboxes: + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve "pvr_parsed", "pvr_description", "code_verification", "quality_gate", + "policy_compliance", and "container_validation" from memcache. + container_validation and policy_compliance may not exist; treat missing as disabled/not found. + + Generate a triage analysis report in markdown and store it under + memcache key "triage_report". + + The report must follow this structure exactly: + + --- + + ## PVR Triage Analysis: {{ globals.ghsa }} + + **Repository:** {{ globals.repo }} + **Claimed Severity:** [from pvr_parsed] + **Vulnerability Type:** [from pvr_parsed] + + ### Verdict + + **[CONFIRMED / UNCONFIRMED / INCONCLUSIVE]** + + One or two sentences stating the verdict and the primary reason. + + ### Code Verification + + State the git ref (version tag / commit SHA) used for analysis, or note + if HEAD was used and why. + Describe exactly what code was examined and what was found. + Reference specific file paths and line numbers. + If the vulnerability is confirmed, show the vulnerable code pattern. + If unconfirmed, explain what the code actually does and why it is not vulnerable. + If inconclusive, explain what could not be determined and why. + + ### Validation Results + + If container_validation exists and container_validation.enabled is true + and container_validation.skipped is false, include this section. Otherwise omit it. + + Summarize findings from automated validation: + - SAST: note any semgrep findings relevant to the reported vulnerability type + - Reachability: state whether the reported function is reachable from public + entry points, and list the call chain if found. If unreachable, note this + significantly reduces impact. + - PoC Reproduction: if attempted, state whether it succeeded. If not attempted, + note why (e.g. requires network access, no executable steps provided). + - Patch Analysis: if a patch was found, summarize whether it addresses + the reported vulnerability. + + ### Severity Assessment + + State whether the claimed severity is accurate, overstated, or understated. + Base this on the actual exploitability and impact from the code evidence. + If container_validation was performed and reachability data is available, + factor that into the assessment (unreachable code is lower impact). + + ### CVSS Assessment + + Derive a CVSS 3.1 vector for this vulnerability based on the code evidence. + State: Base Score, Vector String, and whether the reporter's claimed severity + (pvr_parsed.severity_claimed) is accurate / overstated / understated. + If vulnerability_confirmed is false or null, note that CVSS is based on + the claimed scenario and may not reflect actual risk. + + ### Duplicate / Prior Reports + + If quality_gate.duplicate_advisories.is_duplicate is true, prominently flag this + advisory as a potential duplicate. List the other GHSAs in the cluster, the match + level, and the reasons for the match (shared CWE, package, version, file paths). + Recommend the maintainer review the cluster and triage the best report. + + If quality_gate.similar_reports is non-empty, list them with their verdict and quality. + Note whether this report adds new evidence vs. restating a known issue. + If no duplicates or similar reports found, state "No duplicates or prior reports found." + + ### Patch Status + + State code_verification.patch_status at HEAD. + If patched: note the triage impact (lower urgency for confirmed vulnerabilities). + If still_vulnerable: note urgency is unchanged. + If could_not_determine: state that HEAD status could not be assessed. + + ### Security Policy Compliance + + If policy_compliance exists and policy_compliance.policy_found is true, + include this section. Otherwise omit it entirely. + + Summarize how the PVR report aligns with the repository's security policy: + - Scope: is the reported vulnerability in scope per the policy? + If a supported versions table exists, is the reported version covered? + - Required elements: list what the policy asks for and whether this report + provides each element (e.g. reproduction steps, impact, affected versions). + - Out-of-scope: note if the report matches any explicit exclusion in the policy. + - Channel: note if the policy prefers a different reporting channel than PVR. + - Conclusion: state whether the report is compliant, partially compliant, + or non-compliant with the security policy. This helps the maintainer + decide whether to engage or redirect the reporter. + + ### Report Quality + + Assess the quality of the PVR submission: + - Note which claims were accurate (correct file paths, line numbers, functions) + - Note any inaccuracies (wrong paths, non-existent functions, incorrect PoC) + - Rate overall quality: High / Medium / Low + - High: specific, accurate, verified PoC + - Medium: partially accurate, some details wrong or missing + - Low: vague, speculative, or significantly inaccurate ("AI slop") + + ### Reporter Reputation + + Reporter login: [quality_gate.reporter_login] + Score summary: [quality_gate.reporter_score.recommendation] (confirmed_pct, + total_reports, quality_breakdown from reporter_score) + + ### Recommendations + + Provide 1-3 specific, actionable recommendations for the maintainer. + If confirmed: suggest the fix approach. + If unconfirmed: suggest whether to close, request more info, or monitor. + If low quality: recommend closing with explanation. + + --- + + Be factual. Do not include anything not supported by code evidence. + Keep the report concise. Aim for under 800 words. + + After generating the report, also store a structured summary under memcache + key "triage_outcome": + { + "verdict": "CONFIRMED" | "UNCONFIRMED" | "INCONCLUSIVE", + "quality": "High" | "Medium" | "Low" + } + + Execute all steps above, then stop. Do not ask what to do next. + + # ------------------------------------------------------------------------- + # Task 6: Save report to disk and print path + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflow_agent.personalities.assistant + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve the "triage_report" from memcache. + + Call save_triage_report with: + - ghsa_id: "{{ globals.ghsa }}" + - report: the full report content exactly as stored in memcache + + Then print the report content verbatim, followed by a blank line and: + "Report saved to: " + + # ------------------------------------------------------------------------- + # Task 7: Generate Reporter Response Draft + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflows.personalities.pvr_analyst + toolboxes: + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve "pvr_parsed", "code_verification", "quality_gate", "triage_report", + and "triage_outcome" from memcache. + + Use triage_outcome.verdict as the verdict. + + Draft a response comment to the reporter. Tone: direct, factual, not harsh. + Select the template based on triage_outcome.verdict and quality_gate.fast_close: + + fast_close (quality_gate.fast_close=true): + Explain that the report lacks file paths, functions, and reproduction steps + that match the codebase. Invite resubmission with specific details including + the exact file path, line number, and a concrete reproduction scenario. + + CONFIRMED: + Acknowledge the finding. State that a fix is in progress and credit will + be given when the advisory is published. + + UNCONFIRMED: + Cite specific code evidence for why the claim could not be confirmed + (reference the file path and what the code actually does). Ask for more + specific reproduction steps if the reporter wants to follow up. + + INCONCLUSIVE: + Explain what specific information is missing to complete verification + (e.g. exact version, file path, reproduction steps). + + Keep the response under 200 words. No markdown headers. Plain text suitable + for a GitHub comment. + + Store under memcache key "response_draft". + + Execute all steps above, then stop. Do not ask what to do next. + + # ------------------------------------------------------------------------- + # Task 8: Update Reporter Reputation + Save Response Draft + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflow_agent.personalities.assistant + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflows.toolboxes.reporter_reputation + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve "pvr_parsed", "code_verification", "quality_gate", "triage_report", + "triage_outcome", and "response_draft" from memcache. + + Use triage_outcome.verdict as the verdict. + Use triage_outcome.quality as the quality rating. + Extract reporter login from quality_gate.reporter_login. + + Call record_triage_result with: + - login: quality_gate.reporter_login + - ghsa_id: "{{ globals.ghsa }}" + - repo: "{{ globals.repo }}" + - verdict: the extracted verdict + - quality: the extracted quality rating + + Call save_triage_report with: + - ghsa_id: "{{ globals.ghsa }}_response" + - report: response_draft + + Print: "Response draft saved." followed by the response_draft text. diff --git a/src/seclab_taskflows/taskflows/pvr_triage/pvr_triage_batch.yaml b/src/seclab_taskflows/taskflows/pvr_triage/pvr_triage_batch.yaml new file mode 100644 index 0000000..4462a55 --- /dev/null +++ b/src/seclab_taskflows/taskflows/pvr_triage/pvr_triage_batch.yaml @@ -0,0 +1,214 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# PVR Triage Batch Taskflow +# +# Lists PVR advisories in triage state for a repository, scores each unprocessed one by +# priority (based on severity and quality signals), and outputs a ranked +# markdown table to REPORT_DIR for maintainer review. +# Advisories with an existing triage report in REPORT_DIR are skipped. +# +# Usage: +# python -m seclab_taskflow_agent \ +# -t seclab_taskflows.taskflows.pvr_triage.pvr_triage_batch \ +# -g repo=owner/repo +# +# Required environment variables: +# GH_TOKEN - GitHub token with repo and security_events scope +# AI_API_TOKEN - API token for the AI model provider +# AI_API_ENDPOINT - Model provider endpoint (default: https://api.githubcopilot.com) +# REPORT_DIR - Directory where triage reports are stored (and batch output is saved) + +seclab-taskflow-agent: + version: "1.0" + filetype: taskflow + +model_config: seclab_taskflows.configs.model_config_pvr_triage + +globals: + # GitHub repository in owner/repo format + repo: + # Advisory state to filter by (default: triage). Use "draft" for testing + # with owner-created advisories. + state: triage + +taskflow: + # ------------------------------------------------------------------------- + # Task 1: List triage advisories + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflows.personalities.pvr_analyst + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Extract owner and repo from "{{ globals.repo }}" (format: owner/repo). + + Call list_pvr_advisories with owner, repo, and state="{{ globals.state }}" to retrieve + all advisories in {{ globals.state }} state. + + Store the full JSON list under memcache key "pvr_queue". + + Print: "Found N {{ globals.state }} advisories for {{ globals.repo }}." where N is the count. + + If no advisories are found, print "No {{ globals.state }} advisories found." and stop. + + Execute all steps above, then stop. Do not ask what to do next. + + # ------------------------------------------------------------------------- + # Task 2: Score each advisory and detect duplicates + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflows.personalities.pvr_analyst + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve "pvr_queue" from memcache. + + Extract owner and repo from "{{ globals.repo }}" (format: owner/repo). + + First, call compare_advisories with owner, repo, and state="{{ globals.state }}" to + detect duplicate or near-duplicate advisories via structural metadata + comparison. Store the result under memcache key "dedup_result". + + Note: compare_advisories uses structural field overlap (CWE, package, + version, file paths). A match level of "none" does NOT mean two advisories + are distinct -- it means there was insufficient structured metadata to + determine overlap. Low-quality reports often lack these fields entirely. + + Then for each advisory in pvr_queue: + 1. Call fetch_pvr_advisory to get the full advisory details. + 2. Check for existing triage by calling read_triage_report with the ghsa_id. + If the result does not start with "Report not found", mark already_triaged=true + and extract the verdict from the report content. + Otherwise, mark already_triaged=false and verdict=null. + 3. Extract quality signals from the description: + - has_file_references: description mentions specific file paths + - has_poc: description includes reproduction steps or exploit code + - has_line_numbers: description cites line numbers + 4. Compute priority_score using this formula: + severity_weight: critical=4, high=3, medium=2, low=1, unknown=1 + quality_weight: has_file_references(+1) + has_poc(+1) + has_line_numbers(+1) + priority_score = severity_weight + quality_weight + 5. Check if this ghsa_id appears in any dedup_result.clusters entry. + If so, set structural_duplicate to the list of other GHSAs in that cluster + and the match_level. Otherwise set structural_duplicate=null. + 6. Determine suggested_action: + - If already_triaged and verdict is UNCONFIRMED or INCONCLUSIVE: "Review/Close" + - If already_triaged and verdict is CONFIRMED: "Fix/Publish" + - If structural_duplicate is not null and match_level is "strong": "Likely Duplicate -- Triage Best" + - If priority_score >= 5: "Triage Immediately" + - If priority_score >= 3: "Triage Soon" + - If priority_score <= 1: "Likely Low Quality -- Fast Close" + - Otherwise: "Triage" + + After scoring all advisories individually, perform semantic duplicate + analysis across the full set of fetched advisory descriptions: + + Review the summary and description of every advisory you fetched above. + Identify groups that appear to describe the same underlying vulnerability, + even if they use different wording, cite different CWEs, or lack structured + metadata. Consider: same code path or function, same attack scenario, + same root cause, same affected behavior. Two reports from different + reporters about the same bug are duplicates regardless of how they word it. + + For any semantic duplicate groups you identify that were NOT already caught + by compare_advisories, add them as additional duplicate_cluster entries on + the relevant scored entries. Set match_level to "semantic" for these. + Update suggested_action to "Likely Duplicate -- Triage Best" for all but + the highest-quality report in each semantic group. + + Build a list of scored entries, each with: + {ghsa_id, severity, summary, vuln_type, quality_signals, + priority_score, already_triaged, verdict, suggested_action, + duplicate_cluster, created_at} + + Sort the list: primary key priority_score descending; ties broken by + created_at ascending (oldest advisory first). + + Split the list: + - scored_queue: entries where already_triaged=false only + - skipped_count: count of entries where already_triaged=true + + Store scored_queue under memcache key "scored_queue". + Store skipped_count under memcache key "skipped_count". + + Execute all steps above, then stop. Do not ask what to do next. + + # ------------------------------------------------------------------------- + # Task 3: Generate and save ranked queue report + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflow_agent.personalities.assistant + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve "scored_queue" and "skipped_count" from memcache. + + Generate today's date in YYYY-MM-DD format. + + For each entry in scored_queue compute days_pending: + days_pending = (today - date(created_at)).days (integer, round down) + Parse created_at as an ISO 8601 date string (YYYY-MM-DD prefix is sufficient). + If created_at is missing or unparseable, use "?" for Age. + + Build a report string with this structure: + + # PVR Batch Triage Queue: {{ globals.repo }} + + **Generated:** [today's date] + **Pending triage:** [count of scored_queue entries] + **Skipped (already triaged):** [skipped_count] + + | GHSA | Age (days) | Severity | Vuln Type | Quality Signals | Priority | Duplicates | Status | Suggested Action | + |------|------------|----------|-----------|-----------------|----------|------------|--------|-----------------| + [one row per advisory, sorted by priority_score desc then created_at asc] + + For each row: + - GHSA: the ghsa_id as a plain string + - Age (days): days_pending computed above + - Severity: severity from the advisory + - Vuln Type: vuln_type (truncated to 30 chars if needed) + - Quality Signals: compact representation, e.g. "PoC, Files, Lines" for all three, + or list only the signals present, or "None" if all false + - Priority: priority_score as an integer + - Duplicates: if duplicate_cluster is not null, show the other GHSA IDs and match + level (e.g. "GHSA-xxxx [strong]"). Otherwise "-" + - Status: "Triaged (CONFIRMED)" / "Triaged (UNCONFIRMED)" / "Triaged (INCONCLUSIVE)" / + "Not triaged" + - Suggested Action: from suggested_action field + + If scored_queue is empty, replace the table with: + "No pending advisories." + + After the table, add a section: + + ## Summary + + List any advisories with priority_score >= 5 as "Requires immediate attention." + If any duplicate clusters were found, list them with the shared GHSAs and match reasons. + Recommend triaging the highest-quality report in each cluster and closing the rest. + If skipped_count > 0, note: "[skipped_count] already-triaged advisories skipped." + + Sanitize the repo name for use in a filename: replace "/" and any non-alphanumeric + characters (except "-" and "_") with "_". + + Call save_triage_report with: + - ghsa_id: "batch_queue_[sanitized_repo]_[today's date]" + - report: the full report string + + Print: "Batch queue report saved to: " + Then print the full report. + + Execute all steps above, then stop. Do not ask what to do next. diff --git a/src/seclab_taskflows/toolboxes/pvr_ghsa.yaml b/src/seclab_taskflows/toolboxes/pvr_ghsa.yaml new file mode 100644 index 0000000..be7adde --- /dev/null +++ b/src/seclab_taskflows/toolboxes/pvr_ghsa.yaml @@ -0,0 +1,26 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# Toolbox: PVR GHSA advisory fetcher +# +# Provides tools for fetching GitHub Security Advisories in triage state submitted +# via Private Vulnerability Reporting. Uses the gh CLI for API calls. +# +# Requires GH_TOKEN with repo or security_events scope to read advisories in triage state. + +seclab-taskflow-agent: + version: "1.0" + filetype: toolbox + +server_params: + kind: stdio + command: python + args: ["-m", "seclab_taskflows.mcp_servers.pvr_ghsa"] + env: + GH_TOKEN: "{{ env('GH_TOKEN') }}" + LOG_DIR: "{{ env('LOG_DIR') }}" + REPORT_DIR: "{{ env('REPORT_DIR', 'reports') }}" +# Guard write-back tools: user must confirm before execution +confirm: + - accept_pvr_advisory + - reject_pvr_advisory diff --git a/src/seclab_taskflows/toolboxes/reporter_reputation.yaml b/src/seclab_taskflows/toolboxes/reporter_reputation.yaml new file mode 100644 index 0000000..0c799ec --- /dev/null +++ b/src/seclab_taskflows/toolboxes/reporter_reputation.yaml @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# Toolbox: Reporter Reputation tracker +# +# Provides tools for recording PVR triage outcomes per reporter and +# querying their reputation score across prior reports. + +seclab-taskflow-agent: + version: "1.0" + filetype: toolbox + +server_params: + kind: stdio + command: python + args: ["-m", "seclab_taskflows.mcp_servers.reporter_reputation"] + env: + GH_TOKEN: "{{ env('GH_TOKEN') }}" + LOG_DIR: "{{ env('LOG_DIR') }}" + REPORTER_DB_DIR: "{{ env('REPORTER_DB_DIR', '') }}" diff --git a/tests/test_pvr_mcp.py b/tests/test_pvr_mcp.py new file mode 100644 index 0000000..8189613 --- /dev/null +++ b/tests/test_pvr_mcp.py @@ -0,0 +1,767 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# Unit tests for the PVR MCP server extensions and reporter reputation backend. +# +# Run with: pytest tests/test_pvr_mcp.py -v + +import json +import tempfile +import unittest +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + + +# --------------------------------------------------------------------------- +# Helpers: patch mcp_data_dir so imports don't fail in CI (no platformdirs dir) +# --------------------------------------------------------------------------- + +def _patch_report_dir(tmp_path): + """Return a context manager that patches REPORT_DIR in pvr_ghsa.""" + import seclab_taskflows.mcp_servers.pvr_ghsa as pvr_mod + return patch.object(pvr_mod, "REPORT_DIR", tmp_path) + + +# --------------------------------------------------------------------------- +# TestPvrGhsaTools +# --------------------------------------------------------------------------- + +class TestPvrGhsaTools(unittest.TestCase): + """Tests for the new write-back and similarity tools in pvr_ghsa.py.""" + + def setUp(self): + import seclab_taskflows.mcp_servers.pvr_ghsa as pvr_mod + self.pvr = pvr_mod + self.tmp_dir = tempfile.TemporaryDirectory() + self.tmp = Path(self.tmp_dir.name) + + def tearDown(self): + self.tmp_dir.cleanup() + + # --- accept_pvr_advisory --- + + def test_accept_pvr_advisory_calls_correct_api(self): + """accept_pvr_advisory should PATCH state=draft.""" + calls = [] + + def fake_gh_api(path, method="GET", body=None): + calls.append({"path": path, "method": method, "body": body}) + return {"ghsa_id": "GHSA-1234-5678-abcd", "state": "draft"}, None + + with patch.object(self.pvr, "_gh_api", side_effect=fake_gh_api): + result = self.pvr.accept_pvr_advisory.fn( + owner="owner", + repo="repo", + ghsa_id="GHSA-1234-5678-abcd", + ) + + self.assertEqual(calls[0]["method"], "PATCH") + self.assertIn("GHSA-1234-5678-abcd", calls[0]["path"]) + self.assertEqual(calls[0]["body"], {"state": "draft"}) + self.assertIn("draft", result) + + # --- reject_pvr_advisory --- + + def test_reject_pvr_advisory_calls_correct_api(self): + """reject_pvr_advisory should PATCH state=closed.""" + calls = [] + + def fake_gh_api(path, method="GET", body=None): + calls.append({"path": path, "method": method, "body": body}) + return {"ghsa_id": "GHSA-1234-5678-abcd", "state": "closed"}, None + + with patch.object(self.pvr, "_gh_api", side_effect=fake_gh_api): + result = self.pvr.reject_pvr_advisory.fn( + owner="owner", + repo="repo", + ghsa_id="GHSA-1234-5678-abcd", + ) + + self.assertEqual(calls[0]["method"], "PATCH") + self.assertIn("GHSA-1234-5678-abcd", calls[0]["path"]) + self.assertEqual(calls[0]["body"], {"state": "closed"}) + self.assertIn("closed", result) + + # --- find_similar_triage_reports --- + + def test_find_similar_reports_matches_vuln_type(self): + """find_similar_triage_reports returns matching reports by vuln_type.""" + report_dir = self.tmp + # Write a fixture report + (report_dir / "GHSA-aaaa-bbbb-cccc_triage.md").write_text( + "## PVR Triage Analysis: GHSA-aaaa-bbbb-cccc\n" + "**Vulnerability Type:** path traversal\n" + "**[UNCONFIRMED]**\n" + "Rate overall quality: Low\n", + encoding="utf-8", + ) + + with _patch_report_dir(report_dir): + result_json = self.pvr.find_similar_triage_reports.fn( + vuln_type="path traversal", + affected_component="upload handler", + ) + + results = json.loads(result_json) + self.assertEqual(len(results), 1) + self.assertEqual(results[0]["ghsa_id"], "GHSA-aaaa-bbbb-cccc") + self.assertEqual(results[0]["verdict"], "UNCONFIRMED") + + def test_find_similar_reports_no_matches(self): + """find_similar_triage_reports returns empty list when nothing matches.""" + report_dir = self.tmp + (report_dir / "GHSA-aaaa-bbbb-dddd_triage.md").write_text( + "## PVR Triage Analysis: GHSA-aaaa-bbbb-dddd\n" + "**Vulnerability Type:** SQL injection\n" + "**[CONFIRMED]**\n", + encoding="utf-8", + ) + + with _patch_report_dir(report_dir): + result_json = self.pvr.find_similar_triage_reports.fn( + vuln_type="XSS", + affected_component="login form", + ) + + results = json.loads(result_json) + self.assertEqual(results, []) + + def test_find_similar_reports_empty_dir(self): + """find_similar_triage_reports returns empty list for non-existent REPORT_DIR.""" + empty_dir = self.tmp / "nonexistent" + with _patch_report_dir(empty_dir): + result_json = self.pvr.find_similar_triage_reports.fn( + vuln_type="IDOR", + affected_component="profile", + ) + results = json.loads(result_json) + self.assertEqual(results, []) + + # --- save_triage_report path sanitization --- + + def test_save_triage_report_path_sanitization(self): + """save_triage_report strips path traversal characters from the GHSA ID.""" + with _patch_report_dir(self.tmp): + out_path = self.pvr.save_triage_report.fn( + ghsa_id="../../../etc/passwd", + report="malicious content", + ) + # The file must be inside REPORT_DIR, not outside. + # Resolve both paths to handle macOS /var -> /private/var symlinks. + self.assertTrue(out_path.startswith(str(self.tmp.resolve()))) + # The filename should not contain path separators + saved = Path(out_path) + self.assertFalse(".." in saved.name) + self.assertFalse("/" in saved.name) + + def test_save_triage_report_empty_after_sanitization(self): + """save_triage_report returns an error when ghsa_id is all special chars.""" + with _patch_report_dir(self.tmp): + result = self.pvr.save_triage_report.fn( + ghsa_id="!@#$%^&*()", + report="some content", + ) + self.assertIn("Error", result) + + # --- read_triage_report --- + + def test_read_triage_report_returns_content(self): + """read_triage_report reads back a previously saved report.""" + content = "## PVR Triage Analysis: GHSA-test\n\n**[CONFIRMED]**\n" + (self.tmp / "GHSA-test_triage.md").write_text(content, encoding="utf-8") + + with _patch_report_dir(self.tmp): + result = self.pvr.read_triage_report.fn(ghsa_id="GHSA-test") + + self.assertEqual(result, content) + + def test_read_triage_report_missing_file(self): + """read_triage_report returns an error string for a missing report.""" + with _patch_report_dir(self.tmp): + result = self.pvr.read_triage_report.fn(ghsa_id="GHSA-does-not-exist") + + self.assertIn("not found", result.lower()) + + # --- list_pending_responses --- + + def test_list_pending_responses_empty(self): + """list_pending_responses returns [] when no response drafts exist.""" + with _patch_report_dir(self.tmp): + result_json = self.pvr.list_pending_responses.fn() + results = json.loads(result_json) + self.assertEqual(results, []) + + def test_list_pending_responses_returns_pending(self): + """list_pending_responses includes an entry when a draft exists but no sent marker.""" + (self.tmp / "GHSA-1111-2222-3333_response_triage.md").write_text( + "Response draft.", encoding="utf-8" + ) + with _patch_report_dir(self.tmp): + result_json = self.pvr.list_pending_responses.fn() + results = json.loads(result_json) + self.assertEqual(len(results), 1) + self.assertEqual(results[0]["ghsa_id"], "GHSA-1111-2222-3333") + + def test_list_pending_responses_excludes_sent(self): + """list_pending_responses skips entries where a _response_sent.md marker exists.""" + (self.tmp / "GHSA-1111-2222-3333_response_triage.md").write_text( + "Response draft.", encoding="utf-8" + ) + (self.tmp / "GHSA-1111-2222-3333_response_sent.md").write_text( + "Response sent: 2026-03-03T00:00:00+00:00\n", encoding="utf-8" + ) + with _patch_report_dir(self.tmp): + result_json = self.pvr.list_pending_responses.fn() + results = json.loads(result_json) + self.assertEqual(results, []) + + # --- mark_response_sent --- + + def test_mark_response_sent_creates_marker(self): + """mark_response_sent creates a _response_sent.md marker and returns its path.""" + with _patch_report_dir(self.tmp): + result = self.pvr.mark_response_sent.fn(ghsa_id="GHSA-1111-2222-3333") + marker = self.tmp / "GHSA-1111-2222-3333_response_sent.md" + self.assertTrue(marker.exists()) + self.assertTrue(result.startswith(str(self.tmp.resolve()))) + content = marker.read_text(encoding="utf-8") + self.assertIn("Response sent:", content) + + def test_mark_response_sent_empty_ghsa_id(self): + """mark_response_sent returns an error string when ghsa_id sanitizes to empty.""" + with _patch_report_dir(self.tmp): + result = self.pvr.mark_response_sent.fn(ghsa_id="!@#$%") + self.assertIn("Error", result) + + # --- fetch_security_policy --- + + def test_fetch_security_policy_found(self): + """fetch_security_policy returns content when SECURITY.md exists.""" + policy_text = "# Security Policy\n\n## Supported Versions\n| 1.x | yes |" + + def fake_run(cmd, **kwargs): + mock_result = MagicMock() + if "SECURITY.md" in cmd[-1] and ".github" not in cmd[-1]: + mock_result.returncode = 0 + mock_result.stdout = policy_text + else: + mock_result.returncode = 1 + mock_result.stdout = "" + return mock_result + + with patch("subprocess.run", side_effect=fake_run): + result = self.pvr.fetch_security_policy.fn(owner="acme", repo="widget") + + self.assertIn("Security Policy", result) + self.assertIn("Supported Versions", result) + + def test_fetch_security_policy_not_found(self): + """fetch_security_policy returns empty string when no policy exists.""" + def fake_run(cmd, **kwargs): + mock_result = MagicMock() + mock_result.returncode = 1 + mock_result.stdout = "" + return mock_result + + with patch("subprocess.run", side_effect=fake_run): + result = self.pvr.fetch_security_policy.fn(owner="acme", repo="widget") + + self.assertEqual(result, "") + + +# --------------------------------------------------------------------------- +# TestFingerprintAndDedup +# --------------------------------------------------------------------------- + +class TestFingerprintAndDedup(unittest.TestCase): + """Tests for advisory fingerprinting, comparison, and dedup detection.""" + + def setUp(self): + import seclab_taskflows.mcp_servers.pvr_ghsa as pvr_mod + self.pvr = pvr_mod + + # --- _extract_file_paths --- + + def test_extract_file_paths_finds_paths(self): + """_extract_file_paths finds source file paths in free text.""" + text = "The bug is in `src/handlers/upload.py` and also affects lib/auth/check.go" + paths = self.pvr._extract_file_paths(text) + self.assertIn("src/handlers/upload.py", paths) + self.assertIn("lib/auth/check.go", paths) + + def test_extract_file_paths_ignores_non_source(self): + """_extract_file_paths ignores non-source extensions and bare filenames.""" + text = "See README.md and report.pdf for details. Also config.txt" + paths = self.pvr._extract_file_paths(text) + # No paths with / separator, so none should match + self.assertEqual(paths, []) + + def test_extract_file_paths_deduplicates(self): + """_extract_file_paths returns unique sorted paths.""" + text = "Bug in src/auth.py and also src/auth.py again" + paths = self.pvr._extract_file_paths(text) + self.assertEqual(len(paths), len(set(paths))) + + # --- _fingerprint_advisory --- + + def test_fingerprint_advisory_extracts_fields(self): + """_fingerprint_advisory extracts CWEs, packages, versions, file paths.""" + parsed = { + "ghsa_id": "GHSA-test-1234-abcd", + "summary": "Path traversal in upload handler", + "severity": "high", + "description": "The file src/upload/handler.py has a path traversal bug at line 42", + "vulnerabilities": [ + { + "ecosystem": "pip", + "package": "myapp", + "vulnerable_versions": "<= 1.5.0", + "patched_versions": "1.5.1", + } + ], + "cwes": ["CWE-22"], + } + fp = self.pvr._fingerprint_advisory(parsed) + self.assertEqual(fp["ghsa_id"], "GHSA-test-1234-abcd") + self.assertIn("CWE-22", fp["cwes"]) + self.assertIn(("pip", "myapp"), fp["packages"]) + self.assertIn("<= 1.5.0", fp["versions"]) + self.assertIn("src/upload/handler.py", fp["file_paths"]) + self.assertEqual(fp["severity"], "high") + + def test_fingerprint_advisory_empty_fields(self): + """_fingerprint_advisory handles empty/missing fields gracefully.""" + parsed = { + "ghsa_id": "GHSA-empty", + "summary": "", + "severity": "", + "description": "", + "vulnerabilities": [], + "cwes": [], + } + fp = self.pvr._fingerprint_advisory(parsed) + self.assertEqual(fp["cwes"], set()) + self.assertEqual(fp["packages"], set()) + self.assertEqual(fp["file_paths"], set()) + + # --- _compare_fingerprints --- + + def test_compare_strong_match(self): + """Two advisories with same CWE + same package → strong match.""" + a = { + "ghsa_id": "A", + "cwes": {"CWE-22"}, + "packages": {("pip", "myapp")}, + "versions": {"<= 1.5.0"}, + "file_paths": set(), + "severity": "high", + "summary_norm": "path traversal in upload", + } + b = { + "ghsa_id": "B", + "cwes": {"CWE-22"}, + "packages": {("pip", "myapp")}, + "versions": {"<= 1.5.0"}, + "file_paths": set(), + "severity": "high", + "summary_norm": "path traversal in upload handler", + } + result = self.pvr._compare_fingerprints(a, b) + self.assertEqual(result["match_level"], "strong") + self.assertTrue(len(result["reasons"]) > 0) + + def test_compare_no_match(self): + """Two unrelated advisories → no match.""" + a = { + "ghsa_id": "A", + "cwes": {"CWE-22"}, + "packages": {("pip", "appA")}, + "versions": {"<= 1.0"}, + "file_paths": {"src/a.py"}, + "severity": "high", + "summary_norm": "path traversal", + } + b = { + "ghsa_id": "B", + "cwes": {"CWE-79"}, + "packages": {("npm", "appB")}, + "versions": {">= 2.0"}, + "file_paths": {"src/b.js"}, + "severity": "medium", + "summary_norm": "xss in login form", + } + result = self.pvr._compare_fingerprints(a, b) + self.assertEqual(result["match_level"], "none") + + def test_compare_moderate_match(self): + """Same package but no CWE/version/file overlap → moderate match.""" + a = { + "ghsa_id": "A", + "cwes": set(), + "packages": {("pip", "myapp")}, + "versions": set(), + "file_paths": set(), + "severity": "high", + "summary_norm": "bug in myapp", + } + b = { + "ghsa_id": "B", + "cwes": set(), + "packages": {("pip", "myapp")}, + "versions": set(), + "file_paths": set(), + "severity": "medium", + "summary_norm": "another bug in myapp", + } + result = self.pvr._compare_fingerprints(a, b) + self.assertEqual(result["match_level"], "moderate") + + def test_compare_weak_match(self): + """Only CWE overlap, different packages → weak match.""" + a = { + "ghsa_id": "A", + "cwes": {"CWE-79"}, + "packages": {("pip", "appA")}, + "versions": set(), + "file_paths": set(), + "severity": "medium", + "summary_norm": "xss in appA", + } + b = { + "ghsa_id": "B", + "cwes": {"CWE-79"}, + "packages": {("pip", "appB")}, + "versions": set(), + "file_paths": set(), + "severity": "medium", + "summary_norm": "xss in appB", + } + result = self.pvr._compare_fingerprints(a, b) + self.assertEqual(result["match_level"], "weak") + + # --- compare_advisories (MCP tool, needs API mock) --- + + def test_compare_advisories_no_advisories(self): + """compare_advisories returns empty result when no advisories exist.""" + def fake_gh_api(path, method="GET", body=None): + return [], None + + with patch.object(self.pvr, "_gh_api", side_effect=fake_gh_api): + result_json = self.pvr.compare_advisories.fn( + owner="owner", repo="repo", state="triage", target_ghsa="" + ) + + result = json.loads(result_json) + self.assertEqual(result["total"], 0) + self.assertEqual(result["clusters"], []) + + def test_compare_advisories_detects_duplicates(self): + """compare_advisories clusters advisories with matching CWE + package.""" + fake_advisories = [ + { + "ghsa_id": "GHSA-aaaa-1111-aaaa", + "cve_id": None, + "html_url": "https://github.com/x/y/security/advisories/GHSA-aaaa-1111-aaaa", + "state": "triage", + "severity": "high", + "summary": "Path traversal in upload", + "description": "The file src/upload/handler.py allows path traversal", + "vulnerabilities": [{"package": {"ecosystem": "pip", "name": "myapp"}, "vulnerable_version_range": "<= 1.5.0", "patched_versions": ""}], + "cwes": [{"cwe_id": "CWE-22"}], + "credits_detailed": [], + "submission": {}, + "created_at": "2026-04-01", + "updated_at": "2026-04-01", + "collaborating_users": [], + }, + { + "ghsa_id": "GHSA-bbbb-2222-bbbb", + "cve_id": None, + "html_url": "https://github.com/x/y/security/advisories/GHSA-bbbb-2222-bbbb", + "state": "triage", + "severity": "high", + "summary": "Directory traversal in file upload", + "description": "Directory traversal vulnerability in src/upload/handler.py", + "vulnerabilities": [{"package": {"ecosystem": "pip", "name": "myapp"}, "vulnerable_version_range": "<= 1.5.0", "patched_versions": ""}], + "cwes": [{"cwe_id": "CWE-22"}], + "credits_detailed": [], + "submission": {}, + "created_at": "2026-04-02", + "updated_at": "2026-04-02", + "collaborating_users": [], + }, + { + "ghsa_id": "GHSA-cccc-3333-cccc", + "cve_id": None, + "html_url": "https://github.com/x/y/security/advisories/GHSA-cccc-3333-cccc", + "state": "triage", + "severity": "medium", + "summary": "XSS in comment rendering", + "description": "Cross-site scripting in src/comments/render.js", + "vulnerabilities": [{"package": {"ecosystem": "npm", "name": "other-app"}, "vulnerable_version_range": "<= 2.0.0", "patched_versions": ""}], + "cwes": [{"cwe_id": "CWE-79"}], + "credits_detailed": [], + "submission": {}, + "created_at": "2026-04-03", + "updated_at": "2026-04-03", + "collaborating_users": [], + }, + ] + + def fake_gh_api(path, method="GET", body=None): + return fake_advisories, None + + with patch.object(self.pvr, "_gh_api", side_effect=fake_gh_api): + result_json = self.pvr.compare_advisories.fn( + owner="owner", repo="repo", state="triage", target_ghsa="" + ) + + result = json.loads(result_json) + self.assertEqual(result["total"], 3) + # The two path traversal advisories should cluster together + self.assertEqual(len(result["clusters"]), 1) + cluster = result["clusters"][0] + self.assertIn("GHSA-aaaa-1111-aaaa", cluster["advisories"]) + self.assertIn("GHSA-bbbb-2222-bbbb", cluster["advisories"]) + self.assertNotIn("GHSA-cccc-3333-cccc", cluster["advisories"]) + self.assertIn(cluster["match_level"], ("strong", "moderate")) + # The XSS advisory should be in singles + self.assertIn("GHSA-cccc-3333-cccc", result["singles"]) + + def test_compare_advisories_target_ghsa_filter(self): + """compare_advisories with target_ghsa only returns matches for that GHSA.""" + fake_advisories = [ + { + "ghsa_id": "GHSA-aaaa-1111-aaaa", + "cve_id": None, "html_url": "", "state": "triage", + "severity": "high", "summary": "Bug A", + "description": "desc", + "vulnerabilities": [{"package": {"ecosystem": "pip", "name": "app"}, "vulnerable_version_range": "<= 1.0", "patched_versions": ""}], + "cwes": [{"cwe_id": "CWE-22"}], + "credits_detailed": [], "submission": {}, + "created_at": "2026-04-01", "updated_at": "2026-04-01", + "collaborating_users": [], + }, + { + "ghsa_id": "GHSA-bbbb-2222-bbbb", + "cve_id": None, "html_url": "", "state": "triage", + "severity": "high", "summary": "Bug B", + "description": "desc", + "vulnerabilities": [{"package": {"ecosystem": "pip", "name": "app"}, "vulnerable_version_range": "<= 1.0", "patched_versions": ""}], + "cwes": [{"cwe_id": "CWE-22"}], + "credits_detailed": [], "submission": {}, + "created_at": "2026-04-02", "updated_at": "2026-04-02", + "collaborating_users": [], + }, + ] + + def fake_gh_api(path, method="GET", body=None): + return fake_advisories, None + + with patch.object(self.pvr, "_gh_api", side_effect=fake_gh_api): + result_json = self.pvr.compare_advisories.fn( + owner="owner", repo="repo", state="triage", + target_ghsa="GHSA-aaaa-1111-aaaa", + ) + + result = json.loads(result_json) + # Should still find the cluster + self.assertTrue(len(result["clusters"]) >= 1 or len(result.get("weak_matches", [])) >= 0) + + +# --------------------------------------------------------------------------- +# TestReporterReputationBackend +# --------------------------------------------------------------------------- + +class TestReporterReputationBackend(unittest.TestCase): + """Tests for the ReporterReputationBackend class using in-memory SQLite.""" + + def setUp(self): + from seclab_taskflows.mcp_servers.reporter_reputation import ReporterReputationBackend + # Use explicit in-memory sentinel for tests + self.backend = ReporterReputationBackend(db_dir="sqlite://") + + def test_record_and_retrieve(self): + """record_triage_result inserts a record and get_reporter_history retrieves it.""" + self.backend.record_triage_result( + login="alice", + ghsa_id="GHSA-1111-2222-3333", + repo="owner/repo", + verdict="CONFIRMED", + quality="High", + ) + history = self.backend.get_reporter_history("alice") + self.assertEqual(len(history), 1) + self.assertEqual(history[0]["login"], "alice") + self.assertEqual(history[0]["ghsa_id"], "GHSA-1111-2222-3333") + self.assertEqual(history[0]["verdict"], "CONFIRMED") + self.assertEqual(history[0]["quality"], "High") + + def test_upsert_same_ghsa(self): + """record_triage_result updates an existing record when called again for the same GHSA.""" + self.backend.record_triage_result( + login="bob", + ghsa_id="GHSA-aaaa-bbbb-cccc", + repo="owner/repo", + verdict="UNCONFIRMED", + quality="Low", + ) + # Re-triage the same advisory — should update, not duplicate + self.backend.record_triage_result( + login="bob", + ghsa_id="GHSA-aaaa-bbbb-cccc", + repo="owner/repo", + verdict="CONFIRMED", + quality="High", + ) + history = self.backend.get_reporter_history("bob") + # Should still be exactly 1 record + self.assertEqual(len(history), 1) + self.assertEqual(history[0]["verdict"], "CONFIRMED") + self.assertEqual(history[0]["quality"], "High") + + def test_get_reporter_score_empty(self): + """get_reporter_score returns zero totals for an unknown login.""" + score = self.backend.get_reporter_score("nobody") + self.assertEqual(score["total_reports"], 0) + self.assertEqual(score["confirmed_pct"], 0.0) + self.assertEqual(score["quality_breakdown"], {"High": 0, "Medium": 0, "Low": 0}) + self.assertEqual(score["recommendation"], "no history") + + def test_get_reporter_score_recommendation_skepticism(self): + """5 Low-quality UNCONFIRMED reports → recommendation is 'treat with skepticism'.""" + for i in range(5): + self.backend.record_triage_result( + login="spammer", + ghsa_id=f"GHSA-{i:04d}-0000-0000", + repo="owner/repo", + verdict="UNCONFIRMED", + quality="Low", + ) + score = self.backend.get_reporter_score("spammer") + self.assertEqual(score["recommendation"], "treat with skepticism") + self.assertEqual(score["quality_breakdown"]["Low"], 5) + self.assertEqual(score["confirmed_pct"], 0.0) + + def test_get_reporter_score_recommendation_trust(self): + """5 High-quality CONFIRMED reports → recommendation is 'high trust'.""" + for i in range(5): + self.backend.record_triage_result( + login="expert", + ghsa_id=f"GHSA-{i:04d}-1111-1111", + repo="owner/repo", + verdict="CONFIRMED", + quality="High", + ) + score = self.backend.get_reporter_score("expert") + self.assertEqual(score["recommendation"], "high trust") + self.assertEqual(score["confirmed_pct"], 1.0) + + def test_get_reporter_history_empty(self): + """get_reporter_history returns empty list for unknown login.""" + history = self.backend.get_reporter_history("ghost") + self.assertEqual(history, []) + + def test_record_invalid_verdict_raises(self): + """record_triage_result rejects unknown verdict strings.""" + with self.assertRaises(ValueError): + self.backend.record_triage_result("alice", "GHSA-x", "r/r", "MAYBE", "High") + + def test_record_invalid_quality_raises(self): + """record_triage_result rejects unknown quality strings.""" + with self.assertRaises(ValueError): + self.backend.record_triage_result("alice", "GHSA-x", "r/r", "CONFIRMED", "Excellent") + + def test_multiple_reporters_isolated(self): + """Records for different reporters are independent.""" + self.backend.record_triage_result("alice", "GHSA-a", "r/r", "CONFIRMED", "High") + self.backend.record_triage_result("bob", "GHSA-b", "r/r", "UNCONFIRMED", "Low") + + alice_history = self.backend.get_reporter_history("alice") + bob_history = self.backend.get_reporter_history("bob") + + self.assertEqual(len(alice_history), 1) + self.assertEqual(len(bob_history), 1) + self.assertEqual(alice_history[0]["ghsa_id"], "GHSA-a") + self.assertEqual(bob_history[0]["ghsa_id"], "GHSA-b") + + +# --------------------------------------------------------------------------- +# TestYamlStructure +# --------------------------------------------------------------------------- + +class TestYamlStructure(unittest.TestCase): + """Tests that the new YAML files parse correctly via AvailableTools.""" + + def setUp(self): + from seclab_taskflow_agent.available_tools import AvailableTools + self.tools = AvailableTools() + + def test_pvr_triage_yaml_parses(self): + """pvr_triage.yaml loads without error and is a taskflow.""" + result = self.tools.get_taskflow("seclab_taskflows.taskflows.pvr_triage.pvr_triage") + self.assertIsNotNone(result) + self.assertEqual(result.header.filetype, "taskflow") + + def test_pvr_respond_yaml_parses(self): + """pvr_respond.yaml loads without error and declares required globals.""" + result = self.tools.get_taskflow("seclab_taskflows.taskflows.pvr_triage.pvr_respond") + self.assertIsNotNone(result) + self.assertEqual(result.header.filetype, "taskflow") + globals_keys = result.globals or {} + self.assertIn("repo", globals_keys) + self.assertIn("ghsa", globals_keys) + self.assertIn("action", globals_keys) + + def test_pvr_triage_batch_yaml_parses(self): + """pvr_triage_batch.yaml loads without error and declares repo global.""" + result = self.tools.get_taskflow("seclab_taskflows.taskflows.pvr_triage.pvr_triage_batch") + self.assertIsNotNone(result) + self.assertEqual(result.header.filetype, "taskflow") + globals_keys = result.globals or {} + self.assertIn("repo", globals_keys) + + def test_reporter_reputation_toolbox_parses(self): + """reporter_reputation.yaml loads without error and is a toolbox.""" + result = self.tools.get_toolbox("seclab_taskflows.toolboxes.reporter_reputation") + self.assertIsNotNone(result) + self.assertEqual(result.header.filetype, "toolbox") + + def test_pvr_ghsa_toolbox_has_confirm(self): + """pvr_ghsa.yaml toolbox declares write-back tools in confirm list.""" + result = self.tools.get_toolbox("seclab_taskflows.toolboxes.pvr_ghsa") + self.assertIsNotNone(result) + confirm = result.confirm or [] + self.assertIn("accept_pvr_advisory", confirm) + self.assertIn("reject_pvr_advisory", confirm) + self.assertNotIn("add_pvr_advisory_comment", confirm) + + def test_pvr_respond_batch_yaml_parses(self): + """pvr_respond_batch.yaml loads without error and declares repo + action globals.""" + result = self.tools.get_taskflow("seclab_taskflows.taskflows.pvr_triage.pvr_respond_batch") + self.assertIsNotNone(result) + self.assertEqual(result.header.filetype, "taskflow") + globals_keys = result.globals or {} + self.assertIn("repo", globals_keys) + self.assertIn("action", globals_keys) + + def test_pvr_triage_yaml_has_reporter_reputation_toolbox(self): + """pvr_triage.yaml references reporter_reputation toolbox in at least one task.""" + result = self.tools.get_taskflow("seclab_taskflows.taskflows.pvr_triage.pvr_triage") + taskflow = result.taskflow or [] + toolbox_refs = [] + for task_wrapper in taskflow: + task = task_wrapper.task + toolboxes = task.toolboxes or [] + toolbox_refs.extend(toolboxes) + self.assertIn( + "seclab_taskflows.toolboxes.reporter_reputation", + toolbox_refs, + "pvr_triage.yaml must reference the reporter_reputation toolbox", + ) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])