Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 243 additions & 38 deletions packages/opencode/src/cli/cmd/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ import { Filesystem } from "../../util/filesystem"
import { createOpencodeClient, type Message, type OpencodeClient, type ToolPart } from "@opencode-ai/sdk/v2"
import { Server } from "../../server/server"
import { Provider } from "../../provider/provider"
// altimate_change start — verifier-gated router (run cheap, verify, escalate)
import { Router } from "../../router/router"
import { Verifier } from "../../router/verifier"
import { Verdict } from "../../router/verdict"
import { Policy } from "../../router/policy"
import { EquivalenceVerifier } from "../../router/equivalence-verifier"
import { ReferenceResolver } from "../../router/reference"
import * as Dispatcher from "../../altimate/native/dispatcher"
// altimate_change end
import { Agent } from "../../agent/agent"
import { PermissionNext } from "../../permission/next"
import { Tool } from "../../tool/tool"
Expand Down Expand Up @@ -816,46 +825,53 @@ You are speaking to a non-technical business executive. Follow these rules stric
process.exit(1)
})

if (args.command) {
await sdk.session.command({
sessionID,
agent,
model: args.model,
command: args.command,
arguments: message,
variant: args.variant,
})
} else {
const model = args.model ? Provider.parseModel(args.model) : undefined
await sdk.session.prompt({
sessionID,
agent,
model,
variant: args.variant,
parts: [...files, { type: "text", text: message }],
...(audienceSystem ? { system: audienceSystem } : {}),
})
}
// altimate_change start — per-run finally cleanup. The verifier-gated router catches a
// thrown tier (router.ts) and escalates to the next tier within the SAME process; without
// this finally, a tier whose prompt throws would leak its SIGINT/SIGTERM/beforeExit handlers
// and leave its tracer active, accumulating across tiers. Cleanup now always runs.
try {
if (args.command) {
await sdk.session.command({
sessionID,
agent,
model: args.model,
command: args.command,
arguments: message,
variant: args.variant,
})
} else {
const model = args.model ? Provider.parseModel(args.model) : undefined
await sdk.session.prompt({
sessionID,
agent,
model,
variant: args.variant,
parts: [...files, { type: "text", text: message }],
...(audienceSystem ? { system: audienceSystem } : {}),
})
}

// Wait for the event loop to drain (breaks when session reaches idle)
await loopPromise

// Remove crash handlers — trace will be finalized cleanly
process.removeListener("SIGINT", onSigint)
process.removeListener("SIGTERM", onSigterm)
process.removeListener("beforeExit", onBeforeExit)

// Finalize trace and save to disk
if (tracer) {
Tracer.setActive(null)
const tracePath = await tracer.endTrace(error)
if (tracePath) {
emit("trace_saved", { path: tracePath })
if (args.format !== "json" && process.stdout.isTTY) {
UI.println(UI.Style.TEXT_DIM + `Trace saved: ${tracePath}` + UI.Style.TEXT_NORMAL)
// Wait for the event loop to drain (breaks when session reaches idle)
await loopPromise
} finally {
// Remove crash handlers — trace will be finalized cleanly
process.removeListener("SIGINT", onSigint)
process.removeListener("SIGTERM", onSigterm)
process.removeListener("beforeExit", onBeforeExit)

// Finalize trace and save to disk (with `error` if the run failed)
if (tracer) {
Tracer.setActive(null)
const tracePath = await tracer.endTrace(error)
if (tracePath) {
emit("trace_saved", { path: tracePath })
if (args.format !== "json" && process.stdout.isTTY) {
UI.println(UI.Style.TEXT_DIM + `Trace saved: ${tracePath}` + UI.Style.TEXT_NORMAL)
}
}
}
}
// altimate_change end

// Write accumulated text output to file if --output was specified
if (args.output) {
Expand All @@ -864,8 +880,190 @@ You are speaking to a non-technical business executive. Follow these rules stric
await Bun.write(outputPath, content)
process.stderr.write(`\n✓ Output saved to: ${outputPath}\n`)
}

// altimate_change start — expose the session id so the router can reuse one session
// across tiers (escalation continues the same session instead of starting fresh).
return sessionID
// altimate_change end
}

// altimate_change start — verifier-gated router orchestration
// Deterministic-verify the dbt workspace in cwd (`dbt build`, judged by Verifier).
// Only gates real dbt projects; with nothing to prove it returns ok (no escalation).
async function verifyWorkspace(): Promise<Verifier.Verdict> {
const root = process.cwd()
if (!(await Filesystem.exists(path.join(root, "dbt_project.yml"))))
return {
ok: true,
unverifiable: true,
strength: Verifier.Strength.UNVERIFIABLE,
decision: Verifier.Decision.OK,
reason: "no dbt project to verify",
checks: [],
}

// Reference-free gate: `dbt build` in `dir`, judged by Verifier. Used directly (default)
// and as the fallback for the equivalence verifier (greenfield / undecidable).
const buildVerify = async (dir: string): Promise<Verifier.Verdict> => {
try {
const proc = Bun.spawn(["dbt", "build"], { cwd: dir, stdout: "pipe", stderr: "pipe" })
// Hard timeout so a hung dbt (lock, prompt, runaway query) can't stall the run.
let timedOut = false
const timer = setTimeout(() => {
timedOut = true
proc.kill()
}, 300_000)
const out = (await new Response(proc.stdout).text()) + (await new Response(proc.stderr).text())
const code = await proc.exited
clearTimeout(timer)
if (timedOut)
return {
ok: false,
strength: Verifier.Strength.BUILD,
decision: Verifier.Decision.FAILED,
reason: "dbt build timed out after 300s",
checks: [{ name: "dbt build", ok: false, detail: "timed out after 300s" }],
}
return Verifier.fromDbt(out, code)
} catch (e) {
// dbt binary missing / spawn failure → can't verify; mark unverifiable (fail-open, but honest).
return {
ok: true,
unverifiable: true,
strength: Verifier.Strength.UNVERIFIABLE,
decision: Verifier.Decision.OK,
reason: `verify skipped: ${String(e)}`,
checks: [],
}
}
}

// EXPERIMENTAL (flag-gated, default off): equivalence-backed verification in the
// reference-available regime — proven-equivalent vs the model's base version. Always
// falls back to `buildVerify` on greenfield / undecidable / any error, so it can never
// be less safe than the build gate. Value is gated on altimate-core dialect + schema
// coverage (altimate-core-internal #128 / #130); ships dormant until those land.
if (process.env["ALTIMATE_ROUTER_EQUIVALENCE"] === "1") {
try {
const exec: ReferenceResolver.Exec = async (cmd, args, cwd) => {
const p = Bun.spawn([cmd, ...args], { cwd, stdout: "pipe", stderr: "pipe" })
const stdout = await new Response(p.stdout).text()
return { stdout, code: await p.exited }
}
const readCompiled = async (dir: string): Promise<Map<string, string>> => {
const { readdir } = await import("node:fs/promises")
const map = new Map<string, string>()
const baseDir = path.join(dir, "target", "compiled")
if (!(await Filesystem.exists(baseDir))) return map
const walk = async (d: string) => {
for (const e of await readdir(d, { withFileTypes: true })) {
const fp = path.join(d, e.name)
if (e.isDirectory()) await walk(fp)
else if (e.name.endsWith(".sql")) map.set(e.name.replace(/\.sql$/, ""), await Bun.file(fp).text())
}
}
await walk(baseDir)
return map
}
const checkoutBase = async (workdir: string, ref: string) => {
const dir = path.join("/tmp", `altimate-base-${Date.now()}`)
await exec("git", ["worktree", "add", "--detach", dir, ref], workdir)
return {
dir,
cleanup: async () => {
await exec("git", ["worktree", "remove", "--force", dir], workdir)
},
}
}
const deps = ReferenceResolver.gitDbtDeps(exec, {
readCompiled,
// Best-effort: empty schema ⇒ the engine abstains on table refs ⇒ build fallback.
// A warehouse schema resolver lands with the dialect coverage work.
buildSchema: async () => undefined,
checkoutBase,
})
const check: EquivalenceVerifier.CheckEquivalence = async (head, base, schema) => {
const r = await Dispatcher.call("altimate_core.equivalence", {
sql1: head,
sql2: base,
schema_context: schema as Record<string, unknown> | undefined,
})
const d = ((r as { data?: Record<string, unknown> }).data ?? {}) as {
equivalent?: boolean
validation_errors?: string[]
differences?: { severity?: string; description?: string }[]
confidence?: number
}
return {
equivalent: !!d.equivalent,
validation_errors: d.validation_errors ?? [],
differences: d.differences ?? [],
confidence: d.confidence,
}
}
return await EquivalenceVerifier.create(check, ReferenceResolver.create(deps), {
verify: buildVerify,
}).verify(root)
} catch {
return buildVerify(root) // the experimental path must never break the run
}
}

return buildVerify(root)
}

// Run the tier ladder: cheap → verify → escalate with failing-check context, stop at first pass.
// Each tier re-invokes the existing single-run path with that model (and the escalation note
// prepended) in the SAME workspace, so a later tier fixes the prior attempt rather than restarting.
async function runRouted(sdk: OpencodeClient) {
// Only route when the workspace is verifiable. Without a deterministic gate, routing
// would accept the cheapest tier with no way to verify or escalate — silently
// downgrading quality. In a non-dbt project, run once with the user's model instead.
if (!(await Filesystem.exists(path.join(process.cwd(), "dbt_project.yml")))) {
await execute(sdk)
return
}
const baseMessage = message
const originalModel = args.model
const originalSession = args.session
// Reuse ONE session across tiers: tier-1 creates it; escalation tiers continue the
// same session so the stronger model sees the prior attempt + the failing-check note,
// rather than starting cold. Captured from execute()'s returned session id.
let sharedSessionID: string | undefined
const policy = Policy.resolve()
const tiers = await policy.tiers({ prompt: baseMessage })
let result
try {
result = await Router.route({
tiers,
runAgent: async (model, note) => {
args.model = model
message = note ? `${note}\n\n${baseMessage}` : baseMessage
if (sharedSessionID) args.session = sharedSessionID // continue tier-1's session
const sid = await execute(sdk)
if (sid && !sharedSessionID) sharedSessionID = sid // capture tier-1's session
},
verify: verifyWorkspace,
})
} finally {
// Always restore the mutated request state, even if a tier throws — otherwise
// `message`/`args.model`/`args.session` leak the last tier's state to any
// downstream logging/telemetry/retry.
message = baseMessage
args.model = originalModel
args.session = originalSession
}
const envelope = Verdict.build(result, { now: new Date().toISOString() })
if (args.format === "json") {
process.stdout.write(JSON.stringify({ type: "verdict", timestamp: Date.now(), ...envelope }) + EOL)
} else {
const tag = envelope.solved ? `✓ verified by ${envelope.solvedBy}` : "✗ unverified after all tiers"
UI.println(UI.Style.TEXT_INFO_BOLD + `~ router: ${tag} (policy: ${policy.source})`)
}
await Policy.reportOutcome(envelope)
}
// altimate_change end

if (args.attach) {
const headers = (() => {
const password = args.password ?? process.env.OPENCODE_SERVER_PASSWORD
Expand All @@ -875,7 +1073,11 @@ You are speaking to a non-technical business executive. Follow these rules stric
return { Authorization: auth }
})()
const sdk = createOpencodeClient({ baseUrl: args.attach, directory, headers })
return await execute(sdk)
// altimate_change start — route when enabled, else single run
if (Router.enabled()) await runRouted(sdk)
else await execute(sdk) // discard execute()'s returned session id (handler returns void)
return
// altimate_change end
}

await bootstrap(process.cwd(), async () => {
Expand All @@ -884,7 +1086,10 @@ You are speaking to a non-technical business executive. Follow these rules stric
return Server.Default().fetch(request)
}) as typeof globalThis.fetch
const sdk = createOpencodeClient({ baseUrl: "http://altimate-code.internal", fetch: fetchFn })
await execute(sdk)
// altimate_change start — route when enabled, else single run
if (Router.enabled()) await runRouted(sdk)
else await execute(sdk)
// altimate_change end
})
},
})
Loading
Loading