diff --git a/src/lib/orchestrator.ts b/src/lib/orchestrator.ts index ae1042d..7978d1b 100644 --- a/src/lib/orchestrator.ts +++ b/src/lib/orchestrator.ts @@ -1,7 +1,7 @@ import { randomUUID } from 'crypto'; import type { MCConfig } from './config'; import type { PlanSpec, JobSpec, PlanStatus, CheckpointType, CheckpointContext } from './plan-types'; -import { loadPlan, savePlan, updatePlanJob, clearPlan, validateGhAuth } from './plan-state'; +import { loadPlan, savePlan, updatePlanJob, updatePlanFields, clearPlan, validateGhAuth } from './plan-state'; import { getDefaultBranch } from './git'; import { createIntegrationBranch, deleteIntegrationBranch } from './integration'; import { MergeTrain, checkMergeability, type MergeTestReport, validateTouchSet } from './merge-train'; @@ -291,7 +291,6 @@ export class Orchestrator { const plan = await loadPlan(); if (plan && plan.status === 'paused') { - // Track jobs approved for merge so reconciler doesn't re-checkpoint them if (wasPreMerge) { for (const job of plan.jobs) { if (job.status === 'ready_to_merge') { @@ -300,10 +299,11 @@ export class Orchestrator { } } - plan.status = 'running'; - plan.checkpoint = null; - plan.checkpointContext = null; - await savePlan(plan); + await updatePlanFields(plan.id, { + status: 'running', + checkpoint: null, + checkpointContext: null, + }); this.showToast('Mission Control', 'Checkpoint cleared, resuming execution.', 'info'); if (!this.isRunning) { @@ -391,10 +391,11 @@ export class Orchestrator { private async setCheckpoint(type: CheckpointType, plan: PlanSpec, context?: CheckpointContext): Promise { this.checkpoint = type; - plan.status = 'paused'; - plan.checkpoint = type; - plan.checkpointContext = context ?? null; - await savePlan(plan); + await updatePlanFields(plan.id, { + status: 'paused', + checkpoint: type, + checkpointContext: context ?? null, + }); this.stopReconciler(); this.showToast( 'Mission Control', @@ -447,6 +448,32 @@ export class Orchestrator { const runningJobs = (await getRunningJobs()).filter((job) => job.planId === plan.id); let runningCount = runningJobs.length; + // Safety net: detect plan jobs stuck as 'running' when jobs.json already + // shows them as completed/failed (can happen if a prior savePlan race + // overwrote their status, or if the 'complete' event was missed). + const jobState = await loadJobState(); + const runningJobNames = new Set(runningJobs.map((j) => j.name)); + for (const planJob of plan.jobs) { + if (planJob.status !== 'running') continue; + if (runningJobNames.has(planJob.name)) continue; + + const stateJob = jobState.jobs.find( + (j) => j.name === planJob.name && j.planId === plan.id, + ); + if (!stateJob) continue; + + if (stateJob.status === 'completed') { + await updatePlanJob(plan.id, planJob.name, { status: 'completed' }); + planJob.status = 'completed'; + } else if (stateJob.status === 'failed' || stateJob.status === 'stopped') { + await updatePlanJob(plan.id, planJob.name, { + status: 'failed', + error: 'recovered from missed completion event', + }); + planJob.status = 'failed'; + } + } + const mergeOrder = [...plan.jobs].sort( (a, b) => (a.mergeOrder ?? Number.MAX_SAFE_INTEGER) - (b.mergeOrder ?? Number.MAX_SAFE_INTEGER), ); @@ -654,23 +681,26 @@ export class Orchestrator { return; } - latestPlan.status = 'creating_pr'; - await savePlan(latestPlan); + await updatePlanFields(latestPlan.id, { status: 'creating_pr' }); try { const prUrl = await this.createPR(); - latestPlan.prUrl = prUrl; - latestPlan.status = 'completed'; - latestPlan.completedAt = new Date().toISOString(); - await savePlan(latestPlan); + const completedAt = new Date().toISOString(); + await updatePlanFields(latestPlan.id, { + status: 'completed', + prUrl, + completedAt, + }); this.stopReconciler(); this.unsubscribeFromMonitorEvents(); this.showToast('Mission Control', `Plan completed! PR: ${prUrl}`, 'success'); this.notify(`🎉 Plan "${latestPlan.name}" completed! PR created: ${prUrl}`); } catch (prError) { - latestPlan.status = 'failed'; - latestPlan.completedAt = new Date().toISOString(); - await savePlan(latestPlan); + const completedAt = new Date().toISOString(); + await updatePlanFields(latestPlan.id, { + status: 'failed', + completedAt, + }); this.stopReconciler(); this.unsubscribeFromMonitorEvents(); const errMsg = prError instanceof Error ? prError.message : String(prError); @@ -681,15 +711,17 @@ export class Orchestrator { } if (latestPlan.status !== plan.status) { - latestPlan.status = plan.status; + const updates: { status: typeof plan.status; completedAt?: string } = { + status: plan.status, + }; if (plan.status === 'failed') { - latestPlan.completedAt = new Date().toISOString(); + updates.completedAt = new Date().toISOString(); this.stopReconciler(); this.unsubscribeFromMonitorEvents(); this.showToast('Mission Control', `Plan "${latestPlan.name}" failed.`, 'error'); this.notify(`❌ Plan "${latestPlan.name}" failed.`); } - await savePlan(latestPlan); + await updatePlanFields(latestPlan.id, updates); } } @@ -1154,10 +1186,11 @@ If your work needs human review before it can proceed: mc_report(status: "needs_ } if (plan.status === 'paused') { - plan.status = 'running'; - plan.checkpoint = null; - plan.checkpointContext = null; - await savePlan(plan); + await updatePlanFields(plan.id, { + status: 'running', + checkpoint: null, + checkpointContext: null, + }); } this.checkpoint = null; @@ -1196,12 +1229,10 @@ If your work needs human review before it can proceed: mc_report(status: "needs_ } if (hasDeadRunningJob) { - const currentPlan = await loadPlan(); - if (currentPlan) { - currentPlan.status = 'failed'; - currentPlan.completedAt = new Date().toISOString(); - await savePlan(currentPlan); - } + await updatePlanFields(plan.id, { + status: 'failed', + completedAt: new Date().toISOString(), + }); return; } diff --git a/src/lib/plan-state.ts b/src/lib/plan-state.ts index a4921cc..f68661e 100644 --- a/src/lib/plan-state.ts +++ b/src/lib/plan-state.ts @@ -2,7 +2,7 @@ import { join } from 'path'; import { z } from 'zod'; import { getDataDir } from './paths'; import { GitMutex } from './git-mutex'; -import type { PlanSpec, JobSpec } from './plan-types'; +import type { PlanSpec, JobSpec, CheckpointContext } from './plan-types'; import { isValidPlanTransition, isValidJobTransition } from './plan-types'; import { PlanSpecSchema } from './schemas'; import { atomicWrite } from './utils'; @@ -114,6 +114,71 @@ export async function updatePlanJob( }); } +export interface PlanFieldUpdates { + status?: PlanSpec['status']; + checkpoint?: PlanSpec['checkpoint']; + checkpointContext?: CheckpointContext | null; + completedAt?: string; + prUrl?: string; +} + +/** + * Atomically update plan-level fields without overwriting job states. + * + * Unlike savePlan(), this reads the current plan inside the mutex and merges + * only the specified fields. This prevents a stale plan snapshot from clobbering + * concurrent updatePlanJob() writes — the root cause of completed jobs appearing + * as "running" after a sibling job failed (see #63). + */ +export async function updatePlanFields( + planId: string, + updates: PlanFieldUpdates, +): Promise { + await planMutex.withLock(async () => { + const plan = await loadPlan(); + + if (!plan) { + throw new Error('No active plan exists'); + } + + if (plan.id !== planId) { + throw new Error( + `Plan ID mismatch: expected ${planId}, got ${plan.id}`, + ); + } + + if (updates.status !== undefined && updates.status !== plan.status) { + if (!isValidPlanTransition(plan.status, updates.status)) { + console.warn(`[MC] Invalid plan transition: ${plan.status} → ${updates.status} (plan: ${plan.name})`); + } + plan.status = updates.status; + } + if (updates.checkpoint !== undefined) { + plan.checkpoint = updates.checkpoint; + } + if (updates.checkpointContext !== undefined) { + plan.checkpointContext = updates.checkpointContext; + } + if (updates.completedAt !== undefined) { + plan.completedAt = updates.completedAt; + } + if (updates.prUrl !== undefined) { + plan.prUrl = updates.prUrl; + } + + const ghAuthenticated = await validateGhAuth(); + const planToSave = { ...plan, ghAuthenticated }; + + const filePath = await getPlanFilePath(); + try { + const data = JSON.stringify(planToSave, null, 2); + await atomicWrite(filePath, data); + } catch (error) { + throw new Error(`Failed to save plan state to ${filePath}: ${error}`); + } + }); +} + export async function clearPlan(): Promise { await planMutex.withLock(async () => { const filePath = await getPlanFilePath(); diff --git a/tests/lib/orchestrator-modes.test.ts b/tests/lib/orchestrator-modes.test.ts index 3d2675b..2e90cb8 100644 --- a/tests/lib/orchestrator-modes.test.ts +++ b/tests/lib/orchestrator-modes.test.ts @@ -80,6 +80,16 @@ describe('orchestrator modes', () => { ); }, ); + spyOn(planStateMod, 'updatePlanFields').mockImplementation( + async (planId: string, updates: Partial) => { + if (!planState || planState.id !== planId) return; + if (updates.status !== undefined) planState.status = updates.status; + if (updates.checkpoint !== undefined) planState.checkpoint = updates.checkpoint; + if (updates.checkpointContext !== undefined) planState.checkpointContext = updates.checkpointContext; + if (updates.completedAt !== undefined) planState.completedAt = updates.completedAt; + if (updates.prUrl !== undefined) planState.prUrl = updates.prUrl; + }, + ); spyOn(planStateMod, 'clearPlan').mockImplementation(async () => { planState = null; }); diff --git a/tests/lib/orchestrator.test.ts b/tests/lib/orchestrator.test.ts index d8c17a6..f69b5af 100644 --- a/tests/lib/orchestrator.test.ts +++ b/tests/lib/orchestrator.test.ts @@ -68,6 +68,18 @@ describe('orchestrator', () => { ); }, ); + spyOn(planStateMod, 'updatePlanFields').mockImplementation( + async (planId: string, updates: Partial) => { + if (!planState || planState.id !== planId) { + return; + } + if (updates.status !== undefined) planState.status = updates.status; + if (updates.checkpoint !== undefined) planState.checkpoint = updates.checkpoint; + if (updates.checkpointContext !== undefined) planState.checkpointContext = updates.checkpointContext; + if (updates.completedAt !== undefined) planState.completedAt = updates.completedAt; + if (updates.prUrl !== undefined) planState.prUrl = updates.prUrl; + }, + ); spyOn(planStateMod, 'clearPlan').mockImplementation(async () => { planState = null; }); @@ -792,6 +804,156 @@ describe('orchestrator', () => { expect(noDepCall[0].startPoint).toBe('def456'); expect(withDepCall[0].startPoint).toBe('mc/integration-plan-1'); }); + + it('failed job event preserves sibling completed job states (race condition fix)', async () => { + // Simulate: 3-job plan where job-b and job-c complete, then job-a fails. + // handleJobFailed calls loadPlan() then setCheckpoint(). + // Before the fix, setCheckpoint would savePlan(staleSnapshot), overwriting + // the completed statuses of job-b and job-c. After the fix, setCheckpoint + // uses updatePlanFields which only updates plan-level fields. + const orchestrator = new Orchestrator(monitor as any, { + defaultPlacement: 'session', + pollInterval: 10000, + idleThreshold: 300000, + worktreeBasePath: '/tmp', + omo: { enabled: false, defaultMode: 'vanilla' }, + } as any); + spyOn(orchestrator as any, 'startReconciler').mockImplementation(() => {}); + + await orchestrator.startPlan( + makePlan({ + status: 'pending', + jobs: [ + makeJob('job-a', { status: 'queued' }), + makeJob('job-b', { status: 'queued' }), + makeJob('job-c', { status: 'queued' }), + ], + }), + ); + + // Simulate job-b and job-c completing via updatePlanJob (as handleJobComplete does) + await planStateMod.updatePlanJob('plan-1', 'job-b', { status: 'completed' }); + await planStateMod.updatePlanJob('plan-1', 'job-c', { status: 'completed' }); + + // Verify they are completed before the failed event + expect(planState?.jobs.find((j) => j.name === 'job-b')?.status).toBe('completed'); + expect(planState?.jobs.find((j) => j.name === 'job-c')?.status).toBe('completed'); + + // Now job-a fails — this triggers handleJobFailed which calls loadPlan() then setCheckpoint() + monitor.emit('failed', { + id: 'j1', + name: 'job-a', + planId: 'plan-1', + } as Job); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + // Plan should be paused with on_error checkpoint + expect(planState?.status).toBe('paused'); + expect(planState?.checkpoint).toBe('on_error'); + + // CRITICAL: sibling job states must NOT be overwritten back to 'queued' + expect(planState?.jobs.find((j) => j.name === 'job-b')?.status).toBe('completed'); + expect(planState?.jobs.find((j) => j.name === 'job-c')?.status).toBe('completed'); + expect(planState?.jobs.find((j) => j.name === 'job-a')?.status).toBe('failed'); + }); + + it('reconciliation safety net detects stuck running jobs from jobs.json', async () => { + // Simulate: plan has a job marked 'running' but jobs.json shows it as 'completed'. + // This can happen if a prior race condition overwrote the plan job status. + planState = makePlan({ + status: 'running', + jobs: [ + makeJob('stuck-job', { status: 'running', mergeOrder: 0 }), + makeJob('other-job', { status: 'queued', mergeOrder: 1 }), + ], + }); + + // jobs.json has no running jobs for this plan (stuck-job already completed) + runningJobs = []; + + // loadJobState returns the job as completed + spyOn(jobStateMod, 'loadJobState').mockImplementation(async () => ({ + version: 2, + jobs: [ + { + id: 'stuck-job-id', + name: 'stuck-job', + planId: 'plan-1', + status: 'completed', + worktreePath: '/tmp/w1', + branch: 'mc/stuck-job', + tmuxTarget: 'mc-stuck-job', + placement: 'session' as const, + prompt: 'do stuck-job', + mode: 'vanilla' as const, + createdAt: '2026-01-01T00:00:00.000Z', + }, + ], + updatedAt: new Date().toISOString(), + })); + + const orchestrator = new Orchestrator(monitor as any, { + defaultPlacement: 'session', + pollInterval: 10000, + idleThreshold: 300000, + worktreeBasePath: '/tmp', + omo: { enabled: false, defaultMode: 'vanilla' }, + } as any); + + await (orchestrator as any).reconcile(); + + // Safety net should have corrected the stuck job status + expect(planStateMod.updatePlanJob).toHaveBeenCalledWith('plan-1', 'stuck-job', { status: 'completed' }); + }); + + it('reconciliation safety net recovers stopped jobs as failed', async () => { + planState = makePlan({ + status: 'running', + jobs: [ + makeJob('killed-job', { status: 'running', mergeOrder: 0 }), + makeJob('merged-job', { status: 'merged', mergeOrder: 1 }), + ], + }); + + runningJobs = []; + + spyOn(jobStateMod, 'loadJobState').mockImplementation(async () => ({ + version: 2, + jobs: [ + { + id: 'killed-job-id', + name: 'killed-job', + planId: 'plan-1', + status: 'stopped' as const, + worktreePath: '/tmp/w1', + branch: 'mc/killed-job', + tmuxTarget: 'mc-killed-job', + placement: 'session' as const, + prompt: 'do killed-job', + mode: 'vanilla' as const, + createdAt: '2026-01-01T00:00:00.000Z', + completedAt: '2026-01-01T00:01:00.000Z', + }, + ], + updatedAt: new Date().toISOString(), + })); + + const orchestrator = new Orchestrator(monitor as any, { + defaultPlacement: 'session', + pollInterval: 10000, + idleThreshold: 300000, + worktreeBasePath: '/tmp', + omo: { enabled: false, defaultMode: 'vanilla' }, + } as any); + + await (orchestrator as any).reconcile(); + + expect(planStateMod.updatePlanJob).toHaveBeenCalledWith('plan-1', 'killed-job', { + status: 'failed', + error: 'recovered from missed completion event', + }); + }); }); describe('orchestrator reconcile pending (dirty re-reconcile)', () => { @@ -818,6 +980,18 @@ describe('orchestrator reconcile pending (dirty re-reconcile)', () => { ); }, ); + spyOn(planStateMod, 'updatePlanFields').mockImplementation( + async (planId: string, updates: Partial) => { + if (!planState || planState.id !== planId) { + return; + } + if (updates.status !== undefined) planState.status = updates.status; + if (updates.checkpoint !== undefined) planState.checkpoint = updates.checkpoint; + if (updates.checkpointContext !== undefined) planState.checkpointContext = updates.checkpointContext; + if (updates.completedAt !== undefined) planState.completedAt = updates.completedAt; + if (updates.prUrl !== undefined) planState.prUrl = updates.prUrl; + }, + ); spyOn(planStateMod, 'clearPlan').mockImplementation(async () => { planState = null; }); @@ -1002,6 +1176,18 @@ describe('plan-scoped branch naming', () => { ); }, ); + spyOn(planStateMod, 'updatePlanFields').mockImplementation( + async (planId: string, updates: Partial) => { + if (!planState || planState.id !== planId) { + return; + } + if (updates.status !== undefined) planState.status = updates.status; + if (updates.checkpoint !== undefined) planState.checkpoint = updates.checkpoint; + if (updates.checkpointContext !== undefined) planState.checkpointContext = updates.checkpointContext; + if (updates.completedAt !== undefined) planState.completedAt = updates.completedAt; + if (updates.prUrl !== undefined) planState.prUrl = updates.prUrl; + }, + ); spyOn(planStateMod, 'clearPlan').mockImplementation(async () => { planState = null; }); diff --git a/tests/lib/plan-state.test.ts b/tests/lib/plan-state.test.ts index a4e04c9..61ba79d 100644 --- a/tests/lib/plan-state.test.ts +++ b/tests/lib/plan-state.test.ts @@ -13,6 +13,7 @@ const { savePlan, getActivePlan, updatePlanJob, + updatePlanFields, clearPlan, validateGhAuth, } = await import('../../src/lib/plan-state'); @@ -227,6 +228,101 @@ describe('plan-state', () => { }); }); + describe('updatePlanFields', () => { + it('updates plan-level fields without overwriting job states', async () => { + vi.spyOn( + await import('../../src/lib/plan-state'), + 'validateGhAuth', + ).mockResolvedValue(true); + + const plan = makePlan({ + status: 'running', + jobs: [ + makeJob({ name: 'job-a', status: 'completed' }), + makeJob({ id: 'job-2', name: 'job-b', status: 'completed' }), + ], + }); + await savePlan(plan); + + await updatePlanFields('plan-1', { status: 'paused', checkpoint: 'on_error' }); + + const loaded = await loadPlan(); + expect(loaded!.status).toBe('paused'); + expect(loaded!.checkpoint).toBe('on_error'); + expect(loaded!.jobs[0].status).toBe('completed'); + expect(loaded!.jobs[1].status).toBe('completed'); + }); + + it('preserves job updates made concurrently', async () => { + vi.spyOn( + await import('../../src/lib/plan-state'), + 'validateGhAuth', + ).mockResolvedValue(true); + + const plan = makePlan({ + status: 'running', + jobs: [ + makeJob({ name: 'job-a', status: 'running' }), + makeJob({ id: 'job-2', name: 'job-b', status: 'running' }), + makeJob({ id: 'job-3', name: 'job-c', status: 'running' }), + ], + }); + await savePlan(plan); + + await Promise.all([ + updatePlanJob('plan-1', 'job-b', { status: 'completed' }), + updatePlanJob('plan-1', 'job-c', { status: 'completed' }), + updatePlanFields('plan-1', { status: 'paused', checkpoint: 'on_error' }), + ]); + + const loaded = await loadPlan(); + expect(loaded!.status).toBe('paused'); + expect(loaded!.jobs[0].status).toBe('running'); + expect(loaded!.jobs[1].status).toBe('completed'); + expect(loaded!.jobs[2].status).toBe('completed'); + }); + + it('throws when plan ID does not match', async () => { + vi.spyOn( + await import('../../src/lib/plan-state'), + 'validateGhAuth', + ).mockResolvedValue(true); + + await savePlan(makePlan({ id: 'plan-1' })); + + await expect( + updatePlanFields('plan-wrong', { status: 'paused' }), + ).rejects.toThrow('Plan ID mismatch'); + }); + + it('throws when no active plan exists', async () => { + await expect( + updatePlanFields('plan-1', { status: 'paused' }), + ).rejects.toThrow('No active plan exists'); + }); + + it('updates checkpointContext field', async () => { + vi.spyOn( + await import('../../src/lib/plan-state'), + 'validateGhAuth', + ).mockResolvedValue(true); + + await savePlan(makePlan({ status: 'running' })); + + await updatePlanFields('plan-1', { + status: 'paused', + checkpoint: 'on_error', + checkpointContext: { jobName: 'bad-job', failureKind: 'job_failed' }, + }); + + const loaded = await loadPlan(); + expect(loaded!.checkpointContext).toEqual({ + jobName: 'bad-job', + failureKind: 'job_failed', + }); + }); + }); + describe('clearPlan', () => { it('removes plan.json', async () => { vi.spyOn(