Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions src/scheduler/__tests__/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -462,4 +462,164 @@ describe("Scheduler", () => {
expect(row.next).toBeNull();
scheduler.stop();
});

test("updateJob updates task only, preserves run history", async () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
name: "Updatable",
schedule: { kind: "every", intervalMs: 60_000 },
task: "Original task",
});

// Simulate some run history
await scheduler.runJobNow(job.id);
const beforeUpdate = scheduler.getJob(job.id);
expect(beforeUpdate?.runCount).toBe(1);
expect(beforeUpdate?.lastRunAt).toBeTruthy();

const updated = scheduler.updateJob(job.id, { task: "Updated task" });
expect(updated).not.toBeNull();
expect(updated?.task).toBe("Updated task");
expect(updated?.runCount).toBe(1);
expect(updated?.lastRunAt).toBe(beforeUpdate?.lastRunAt);
expect(updated?.lastRunStatus).toBe("ok");
});

test("updateJob updates schedule and recomputes next_run_at", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
name: "Schedule Update",
schedule: { kind: "every", intervalMs: 60_000 },
task: "Task",
});

const originalNextRun = job.nextRunAt;
const updated = scheduler.updateJob(job.id, { schedule: { kind: "every", intervalMs: 120_000 } });

expect(updated).not.toBeNull();
expect(updated?.schedule).toEqual({ kind: "every", intervalMs: 120_000 });
expect(updated?.nextRunAt).not.toBe(originalNextRun);
expect(updated?.nextRunAt).toBeTruthy();
});

test("updateJob updates delivery", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
name: "Delivery Update",
schedule: { kind: "every", intervalMs: 60_000 },
task: "Task",
});

const updated = scheduler.updateJob(job.id, { delivery: { channel: "slack", target: "C04ABC123" } });
expect(updated).not.toBeNull();
expect(updated?.delivery).toEqual({ channel: "slack", target: "C04ABC123" });
});

test("updateJob updates name", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
name: "Old Name",
schedule: { kind: "every", intervalMs: 60_000 },
task: "Task",
});

const updated = scheduler.updateJob(job.id, { name: "New Name" });
expect(updated).not.toBeNull();
expect(updated?.name).toBe("New Name");
});

test("updateJob updates enabled flag", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
name: "Toggle",
schedule: { kind: "every", intervalMs: 60_000 },
task: "Task",
enabled: true,
});

const disabled = scheduler.updateJob(job.id, { enabled: false });
expect(disabled).not.toBeNull();
expect(disabled?.enabled).toBe(false);

const enabled = scheduler.updateJob(job.id, { enabled: true });
expect(enabled).not.toBeNull();
expect(enabled?.enabled).toBe(true);
});

test("updateJob returns null for nonexistent job", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const updated = scheduler.updateJob("nonexistent-id", { task: "New task" });
expect(updated).toBeNull();
});

test("updateJob rejects invalid schedule", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
name: "Bad Schedule",
schedule: { kind: "every", intervalMs: 60_000 },
task: "Task",
});

const past = new Date(Date.now() - 3_600_000).toISOString();
expect(() => {
scheduler.updateJob(job.id, { schedule: { kind: "at", at: past } });
}).toThrow("invalid schedule");
});

test("updateJob rejects invalid delivery target", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
name: "Bad Delivery",
schedule: { kind: "every", intervalMs: 60_000 },
task: "Task",
});

expect(() => {
scheduler.updateJob(job.id, { delivery: { channel: "slack", target: "invalid" } });
}).toThrow("invalid delivery.target");
});

test("updateJob rejects duplicate name belonging to another job", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
scheduler.createJob({
name: "Existing",
schedule: { kind: "every", intervalMs: 60_000 },
task: "Task A",
});
const jobB = scheduler.createJob({
name: "Other",
schedule: { kind: "every", intervalMs: 60_000 },
task: "Task B",
});

expect(() => {
scheduler.updateJob(jobB.id, { name: "Existing" });
}).toThrow('job with name "Existing" already exists');
});

test("updateJob allows renaming to current name (idempotent)", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
name: "SameName",
schedule: { kind: "every", intervalMs: 60_000 },
task: "Task",
});

const updated = scheduler.updateJob(job.id, { name: "SameName" });
expect(updated?.name).toBe("SameName");
});

test("updateJob rejects task exceeding MAX_TASK_BYTES", () => {
const scheduler = new Scheduler({ db, runtime: mockRuntime as never });
const job = scheduler.createJob({
name: "BigTask",
schedule: { kind: "every", intervalMs: 60_000 },
task: "small",
});

const huge = "x".repeat(33 * 1024);
expect(() => {
scheduler.updateJob(job.id, { task: huge });
}).toThrow("byte limit");
});
});
103 changes: 100 additions & 3 deletions src/scheduler/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ import type { Database } from "bun:sqlite";
import { randomUUID } from "node:crypto";
import type { AgentRuntime } from "../agent/runtime.ts";
import type { SlackTransport } from "../channels/slack-transport.ts";
import { validateCreateInput } from "./create-validation.ts";
import { MAX_TASK_BYTES, validateCreateInput } from "./create-validation.ts";
import { executeJob } from "./executor.ts";
import { type SchedulerHealthSummary, computeHealthSummary } from "./health.ts";
import { cleanupOldTerminalJobs, staggerMissedJobs } from "./recovery.ts";
import { rowToJob } from "./row-mapper.ts";
import { computeNextRunAt, serializeScheduleValue } from "./schedule.ts";
import type { JobCreateInput, JobRow, ScheduledJob } from "./types.ts";
import { computeNextRunAt, serializeScheduleValue, validateSchedule } from "./schedule.ts";
import {
type JobCreateInput,
type JobRow,
type JobUpdateInput,
type ScheduledJob,
isValidSlackTarget,
} from "./types.ts";

// Upper bound on the setTimeout delay we pass when arming the next wake-up.
// Both Node and Bun use a 32-bit signed integer for the setTimeout delay, so
Expand Down Expand Up @@ -124,6 +130,97 @@ export class Scheduler {
return created;
}

updateJob(id: string, input: JobUpdateInput): ScheduledJob | null {
const job = this.getJob(id);
if (!job) return null;

// Validate schedule if changed
if (input.schedule) {
const scheduleError = validateSchedule(input.schedule);
if (scheduleError) {
throw new Error(`invalid schedule: ${scheduleError}`);
}
}

// Validate delivery if changed
if (input.delivery?.channel === "slack" && input.delivery.target) {
if (!isValidSlackTarget(input.delivery.target)) {
throw new Error(
`invalid delivery.target '${input.delivery.target}': must be "owner", a Slack channel id (C...), or a Slack user id (U...)`,
);
}
}

// Duplicate name detection: skip if unchanged (idempotent rename).
if (input.name !== undefined && input.name.toLowerCase() !== job.name.toLowerCase()) {
const dupe = this.db.query("SELECT id FROM scheduled_jobs WHERE lower(name) = lower(?)").get(input.name) as {
id: string;
} | null;
if (dupe) {
throw new Error(`job with name "${input.name}" already exists (id: ${dupe.id})`);
}
}

// Task size cap (mirrors validateCreateInput).
if (input.task !== undefined) {
const taskBytes = Buffer.byteLength(input.task, "utf8");
if (taskBytes > MAX_TASK_BYTES) {
throw new Error(`task text is ${taskBytes} bytes, exceeds ${MAX_TASK_BYTES} byte limit`);
}
}

// Build dynamic UPDATE statement
const updates: string[] = [];
const values: (string | number | null)[] = [];

if (input.name !== undefined) {
updates.push("name = ?");
values.push(input.name);
}
if (input.description !== undefined) {
updates.push("description = ?");
values.push(input.description);
}
if (input.task !== undefined) {
updates.push("task = ?");
values.push(input.task);
}
if (input.enabled !== undefined) {
updates.push("enabled = ?");
values.push(input.enabled ? 1 : 0);
}
if (input.schedule) {
updates.push("schedule_kind = ?");
updates.push("schedule_value = ?");
values.push(input.schedule.kind);
values.push(serializeScheduleValue(input.schedule));
// Recompute next_run_at when schedule changes
const nextRun = computeNextRunAt(input.schedule);
updates.push("next_run_at = ?");
values.push(nextRun ? nextRun.toISOString() : null);
}
if (input.delivery) {
updates.push("delivery_channel = ?");
updates.push("delivery_target = ?");
values.push(input.delivery.channel);
values.push(input.delivery.target);
}

if (updates.length === 0) {
// No fields to update, return current job
return job;
}

updates.push("updated_at = datetime('now')");
values.push(id);

this.db.run(`UPDATE scheduled_jobs SET ${updates.join(", ")} WHERE id = ?`, values);

this.armTimer();

return this.getJob(id);
}

deleteJob(id: string): boolean {
const result = this.db.run("DELETE FROM scheduled_jobs WHERE id = ?", [id]);
if (result.changes > 0) {
Expand Down
15 changes: 15 additions & 0 deletions src/scheduler/tool-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,18 @@ export const JobCreateInputSchema = z.object({
});

export type JobCreateInputParsed = z.infer<typeof JobCreateInputSchema>;

export const JobUpdateInputSchema = z.object({
name: z.string().min(1).max(200).optional(),
description: z.string().max(1000).optional(),
schedule: ScheduleInputSchema.optional(),
task: z
.string()
.min(1)
.max(32 * 1024)
.optional(),
delivery: JobDeliverySchema.optional(),
enabled: z.boolean().optional(),
});

export type JobUpdateInputParsed = z.infer<typeof JobUpdateInputSchema>;
40 changes: 36 additions & 4 deletions src/scheduler/tool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ function err(message: string): { content: Array<{ type: "text"; text: string }>;
return { content: [{ type: "text" as const, text: JSON.stringify({ error: message }) }], isError: true };
}

const TOOL_DESCRIPTION = `Create, list, delete, or trigger scheduled tasks. Lets you set up recurring jobs, one-shot reminders, and automated reports.
const TOOL_DESCRIPTION = `Create, list, delete, update, or trigger scheduled tasks. Lets you set up recurring jobs, one-shot reminders, and automated reports.

Actions:
- create: Create a new scheduled task. Returns the job id and next run time. Rejects invalid schedules, past timestamps, duplicate names, task text over 32 KB, and delivery targets that are not "owner", a channel id (C...), or a user id (U...).
- list: List all scheduled tasks with status and next run time. Corrupt rows are logged and skipped.
- delete: Remove a scheduled task by jobId or by name (case insensitive).
- update: Update a scheduled task by jobId or by name. Preserves run history (run_count, last_run_at, last_run_status, consecutive_errors). Only provided fields are updated. If schedule is changed, next_run_at is recomputed.
- run: Trigger a task immediately. Only runs when status is active and no other job is currently executing. Returns the task output.

Schedule types:
Expand Down Expand Up @@ -60,9 +61,9 @@ export function createSchedulerToolServer(scheduler: Scheduler): McpSdkServerCon
TOOL_DESCRIPTION,
{
action: z
.enum(["create", "list", "delete", "run"])
.enum(["create", "list", "delete", "update", "run"])
.describe(
"create: new scheduled task. list: enumerate tasks. delete: remove by jobId or name. run: trigger immediately (only when status=active and scheduler is idle).",
"create: new scheduled task. list: enumerate tasks. delete: remove by jobId or name. update: modify by jobId or name. run: trigger immediately (only when status=active and scheduler is idle).",
),
name: z.string().optional().describe("Job name (required for create)"),
description: z.string().optional().describe("Job description"),
Expand All @@ -72,7 +73,8 @@ export function createSchedulerToolServer(scheduler: Scheduler): McpSdkServerCon
.optional()
.describe("The prompt for the agent when the job fires (required for create, 32 KB max)"),
delivery: JobDeliverySchema.optional().describe("Where to deliver results"),
jobId: z.string().optional().describe("Job ID (for delete or run)"),
enabled: z.boolean().optional().describe("Enable or disable the job (for update)"),
jobId: z.string().optional().describe("Job ID (for delete, update, or run)"),
},
async (input) => {
try {
Expand Down Expand Up @@ -131,6 +133,36 @@ export function createSchedulerToolServer(scheduler: Scheduler): McpSdkServerCon
return ok({ deleted, id: targetId });
}

case "update": {
const targetId = input.jobId ?? scheduler.findJobIdByName(input.name);
if (!targetId) return err("Provide jobId or name to update");

// Build update object with only provided fields.
// If name was used for lookup (no jobId), exclude it from updates
// to avoid updating the name to itself.
const usedNameForLookup = !input.jobId && input.name;
const updateFields: Record<string, unknown> = {};
if (input.name !== undefined && !usedNameForLookup) updateFields.name = input.name;
if (input.description !== undefined) updateFields.description = input.description;
if (input.schedule !== undefined) updateFields.schedule = input.schedule;
if (input.task !== undefined) updateFields.task = input.task;
if (input.delivery !== undefined) updateFields.delivery = input.delivery;
if (input.enabled !== undefined) updateFields.enabled = input.enabled;

const updated = scheduler.updateJob(targetId, updateFields);
if (!updated) return err(`Job not found: ${targetId}`);

return ok({
updated: true,
id: updated.id,
name: updated.name,
schedule: updated.schedule,
nextRunAt: updated.nextRunAt,
delivery: updated.delivery,
enabled: updated.enabled,
});
}

case "run": {
const targetId = input.jobId ?? scheduler.findJobIdByName(input.name);
if (!targetId) return err("Provide jobId or name to run");
Expand Down
9 changes: 9 additions & 0 deletions src/scheduler/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ export type JobCreateInput = {
createdBy?: string;
};

export type JobUpdateInput = {
name?: string;
description?: string;
schedule?: Schedule;
task?: string;
delivery?: JobDelivery;
enabled?: boolean;
};

export type JobRow = {
id: string;
name: string;
Expand Down