Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
342 changes: 162 additions & 180 deletions internal-packages/run-engine/src/batch-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -537,152 +537,101 @@ export class BatchQueue {
}): Promise<void> {
const { batchId, friendlyId, itemIndex, item } = ctx.message.payload;

return this.#startSpan(
"BatchQueue.handleMessage",
async (span) => {
span?.setAttributes({
"batch.id": batchId,
"batch.friendlyId": friendlyId,
"batch.itemIndex": itemIndex,
"batch.task": item.task,
"batch.consumerId": ctx.consumerId,
"batch.attempt": ctx.message.attempt,
});

// Record queue time metric (time from enqueue to processing)
const queueTimeMs = Date.now() - ctx.message.timestamp;
this.itemQueueTimeHistogram?.record(queueTimeMs, { envId: ctx.queue.tenantId });
span?.setAttribute("batch.queueTimeMs", queueTimeMs);

this.logger.debug("Processing batch item", {
batchId,
friendlyId,
itemIndex,
task: item.task,
consumerId: ctx.consumerId,
attempt: ctx.message.attempt,
queueTimeMs,
});

if (!this.processItemCallback) {
this.logger.error("No process item callback set", { batchId, itemIndex });
// Still complete the message to avoid blocking
await ctx.complete();
return;
}
return this.#startSpan("BatchQueue.handleMessage", async (span) => {
span?.setAttributes({
"batch.id": batchId,
"batch.friendlyId": friendlyId,
"batch.itemIndex": itemIndex,
"batch.task": item.task,
"batch.consumerId": ctx.consumerId,
"batch.attempt": ctx.message.attempt,
});

// Get batch metadata
const meta = await this.#startSpan("BatchQueue.getMeta", async () => {
return this.completionTracker.getMeta(batchId);
});
// Record queue time metric (time from enqueue to processing)
const queueTimeMs = Date.now() - ctx.message.timestamp;
this.itemQueueTimeHistogram?.record(queueTimeMs, { envId: ctx.queue.tenantId });
span?.setAttribute("batch.queueTimeMs", queueTimeMs);

if (!meta) {
this.logger.error("Batch metadata not found", { batchId, itemIndex });
await ctx.complete();
return;
}
this.logger.debug("Processing batch item", {
batchId,
friendlyId,
itemIndex,
task: item.task,
consumerId: ctx.consumerId,
attempt: ctx.message.attempt,
queueTimeMs,
});

span?.setAttributes({
"batch.runCount": meta.runCount,
"batch.environmentId": meta.environmentId,
});
if (!this.processItemCallback) {
this.logger.error("No process item callback set", { batchId, itemIndex });
// Still complete the message to avoid blocking
await ctx.complete();
return;
}

let processedCount: number;
// Get batch metadata
const meta = await this.#startSpan("BatchQueue.getMeta", async () => {
return this.completionTracker.getMeta(batchId);
});

try {
const result = await this.#startSpan(
"BatchQueue.processItemCallback",
async (innerSpan) => {
innerSpan?.setAttributes({
"batch.id": batchId,
"batch.itemIndex": itemIndex,
"batch.task": item.task,
});
return this.processItemCallback!({
batchId,
friendlyId,
itemIndex,
item,
meta,
});
}
);
if (!meta) {
this.logger.error("Batch metadata not found", { batchId, itemIndex });
await ctx.complete();
return;
}

if (result.success) {
span?.setAttribute("batch.result", "success");
span?.setAttribute("batch.runId", result.runId);
span?.setAttributes({
"batch.runCount": meta.runCount,
"batch.environmentId": meta.environmentId,
});

// Pass itemIndex for idempotency - prevents double-counting on redelivery
processedCount = await this.#startSpan(
"BatchQueue.recordSuccess",
async () => {
return this.completionTracker.recordSuccess(batchId, result.runId, itemIndex);
}
);
let processedCount: number;

this.itemsProcessedCounter?.add(1, { envId: meta.environmentId });
this.logger.debug("Batch item processed successfully", {
try {
const result = await this.#startSpan(
"BatchQueue.processItemCallback",
async (innerSpan) => {
innerSpan?.setAttributes({
"batch.id": batchId,
"batch.itemIndex": itemIndex,
"batch.task": item.task,
});
return this.processItemCallback!({
batchId,
friendlyId,
itemIndex,
runId: result.runId,
processedCount,
expectedCount: meta.runCount,
item,
meta,
});
} else {
span?.setAttribute("batch.result", "failure");
span?.setAttribute("batch.error", result.error);
if (result.errorCode) {
span?.setAttribute("batch.errorCode", result.errorCode);
}
}
);

// For offloaded payloads (payloadType: "application/store"), payload is already an R2 path
// For inline payloads, store the full payload - it's under the offload threshold anyway
const payloadStr = await this.#startSpan(
"BatchQueue.serializePayload",
async (innerSpan) => {
const str =
typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload);
innerSpan?.setAttribute("batch.payloadSize", str.length);
return str;
}
);

processedCount = await this.#startSpan(
"BatchQueue.recordFailure",
async () => {
return this.completionTracker.recordFailure(batchId, {
index: itemIndex,
taskIdentifier: item.task,
payload: payloadStr,
options: item.options,
error: result.error,
errorCode: result.errorCode,
});
}
);

this.itemsFailedCounter?.add(1, {
envId: meta.environmentId,
errorCode: result.errorCode,
});
if (result.success) {
span?.setAttribute("batch.result", "success");
span?.setAttribute("batch.runId", result.runId);

this.logger.error("Batch item processing failed", {
batchId,
itemIndex,
error: result.error,
processedCount,
expectedCount: meta.runCount,
});
// Pass itemIndex for idempotency - prevents double-counting on redelivery
processedCount = await this.#startSpan("BatchQueue.recordSuccess", async () => {
return this.completionTracker.recordSuccess(batchId, result.runId, itemIndex);
});

this.itemsProcessedCounter?.add(1, { envId: meta.environmentId });
this.logger.debug("Batch item processed successfully", {
batchId,
itemIndex,
runId: result.runId,
processedCount,
expectedCount: meta.runCount,
});
} else {
span?.setAttribute("batch.result", "failure");
span?.setAttribute("batch.error", result.error);
if (result.errorCode) {
span?.setAttribute("batch.errorCode", result.errorCode);
}
} catch (error) {
span?.setAttribute("batch.result", "unexpected_error");
span?.setAttribute(
"batch.error",
error instanceof Error ? error.message : String(error)
);

// Unexpected error during processing
// For offloaded payloads, payload is an R2 path; for inline payloads, store full payload
// For offloaded payloads (payloadType: "application/store"), payload is already an R2 path
// For inline payloads, store the full payload - it's under the offload threshold anyway
const payloadStr = await this.#startSpan(
"BatchQueue.serializePayload",
async (innerSpan) => {
Expand All @@ -693,56 +642,92 @@ export class BatchQueue {
}
);

processedCount = await this.#startSpan(
"BatchQueue.recordFailure",
async () => {
return this.completionTracker.recordFailure(batchId, {
index: itemIndex,
taskIdentifier: item.task,
payload: payloadStr,
options: item.options,
error: error instanceof Error ? error.message : String(error),
errorCode: "UNEXPECTED_ERROR",
});
}
);
processedCount = await this.#startSpan("BatchQueue.recordFailure", async () => {
return this.completionTracker.recordFailure(batchId, {
index: itemIndex,
taskIdentifier: item.task,
payload: payloadStr,
options: item.options,
error: result.error,
errorCode: result.errorCode,
});
});

this.itemsFailedCounter?.add(1, {
envId: meta.environmentId,
errorCode: "UNEXPECTED_ERROR",
errorCode: result.errorCode,
});
this.logger.error("Unexpected error processing batch item", {

this.logger.error("Batch item processing failed", {
batchId,
itemIndex,
error: error instanceof Error ? error.message : String(error),
error: result.error,
processedCount,
expectedCount: meta.runCount,
});
}
} catch (error) {
span?.setAttribute("batch.result", "unexpected_error");
span?.setAttribute("batch.error", error instanceof Error ? error.message : String(error));

// Unexpected error during processing
// For offloaded payloads, payload is an R2 path; for inline payloads, store full payload
const payloadStr = await this.#startSpan(
"BatchQueue.serializePayload",
async (innerSpan) => {
const str =
typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload);
innerSpan?.setAttribute("batch.payloadSize", str.length);
return str;
}
);

processedCount = await this.#startSpan("BatchQueue.recordFailure", async () => {
return this.completionTracker.recordFailure(batchId, {
index: itemIndex,
taskIdentifier: item.task,
payload: payloadStr,
options: item.options,
error: error instanceof Error ? error.message : String(error),
errorCode: "UNEXPECTED_ERROR",
});
});

span?.setAttribute("batch.processedCount", processedCount);

// Complete the FairQueue message (no retry for batch items)
// This must happen after recording success/failure to ensure the counter
// is updated before the message is considered done
await this.#startSpan("BatchQueue.completeMessage", async () => {
return ctx.complete();
this.itemsFailedCounter?.add(1, {
envId: meta.environmentId,
errorCode: "UNEXPECTED_ERROR",
});
this.logger.error("Unexpected error processing batch item", {
batchId,
itemIndex,
error: error instanceof Error ? error.message : String(error),
processedCount,
expectedCount: meta.runCount,
});
}

// Check if all items have been processed using atomic counter
// This is safe even with multiple concurrent consumers because
// the processedCount is atomically incremented and we only trigger
// finalization when we see the exact final count
if (processedCount === meta.runCount) {
this.logger.debug("All items processed, finalizing batch", {
batchId,
processedCount,
expectedCount: meta.runCount,
});
await this.#finalizeBatch(batchId, meta);
}
span?.setAttribute("batch.processedCount", processedCount);

// Complete the FairQueue message (no retry for batch items)
// This must happen after recording success/failure to ensure the counter
// is updated before the message is considered done
await this.#startSpan("BatchQueue.completeMessage", async () => {
return ctx.complete();
});

// Check if all items have been processed using atomic counter
// This is safe even with multiple concurrent consumers because
// the processedCount is atomically incremented and we only trigger
// finalization when we see the exact final count
if (processedCount === meta.runCount) {
this.logger.debug("All items processed, finalizing batch", {
batchId,
processedCount,
expectedCount: meta.runCount,
});
await this.#finalizeBatch(batchId, meta);
}
);
});
}

/**
Expand All @@ -757,19 +742,16 @@ export class BatchQueue {
"batch.environmentId": meta.environmentId,
});

const result = await this.#startSpan(
"BatchQueue.getCompletionResult",
async (innerSpan) => {
const completionResult = await this.completionTracker.getCompletionResult(batchId);
innerSpan?.setAttributes({
"batch.successfulRunCount": completionResult.successfulRunCount,
"batch.failedRunCount": completionResult.failedRunCount,
"batch.runIdsCount": completionResult.runIds.length,
"batch.failuresCount": completionResult.failures.length,
});
return completionResult;
}
);
const result = await this.#startSpan("BatchQueue.getCompletionResult", async (innerSpan) => {
const completionResult = await this.completionTracker.getCompletionResult(batchId);
innerSpan?.setAttributes({
"batch.successfulRunCount": completionResult.successfulRunCount,
"batch.failedRunCount": completionResult.failedRunCount,
"batch.runIdsCount": completionResult.runIds.length,
"batch.failuresCount": completionResult.failures.length,
});
return completionResult;
});

span?.setAttributes({
"batch.successfulRunCount": result.successfulRunCount,
Expand Down
Loading