From a173a21e6b84b0f9df4bfccefbb62c492a630a8f Mon Sep 17 00:00:00 2001 From: Arul Sharma <31745423+arul28@users.noreply.github.com> Date: Sun, 31 May 2026 19:43:45 -0400 Subject: [PATCH 1/2] Fix ADE-71 state persistence and kvDb hardening --- .../services/config/projectConfigService.ts | 40 ++- .../main/services/history/operationService.ts | 49 +-- .../main/services/sessions/sessionService.ts | 63 ++-- .../main/services/state/globalState.test.ts | 23 +- .../src/main/services/state/globalState.ts | 32 +- .../src/main/services/state/kvDb.test.ts | 11 + apps/desktop/src/main/services/state/kvDb.ts | 332 ++++++++++-------- 7 files changed, 325 insertions(+), 225 deletions(-) diff --git a/apps/desktop/src/main/services/config/projectConfigService.ts b/apps/desktop/src/main/services/config/projectConfigService.ts index 7a38bdfd9..432f46aba 100644 --- a/apps/desktop/src/main/services/config/projectConfigService.ts +++ b/apps/desktop/src/main/services/config/projectConfigService.ts @@ -3,6 +3,7 @@ import path from "node:path"; import { createHash } from "node:crypto"; import YAML from "yaml"; import cron from "node-cron"; +import { z } from "zod"; import type { AiConfig, AiFeatureKey, @@ -91,6 +92,27 @@ const AUTOMATION_ACTION_TYPE_SET = new Set([ "run-command", "ade-action", ]); +const OPTIONAL_STRING_SCHEMA = z.string().optional().catch(undefined); +const OPTIONAL_NUMBER_SCHEMA = z.number().finite().optional().catch(undefined); +const OPTIONAL_BOOL_SCHEMA = z.boolean().optional().catch(undefined); +const STRING_ARRAY_SCHEMA = z.array(z.unknown()) + .transform((values) => values + .filter((value): value is string => typeof value === "string") + .map((value) => value.trim()) + .filter(Boolean)) + .optional() + .catch(() => undefined); +const STRING_MAP_SCHEMA = z.record(z.string(), z.unknown()) + .transform((value) => { + const out: Record = {}; + for (const [key, entry] of Object.entries(value)) { + if (typeof entry === "string") out[key] = entry; + } + return out; + }) + .optional() + .catch(() => undefined); +const COMPUTE_BACKEND_SCHEMA = z.enum(["local", "vps", "daytona"]).optional().catch(undefined); function isPathWithinProjectRoot(projectRoot: string, candidate: string, opts: { allowMissing?: boolean } = {}): boolean { try { @@ -102,12 +124,11 @@ function isPathWithinProjectRoot(projectRoot: string, candidate: string, opts: { } function asString(value: unknown): string | undefined { - return typeof value === "string" ? value : undefined; + return OPTIONAL_STRING_SCHEMA.parse(value); } function asStringArray(value: unknown): string[] | undefined { - if (!Array.isArray(value)) return undefined; - return value.filter((v): v is string => typeof v === "string").map((v) => v.trim()).filter(Boolean); + return STRING_ARRAY_SCHEMA.parse(value); } function asLaneTypeArray(value: unknown): LaneType[] | undefined { @@ -120,15 +141,15 @@ function asLaneTypeArray(value: unknown): LaneType[] | undefined { } function asNumber(value: unknown): number | undefined { - return typeof value === "number" && Number.isFinite(value) ? value : undefined; + return OPTIONAL_NUMBER_SCHEMA.parse(value); } function asBool(value: unknown): boolean | undefined { - return typeof value === "boolean" ? value : undefined; + return OPTIONAL_BOOL_SCHEMA.parse(value); } function asComputeBackend(value: unknown): "local" | "vps" | "daytona" | undefined { - return value === "local" || value === "vps" || value === "daytona" ? value : undefined; + return COMPUTE_BACKEND_SCHEMA.parse(value); } function coerceOrchestratorHookConfig(value: unknown): { command: string; timeoutMs?: number } | null { @@ -147,12 +168,7 @@ function coerceOrchestratorHookConfig(value: unknown): { command: string; timeou } function asStringMap(value: unknown): Record | undefined { - if (!isRecord(value)) return undefined; - const out: Record = {}; - for (const [k, v] of Object.entries(value)) { - if (typeof v === "string") out[k] = v; - } - return out; + return STRING_MAP_SCHEMA.parse(value); } function normalizeConfigPath(value: string): string { diff --git a/apps/desktop/src/main/services/history/operationService.ts b/apps/desktop/src/main/services/history/operationService.ts index 15d481532..ce99f6e13 100644 --- a/apps/desktop/src/main/services/history/operationService.ts +++ b/apps/desktop/src/main/services/history/operationService.ts @@ -8,6 +8,19 @@ type OperationStatus = "running" | "succeeded" | "failed" | "canceled"; type OperationMetadata = Record; type HeadChangeOperationRecord = OperationRecord & { preHeadSha: string; postHeadSha: string }; +const OPERATION_COLUMNS = ` + o.id as id, + o.lane_id as laneId, + l.name as laneName, + o.kind as kind, + o.started_at as startedAt, + o.ended_at as endedAt, + o.status as status, + o.pre_head_sha as preHeadSha, + o.post_head_sha as postHeadSha, + o.metadata_json as metadataJson +`; + function safeParseMetadata(raw: string | null | undefined): OperationMetadata { const parsed = safeJsonParse(raw, null); return isRecord(parsed) ? parsed : {}; @@ -96,17 +109,7 @@ export function createOperationService({ if (!operationId.trim()) return null; const row = db.get( ` - select - o.id as id, - o.lane_id as laneId, - l.name as laneName, - o.kind as kind, - o.started_at as startedAt, - o.ended_at as endedAt, - o.status as status, - o.pre_head_sha as preHeadSha, - o.post_head_sha as postHeadSha, - o.metadata_json as metadataJson + select ${OPERATION_COLUMNS} from operations o left join lanes l on l.id = o.lane_id where o.project_id = ? and o.id = ? @@ -144,17 +147,7 @@ export function createOperationService({ const limit = typeof args.limit === "number" ? Math.max(1, Math.min(1000, Math.floor(args.limit))) : 100; return db.all( ` - select - o.id as id, - o.lane_id as laneId, - l.name as laneName, - o.kind as kind, - o.started_at as startedAt, - o.ended_at as endedAt, - o.status as status, - o.pre_head_sha as preHeadSha, - o.post_head_sha as postHeadSha, - o.metadata_json as metadataJson + select ${OPERATION_COLUMNS} from operations o left join lanes l on l.id = o.lane_id where o.project_id = ? @@ -197,17 +190,7 @@ export function createOperationService({ const rows = db.all( ` - select - o.id as id, - o.lane_id as laneId, - l.name as laneName, - o.kind as kind, - o.started_at as startedAt, - o.ended_at as endedAt, - o.status as status, - o.pre_head_sha as preHeadSha, - o.post_head_sha as postHeadSha, - o.metadata_json as metadataJson + select ${OPERATION_COLUMNS} from operations o left join lanes l on l.id = o.lane_id where ${where.join(" and ")} diff --git a/apps/desktop/src/main/services/sessions/sessionService.ts b/apps/desktop/src/main/services/sessions/sessionService.ts index 64d19ded5..b07af87f8 100644 --- a/apps/desktop/src/main/services/sessions/sessionService.ts +++ b/apps/desktop/src/main/services/sessions/sessionService.ts @@ -532,9 +532,13 @@ export function createSessionService({ db }: { db: AdeDb }) { .map((toolType) => normalizeToolType(toolType)) .filter((toolType): toolType is TerminalToolType => toolType != null) : []; - const exclusionSql = normalizedExcludedToolTypes.length - ? ` and (tool_type is null or tool_type not in (${normalizedExcludedToolTypes.map(() => "?").join(", ")}))` - : ""; + type SqlClause = { sql: string; params: Array }; + const exclusionClause: SqlClause = normalizedExcludedToolTypes.length + ? { + sql: ` and (tool_type is null or tool_type not in (${normalizedExcludedToolTypes.map(() => "?").join(", ")}))`, + params: normalizedExcludedToolTypes, + } + : { sql: "", params: [] }; const ownerParams = liveOwnerPids ? Array.from(liveOwnerPids) .map((pid) => normalizeOwnerPid(pid)) @@ -577,25 +581,33 @@ export function createSessionService({ db }: { db: AdeDb }) { identity.pid, identity.startedAt, ]); - const ownerGuardSql = (() => { - if (liveOwnerPids === undefined) return " and owner_pid is null"; + const ownerGuardClause = ((): SqlClause => { + if (liveOwnerPids === undefined) return { sql: " and owner_pid is null", params: [] }; const hasKnownOwnerScope = knownOwnerPids !== undefined || knownOwnerIdentities !== undefined; if (!hasKnownOwnerScope) { if (ownerIdentities.length) { - return ` and (owner_pid is null or owner_process_started_at is null or not (${ownerIdentities.map(() => "(owner_pid = ? and owner_process_started_at = ?)").join(" or ")}))`; + return { + sql: ` and (owner_pid is null or owner_process_started_at is null or not (${ownerIdentities.map(() => "(owner_pid = ? and owner_process_started_at = ?)").join(" or ")}))`, + params: ownerIdentityParams, + }; } return ownerParams.length - ? ` and (owner_pid is null or owner_pid not in (${ownerParams.map(() => "?").join(", ")}))` - : ""; + ? { + sql: ` and (owner_pid is null or owner_pid not in (${ownerParams.map(() => "?").join(", ")}))`, + params: ownerParams, + } + : { sql: "", params: [] }; } const staleKnownClauses = ["owner_pid is null"]; + const params: Array = []; if (knownOwnerIdentityRows.length) { const knownIdentitySql = knownOwnerIdentityRows.map(() => "(owner_pid = ? and owner_process_started_at = ?)").join(" or "); const liveIdentitySql = ownerIdentities.length ? ownerIdentities.map(() => "(owner_pid = ? and owner_process_started_at = ?)").join(" or ") : "0"; staleKnownClauses.push(`(owner_process_started_at is not null and (${knownIdentitySql}) and not (${liveIdentitySql}))`); + params.push(...knownOwnerIdentityParams, ...ownerIdentityParams); } if (knownOwnerParams.length) { const knownPidSql = knownOwnerParams.map(() => "?").join(", "); @@ -603,21 +615,9 @@ export function createSessionService({ db }: { db: AdeDb }) { ? ` and owner_pid not in (${ownerParams.map(() => "?").join(", ")})` : ""; staleKnownClauses.push(`(owner_process_started_at is null and owner_pid in (${knownPidSql})${livePidSql})`); + params.push(...knownOwnerParams, ...ownerParams); } - return ` and (${staleKnownClauses.join(" or ")})`; - })(); - const ownerGuardParams = (() => { - if (liveOwnerPids === undefined) return []; - const hasKnownOwnerScope = knownOwnerPids !== undefined || knownOwnerIdentities !== undefined; - if (!hasKnownOwnerScope) { - return ownerIdentities.length ? ownerIdentityParams : ownerParams; - } - return [ - ...knownOwnerIdentityParams, - ...(knownOwnerIdentityRows.length ? ownerIdentityParams : []), - ...knownOwnerParams, - ...(knownOwnerParams.length ? ownerParams : []), - ]; + return { sql: ` and (${staleKnownClauses.join(" or ")})`, params }; })(); const graceMs = typeof freshActivityGraceMs === "number" && Number.isFinite(freshActivityGraceMs) ? Math.max(0, freshActivityGraceMs) @@ -627,16 +627,15 @@ export function createSessionService({ db }: { db: AdeDb }) { const activityCutoff = graceMs > 0 && Number.isFinite(cutoffMs) ? new Date(cutoffMs).toISOString() : null; - const activityGuardSql = activityCutoff - ? " and started_at < ? and (last_output_at is null or last_output_at < ?)" - : ""; - const activityParams = activityCutoff ? [activityCutoff, activityCutoff] : []; - const whereSql = `status = 'running'${exclusionSql}${ownerGuardSql}${activityGuardSql}`; - const params = [ - ...normalizedExcludedToolTypes, - ...ownerGuardParams, - ...activityParams, - ]; + const activityClause: SqlClause = activityCutoff + ? { + sql: " and started_at < ? and (last_output_at is null or last_output_at < ?)", + params: [activityCutoff, activityCutoff], + } + : { sql: "", params: [] }; + const clauses = [exclusionClause, ownerGuardClause, activityClause]; + const whereSql = `status = 'running'${clauses.map((clause) => clause.sql).join("")}`; + const params = clauses.flatMap((clause) => clause.params); const rows = db.all<{ id: string }>( `select id from terminal_sessions where ${whereSql}`, params, diff --git a/apps/desktop/src/main/services/state/globalState.test.ts b/apps/desktop/src/main/services/state/globalState.test.ts index 1a182b88a..2b80933ca 100644 --- a/apps/desktop/src/main/services/state/globalState.test.ts +++ b/apps/desktop/src/main/services/state/globalState.test.ts @@ -1,6 +1,9 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; import { describe, expect, it } from "vitest"; -import { upsertRecentProject, type GlobalState } from "./globalState"; +import { readGlobalState, upsertRecentProject, writeGlobalState, type GlobalState } from "./globalState"; describe("upsertRecentProject", () => { it("keeps an existing project in place when preserving recent order", () => { @@ -62,3 +65,21 @@ describe("upsertRecentProject", () => { expect(next.lastProjectRoot).toBe("/projects/a"); }); }); + +describe("writeGlobalState", () => { + it("persists state through an atomic temp-file rename", () => { + const dir = fs.mkdtempSync(path.join(os.tmpdir(), "ade-global-state-")); + const filePath = path.join(dir, "global-state.json"); + const state: GlobalState = { + lastProjectRoot: "/repo/ade", + recentProjects: [ + { rootPath: "/repo/ade", displayName: "ADE", lastOpenedAt: "2026-05-31T00:00:00.000Z" }, + ], + }; + + writeGlobalState(filePath, state); + + expect(readGlobalState(filePath)).toEqual(state); + expect(fs.readdirSync(dir).filter((entry) => entry.endsWith(".tmp"))).toEqual([]); + }); +}); diff --git a/apps/desktop/src/main/services/state/globalState.ts b/apps/desktop/src/main/services/state/globalState.ts index 6a075dd8d..48d1b9cee 100644 --- a/apps/desktop/src/main/services/state/globalState.ts +++ b/apps/desktop/src/main/services/state/globalState.ts @@ -1,5 +1,6 @@ import fs from "node:fs"; import path from "node:path"; +import { randomUUID } from "node:crypto"; import type { OpenProjectBinding, RecentlyInstalledUpdate } from "../../../shared/types"; export type RecentProject = { @@ -36,10 +37,37 @@ export function readGlobalState(filePath: string): GlobalState { } export function writeGlobalState(filePath: string, state: GlobalState): void { + let tempPath: string | null = null; + let fd: number | null = null; try { - fs.mkdirSync(path.dirname(filePath), { recursive: true }); - fs.writeFileSync(filePath, JSON.stringify(state, null, 2), "utf8"); + const dir = path.dirname(filePath); + fs.mkdirSync(dir, { recursive: true }); + tempPath = path.join(dir, `.${path.basename(filePath)}.${process.pid}.${randomUUID()}.tmp`); + const serialized = `${JSON.stringify(state, null, 2)}\n`; + fd = fs.openSync(tempPath, "w"); + fs.writeFileSync(fd, serialized, "utf8"); + fs.fsyncSync(fd); + fs.closeSync(fd); + fd = null; + fs.renameSync(tempPath, filePath); + tempPath = null; + try { + const dirFd = fs.openSync(dir, "r"); + try { + fs.fsyncSync(dirFd); + } finally { + fs.closeSync(dirFd); + } + } catch { + // Directory fsync is best effort across filesystems/platforms. + } } catch { + if (fd != null) { + try { fs.closeSync(fd); } catch {} + } + if (tempPath) { + try { fs.unlinkSync(tempPath); } catch {} + } // Non-fatal; global state is a convenience. } } diff --git a/apps/desktop/src/main/services/state/kvDb.test.ts b/apps/desktop/src/main/services/state/kvDb.test.ts index 3d3115594..16678d79d 100644 --- a/apps/desktop/src/main/services/state/kvDb.test.ts +++ b/apps/desktop/src/main/services/state/kvDb.test.ts @@ -216,6 +216,17 @@ describe("openKvDb SQL binding", () => { db.all("select flag from db_value_test where flag = ?", [{} as any]), ).toThrow(/Unsupported database value at parameter 1: object .*sql=select flag from db_value_test/i); }); + + it("checkpoints pending writes when flushed", async () => { + const projectRoot = makeProjectRoot("ade-kvdb-flush-"); + const dbPath = path.join(projectRoot, ".ade", "ade.db"); + const db = await openKvDb(dbPath, createLogger() as any); + activeDisposers.push(async () => db.close()); + + db.setJson("flush:probe", { ok: true }); + expect(() => db.flushNow()).not.toThrow(); + expect(db.getJson<{ ok: boolean }>("flush:probe")).toEqual({ ok: true }); + }); }); describe("lane_linear_issue_links schema", () => { diff --git a/apps/desktop/src/main/services/state/kvDb.ts b/apps/desktop/src/main/services/state/kvDb.ts index 99c827955..da1ef22dc 100644 --- a/apps/desktop/src/main/services/state/kvDb.ts +++ b/apps/desktop/src/main/services/state/kvDb.ts @@ -212,6 +212,19 @@ function quoteIdentifier(value: string): string { return `"${value.replace(/"/g, "\"\"")}"`; } +function unquoteSqlIdentifier(value: string): string { + const trimmed = value.trim(); + if ( + (trimmed.startsWith("\"") && trimmed.endsWith("\"")) + || (trimmed.startsWith("'") && trimmed.endsWith("'")) + || (trimmed.startsWith("`") && trimmed.endsWith("`")) + || (trimmed.startsWith("[") && trimmed.endsWith("]")) + ) { + return trimmed.slice(1, -1); + } + return trimmed; +} + function rewriteCreateTableName(sql: string, fromName: string, toName: string): string { const pattern = new RegExp( `^(\\s*create\\s+table\\s+(?:if\\s+not\\s+exists\\s+)?)((?:["'\`\\[])?${escapeRegExp(fromName)}(?:["'\`\\]])?)`, @@ -862,7 +875,7 @@ function rebuildCrrTableWithBackfill(db: DatabaseSyncType, tableName: string): v .filter((sql) => sql.length > 0); runStatement(db, "pragma foreign_keys = off"); - runStatement(db, "begin"); + runStatement(db, "BEGIN IMMEDIATE"); try { runStatement( db, @@ -1081,7 +1094,85 @@ type MigrationDb = { function parseAlterTableTarget(sql: string): string | null { const match = sql.match(/^\s*alter\s+table\s+([`"'[\]A-Za-z0-9_]+)\s+add\s+column\s+/i); if (!match?.[1]) return null; - return match[1].replace(/^["'`[]|["'`\]]$/g, ""); + return unquoteSqlIdentifier(match[1]); +} + +function parseAlterTableAddColumn(sql: string): { tableName: string; columnName: string } | null { + const identifier = String.raw`(?:"[^"]+"|'[^']+'|` + "`[^`]+`" + String.raw`|\[[^\]]+\]|[A-Za-z0-9_]+)`; + const match = sql.match(new RegExp(String.raw`^\s*alter\s+table\s+(${identifier})\s+add\s+column\s+(${identifier})\b`, "i")); + if (!match?.[1] || !match[2]) return null; + return { + tableName: unquoteSqlIdentifier(match[1]), + columnName: unquoteSqlIdentifier(match[2]), + }; +} + +function migrationHasColumn(db: MigrationDb, tableName: string, columnName: string): boolean { + return db.all<{ name: string }>(`pragma table_info('${tableName.replace(/'/g, "''")}')`) + .some((column) => column.name === columnName); +} + +function isDuplicateColumnError(error: unknown): boolean { + const message = error instanceof Error ? error.message : String(error); + return /duplicate column name/i.test(message); +} + +function safeAddColumn(db: MigrationDb, sql: string): void { + const parsed = parseAlterTableAddColumn(sql); + if (!parsed) { + db.run(sql); + return; + } + if (migrationHasColumn(db, parsed.tableName, parsed.columnName)) return; + try { + db.run(sql); + } catch (error) { + if (isDuplicateColumnError(error) && migrationHasColumn(db, parsed.tableName, parsed.columnName)) { + return; + } + console.warn("kvDb.migrate.add_column_failed", { + tableName: parsed.tableName, + columnName: parsed.columnName, + sql: sql.replace(/\s+/g, " ").trim(), + error: error instanceof Error ? error.message : String(error), + }); + throw error; + } +} + +function makeCrrAwareDb({ + getDb, + isCrsqliteLoaded, +}: { + getDb: () => DatabaseSyncType; + isCrsqliteLoaded: () => boolean; +}): MigrationDb { + return { + run: (sql: string, params: SqlValue[] = []) => { + const db = getDb(); + const alterTable = parseAlterTableTarget(sql); + if (alterTable && isCrsqliteLoaded() && rawHasTable(db, `${alterTable}__crsql_clock`)) { + getRow(db, "select crsql_begin_alter(?) as ok", [alterTable]); + try { + runStatement(db, sql, params); + } catch (error) { + // Commit the alter even on failure so the CRR state stays consistent, + // then re-throw so callers can handle duplicate-column upgrades. + getRow(db, "select crsql_commit_alter(?) as ok", [alterTable]); + throw error; + } + getRow(db, "select crsql_commit_alter(?) as ok", [alterTable]); + return; + } + runStatement(db, sql, params); + }, + get: = Record>(sql: string, params: SqlValue[] = []) => { + return getRow(getDb(), sql, params); + }, + all: = Record>(sql: string, params: SqlValue[] = []) => { + return allRows(getDb(), sql, params); + }, + }; } function migrate(db: MigrationDb) { @@ -1135,7 +1226,7 @@ function migrate(db: MigrationDb) { foreign key(parent_lane_id) references lanes(id) ) `); - try { db.run("alter table lanes add column runtime_placement text not null default 'local'"); } catch {} + safeAddColumn(db, "alter table lanes add column runtime_placement text not null default 'local'"); db.run("create index if not exists idx_lanes_project_id on lanes(project_id)"); db.run("create index if not exists idx_lanes_project_type on lanes(project_id, lane_type)"); db.run("create index if not exists idx_lanes_project_parent on lanes(project_id, parent_lane_id)"); @@ -1408,20 +1499,20 @@ function migrate(db: MigrationDb) { db.run("create index if not exists idx_terminal_sessions_lane_started_at on terminal_sessions(lane_id, started_at desc)"); // Migration: add resume_command to existing databases that pre-date this column. - try { db.run("alter table terminal_sessions add column resume_command text"); } catch {} - try { db.run("alter table terminal_sessions add column resume_metadata_json text"); } catch {} - try { db.run("alter table terminal_sessions add column manually_named integer not null default 0"); } catch {} - try { db.run("alter table terminal_sessions add column archived_at text"); } catch {} - try { db.run("alter table terminal_sessions add column chat_session_id text"); } catch {} + safeAddColumn(db, "alter table terminal_sessions add column resume_command text"); + safeAddColumn(db, "alter table terminal_sessions add column resume_metadata_json text"); + safeAddColumn(db, "alter table terminal_sessions add column manually_named integer not null default 0"); + safeAddColumn(db, "alter table terminal_sessions add column archived_at text"); + safeAddColumn(db, "alter table terminal_sessions add column chat_session_id text"); try { db.run("create index if not exists idx_terminal_sessions_chat_session_id on terminal_sessions(chat_session_id)"); } catch {} // owner_pid identifies the ADE OS process that owns this row's runtime // (the one with the live PTY or SDK session). Cross-process dispose / // reconcile must check it before sweeping or every concurrent surface // would happily mark each other's live sessions dead. Nullable because // pre-migration rows pre-date ownership tracking. - try { db.run("alter table terminal_sessions add column owner_pid integer"); } catch {} + safeAddColumn(db, "alter table terminal_sessions add column owner_pid integer"); try { db.run("create index if not exists idx_terminal_sessions_owner_pid on terminal_sessions(owner_pid)"); } catch {} - try { db.run("alter table terminal_sessions add column owner_process_started_at text"); } catch {} + safeAddColumn(db, "alter table terminal_sessions add column owner_process_started_at text"); try { db.run("create index if not exists idx_terminal_sessions_owner_process on terminal_sessions(owner_pid, owner_process_started_at)"); } catch {} // Machine-local process liveness registry. Every ADE process (desktop main, @@ -1756,11 +1847,11 @@ function migrate(db: MigrationDb) { `); db.run("create index if not exists idx_pull_requests_lane_id on pull_requests(lane_id)"); db.run("create index if not exists idx_pull_requests_project_id on pull_requests(project_id)"); - try { db.run("alter table pull_requests add column last_polled_at text"); } catch {} - try { db.run("alter table pull_requests add column head_sha text"); } catch {} - try { db.run("alter table pull_requests add column creation_strategy text"); } catch {} - try { db.run("alter table pull_requests add column merge_conflicts integer"); } catch {} - try { db.run("alter table pull_requests add column behind_base_by integer"); } catch {} + safeAddColumn(db, "alter table pull_requests add column last_polled_at text"); + safeAddColumn(db, "alter table pull_requests add column head_sha text"); + safeAddColumn(db, "alter table pull_requests add column creation_strategy text"); + safeAddColumn(db, "alter table pull_requests add column merge_conflicts integer"); + safeAddColumn(db, "alter table pull_requests add column behind_base_by integer"); db.run("drop table if exists github_pr_cache"); @@ -1807,7 +1898,7 @@ function migrate(db: MigrationDb) { ) `); db.run("create index if not exists idx_pull_request_snapshots_updated_at on pull_request_snapshots(updated_at)"); - try { db.run("alter table pull_request_snapshots add column commits_json text"); } catch {} + safeAddColumn(db, "alter table pull_request_snapshots add column commits_json text"); db.run(` create table if not exists files_workspaces ( @@ -2028,17 +2119,17 @@ function migrate(db: MigrationDb) { ) `); db.run("create index if not exists idx_integration_proposals_project on integration_proposals(project_id)"); - try { db.run("alter table integration_proposals add column linked_group_id text"); } catch {} - try { db.run("alter table integration_proposals add column linked_pr_id text"); } catch {} - try { db.run("alter table integration_proposals add column workflow_display_state text not null default 'active'"); } catch {} - try { db.run("alter table integration_proposals add column cleanup_state text not null default 'none'"); } catch {} - try { db.run("alter table integration_proposals add column closed_at text"); } catch {} - try { db.run("alter table integration_proposals add column merged_at text"); } catch {} - try { db.run("alter table integration_proposals add column completed_at text"); } catch {} - try { db.run("alter table integration_proposals add column cleanup_declined_at text"); } catch {} - try { db.run("alter table integration_proposals add column cleanup_completed_at text"); } catch {} - try { db.run("alter table integration_proposals add column preferred_integration_lane_id text"); } catch {} - try { db.run("alter table integration_proposals add column merge_into_head_sha text"); } catch {} + safeAddColumn(db, "alter table integration_proposals add column linked_group_id text"); + safeAddColumn(db, "alter table integration_proposals add column linked_pr_id text"); + safeAddColumn(db, "alter table integration_proposals add column workflow_display_state text not null default 'active'"); + safeAddColumn(db, "alter table integration_proposals add column cleanup_state text not null default 'none'"); + safeAddColumn(db, "alter table integration_proposals add column closed_at text"); + safeAddColumn(db, "alter table integration_proposals add column merged_at text"); + safeAddColumn(db, "alter table integration_proposals add column completed_at text"); + safeAddColumn(db, "alter table integration_proposals add column cleanup_declined_at text"); + safeAddColumn(db, "alter table integration_proposals add column cleanup_completed_at text"); + safeAddColumn(db, "alter table integration_proposals add column preferred_integration_lane_id text"); + safeAddColumn(db, "alter table integration_proposals add column merge_into_head_sha text"); // Queue landing state table (crash recovery for sequential landing) db.run(` @@ -2062,12 +2153,12 @@ function migrate(db: MigrationDb) { ) `); db.run("create index if not exists idx_queue_landing_state_group on queue_landing_state(group_id)"); - try { db.run("alter table queue_landing_state add column config_json text not null default '{}'"); } catch {} - try { db.run("alter table queue_landing_state add column active_pr_id text"); } catch {} - try { db.run("alter table queue_landing_state add column active_resolver_run_id text"); } catch {} - try { db.run("alter table queue_landing_state add column last_error text"); } catch {} - try { db.run("alter table queue_landing_state add column wait_reason text"); } catch {} - try { db.run("alter table queue_landing_state add column updated_at text"); } catch {} + safeAddColumn(db, "alter table queue_landing_state add column config_json text not null default '{}'"); + safeAddColumn(db, "alter table queue_landing_state add column active_pr_id text"); + safeAddColumn(db, "alter table queue_landing_state add column active_resolver_run_id text"); + safeAddColumn(db, "alter table queue_landing_state add column last_error text"); + safeAddColumn(db, "alter table queue_landing_state add column wait_reason text"); + safeAddColumn(db, "alter table queue_landing_state add column updated_at text"); // Rebase dismiss/defer persistence db.run(` @@ -2252,7 +2343,7 @@ function migrate(db: MigrationDb) { deleted_at text ) `); - try { db.run("alter table worker_agents add column linear_identity_json text not null default '{}'"); } catch {} + safeAddColumn(db, "alter table worker_agents add column linear_identity_json text not null default '{}'"); db.run("create index if not exists idx_worker_agents_project on worker_agents(project_id)"); db.run("create index if not exists idx_worker_agents_project_active on worker_agents(project_id, deleted_at)"); @@ -2514,15 +2605,15 @@ function migrate(db: MigrationDb) { updated_at text not null ) `); - try { db.run("alter table linear_workflow_runs add column execution_lane_id text"); } catch {} - try { db.run("alter table linear_workflow_runs add column supervisor_identity_key text"); } catch {} - try { db.run("alter table linear_workflow_runs add column review_ready_reason text"); } catch {} - try { db.run("alter table linear_workflow_runs add column pr_state text"); } catch {} - try { db.run("alter table linear_workflow_runs add column pr_checks_status text"); } catch {} - try { db.run("alter table linear_workflow_runs add column pr_review_status text"); } catch {} - try { db.run("alter table linear_workflow_runs add column latest_review_note text"); } catch {} - try { db.run("alter table linear_workflow_runs add column route_context_json text"); } catch {} - try { db.run("alter table linear_workflow_runs add column execution_context_json text"); } catch {} + safeAddColumn(db, "alter table linear_workflow_runs add column execution_lane_id text"); + safeAddColumn(db, "alter table linear_workflow_runs add column supervisor_identity_key text"); + safeAddColumn(db, "alter table linear_workflow_runs add column review_ready_reason text"); + safeAddColumn(db, "alter table linear_workflow_runs add column pr_state text"); + safeAddColumn(db, "alter table linear_workflow_runs add column pr_checks_status text"); + safeAddColumn(db, "alter table linear_workflow_runs add column pr_review_status text"); + safeAddColumn(db, "alter table linear_workflow_runs add column latest_review_note text"); + safeAddColumn(db, "alter table linear_workflow_runs add column route_context_json text"); + safeAddColumn(db, "alter table linear_workflow_runs add column execution_context_json text"); db.run("create index if not exists idx_linear_workflow_runs_project_status on linear_workflow_runs(project_id, status, updated_at)"); db.run("create index if not exists idx_linear_workflow_runs_issue on linear_workflow_runs(project_id, issue_id, updated_at)"); @@ -2736,11 +2827,11 @@ function migrate(db: MigrationDb) { `); db.run("create index if not exists idx_review_candidate_findings_run on review_candidate_findings(run_id)"); db.run("create index if not exists idx_review_candidate_findings_reviewer on review_candidate_findings(reviewer_run_id)"); - try { db.run("alter table review_findings add column finding_class text"); } catch {} - try { db.run("alter table review_findings add column originating_passes_json text"); } catch {} - try { db.run("alter table review_findings add column adjudication_json text"); } catch {} - try { db.run("alter table review_findings add column diff_context_json text"); } catch {} - try { db.run("alter table review_findings add column suppression_match_json text"); } catch {} + safeAddColumn(db, "alter table review_findings add column finding_class text"); + safeAddColumn(db, "alter table review_findings add column originating_passes_json text"); + safeAddColumn(db, "alter table review_findings add column adjudication_json text"); + safeAddColumn(db, "alter table review_findings add column diff_context_json text"); + safeAddColumn(db, "alter table review_findings add column suppression_match_json text"); // Per-finding feedback — powers the learning loop. db.run(` @@ -2808,17 +2899,17 @@ function migrate(db: MigrationDb) { foreign key(pr_id) references pull_requests(id) on delete cascade ) `); - try { db.run("alter table pr_issue_inventory add column thread_comment_count integer"); } catch {} - try { db.run("alter table pr_issue_inventory add column thread_latest_comment_id text"); } catch {} - try { db.run("alter table pr_issue_inventory add column thread_latest_comment_author text"); } catch {} - try { db.run("alter table pr_issue_inventory add column thread_latest_comment_at text"); } catch {} - try { db.run("alter table pr_issue_inventory add column thread_latest_comment_source text"); } catch {} + safeAddColumn(db, "alter table pr_issue_inventory add column thread_comment_count integer"); + safeAddColumn(db, "alter table pr_issue_inventory add column thread_latest_comment_id text"); + safeAddColumn(db, "alter table pr_issue_inventory add column thread_latest_comment_author text"); + safeAddColumn(db, "alter table pr_issue_inventory add column thread_latest_comment_at text"); + safeAddColumn(db, "alter table pr_issue_inventory add column thread_latest_comment_source text"); db.run("create index if not exists idx_inventory_pr_state on pr_issue_inventory(pr_id, state)"); // PR pipeline settings: per-PR auto-converge / auto-merge configuration. // Newer fields (conflict_strategy, force_finalize_*, early_merge_on_green, - // auto_agent_*) are added via try-catch ALTER below so existing DBs upgrade - // in place. The legacy `on_rebase_needed` column is retained for back-compat. + // auto_agent_*) are added with safeAddColumn so existing DBs upgrade in + // place. The legacy `on_rebase_needed` column is retained for back-compat. db.run(` create table if not exists pr_pipeline_settings ( pr_id text primary key, @@ -2830,20 +2921,20 @@ function migrate(db: MigrationDb) { foreign key(pr_id) references pull_requests(id) on delete cascade ) `); - try { db.run("alter table pr_pipeline_settings add column conflict_strategy text not null default 'pause'"); } catch {} - try { db.run("alter table pr_pipeline_settings add column force_finalize_mode text not null default 'conditional'"); } catch {} - try { db.run("alter table pr_pipeline_settings add column force_finalize_require_no_ci_failures integer not null default 1"); } catch {} - try { db.run("alter table pr_pipeline_settings add column early_merge_on_green integer not null default 1"); } catch {} - try { db.run("alter table pr_pipeline_settings add column auto_agent_provider text"); } catch {} - try { db.run("alter table pr_pipeline_settings add column auto_agent_model text"); } catch {} - try { db.run("alter table pr_pipeline_settings add column auto_agent_reasoning_effort text"); } catch {} - try { db.run("alter table pr_pipeline_settings add column auto_agent_permission_mode text"); } catch {} - try { db.run("alter table pr_pipeline_settings add column auto_agent_confidence_threshold real"); } catch {} - try { db.run("alter table pr_pipeline_settings add column at_cap_policy text default 'ci_retry_once'"); } catch {} - try { db.run("alter table pr_pipeline_settings add column at_cap_wait_minutes integer"); } catch {} - try { db.run("alter table pr_pipeline_settings add column at_cap_ci_retry_max integer"); } catch {} - try { db.run("alter table pr_pipeline_settings add column force_merge_requires_confirmation integer"); } catch {} - try { db.run("alter table pr_pipeline_settings add column ptm_defaults_backfilled_version text"); } catch {} + safeAddColumn(db, "alter table pr_pipeline_settings add column conflict_strategy text not null default 'pause'"); + safeAddColumn(db, "alter table pr_pipeline_settings add column force_finalize_mode text not null default 'conditional'"); + safeAddColumn(db, "alter table pr_pipeline_settings add column force_finalize_require_no_ci_failures integer not null default 1"); + safeAddColumn(db, "alter table pr_pipeline_settings add column early_merge_on_green integer not null default 1"); + safeAddColumn(db, "alter table pr_pipeline_settings add column auto_agent_provider text"); + safeAddColumn(db, "alter table pr_pipeline_settings add column auto_agent_model text"); + safeAddColumn(db, "alter table pr_pipeline_settings add column auto_agent_reasoning_effort text"); + safeAddColumn(db, "alter table pr_pipeline_settings add column auto_agent_permission_mode text"); + safeAddColumn(db, "alter table pr_pipeline_settings add column auto_agent_confidence_threshold real"); + safeAddColumn(db, "alter table pr_pipeline_settings add column at_cap_policy text default 'ci_retry_once'"); + safeAddColumn(db, "alter table pr_pipeline_settings add column at_cap_wait_minutes integer"); + safeAddColumn(db, "alter table pr_pipeline_settings add column at_cap_ci_retry_max integer"); + safeAddColumn(db, "alter table pr_pipeline_settings add column force_merge_requires_confirmation integer"); + safeAddColumn(db, "alter table pr_pipeline_settings add column ptm_defaults_backfilled_version text"); try { db.run(` update pr_pipeline_settings @@ -2900,16 +2991,16 @@ function migrate(db: MigrationDb) { // PtM-specific run args (modelId, reasoning, scope, additionalInstructions) // serialized as JSON. Persisted so resumeFromPersistedState can re-dispatch // the fix agent after a desktop restart instead of pausing on missing modelId. - try { db.run("alter table pr_convergence_state add column ptm_args_json text"); } catch {} - try { db.run("alter table pr_convergence_state add column force_finalize_used integer not null default 0"); } catch {} - try { db.run("alter table pr_convergence_state add column ci_retry_attempts_used integer not null default 0"); } catch {} - try { db.run("alter table pr_convergence_state add column wait_for_ci_started_at text"); } catch {} - try { db.run("alter table pr_convergence_state add column last_dispatch_head_sha text"); } catch {} - try { db.run("alter table pr_convergence_state add column last_bot_ping_head_sha text"); } catch {} - try { db.run("alter table pr_convergence_state add column last_bot_ping_at text"); } catch {} - try { db.run("alter table pr_convergence_state add column merge_wait_kind text"); } catch {} - try { db.run("alter table pr_convergence_state add column pause_repeat_count integer not null default 0"); } catch {} - try { db.run("alter table pr_convergence_state add column last_pause_reason_hash text"); } catch {} + safeAddColumn(db, "alter table pr_convergence_state add column ptm_args_json text"); + safeAddColumn(db, "alter table pr_convergence_state add column force_finalize_used integer not null default 0"); + safeAddColumn(db, "alter table pr_convergence_state add column ci_retry_attempts_used integer not null default 0"); + safeAddColumn(db, "alter table pr_convergence_state add column wait_for_ci_started_at text"); + safeAddColumn(db, "alter table pr_convergence_state add column last_dispatch_head_sha text"); + safeAddColumn(db, "alter table pr_convergence_state add column last_bot_ping_head_sha text"); + safeAddColumn(db, "alter table pr_convergence_state add column last_bot_ping_at text"); + safeAddColumn(db, "alter table pr_convergence_state add column merge_wait_kind text"); + safeAddColumn(db, "alter table pr_convergence_state add column pause_repeat_count integer not null default 0"); + safeAddColumn(db, "alter table pr_convergence_state add column last_pause_reason_hash text"); // Machine-local runtime guard for PR automation. This table intentionally // has no PRIMARY KEY so cr-sqlite does not register it as a CRR table. @@ -3002,6 +3093,10 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise { } return crsqliteLoaded; }; + const crrAwareDb = makeCrrAwareDb({ + getDb: () => db, + isCrsqliteLoaded: () => crsqliteLoaded, + }); try { // Existing CRR tables install triggers that call cr-sqlite functions on @@ -3013,38 +3108,7 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise { disableCrrTriggersForUnavailableRuntime(db, logger); } - // Build a CRR-aware run wrapper: when crsqlite is loaded and a table has - // been converted to a CRR, ALTER TABLE statements must be wrapped with - // crsql_begin_alter / crsql_commit_alter so the clock tables stay in sync. - const makeMigrateDb = () => ({ - run: (sql: string, params: SqlValue[] = []) => { - const alterTable = parseAlterTableTarget(sql); - if (alterTable && crsqliteLoaded && rawHasTable(db, `${alterTable}__crsql_clock`)) { - getRow(db, "select crsql_begin_alter(?) as ok", [alterTable]); - try { - runStatement(db, sql, params); - } catch (error) { - // Commit the alter even on failure so the CRR state stays consistent, - // then re-throw so the caller's try/catch can handle it (e.g. column - // already exists on upgrade). - getRow(db, "select crsql_commit_alter(?) as ok", [alterTable]); - throw error; - } - getRow(db, "select crsql_commit_alter(?) as ok", [alterTable]); - return; - } - runStatement(db, sql, params); - }, - get: = Record>(sql: string, params: SqlValue[] = []) => { - return getRow(db, sql, params); - }, - all: = Record>(sql: string, params: SqlValue[] = []) => { - return allRows(db, sql, params); - }, - }); - - const migrateDb = makeMigrateDb(); - migrate(migrateDb); + migrate(crrAwareDb); removeExcludedCrrMetadata(db, logger); // Tear down the legacy `unified_memories` schema (removed in #329) before // any retrofit pass runs — the FTS4 shadow tables cannot be dropped @@ -3073,8 +3137,7 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise { if (hasCrsqlMetadata(db) && !crsqliteLoaded) { disableCrrTriggersForUnavailableRuntime(db, logger); } - const remigrateDb = makeMigrateDb(); - migrate(remigrateDb); + migrate(crrAwareDb); removeExcludedCrrMetadata(db, logger); } @@ -3092,8 +3155,7 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise { if (hasCrsqlMetadata(db) && !crsqliteLoaded) { disableCrrTriggersForUnavailableRuntime(db, logger); } - const remigrateDb = makeMigrateDb(); - migrate(remigrateDb); + migrate(crrAwareDb); removeExcludedCrrMetadata(db, logger); } @@ -3141,31 +3203,9 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise { runStatement(db, "insert into kv(key, value) values (?, ?) on conflict(key) do update set value = excluded.value", [key, value]); }; - const run = (sql: string, params: SqlValue[] = []) => { - const alterTable = parseAlterTableTarget(sql); - if (crsqliteLoaded && alterTable && rawHasTable(db, `${alterTable}__crsql_clock`)) { - getRow(db, "select crsql_begin_alter(?) as ok", [alterTable]); - try { - runStatement(db, sql, params); - } catch (error) { - // Commit the alter even on failure so the CRR state stays consistent, - // then re-throw so callers (e.g. safeAlter) can handle duplicate columns. - getRow(db, "select crsql_commit_alter(?) as ok", [alterTable]); - throw error; - } - getRow(db, "select crsql_commit_alter(?) as ok", [alterTable]); - return; - } - runStatement(db, sql, params); - }; - - const all = = Record>(sql: string, params: SqlValue[] = []): T[] => { - return allRows(db, sql, params); - }; - - const get = = Record>(sql: string, params: SqlValue[] = []): T | null => { - return getRow(db, sql, params); - }; + const run = crrAwareDb.run; + const all = crrAwareDb.all; + const get = crrAwareDb.get; const sync: AdeDbSyncApi = { isAvailable: () => crsqliteLoaded, @@ -3239,7 +3279,7 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise { if (!crsqliteLoaded) return { appliedCount: 0, dbVersion: 0, touchedTables: [], rebuiltFts: false }; let appliedCount = 0; const touchedTables = new Set(); - runStatement(db, "begin"); + runStatement(db, "BEGIN IMMEDIATE"); try { for (const rawChange of changes) { if (isLocalOnlyQueueWipeMarkerChange(rawChange)) continue; @@ -3283,10 +3323,10 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise { discardUnpublishedChangesForTables: (tableNames: string[]) => { const normalizedTableNames = Array.from(new Set(tableNames.map((tableName) => tableName.trim()).filter(Boolean))); if (!crsqliteLoaded || normalizedTableNames.length === 0) return; - const throughDbVersion = sync.getDbVersion(); - const createdAt = new Date().toISOString(); - runStatement(db, "begin"); + runStatement(db, "BEGIN IMMEDIATE"); try { + const throughDbVersion = sync.getDbVersion(); + const createdAt = new Date().toISOString(); for (const tableName of normalizedTableNames) { runStatement( db, @@ -3319,7 +3359,9 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise { all, get, sync, - flushNow: () => {}, + flushNow: () => { + getRow(db, "pragma wal_checkpoint(TRUNCATE)"); + }, close: () => { db.close(); }, From ab42bc6167b4bce37229c32b0bb21bcabf9a0471 Mon Sep 17 00:00:00 2001 From: Arul Sharma <31745423+arul28@users.noreply.github.com> Date: Sun, 31 May 2026 19:58:07 -0400 Subject: [PATCH 2/2] Document kvDb flush checkpoint semantics --- apps/desktop/src/main/services/state/kvDb.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apps/desktop/src/main/services/state/kvDb.ts b/apps/desktop/src/main/services/state/kvDb.ts index da1ef22dc..c5a0bc485 100644 --- a/apps/desktop/src/main/services/state/kvDb.ts +++ b/apps/desktop/src/main/services/state/kvDb.ts @@ -76,6 +76,12 @@ export type AdeDb = { all: = Record>(sql: string, params?: SqlValue[]) => T[]; sync: AdeDbSyncApi; + + /** + * Force pending WAL frames onto the main database before shutdown. This uses a + * TRUNCATE checkpoint and can wait for active readers, so keep calls on + * final-persistence paths instead of latency-sensitive request handling. + */ flushNow: () => void; close: () => void; };