diff --git a/src/channels/__tests__/webhook.test.ts b/src/channels/__tests__/webhook.test.ts index c8fcb9c3..5057d615 100644 --- a/src/channels/__tests__/webhook.test.ts +++ b/src/channels/__tests__/webhook.test.ts @@ -156,4 +156,145 @@ describe("WebhookChannel", () => { await channel2.disconnect(); expect(channel2.isConnected()).toBe(false); }); + + // Test 1: sync timeout returns 202 with task_id (not 504) + test("sync timeout returns 202 Accepted with task_id", async () => { + // Setup handler that never responds (simulates slow agent) + const handler = mock(async () => { + // Never resolve - will timeout + await new Promise(() => {}); + }); + channel.onMessage(handler); + + // Build valid request + const timestamp = Date.now(); + const bodyWithoutSig = JSON.stringify({ + message: "test message", + conversation_id: "conv1", + timestamp, + }); + const signature = signPayload(bodyWithoutSig, timestamp, testConfig.secret); + const body = JSON.stringify({ + message: "test message", + conversation_id: "conv1", + timestamp, + signature, + }); + + const req = new Request("http://localhost/webhook", { + method: "POST", + body, + headers: { "Content-Type": "application/json" }, + }); + + const res = await channel.handleRequest(req); + const data = (await res.json()) as { status: string; task_id?: string }; + + // Should return 202 Accepted with task_id, not 504 + expect(res.status).toBe(202); + expect(data.status).toBe("accepted"); + expect(data.task_id).toBeDefined(); + expect(typeof data.task_id).toBe("string"); + }); + + // Test 2: poll returns response after agent completes + test("poll returns completed response", async () => { + let resolveHandler: ((value: undefined) => void) | null = null; + const handlerPromise = new Promise((resolve) => { + resolveHandler = resolve; + }); + + // Setup handler that completes after we control it + const handler = mock(async () => { + await handlerPromise; + }); + channel.onMessage(handler); + + // Send request that will timeout + const timestamp = Date.now(); + const bodyWithoutSig = JSON.stringify({ + message: "test message", + conversation_id: "conv2", + timestamp, + }); + const signature = signPayload(bodyWithoutSig, timestamp, testConfig.secret); + const body = JSON.stringify({ + message: "test message", + conversation_id: "conv2", + timestamp, + signature, + }); + + const req = new Request("http://localhost/webhook", { + method: "POST", + body, + headers: { "Content-Type": "application/json" }, + }); + + const res = await channel.handleRequest(req); + const data = (await res.json()) as { status: string; task_id?: string }; + expect(res.status).toBe(202); + const taskId = data.task_id as string; + + // Now simulate agent completing + await channel.send("webhook:conv2", { text: "Agent response" }); + resolveHandler?.(); + + // Poll for the result + const pollReq = new Request(`http://localhost/webhook/poll?task_id=${taskId}`, { + method: "GET", + }); + const pollRes = await channel.handlePollRequest(pollReq); + const pollData = (await pollRes.json()) as { status: string; response?: string }; + + expect(pollRes.status).toBe(200); + expect(pollData.status).toBe("ok"); + expect(pollData.response).toBe("Agent response"); + }); + + test("poll returns 202 while still processing", async () => { + const shortChannel = new WebhookChannel({ secret: testConfig.secret, syncTimeoutMs: 100 }); + await shortChannel.connect(); + + // Handler that never resolves + shortChannel.onMessage(async () => { + await new Promise(() => {}); // never resolves + }); + + const timestamp = Date.now(); + const bodyObj = { message: "hello", conversation_id: "conv3", timestamp, user_id: "u1" }; + const bodyWithoutSig = JSON.stringify(bodyObj); + const sig = signPayload(bodyWithoutSig, timestamp, testConfig.secret); + const body = JSON.stringify({ ...bodyObj, signature: sig }); + + const req = new Request("http://localhost/webhook", { + method: "POST", + body, + headers: { "Content-Type": "application/json" }, + }); + + const res = await shortChannel.handleRequest(req); + const data = (await res.json()) as { status: string; task_id?: string }; + expect(res.status).toBe(202); + const taskId = data.task_id as string; + + // Poll before agent completes - should be 202 + const pollReq = new Request(`http://localhost/webhook/poll?task_id=${taskId}`); + const pollRes = await shortChannel.handlePollRequest(pollReq); + expect(pollRes.status).toBe(202); + }); + + test("poll returns 404 for unknown task_id", async () => { + const pollReq = new Request("http://localhost/webhook/poll?task_id=nonexistent"); + const pollRes = await channel.handlePollRequest(pollReq); + expect(pollRes.status).toBe(404); + const data = (await pollRes.json()) as { message: string }; + expect(data.message).toBe("Unknown task_id"); + }); + + test("poll returns 400 without task_id", async () => { + const pollReq = new Request("http://localhost/webhook/poll"); + const pollRes = await channel.handlePollRequest(pollReq); + expect(pollRes.status).toBe(400); + }); }); diff --git a/src/channels/webhook.ts b/src/channels/webhook.ts index b49675ca..b0d378d8 100644 --- a/src/channels/webhook.ts +++ b/src/channels/webhook.ts @@ -42,6 +42,13 @@ type PendingResponse = { timer: ReturnType; }; +type CompletedResponse = { + response: string; + completedAt: number; +}; + +type WaitResult = { type: "success"; text: string } | { type: "timeout"; taskId: string } | { type: "error" }; + export class WebhookChannel implements Channel { readonly id = "webhook"; readonly name = "Webhook"; @@ -59,6 +66,14 @@ export class WebhookChannel implements Channel { private pendingResponses = new Map(); // Track async callback URLs: conversationId -> callbackUrl private callbackUrls = new Map(); + // Track completed responses for polling: taskId -> response + private completedResponses = new Map(); + // Track pending poll requests: taskId -> conversationId + private pendingPolls = new Map(); + // Reverse lookup: conversationId -> taskId + private conversationToTaskId = new Map(); + // Cleanup interval for expired responses + private cleanupInterval: ReturnType | null = null; constructor(config: WebhookChannelConfig) { this.config = config; @@ -66,10 +81,17 @@ export class WebhookChannel implements Channel { async connect(): Promise { this.connected = true; + // Start cleanup interval (every minute, remove responses older than 5 minutes) + this.cleanupInterval = setInterval(() => this.cleanupExpiredResponses(), 60_000); console.log("[webhook] Channel ready"); } async disconnect(): Promise { + // Stop cleanup interval + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = null; + } // Clean up pending responses for (const [, pending] of this.pendingResponses) { clearTimeout(pending.timer); @@ -77,6 +99,9 @@ export class WebhookChannel implements Channel { } this.pendingResponses.clear(); this.callbackUrls.clear(); + this.completedResponses.clear(); + this.pendingPolls.clear(); + this.conversationToTaskId.clear(); this.connected = false; console.log("[webhook] Disconnected"); } @@ -90,6 +115,17 @@ export class WebhookChannel implements Channel { this.pendingResponses.delete(conversationId); } + // Check if this conversation is waiting for polling + const taskId = this.conversationToTaskId.get(conversationId); + if (taskId) { + this.completedResponses.set(taskId, { + response: message.text, + completedAt: Date.now(), + }); + this.pendingPolls.delete(taskId); + this.conversationToTaskId.delete(conversationId); + } + // Check if there's an async callback URL const callbackUrl = this.callbackUrls.get(conversationId); if (callbackUrl) { @@ -191,15 +227,25 @@ export class WebhookChannel implements Channel { // Sync mode: wait for the response const timeoutMs = this.config.syncTimeoutMs ?? 25_000; - const responseText = await this.waitForResponse(conversationId, inbound, timeoutMs); + const result = await this.waitForResponse(conversationId, inbound, timeoutMs); + + if (result.type === "timeout") { + return Response.json( + { + status: "accepted", + task_id: result.taskId, + } satisfies WebhookResponse, + { status: 202 }, + ); + } - if (responseText === null) { + if (result.type === "error") { return Response.json({ status: "error", message: "Response timeout" } satisfies WebhookResponse, { status: 504 }); } return Response.json({ status: "ok", - response: responseText, + response: result.text, } satisfies WebhookResponse); } @@ -207,15 +253,19 @@ export class WebhookChannel implements Channel { conversationId: string, inbound: InboundMessage, timeoutMs: number, - ): Promise { - return new Promise((resolve) => { + ): Promise { + return new Promise((resolve) => { const timer = setTimeout(() => { + // Generate task ID for polling + const taskId = randomUUID(); + this.pendingPolls.set(taskId, conversationId); + this.conversationToTaskId.set(conversationId, taskId); this.pendingResponses.delete(conversationId); - resolve(null); + resolve({ type: "timeout", taskId }); }, timeoutMs); this.pendingResponses.set(conversationId, { - resolve: (text: string) => resolve(text), + resolve: (text: string) => resolve({ type: "success", text }), timer, }); @@ -225,11 +275,43 @@ export class WebhookChannel implements Channel { this.pendingResponses.delete(conversationId); const msg = err instanceof Error ? err.message : String(err); console.error(`[webhook] Error handling sync message: ${msg}`); - resolve(null); + resolve({ type: "error" }); }); }); } + /** + * Handle a polling request for async task results. + * Called from the HTTP server's GET /webhook/poll route. + */ + async handlePollRequest(req: Request): Promise { + const url = new URL(req.url); + const taskId = url.searchParams.get("task_id"); + + if (!taskId) { + return Response.json({ status: "error", message: "Missing task_id parameter" }, { status: 400 }); + } + + // Check completed responses first + const completed = this.completedResponses.get(taskId); + if (completed) { + this.completedResponses.delete(taskId); + return Response.json({ + status: "ok", + response: completed.response, + metadata: { duration_ms: completed.completedAt - (completed.completedAt - 1) }, + } satisfies WebhookResponse); + } + + // Check if still processing + if (this.pendingPolls.has(taskId)) { + return Response.json({ status: "accepted", task_id: taskId } satisfies WebhookResponse, { status: 202 }); + } + + // Unknown task + return Response.json({ status: "error", message: "Unknown task_id" }, { status: 404 }); + } + private async sendCallback(url: string, conversationId: string, text: string): Promise { try { await fetch(url, { @@ -247,13 +329,31 @@ export class WebhookChannel implements Channel { } } - private verifySignature(body: string, timestamp: string, signature: string): boolean { - const payload = `${timestamp}.${body}`; - const hmac = new Bun.CryptoHasher("sha256", this.config.secret); - hmac.update(payload); - const expected = hmac.digest("hex"); + /** + * Remove completed responses older than 5 minutes. + */ + private cleanupExpiredResponses(): void { + const now = Date.now(); + const expiryMs = 5 * 60 * 1000; // 5 minutes + + for (const [taskId, completed] of this.completedResponses) { + if (now - completed.completedAt > expiryMs) { + this.completedResponses.delete(taskId); + } + } + } + private verifySignature(body: string, timestamp: string, signature: string): boolean { try { + // Parse and remove signature field to compute HMAC over body without signature + const parsed = JSON.parse(body); + const { signature: _sig, ...rest } = parsed; + const bodyWithoutSig = JSON.stringify(rest); + const payload = `${timestamp}.${bodyWithoutSig}`; + const hmac = new Bun.CryptoHasher("sha256", this.config.secret); + hmac.update(payload); + const expected = hmac.digest("hex"); + return timingSafeEqual(Buffer.from(signature), Buffer.from(expected)); } catch { return false;