From 89305b5ef04dee35a1bba9e369bafe89af9246c8 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 22 Dec 2025 21:29:54 +0000 Subject: [PATCH] fix(batch): extract the queue name out of an already nested queue option --- .../app/runEngine/concerns/queues.server.ts | 39 +++- references/hello-world/src/trigger/batches.ts | 174 ++++++++++++++++++ 2 files changed, 209 insertions(+), 4 deletions(-) diff --git a/apps/webapp/app/runEngine/concerns/queues.server.ts b/apps/webapp/app/runEngine/concerns/queues.server.ts index 3a853f9b25..0980dc2a75 100644 --- a/apps/webapp/app/runEngine/concerns/queues.server.ts +++ b/apps/webapp/app/runEngine/concerns/queues.server.ts @@ -16,6 +16,35 @@ import { env } from "~/env.server"; import { tryCatch } from "@trigger.dev/core/v3"; import { ServiceValidationError } from "~/v3/services/common.server"; +/** + * Extract the queue name from a queue option that may be: + * - An object with a string `name` property: { name: "queue-name" } + * - A double-wrapped object (bug case): { name: { name: "queue-name", ... } } + * + * This handles the case where the SDK accidentally double-wraps the queue + * option when it's already an object with a name property. + */ +function extractQueueName(queue: { name?: unknown } | undefined): string | undefined { + if (!queue?.name) { + return undefined; + } + + // Normal case: queue.name is a string + if (typeof queue.name === "string") { + return queue.name; + } + + // Double-wrapped case: queue.name is an object with its own name property + if (typeof queue.name === "object" && queue.name !== null && "name" in queue.name) { + const innerName = (queue.name as { name: unknown }).name; + if (typeof innerName === "string") { + return innerName; + } + } + + return undefined; +} + export class DefaultQueueManager implements QueueManager { constructor( private readonly prisma: PrismaClientOrTransaction, @@ -32,8 +61,8 @@ export class DefaultQueueManager implements QueueManager { // Determine queue name based on lockToVersion and provided options if (lockedBackgroundWorker) { // Task is locked to a specific worker version - if (request.body.options?.queue?.name) { - const specifiedQueueName = request.body.options.queue.name; + const specifiedQueueName = extractQueueName(request.body.options?.queue); + if (specifiedQueueName) { // A specific queue name is provided const specifiedQueue = await this.prisma.taskQueue.findFirst({ // Validate it exists for the locked worker @@ -126,8 +155,10 @@ export class DefaultQueueManager implements QueueManager { const { taskId, environment, body } = request; const { queue } = body.options ?? {}; - if (queue?.name) { - return queue.name; + // Use extractQueueName to handle double-wrapped queue objects + const queueName = extractQueueName(queue); + if (queueName) { + return queueName; } const defaultQueueName = `task/${taskId}`; diff --git a/references/hello-world/src/trigger/batches.ts b/references/hello-world/src/trigger/batches.ts index b48272962c..1ca134c9d0 100644 --- a/references/hello-world/src/trigger/batches.ts +++ b/references/hello-world/src/trigger/batches.ts @@ -484,6 +484,180 @@ export const simpleTask = task({ }, }); +// ============================================================================ +// Queue Option Tests +// ============================================================================ + +/** + * Task that runs in a specific queue for testing queue option handling. + */ +export const queuedTask = task({ + id: "queued-task", + queue: { + name: "test-queue-for-batch", + }, + run: async (payload: { index: number; testId: string }) => { + logger.info(`Processing queued task ${payload.index}`, { payload }); + await setTimeout(100); + return { + index: payload.index, + testId: payload.testId, + processedAt: Date.now(), + }; + }, +}); + +/** + * Test: Batch trigger with queue option as object + * + * This test verifies that the queue option works correctly when passed + * through batch trigger. The SDK passes queue as { name: "queue-name" } + * which should be handled correctly by the server. + * + * This tests the fix for the double-wrapping bug where queue objects + * like { name: "queue-name", concurrencyLimit: 20 } could get wrapped + * into { name: { name: "queue-name", concurrencyLimit: 20 } }. + */ +export const batchTriggerWithQueueOption = task({ + id: "batch-trigger-with-queue-option", + maxDuration: 120, + run: async (payload: { count: number; useObjectQueue?: boolean }) => { + const count = payload.count || 5; + const testId = `queue-test-${Date.now()}`; + + // If useObjectQueue is true, we bypass the SDK types to send queue as an object + // This simulates what might happen if someone calls the API directly with wrong format + const queueValue = payload.useObjectQueue + ? ({ name: "test-queue-for-batch", concurrencyLimit: 20 } as unknown as string) + : "test-queue-for-batch"; + + // Generate batch items with queue option specified + const items = Array.from({ length: count }, (_, i) => ({ + payload: { index: i, testId }, + options: { + queue: queueValue, + // Also test with lockToVersion since the error showed workers.some.id + // which only appears in the lockedBackgroundWorker code path + }, + })); + + logger.info("Starting batch trigger with queue option", { + count, + testId, + useObjectQueue: payload.useObjectQueue, + queueValue, + }); + + // Trigger the batch with queue option + const result = await queuedTask.batchTrigger(items); + + logger.info("Batch triggered successfully", { + batchId: result.batchId, + runCount: result.runCount, + }); + + // Wait for runs to complete + await setTimeout(5000); + + // Retrieve batch to check results + const batchResult = await batch.retrieve(result.batchId); + + return { + success: true, + batchId: result.batchId, + runCount: result.runCount, + batchStatus: batchResult.status, + testId, + }; + }, +}); + +/** + * Test: Batch triggerAndWait with queue option + * + * Similar to above but waits for all runs to complete. + */ +export const batchTriggerAndWaitWithQueueOption = task({ + id: "batch-trigger-and-wait-with-queue-option", + maxDuration: 120, + run: async (payload: { count: number }) => { + const count = payload.count || 5; + const testId = `queue-wait-test-${Date.now()}`; + + // Generate items with queue option + const items = Array.from({ length: count }, (_, i) => ({ + payload: { index: i, testId }, + options: { + queue: "test-queue-for-batch", + }, + })); + + logger.info("Starting batch triggerAndWait with queue option", { count, testId }); + + // Trigger and wait + const results = await queuedTask.batchTriggerAndWait(items); + + const successCount = results.runs.filter((r) => r.ok).length; + const outputs = results.runs.filter((r) => r.ok).map((r) => (r.ok ? r.output : null)); + + return { + success: successCount === count, + successCount, + totalCount: count, + outputs, + testId, + }; + }, +}); + +/** + * Test: Streaming batch trigger with queue option + * + * Tests that streaming batches also work correctly with queue options. + */ +export const streamingBatchWithQueueOption = task({ + id: "streaming-batch-with-queue-option", + maxDuration: 120, + run: async (payload: { count: number }) => { + const count = payload.count || 10; + const testId = `stream-queue-test-${Date.now()}`; + + // Async generator that yields items with queue option + async function* generateItems() { + for (let i = 0; i < count; i++) { + yield { + payload: { index: i, testId }, + options: { + queue: "test-queue-for-batch", + }, + }; + } + } + + logger.info("Starting streaming batch with queue option", { count, testId }); + + // Trigger using the generator + const result = await queuedTask.batchTrigger(generateItems()); + + logger.info("Streaming batch triggered", { + batchId: result.batchId, + runCount: result.runCount, + }); + + // Wait and check results + await setTimeout(5000); + const batchResult = await batch.retrieve(result.batchId); + + return { + success: true, + batchId: result.batchId, + runCount: result.runCount, + batchStatus: batchResult.status, + testId, + }; + }, +}); + // ============================================================================ // Large Payload Examples (R2 Offloading) // ============================================================================