Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 63 additions & 32 deletions src/lib/orchestrator.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { randomUUID } from 'crypto';
import type { MCConfig } from './config';
import type { PlanSpec, JobSpec, PlanStatus, CheckpointType, CheckpointContext } from './plan-types';
import { loadPlan, savePlan, updatePlanJob, clearPlan, validateGhAuth } from './plan-state';
import { loadPlan, savePlan, updatePlanJob, updatePlanFields, clearPlan, validateGhAuth } from './plan-state';
import { getDefaultBranch } from './git';
import { createIntegrationBranch, deleteIntegrationBranch } from './integration';
import { MergeTrain, checkMergeability, type MergeTestReport, validateTouchSet } from './merge-train';
Expand Down Expand Up @@ -291,7 +291,6 @@ export class Orchestrator {

const plan = await loadPlan();
if (plan && plan.status === 'paused') {
// Track jobs approved for merge so reconciler doesn't re-checkpoint them
if (wasPreMerge) {
for (const job of plan.jobs) {
if (job.status === 'ready_to_merge') {
Expand All @@ -300,10 +299,11 @@ export class Orchestrator {
}
}

plan.status = 'running';
plan.checkpoint = null;
plan.checkpointContext = null;
await savePlan(plan);
await updatePlanFields(plan.id, {
status: 'running',
checkpoint: null,
checkpointContext: null,
});
this.showToast('Mission Control', 'Checkpoint cleared, resuming execution.', 'info');

if (!this.isRunning) {
Expand Down Expand Up @@ -391,10 +391,11 @@ export class Orchestrator {

private async setCheckpoint(type: CheckpointType, plan: PlanSpec, context?: CheckpointContext): Promise<void> {
this.checkpoint = type;
plan.status = 'paused';
plan.checkpoint = type;
plan.checkpointContext = context ?? null;
await savePlan(plan);
await updatePlanFields(plan.id, {
status: 'paused',
checkpoint: type,
checkpointContext: context ?? null,
});
this.stopReconciler();
this.showToast(
'Mission Control',
Expand Down Expand Up @@ -447,6 +448,32 @@ export class Orchestrator {
const runningJobs = (await getRunningJobs()).filter((job) => job.planId === plan.id);
let runningCount = runningJobs.length;

// Safety net: detect plan jobs stuck as 'running' when jobs.json already
// shows them as completed/failed (can happen if a prior savePlan race
// overwrote their status, or if the 'complete' event was missed).
const jobState = await loadJobState();
const runningJobNames = new Set(runningJobs.map((j) => j.name));
for (const planJob of plan.jobs) {
if (planJob.status !== 'running') continue;
if (runningJobNames.has(planJob.name)) continue;

const stateJob = jobState.jobs.find(
(j) => j.name === planJob.name && j.planId === plan.id,
);
if (!stateJob) continue;

if (stateJob.status === 'completed') {
await updatePlanJob(plan.id, planJob.name, { status: 'completed' });
planJob.status = 'completed';
} else if (stateJob.status === 'failed' || stateJob.status === 'stopped') {
await updatePlanJob(plan.id, planJob.name, {
status: 'failed',
error: 'recovered from missed completion event',
});
planJob.status = 'failed';
}
}

const mergeOrder = [...plan.jobs].sort(
(a, b) => (a.mergeOrder ?? Number.MAX_SAFE_INTEGER) - (b.mergeOrder ?? Number.MAX_SAFE_INTEGER),
);
Expand Down Expand Up @@ -654,23 +681,26 @@ export class Orchestrator {
return;
}

latestPlan.status = 'creating_pr';
await savePlan(latestPlan);
await updatePlanFields(latestPlan.id, { status: 'creating_pr' });

try {
const prUrl = await this.createPR();
latestPlan.prUrl = prUrl;
latestPlan.status = 'completed';
latestPlan.completedAt = new Date().toISOString();
await savePlan(latestPlan);
const completedAt = new Date().toISOString();
await updatePlanFields(latestPlan.id, {
status: 'completed',
prUrl,
completedAt,
});
this.stopReconciler();
this.unsubscribeFromMonitorEvents();
this.showToast('Mission Control', `Plan completed! PR: ${prUrl}`, 'success');
this.notify(`🎉 Plan "${latestPlan.name}" completed! PR created: ${prUrl}`);
} catch (prError) {
latestPlan.status = 'failed';
latestPlan.completedAt = new Date().toISOString();
await savePlan(latestPlan);
const completedAt = new Date().toISOString();
await updatePlanFields(latestPlan.id, {
status: 'failed',
completedAt,
});
this.stopReconciler();
this.unsubscribeFromMonitorEvents();
const errMsg = prError instanceof Error ? prError.message : String(prError);
Expand All @@ -681,15 +711,17 @@ export class Orchestrator {
}

if (latestPlan.status !== plan.status) {
latestPlan.status = plan.status;
const updates: { status: typeof plan.status; completedAt?: string } = {
status: plan.status,
};
if (plan.status === 'failed') {
latestPlan.completedAt = new Date().toISOString();
updates.completedAt = new Date().toISOString();
this.stopReconciler();
this.unsubscribeFromMonitorEvents();
this.showToast('Mission Control', `Plan "${latestPlan.name}" failed.`, 'error');
this.notify(`❌ Plan "${latestPlan.name}" failed.`);
}
await savePlan(latestPlan);
await updatePlanFields(latestPlan.id, updates);
}
}

Expand Down Expand Up @@ -1154,10 +1186,11 @@ If your work needs human review before it can proceed: mc_report(status: "needs_
}

if (plan.status === 'paused') {
plan.status = 'running';
plan.checkpoint = null;
plan.checkpointContext = null;
await savePlan(plan);
await updatePlanFields(plan.id, {
status: 'running',
checkpoint: null,
checkpointContext: null,
});
}
this.checkpoint = null;

Expand Down Expand Up @@ -1196,12 +1229,10 @@ If your work needs human review before it can proceed: mc_report(status: "needs_
}

if (hasDeadRunningJob) {
const currentPlan = await loadPlan();
if (currentPlan) {
currentPlan.status = 'failed';
currentPlan.completedAt = new Date().toISOString();
await savePlan(currentPlan);
}
await updatePlanFields(plan.id, {
status: 'failed',
completedAt: new Date().toISOString(),
});
return;
}

Expand Down
67 changes: 66 additions & 1 deletion src/lib/plan-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { join } from 'path';
import { z } from 'zod';
import { getDataDir } from './paths';
import { GitMutex } from './git-mutex';
import type { PlanSpec, JobSpec } from './plan-types';
import type { PlanSpec, JobSpec, CheckpointContext } from './plan-types';
import { isValidPlanTransition, isValidJobTransition } from './plan-types';
import { PlanSpecSchema } from './schemas';
import { atomicWrite } from './utils';
Expand Down Expand Up @@ -114,6 +114,71 @@ export async function updatePlanJob(
});
}

export interface PlanFieldUpdates {
status?: PlanSpec['status'];
checkpoint?: PlanSpec['checkpoint'];
checkpointContext?: CheckpointContext | null;
completedAt?: string;
prUrl?: string;
}

/**
* Atomically update plan-level fields without overwriting job states.
*
* Unlike savePlan(), this reads the current plan inside the mutex and merges
* only the specified fields. This prevents a stale plan snapshot from clobbering
* concurrent updatePlanJob() writes — the root cause of completed jobs appearing
* as "running" after a sibling job failed (see #63).
*/
export async function updatePlanFields(
planId: string,
updates: PlanFieldUpdates,
): Promise<void> {
await planMutex.withLock(async () => {
const plan = await loadPlan();

if (!plan) {
throw new Error('No active plan exists');
}

if (plan.id !== planId) {
throw new Error(
`Plan ID mismatch: expected ${planId}, got ${plan.id}`,
);
}

if (updates.status !== undefined && updates.status !== plan.status) {
if (!isValidPlanTransition(plan.status, updates.status)) {
console.warn(`[MC] Invalid plan transition: ${plan.status} → ${updates.status} (plan: ${plan.name})`);
}
plan.status = updates.status;
}
if (updates.checkpoint !== undefined) {
plan.checkpoint = updates.checkpoint;
}
if (updates.checkpointContext !== undefined) {
plan.checkpointContext = updates.checkpointContext;
}
if (updates.completedAt !== undefined) {
plan.completedAt = updates.completedAt;
}
if (updates.prUrl !== undefined) {
plan.prUrl = updates.prUrl;
}

const ghAuthenticated = await validateGhAuth();
const planToSave = { ...plan, ghAuthenticated };

const filePath = await getPlanFilePath();
try {
const data = JSON.stringify(planToSave, null, 2);
await atomicWrite(filePath, data);
} catch (error) {
throw new Error(`Failed to save plan state to ${filePath}: ${error}`);
}
});
}

export async function clearPlan(): Promise<void> {
await planMutex.withLock(async () => {
const filePath = await getPlanFilePath();
Expand Down
10 changes: 10 additions & 0 deletions tests/lib/orchestrator-modes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ describe('orchestrator modes', () => {
);
},
);
spyOn(planStateMod, 'updatePlanFields').mockImplementation(
async (planId: string, updates: Partial<PlanSpec>) => {
if (!planState || planState.id !== planId) return;
if (updates.status !== undefined) planState.status = updates.status;
if (updates.checkpoint !== undefined) planState.checkpoint = updates.checkpoint;
if (updates.checkpointContext !== undefined) planState.checkpointContext = updates.checkpointContext;
if (updates.completedAt !== undefined) planState.completedAt = updates.completedAt;
if (updates.prUrl !== undefined) planState.prUrl = updates.prUrl;
},
);
spyOn(planStateMod, 'clearPlan').mockImplementation(async () => {
planState = null;
});
Expand Down
Loading