Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/forty-houses-doubt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": minor
"@trigger.dev/core": minor
---

Add GCRA rate limiting algorithm for task queue management
6 changes: 6 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,12 @@ const EnvironmentSchema = z
RUN_ENGINE_RATE_LIMIT_REJECTION_LOGS_ENABLED: z.string().default("1"),
RUN_ENGINE_RATE_LIMIT_LIMITER_LOGS_ENABLED: z.string().default("0"),

/**
* Disable queue rate limiting (useful for development and testing).
* When set to "1", rate limit checks on queues will be bypassed.
*/
TRIGGER_DISABLE_QUEUE_RATE_LIMITS: z.string().default("0"),

RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED: z.string().default("0"),
RUN_ENGINE_RELEASE_CONCURRENCY_DISABLE_CONSUMERS: z.string().default("0"),
RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO: z.coerce.number().default(1),
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ export class RunEngineTriggerTaskService {
sdkVersion: lockedToBackgroundWorker?.sdkVersion,
cliVersion: lockedToBackgroundWorker?.cliVersion,
concurrencyKey: body.options?.concurrencyKey,
rateLimitKey: body.options?.rateLimitKey,
queue: queueName,
lockedQueueId,
workerQueue,
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ function createRunEngine() {
scanJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS,
processMarkedJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS,
},
disableRateLimits: env.TRIGGER_DISABLE_QUEUE_RATE_LIMITS === "1",
},
runLock: {
redis: {
Expand Down
30 changes: 30 additions & 0 deletions apps/webapp/app/v3/runQueue.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,21 @@ import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { marqs } from "./marqs/index.server";
import { engine } from "./runEngine.server";

// Re-export pure utility function from durations.ts (testable without env deps)
export { parseDurationToMs } from "./utils/durations";

//This allows us to update MARQS and the RunQueue

/** Rate limit configuration for a queue */
export type QueueRateLimitConfig = {
/** Maximum number of requests allowed in the period */
limit: number;
/** Time window in milliseconds */
periodMs: number;
/** Optional burst allowance (defaults to limit) */
burst?: number;
};

/** Updates MARQS and the RunQueue limits */
export async function updateEnvConcurrencyLimits(
environment: AuthenticatedEnvironment,
Expand Down Expand Up @@ -42,3 +55,20 @@ export async function removeQueueConcurrencyLimits(
engine.runQueue.removeQueueConcurrencyLimits(environment, queueName),
]);
}

/** Updates the rate limit configuration for a queue in Redis */
export async function updateQueueRateLimitConfig(
environment: AuthenticatedEnvironment,
queueName: string,
config: QueueRateLimitConfig
) {
await engine.runQueue.setQueueRateLimitConfig(environment, queueName, config);
}

/** Removes the rate limit configuration for a queue from Redis */
export async function removeQueueRateLimitConfig(
environment: AuthenticatedEnvironment,
queueName: string
) {
await engine.runQueue.removeQueueRateLimitConfig(environment, queueName);
}
68 changes: 67 additions & 1 deletion apps/webapp/app/v3/services/createBackgroundWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@ import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic";
import type { BackgroundWorker, TaskQueue, TaskQueueType } from "@trigger.dev/database";
import cronstrue from "cronstrue";
import { Prisma, PrismaClientOrTransaction } from "~/db.server";
import { env } from "~/env.server";
import { sanitizeQueueName } from "~/models/taskQueue.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { generateFriendlyId } from "../friendlyIdentifiers";
import {
parseDurationToMs,
removeQueueConcurrencyLimits,
removeQueueRateLimitConfig,
updateEnvConcurrencyLimits,
updateQueueConcurrencyLimits,
updateQueueRateLimitConfig,
type QueueRateLimitConfig,
} from "../runQueue.server";
import { calculateNextBuildVersion } from "../utils/calculateNextBuildVersion";
import { clampMaxDuration } from "../utils/maxDuration";
Expand Down Expand Up @@ -255,6 +260,7 @@ async function createWorkerTask(
{
name: task.queue?.name ?? `task/${task.id}`,
concurrencyLimit: task.queue?.concurrencyLimit,
rateLimit: task.queue?.rateLimit,
},
task.id,
task.queue?.name ? "NAMED" : "VIRTUAL",
Expand Down Expand Up @@ -364,9 +370,27 @@ async function createWorkerQueue(
? Math.max(Math.min(queue.concurrencyLimit, environment.maximumConcurrencyLimit), 0)
: queue.concurrencyLimit;

let rateLimitConfig: QueueRateLimitConfig | null = null;
if (queue.rateLimit) {
try {
rateLimitConfig = {
limit: queue.rateLimit.limit,
periodMs: parseDurationToMs(queue.rateLimit.period),
burst: queue.rateLimit.burst,
};
} catch (error) {
logger.error("createWorkerQueue: invalid rate limit period format", {
queueName,
rateLimit: queue.rateLimit,
error,
});
}
}

const taskQueue = await upsertWorkerQueueRecord(
queueName,
baseConcurrencyLimit ?? null,
rateLimitConfig,
orderableName,
queueType,
worker,
Expand Down Expand Up @@ -397,8 +421,36 @@ async function createWorkerQueue(
});
await removeQueueConcurrencyLimits(environment, taskQueue.name);
}

// Handle rate limit config sync to Redis
if (env.TRIGGER_DISABLE_QUEUE_RATE_LIMITS === "1") {
// Rate limiting disabled: remove any existing config from Redis
// This ensures clean state when toggling the flag
logger.debug("createWorkerQueue: rate limiting disabled by env flag, removing config", {
workerId: worker.id,
taskQueue: taskQueue.name,
orgId: environment.organizationId,
projectId: environment.projectId,
environmentId: environment.id,
});
await removeQueueRateLimitConfig(environment, taskQueue.name);
} else if (rateLimitConfig) {
// Rate limiting enabled and config exists: sync to Redis
logger.debug("createWorkerQueue: updating rate limit config", {
workerId: worker.id,
taskQueue: taskQueue.name,
orgId: environment.organizationId,
projectId: environment.projectId,
environmentId: environment.id,
rateLimitConfig,
});
await updateQueueRateLimitConfig(environment, taskQueue.name, rateLimitConfig);
} else {
// Rate limiting enabled but no config: remove any stale config
await removeQueueRateLimitConfig(environment, taskQueue.name);
}
} else {
logger.debug("createWorkerQueue: queue is paused, not updating concurrency limit", {
logger.debug("createWorkerQueue: queue is paused, not updating limits", {
workerId: worker.id,
taskQueue,
orgId: environment.organizationId,
Expand All @@ -413,6 +465,7 @@ async function createWorkerQueue(
async function upsertWorkerQueueRecord(
queueName: string,
concurrencyLimit: number | null,
rateLimitConfig: QueueRateLimitConfig | null,
orderableName: string,
queueType: TaskQueueType,
worker: BackgroundWorker,
Expand All @@ -431,6 +484,15 @@ async function upsertWorkerQueueRecord(
},
});

// Serialize rate limit config for storage (or null to clear)
const rateLimitData = rateLimitConfig
? {
limit: rateLimitConfig.limit,
periodMs: rateLimitConfig.periodMs,
burst: rateLimitConfig.burst,
}
: Prisma.JsonNull;

if (!taskQueue) {
taskQueue = await prisma.taskQueue.create({
data: {
Expand All @@ -439,6 +501,7 @@ async function upsertWorkerQueueRecord(
name: queueName,
orderableName,
concurrencyLimit,
rateLimit: rateLimitData,
runtimeEnvironmentId: worker.runtimeEnvironmentId,
projectId: worker.projectId,
type: queueType,
Expand All @@ -463,6 +526,8 @@ async function upsertWorkerQueueRecord(
// If overridden, keep current limit and update base; otherwise update limit normally
concurrencyLimit: hasOverride ? undefined : concurrencyLimit,
concurrencyLimitBase: hasOverride ? concurrencyLimit : undefined,
// Always update rate limit config (not overrideable for now)
rateLimit: rateLimitData,
},
});
}
Expand All @@ -474,6 +539,7 @@ async function upsertWorkerQueueRecord(
return await upsertWorkerQueueRecord(
queueName,
concurrencyLimit,
rateLimitConfig,
orderableName,
queueType,
worker,
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/v3/services/enqueueRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export type EnqueueRunOptions = {
env: AuthenticatedEnvironment;
run: TaskRun;
dependentRun?: { queue: string; id: string };
rateLimitKey?: string;
};

export type EnqueueRunResult =
Expand All @@ -22,6 +23,7 @@ export async function enqueueRun({
env,
run,
dependentRun,
rateLimitKey,
}: EnqueueRunOptions): Promise<EnqueueRunResult> {
// If this is a triggerAndWait or batchTriggerAndWait,
// we need to add the parent run to the reserve concurrency set
Expand All @@ -39,6 +41,8 @@ export async function enqueueRun({
projectId: env.projectId,
environmentId: env.id,
environmentType: env.type,
// Include rateLimitKey in message payload for dequeue-time checks
rateLimitKey,
},
run.concurrencyKey ?? undefined,
run.queueTimestamp ?? undefined,
Expand Down
32 changes: 32 additions & 0 deletions apps/webapp/app/v3/utils/durations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Parse a duration string (e.g., "1s", "100ms", "5m", "1h", "1d") to milliseconds.
* @throws Error if the duration string is invalid
*/
export function parseDurationToMs(duration: string): number {
const match = duration.match(/^(\d+(?:\.\d+)?)(ms|s|m|h|d)$/);

if (!match) {
throw new Error(
`Invalid duration string: "${duration}". Expected format: number + unit (ms, s, m, h, d)`
);
}

const [, value, unit] = match;
const numValue = parseFloat(value);

switch (unit) {
case "ms":
return Math.round(numValue);
case "s":
return Math.round(numValue * 1000);
case "m":
return Math.round(numValue * 60 * 1000);
case "h":
return Math.round(numValue * 60 * 60 * 1000);
case "d":
return Math.round(numValue * 24 * 60 * 60 * 1000);
default:
throw new Error(`Unknown duration unit: ${unit}`);
}
}

47 changes: 47 additions & 0 deletions apps/webapp/test/parseDurationToMs.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { describe, it, expect } from "vitest";
import { parseDurationToMs } from "~/v3/utils/durations";

describe("parseDurationToMs", () => {
it("parses milliseconds", () => {
expect(parseDurationToMs("100ms")).toBe(100);
expect(parseDurationToMs("1500ms")).toBe(1500);
expect(parseDurationToMs("0ms")).toBe(0);
});

it("parses seconds", () => {
expect(parseDurationToMs("1s")).toBe(1000);
expect(parseDurationToMs("30s")).toBe(30000);
expect(parseDurationToMs("1.5s")).toBe(1500);
expect(parseDurationToMs("0.5s")).toBe(500);
});

it("parses minutes", () => {
expect(parseDurationToMs("1m")).toBe(60000);
expect(parseDurationToMs("5m")).toBe(300000);
expect(parseDurationToMs("0.5m")).toBe(30000);
});

it("parses hours", () => {
expect(parseDurationToMs("1h")).toBe(3600000);
expect(parseDurationToMs("24h")).toBe(86400000);
expect(parseDurationToMs("0.5h")).toBe(1800000);
});

it("parses days", () => {
expect(parseDurationToMs("1d")).toBe(86400000);
expect(parseDurationToMs("7d")).toBe(604800000);
});

it("throws on invalid format", () => {
expect(() => parseDurationToMs("invalid")).toThrow();
expect(() => parseDurationToMs("1x")).toThrow();
expect(() => parseDurationToMs("")).toThrow();
expect(() => parseDurationToMs("ms")).toThrow();
expect(() => parseDurationToMs("10")).toThrow();
});

it("throws on negative values (invalid regex)", () => {
expect(() => parseDurationToMs("-1s")).toThrow();
});
});

1 change: 1 addition & 0 deletions internal-packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ model TaskRun {
priorityMs Int @default(0)

concurrencyKey String?
rateLimitKey String?

delayUntil DateTime?
queuedAt DateTime?
Expand Down
5 changes: 5 additions & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ export class RunEngine {
masterQueueConsumersIntervalMs: options.queue?.masterQueueConsumersIntervalMs,
processWorkerQueueDebounceMs: options.queue?.processWorkerQueueDebounceMs,
dequeueBlockingTimeoutSeconds: options.queue?.dequeueBlockingTimeoutSeconds,
disableRateLimits: options.queue?.disableRateLimits,
meter: options.meter,
});

Expand Down Expand Up @@ -405,6 +406,7 @@ export class RunEngine {
sdkVersion,
cliVersion,
concurrencyKey,
rateLimitKey,
workerQueue,
queue,
lockedQueueId,
Expand Down Expand Up @@ -554,6 +556,7 @@ export class RunEngine {
sdkVersion,
cliVersion,
concurrencyKey,
rateLimitKey,
queue,
lockedQueueId,
workerQueue,
Expand Down Expand Up @@ -668,6 +671,7 @@ export class RunEngine {

if (taskRun.delayUntil) {
// Schedule the run to be enqueued at the delayUntil time
// Note: rateLimitKey is not passed for delayed runs - it will need to be stored on the run if needed
await this.delayedRunSystem.scheduleDelayedRunEnqueuing({
runId: taskRun.id,
delayUntil: taskRun.delayUntil,
Expand Down Expand Up @@ -705,6 +709,7 @@ export class RunEngine {
runnerId,
tx: prisma,
skipRunLock: true,
rateLimitKey,
});
}

Expand Down
Loading