Skip to content

feat(events): pub/sub event system#3179

Closed
giovaborgogno wants to merge 65 commits intotriggerdotdev:mainfrom
giovaborgogno:feat/pubsub-event-system
Closed

feat(events): pub/sub event system#3179
giovaborgogno wants to merge 65 commits intotriggerdotdev:mainfrom
giovaborgogno:feat/pubsub-event-system

Conversation

@giovaborgogno
Copy link

Summary

Adds a complete pub/sub event system to Trigger.dev, enabling event-driven task orchestration:

  • Event definitionscreateEvent() SDK function to define typed events with optional JSON Schema validation
  • Event subscriptionson: { event } option on tasks to subscribe with filters, patterns, and consumer groups
  • Publish & fan-outevents.publish() triggers all matching subscribers with content-based filtering, pattern matching, and idempotency
  • Publish-and-waitevents.publishAndWait() blocks the parent run until all subscriber runs complete
  • Ordering keys — Per-key serialized execution via concurrency keys
  • Rate limiting — Event-level and per-subscriber rate limits (Redis-backed with in-memory fallback)
  • Consumer groups — FNV-1a hash-based deterministic routing within groups
  • Dead letter queue — Failed event deliveries tracked, with retry/discard management via API
  • Schema registry — Versioned payload schemas with validation on publish
  • Event log — ClickHouse-backed event history with stats/metrics endpoints
  • Replay — Re-publish historical events by time range via API
  • CLI commandstrigger events history, trigger events replay, trigger events dlq list/retry
  • Stale subscription cleanup — Automated daily cleanup of disabled subscriptions for removed tasks
  • Documentation — SDK rules (rules/4.4.0/events.md), Claude Code skill, reference project

Database changes

  • New models: EventDefinition, EventSubscription, DeadLetterEntry
  • New migration to drop redundant composite index (CONCURRENTLY)
  • ClickHouse: raw_event_log_v1 table with materialized view for event metrics

Test plan

  • publishEvent.test.ts — 26 tests (fan-out, filters, patterns, consumer groups, ordering, idempotency, rate limits, payload size limit, publish-and-wait)
  • deadLetterManagement.test.ts — 8 tests (list, discard, retry, retry nonexistent/discarded, retryAll empty)
  • schemaRegistryDb.test.ts — 6 tests (register, upsert, getSchema latest/versioned/nonexistent, listSchemas with counts)
  • eventRateLimiter.test.ts — 12 tests (in-memory + Redis sliding window)
  • replayEvents.test.ts — replay service tests
  • All builds pass (core, sdk, webapp)
  • Typecheck passes (42/42)

🤖 Generated with Claude Code

giovaborgogno and others added 30 commits February 27, 2026 00:12
…ensions

- Create event() function in SDK (packages/trigger-sdk/src/v3/events.ts)
  with EventDefinition interface, type-safe schema inference, publish/batchPublish stubs
- Add EventManifest schema to core (id, version, description)
- Add onEvent field to TaskMetadata for event subscriptions
- Add events array to WorkerManifest
- Extend ResourceCatalog interface with registerEventMetadata, getEvent,
  listEventManifests, getTasksForEvent
- Implement in StandardResourceCatalog with event→tasks reverse index
- Add noop implementations in NoopResourceCatalog
- Wire up ResourceCatalogAPI proxy methods
- Export event, EventDefinition, and related types from SDK

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add EventSource interface to core types (minimal event reference)
- Add TaskOptionsWithEvent type for tasks subscribing via `on: event`
- Add createTask overload for event-subscribed tasks
- Pass onEvent to registerTaskMetadata when task has `on` property
- Export EventSource, TaskOptionsWithEvent from SDK

Usage:
  const orderCreated = event({ id: "order.created", schema: z.object({...}) });
  const sendEmail = task({
    id: "send-email",
    on: orderCreated,
    run: async (payload) => { /* payload is typed from event schema */ },
  });

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…e models

Add Prisma models for EventDefinition and EventSubscription, plus
onEventSlug column on BackgroundWorkerTask. Includes indexes and
unique constraints for efficient event routing lookups.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ploy

Wire the event pipeline end-to-end from index workers through to the
database:
- Add onEvent to TaskResource, events to BackgroundWorkerMetadata
- Include listEventManifests() in both dev and managed index workers
- Pass events through devSupervisor and managed-index-controller
- Upsert EventDefinition and EventSubscription during worker creation
- Set onEventSlug on BackgroundWorkerTask records
- Disable stale subscriptions from previous deploys
- Fix EventManifest.version to required string (avoid Zod input/output
  type mismatch in tshy composite builds)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…vice

Add the event publishing pipeline:
- PublishEventRequestBody/Response schemas in core/schemas/api.ts
- BatchPublishEventRequestBody/Response schemas for batch publishing
- PublishEventService: fan-out engine that triggers subscribed tasks
  with per-consumer idempotency keys and partial failure handling
- POST /api/v1/events/:eventId/publish endpoint
- POST /api/v1/events/:eventId/batchPublish endpoint

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add publishEvent() and batchPublishEvent() methods to ApiClient.
Wire up EventDefinition.publish() and .batchPublish() in the SDK
to call the backend API endpoints, with schema validation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add 6 containerTest tests covering:
- publish with no subscribers → 0 runs
- publish with 3 subscribers → 3 runs
- publish nonexistent event → 404 error
- disabled subscription is skipped
- partial trigger failure does not affect other subscribers
- idempotency key prevents duplicate fan-out

Refactor PublishEventService to accept injectable TriggerFn
for testability while keeping the default behavior unchanged.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add compatibleVersions, deprecatedAt, deprecatedMessage fields to
EventDefinition model. Add schema field to EventManifest. Create
SchemaRegistryService with registerSchema, getSchema, listSchemas,
validatePayload (using ajv), and checkCompatibility.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add GET /api/v1/events (list), GET /api/v1/events/:eventId (detail),
and GET /api/v1/events/:eventId/schema (JSON schema) endpoints.
Add corresponding response schemas and API client methods in core.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…at publish

- SDK event() stores raw schema in resource catalog
- CLI indexers convert event schemas to JSON Schema via schemaToJsonSchema
- syncWorkerEvents stores JSON schema in EventDefinition.schema field
- PublishEventService validates payloads against stored schemas using ajv
- Extends ResourceCatalog with getEventSchema() method

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add unit tests for SchemaRegistryService (validation, compatibility).
Add integration tests for publish with schema validation (reject
invalid, accept valid, skip when no schema). Add changeset for
affected public packages.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- compileFilter/evaluateFilter wraps existing eventFilterMatches with caching
- compilePattern/matchesPattern for wildcard patterns (*, #)
- 28 filter evaluator tests + 30 pattern matcher tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add onEventFilter to TaskMetadata schema
- Add filter option to TaskOptionsWithEvent
- Extract filter in shared.ts and pass through metadata
- Store filter in EventSubscription during syncWorkerEvents

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Evaluate subscription.filter against payload before triggering
- Non-matching subscribers are skipped (no run created)
- Malformed filters err on side of delivery (log warning, don't block)
- 3 new integration tests: filter skips, filter allows, complex multi-field

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add events.match() helper in SDK for pattern subscriptions (*, #)
- Add onEventPattern to TaskMetadata schema
- Store pattern in EventSubscription during deploy
- PublishEventService evaluates pattern subscriptions during fan-out
- Pattern + filter combination supported
- 4 new integration tests: *.matches, *.rejects, #.multi-level, pattern+filter

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tion

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add orderingKey to publish options. When an ordering key is present,
events with the same key are processed sequentially per consumer by
mapping the ordering key to a concurrencyKey on the triggered run.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add consumerGroup option to task event subscriptions. Within a consumer
group, only one task receives each event (round-robin selection).
Tasks without a consumer group continue to receive all events normally.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…roups

3 new tests:
- ordering key sets concurrencyKey on triggered runs
- consumer group: only one task in group receives each event
- consumer group: multiple groups + ungrouped tasks all work

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…sumer groups

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add waitForEvent to RuntimeManager that waits for all event subscriber
runs to complete and returns aggregated results keyed by task slug.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
giovaborgogno and others added 25 commits February 28, 2026 17:49
…mentation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add rules/4.4.0/events.md covering: event definition, publish/subscribe,
content-based filters, wildcard patterns, publishAndWait, ordering keys,
consumer groups, batch publish, validation, DLQ, and replay.

Update manifest.json to v4.4.0 with events option.
Update SKILL.md with events section and reference.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Demo project showcasing all event system features:
- events.ts: event definitions with schemas, rate limits
- basic-subscribers.ts: fan-out to multiple tasks
- filtered-subscribers.ts: content-based filtering
- pattern-subscribers.ts: wildcard patterns (*, #)
- publish-and-wait.ts: scatter-gather orchestration
- consumer-groups.ts: load-balanced event handling
- ordering.ts: sequential processing per entity

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add `trigger events list` and `trigger events publish` commands:
- events/index.ts: parent command registration
- events/list.ts: list event definitions with subscriber count
- events/publish.ts: publish event with JSON payload
- apiClient.ts: listEvents() and publishEvent() methods

Usage:
  trigger events list
  trigger events publish order.created --payload '{"orderId":"123"}'

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… bug, ClickHouse in dev

Found during live E2E testing:
- 9.5: orderingKey doesn't guarantee strict ordering (Trigger.dev concurrencyKey limitation)
- 9.6: payloads >512KB cause silent fan-out failure (0 runs, HTTP 200)
- 9.7: ClickHouse tables not created in dev (stats/history/replay return 500)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ur bug

Root cause: TriggerTaskService tries S3 offload for >512KB payloads.
Without object store credentials (local dev), all subscriber triggers
fail silently. Same behavior as regular tasks.trigger() without object store.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…rency limit

Run engine changes:
- Add globalConcurrencyLimit (gcl) and globalCurrentConcurrency (gcc) Redis keys
- Modify dequeueMessagesFromQueue Lua: check global limit when gcl exists
- Modify releaseConcurrency Lua: SREM from gcc set
- Modify enqueueMessage/enqueueMessageWithTtl: SREM from gcc on re-enqueue
- No impact on existing queues (gcl check only runs when key exists)

Event system changes:
- PublishEventService overrides queue to `evt-order:{eventSlug}` when orderingKey present
- Deploy creates ordering queue with concurrencyLimit:1 (per-key) + global limit
- SDK event() accepts `ordering: { concurrencyLimit: N }` config
- EventManifest/EventMetadata include ordering field

Behavior: orderingKey guarantees strict per-key ordering (1 at a time per key)
while concurrencyLimit controls total parallel runs across all keys.

Run-engine tests: 236 pass, 2 fail (pre-existing flaky, not caused by this change)
Event integration tests: 24/24 pass

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ency release

Previous commit only updated dequeue/release/enqueue Lua scripts but missed:
- acknowledgeMessage: runs completing weren't releasing global concurrency
- nackMessage: nacked runs weren't releasing global concurrency
- moveToDeadLetterQueue: DLQ'd runs weren't releasing
- clearMessageFromConcurrencySets: cleanup wasn't releasing

Also: set globalConcurrencyLimit on subscriber task's queue (not dedicated queue)
since the dev worker only monitors task queues, not custom queues.

Removed queue override from PublishEventService — runs stay in the task's
own queue and ordering is enforced by concurrencyKey + concurrencyLimit:1
+ globalConcurrencyLimit:N.

E2E verified: globalConcurrencyLimit=2 with 3 keys correctly limits to max
2 concurrent runs while maintaining per-key ordering.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- 10.1: Fix expireTtlRuns Lua global concurrency slot leak (CRITICAL)
  Add SREM for globalCurrentConcurrency in TTL expiration script
- 10.2: Fix clearMessageFromConcurrencySets bare queue name (HIGH)
  Add queueGlobalCurrentConcurrencyKey(env, queue) to build correct key
- 10.3: Add .max(100) to batch publish items array (HIGH)
- 10.4: Fix publishAndWait schema — move parentRunId to top-level (HIGH)
- 10.5: ClickHouse interval already safe (whitelist map, not interpolation)
- 10.6: Add @@index([projectId, environmentId, enabled]) to EventSubscription
- 10.7: Fix batch publish partial failure — per-item error handling with 207

Also tightens Zod schemas: z.any() → z.unknown(), idempotencyKey .max(256),
metadata → z.record(z.unknown())

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- 11.1: Fix N+1 in DLQ retryAll — inline retry logic, share TriggerTaskService
- 11.2: Add 512KB payload size check before fan-out (returns 413)
- 11.3: Add try/catch with ServiceValidationError handling to events routes
- 11.4: Add --delay, --tags, --idempotency-key, --ordering-key to CLI publish

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- 13.1: Add LRU eviction to validatorCache, filterCache, patternCache (max 1000)
- 13.2: Zod schema tightening done in phase 10 commit

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add testcontainer-based tests for DeadLetterService and
DeadLetterManagementService covering DLQ entry creation,
field correctness, pagination, filtering, and discard flow.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…askResource schema

- Fix resource callback signature in history/stats loader routes (first arg is resource, not params)
- Add missing onEventFilter and onEventPattern to TaskResource schema in resources.ts
- Fix JSON.parse return type narrowing in DeadLetterService.extractPayload

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add `dlq` option to `event()` allowing per-event DLQ configuration.
When `dlq.enabled` is false, failed event-triggered runs are silently
discarded instead of being stored in the dead letter queue.

- Add EventDLQConfig type to SDK and EventDLQManifest schema to core
- Add dlqConfig JSON column to EventDefinition model
- Wire config through deploy (createBackgroundWorker) and resource catalog
- DeadLetterService checks config before creating DLQ entries

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…rker

Add CleanupStaleSubscriptionsService that finds disabled EventSubscriptions
whose associated task no longer exists in any active worker, and deletes them.
Runs daily at 3 AM UTC via the admin worker cron.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add remaining event CLI commands:
- `trigger events history <eventId>` — paginated event publish history
- `trigger events replay <eventId>` — replay historical events to subscribers
- `trigger events dlq list` — list dead letter queue entries
- `trigger events dlq retry <id>` — retry a specific DLQ entry

Adds corresponding CliApiClient methods: getEventHistory, replayEvents,
listDeadLetterEvents, retryDeadLetterEvent.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add 3 redisTest integration tests for RedisEventRateLimitChecker:
- allows requests within limit
- blocks requests exceeding limit
- isolates keys from each other

Uses @internal/testcontainers redisTest fixture with ioredis adapter.
Total: 14 tests (11 existing + 3 new Redis tests), all passing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Consumer-side rate limiting:
- Add `rateLimit` JSON field to EventSubscription model
- Add `consumerRateLimit` option to TaskOptionsWithEvent in SDK
- Propagate through manifest pipeline → createBackgroundWorker
- PublishEventService checks per-subscriber rate limits during fan-out
- Rate-limited subscribers are skipped with warning log

Metrics endpoint:
- Add GET /api/v1/events/:eventId/metrics for backpressure monitoring
- Returns subscriber health (active/disabled/filters/rate limits)
- Returns DLQ depth (pending/retried/discarded counts)
- Returns event-level rate limit configuration
- Add GetEventMetricsResponseBody schema

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…vice tests

Fix ClickHouse DateTime64(3) parameter parsing — strip trailing 'Z' from
ISO strings since DateTime64 without timezone specifier rejects it. Applied
to ReplayEventsService and history endpoint.

Add 6 integration tests for ReplayEventsService using containerTest
(Postgres + Redis + ClickHouse testcontainers):
- No events in range returns 0
- Dry run returns count without publishing
- Replays events and triggers subscriber runs
- EventFilter narrows replayed events
- Malformed payloads are skipped gracefully
- Tags from original events are preserved

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ests, docs

HIGH fixes:
- Change changeset bump levels from patch to minor (new feature)
- EventLogWriter: use logger.error instead of logger.warn for ClickHouse failures
- Log warning when InMemory rate limiter used in production (no RATE_LIMIT_REDIS_HOST)
- Fix sendEmail naming collision in SKILL.md docs

MEDIUM fixes:
- Document consumerRateLimit, metrics endpoint, and DLQ config in events.md
- Add z.enum() validation on DLQ status query param (was unvalidated cast)
- Add limit param validation (Math.max/min) on DLQ list route
- Add fail-open try/catch around rate limiter checks (Redis down → allow publish)
- Add defensive safety comment on ClickHouse interval interpolation in stats route

Tests added:
- DLQ retry: nonexistent ID, already-discarded, retryAll empty
- SchemaRegistry DB: registerSchema, upsert, getSchema latest/versioned/null, listSchemas
- PublishEvent: payload size limit (512KB → 413), per-subscriber rate limit skipping

LOW fixes:
- Remove dead EventPatternSource type from core
- Make CleanupStaleSubscriptionsService extend BaseService
- Drop redundant 2-col EventSubscription index (3-col supersedes it)
- Parallelize fan-out with Promise.allSettled for better throughput

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@changeset-bot
Copy link

changeset-bot bot commented Mar 5, 2026

🦋 Changeset detected

Latest commit: 7493b86

The changes in this PR will be included in the next version bump.

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@github-actions
Copy link
Contributor

github-actions bot commented Mar 5, 2026

Hi @giovaborgogno, thanks for your interest in contributing!

This project requires that pull request authors are vouched, and you are not in the list of vouched users.

This PR will be closed automatically. See https://github.com/triggerdotdev/trigger.dev/blob/main/CONTRIBUTING.md for more details.

@github-actions github-actions bot closed this Mar 5, 2026
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 5, 2026

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: cd5e2460-24ed-4f52-acc5-d183433281c7

📥 Commits

Reviewing files that changed from the base of the PR and between c013322 and 7493b86.

⛔ Files ignored due to path filters (14)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
  • references/event-system/.gitignore is excluded by !references/**
  • references/event-system/package.json is excluded by !references/**
  • references/event-system/src/index.ts is excluded by !references/**
  • references/event-system/src/trigger/basic-subscribers.ts is excluded by !references/**
  • references/event-system/src/trigger/consumer-groups.ts is excluded by !references/**
  • references/event-system/src/trigger/events.ts is excluded by !references/**
  • references/event-system/src/trigger/filtered-subscribers.ts is excluded by !references/**
  • references/event-system/src/trigger/ordering.ts is excluded by !references/**
  • references/event-system/src/trigger/pattern-subscribers.ts is excluded by !references/**
  • references/event-system/src/trigger/publish-and-wait.ts is excluded by !references/**
  • references/event-system/trigger.config.ts is excluded by !references/**
  • references/event-system/tsconfig.json is excluded by !references/**
  • references/hello-world/src/trigger/events-test.ts is excluded by !references/**
📒 Files selected for processing (107)
  • .changeset/event-dead-letter-queue.md
  • .changeset/event-docs-cli-dx.md
  • .changeset/event-observability-dx.md
  • .changeset/event-ordering-consumer-groups.md
  • .changeset/event-persistence-replay.md
  • .changeset/event-publish-and-wait.md
  • .changeset/event-rate-limiting.md
  • .changeset/event-schema-registry.md
  • .changeset/event-smart-routing.md
  • .claude/projects/-Users-terac-repos-trigger-dev/memory/MEMORY.md
  • .claude/projects/-Users-terac-repos-trigger-dev/memory/pubsub-audit-fixes.md
  • .claude/projects/-Users-terac-repos-trigger-dev/memory/pubsub-pending.md
  • .claude/projects/-Users-terac-repos-trigger-dev/memory/pubsub-progress.md
  • .claude/projects/-Users-terac-repos-trigger-dev/memory/pubsub-roadmap.md
  • .claude/projects/-Users-terac-repos-trigger-dev/memory/repo-conventions.md
  • .claude/skills/trigger-dev-tasks/SKILL.md
  • apps/webapp/app/routes/api.v1.events.$eventId.batchPublish.ts
  • apps/webapp/app/routes/api.v1.events.$eventId.history.ts
  • apps/webapp/app/routes/api.v1.events.$eventId.metrics.ts
  • apps/webapp/app/routes/api.v1.events.$eventId.publish.ts
  • apps/webapp/app/routes/api.v1.events.$eventId.publishAndWait.ts
  • apps/webapp/app/routes/api.v1.events.$eventId.replay.ts
  • apps/webapp/app/routes/api.v1.events.$eventId.schema.ts
  • apps/webapp/app/routes/api.v1.events.$eventId.stats.ts
  • apps/webapp/app/routes/api.v1.events.$eventId.ts
  • apps/webapp/app/routes/api.v1.events.dlq.$id.discard.ts
  • apps/webapp/app/routes/api.v1.events.dlq.$id.retry.ts
  • apps/webapp/app/routes/api.v1.events.dlq.retry-all.ts
  • apps/webapp/app/routes/api.v1.events.dlq.ts
  • apps/webapp/app/routes/api.v1.events.ts
  • apps/webapp/app/v3/runQueue.server.ts
  • apps/webapp/app/v3/services/adminWorker.server.ts
  • apps/webapp/app/v3/services/createBackgroundWorker.server.ts
  • apps/webapp/app/v3/services/events/cleanupStaleSubscriptions.server.ts
  • apps/webapp/app/v3/services/events/deadLetterManagement.server.ts
  • apps/webapp/app/v3/services/events/deadLetterService.server.ts
  • apps/webapp/app/v3/services/events/eventLogWriter.server.ts
  • apps/webapp/app/v3/services/events/eventRateLimiter.server.ts
  • apps/webapp/app/v3/services/events/eventRateLimiterGlobal.server.ts
  • apps/webapp/app/v3/services/events/publishEvent.server.ts
  • apps/webapp/app/v3/services/events/replayEvents.server.ts
  • apps/webapp/app/v3/services/events/schemaRegistry.server.ts
  • apps/webapp/app/v3/services/finalizeTaskRun.server.ts
  • apps/webapp/package.json
  • apps/webapp/test/engine/deadLetterManagement.test.ts
  • apps/webapp/test/engine/deadLetterService.test.ts
  • apps/webapp/test/engine/eventRateLimiter.test.ts
  • apps/webapp/test/engine/publishEvent.test.ts
  • apps/webapp/test/engine/replayEvents.test.ts
  • apps/webapp/test/engine/schemaRegistryDb.test.ts
  • apps/webapp/test/services/schemaRegistry.test.ts
  • internal-packages/clickhouse/schema/021_event_log_v1.sql
  • internal-packages/clickhouse/schema/022_event_counts_mv_v1.sql
  • internal-packages/clickhouse/src/eventCounts.ts
  • internal-packages/clickhouse/src/eventLog.ts
  • internal-packages/clickhouse/src/index.ts
  • internal-packages/database/prisma/migrations/20260227081612_add_event_definition_and_subscription/migration.sql
  • internal-packages/database/prisma/migrations/20260228054059_add_event_schema_versioning/migration.sql
  • internal-packages/database/prisma/migrations/20260228065743_add_dead_letter_event/migration.sql
  • internal-packages/database/prisma/migrations/20260228074544_add_rate_limit_to_event_definition/migration.sql
  • internal-packages/database/prisma/migrations/20260303061123_add_event_subscription_pattern_idx/migration.sql
  • internal-packages/database/prisma/migrations/20260304064319_add_dlq_config_to_event_definition/migration.sql
  • internal-packages/database/prisma/migrations/20260304065828_add_rate_limit_to_event_subscription/migration.sql
  • internal-packages/database/prisma/migrations/20260305000000_drop_redundant_event_subscription_idx/migration.sql
  • internal-packages/database/prisma/schema.prisma
  • internal-packages/run-engine/src/run-queue/index.ts
  • internal-packages/run-engine/src/run-queue/keyProducer.ts
  • internal-packages/run-engine/src/run-queue/types.ts
  • packages/cli-v3/src/apiClient.ts
  • packages/cli-v3/src/cli/index.ts
  • packages/cli-v3/src/commands/events/dlq.ts
  • packages/cli-v3/src/commands/events/history.ts
  • packages/cli-v3/src/commands/events/index.ts
  • packages/cli-v3/src/commands/events/list.ts
  • packages/cli-v3/src/commands/events/publish.ts
  • packages/cli-v3/src/commands/events/replay.ts
  • packages/cli-v3/src/dev/devSupervisor.ts
  • packages/cli-v3/src/entryPoints/dev-index-worker.ts
  • packages/cli-v3/src/entryPoints/managed-index-controller.ts
  • packages/cli-v3/src/entryPoints/managed-index-worker.ts
  • packages/core/src/v3/apiClient/index.ts
  • packages/core/src/v3/events/filterEvaluator.ts
  • packages/core/src/v3/events/index.ts
  • packages/core/src/v3/events/patternMatcher.ts
  • packages/core/src/v3/index.ts
  • packages/core/src/v3/resource-catalog/catalog.ts
  • packages/core/src/v3/resource-catalog/index.ts
  • packages/core/src/v3/resource-catalog/noopResourceCatalog.ts
  • packages/core/src/v3/resource-catalog/standardResourceCatalog.ts
  • packages/core/src/v3/runtime/index.ts
  • packages/core/src/v3/runtime/manager.ts
  • packages/core/src/v3/runtime/noopRuntimeManager.ts
  • packages/core/src/v3/runtime/sharedRuntimeManager.ts
  • packages/core/src/v3/schemas/api.ts
  • packages/core/src/v3/schemas/build.ts
  • packages/core/src/v3/schemas/resources.ts
  • packages/core/src/v3/schemas/schemas.ts
  • packages/core/src/v3/types/tasks.ts
  • packages/core/src/v3/workers/index.ts
  • packages/core/test/v3/events/filterEvaluator.test.ts
  • packages/core/test/v3/events/patternMatcher.test.ts
  • packages/trigger-sdk/src/v3/events.ts
  • packages/trigger-sdk/src/v3/index.ts
  • packages/trigger-sdk/src/v3/shared.ts
  • packages/trigger-sdk/src/v3/tasks.ts
  • rules/4.4.0/events.md
  • rules/manifest.json

Walkthrough

This pull request introduces a comprehensive Pub/Sub event system for Trigger.dev. The implementation spans database schema changes (EventDefinition, EventSubscription, DeadLetterEvent tables), ClickHouse event logging, new API routes for event operations, service layers for publishing/replaying/managing events, rate limiting capabilities (Redis and in-memory), event filtering and pattern matching utilities, schema validation, dead-letter queue handling, and CLI commands. It extends the core SDK with event creation APIs, task event subscriptions, and runtime wait semantics. Multiple changesets document feature releases including rate limiting, ordering keys, consumer groups, DLQ, schema registry, observability, and replay functionality.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Rationale: This diff introduces substantial, intricate functionality across multiple architectural layers with high logic density. The PublishEventService alone contains complex state management, consumer group logic (FNV-1a hashing), rate limiting integration, pattern matching, filter evaluation, and idempotency handling. The system spans new database models with cascading relationships, ClickHouse materialized views, diverse rate-limiting backends (Redis-backed with Upstash, in-memory), service orchestration (replay, dead-letter management, schema validation), and heterogeneous API routes with authorization and error handling. Supporting utilities like pattern matching with wildcard semantics and filter compilation caching add further complexity. The breadth (100+ files across packages/webapp, packages/core, packages/trigger-sdk, packages/cli-v3, internal-packages) combined with semantic interdependencies (event subscriptions, ordering queues, global concurrency tracking) and comprehensive test coverage demand sustained, context-aware review across multiple domains (database design, distributed systems, caching, filtering logic).

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 5 potential issues.

View 5 additional findings in Devin Review.

Open in Devin Review

Comment on lines +37 to +74
const results: Array<
| { ok: true; eventId: string; runs: PublishEventResult["runs"] }
| { ok: false; error: string }
> = [];

for (const item of body.items) {
try {
const result = await service.call(
params.eventId,
authentication.environment,
item.payload,
{
idempotencyKey: item.options?.idempotencyKey,
delay: item.options?.delay,
tags: item.options?.tags,
metadata: item.options?.metadata,
context: item.options?.context,
orderingKey: item.options?.orderingKey,
}
);

results.push({ ok: true, eventId: result.eventId, runs: result.runs });
} catch (error) {
if (error instanceof EventPublishRateLimitError) {
results.push({ ok: false, error: error.message });
} else if (error instanceof ServiceValidationError) {
results.push({ ok: false, error: error.message });
} else {
results.push({
ok: false,
error: error instanceof Error ? error.message : "Unknown error",
});
}
}
}

const hasErrors = results.some((r) => !r.ok);
return json({ results }, { status: hasErrors ? 207 : 200 });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Batch publish route response shape doesn't match the Zod schema used by API clients

The api.v1.events.$eventId.batchPublish.ts route returns results with shape { ok: true, eventId, runs } | { ok: false, error }, but the BatchPublishEventResponseBody schema at packages/core/src/v3/schemas/api.ts:1654 expects { results: z.array(PublishEventResponseBody) } where PublishEventResponseBody requires { eventId: string, runs: [...] }. When any item in the batch fails, the error result { ok: false, error: '...' } lacks the required eventId and runs fields, causing zodfetch in the API client (packages/core/src/v3/apiClient/index.ts:1488) to throw a Zod validation error. This means any batch publish with partial failures will crash the client instead of returning the partial results. The SDK's batchPublish method at packages/trigger-sdk/src/v3/events.ts:320 also relies on result.results having eventId and runs on every item.

Prompt for agents
The batch publish route at apps/webapp/app/routes/api.v1.events.$eventId.batchPublish.ts (lines 37-74) returns a response with shape { results: Array<{ ok: true, eventId, runs } | { ok: false, error }> }, but the BatchPublishEventResponseBody Zod schema at packages/core/src/v3/schemas/api.ts:1654 expects { results: z.array(PublishEventResponseBody) } where PublishEventResponseBody is { eventId: string, runs: Array<...> }.

Two options to fix:

Option A (recommended): Update BatchPublishEventResponseBody in packages/core/src/v3/schemas/api.ts to match the actual route response shape. Change it to support both success and error results:
  results: z.array(z.union([
    z.object({ ok: z.literal(true), eventId: z.string(), runs: z.array(...) }),
    z.object({ ok: z.literal(false), error: z.string() })
  ]))
Then update the SDK batchPublish method in packages/trigger-sdk/src/v3/events.ts to handle error items.

Option B: Change the route to match the existing schema by validating all items upfront and either succeeding entirely or failing entirely, so the response always contains valid PublishEventResponseBody items.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +27 to +40
...(params.cursor && { createdAt: { lt: new Date(params.cursor) } }),
},
orderBy: { createdAt: "desc" },
take: limit + 1,
});

const hasMore = items.length > limit;
const data = items.slice(0, limit);
const lastItem = data[data.length - 1];

return {
data,
pagination: {
cursor: hasMore && lastItem ? lastItem.createdAt.toISOString() : null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 DLQ list pagination uses non-unique createdAt as cursor, causing skipped items

The list method in deadLetterManagement.server.ts uses createdAt as the pagination cursor with { lt: new Date(params.cursor) }. Since createdAt has only millisecond precision and multiple DLQ entries can share the same timestamp (e.g., when a fan-out event triggers multiple subscriber failures simultaneously), entries with the same createdAt as the cursor boundary will be skipped on the next page. The same issue exists in the event history endpoint at apps/webapp/app/routes/api.v1.events.$eventId.history.ts:57 which uses published_at as cursor.

Prompt for agents
In apps/webapp/app/v3/services/events/deadLetterManagement.server.ts, the list() method (line 27-40) uses createdAt as the pagination cursor. Since createdAt is not unique, items sharing the same timestamp as the cursor boundary can be skipped.

Fix by switching to cursor-based pagination using the unique id field instead of createdAt. Change the cursor filter from { createdAt: { lt: new Date(params.cursor) } } to use id-based ordering with a compound cursor (createdAt + id) or simply use the id as cursor with { id: { lt: params.cursor } } and order by id DESC. The same fix should be applied to the event history endpoint at apps/webapp/app/routes/api.v1.events.$eventId.history.ts:57.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +478 to +491
// 4. Disable subscriptions from previous workers that are no longer active
// (tasks that stopped subscribing to events in this deploy)
await prisma.eventSubscription.updateMany({
where: {
projectId: worker.projectId,
environmentId: environment.id,
id: {
notIn: Array.from(activeSubscriptionIds),
},
},
data: {
enabled: false,
},
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 syncWorkerEvents disables all non-current subscriptions in the environment

At apps/webapp/app/v3/services/createBackgroundWorker.server.ts:480-491, syncWorkerEvents disables ALL subscriptions for the project+environment that aren't in the current deploy's activeSubscriptionIds. This follows the same "reconcile to desired state" pattern used elsewhere (e.g., schedule syncing). However, during rolling deploys or if multiple workers coexist briefly for the same environment, this could transiently disable subscriptions from a still-active worker. Worth confirming this aligns with the platform's worker lifecycle model (new deploys fully replace old workers).

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +143 to +146
// Filter to only the requested tasks (if specified)
const matchingRuns = params.tasks
? result.runs.filter((r) => params.tasks!.includes(r.taskIdentifier))
: result.runs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Replay service filters tasks AFTER publishing, not before

In replayEvents.server.ts:143-146, the tasks filter is applied AFTER publishService.call(), meaning all subscribers are triggered for every replayed event, and only the runs matching the requested tasks are included in the response. The actual filtering happens on the response side, not the trigger side. This means replay with a tasks filter still creates runs for ALL subscribers, which may be unexpected. Users might expect tasks to limit which subscribers are triggered, not just which runs are reported.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant