Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
81b8cae
feat(events): phase 0.1+0.3 — event() primitive + ResourceCatalog ext…
giovaborgogno Feb 27, 2026
e165988
feat(events): phase 0.2 — extend TaskResource for event subscriptions
giovaborgogno Feb 27, 2026
7a78e1c
feat(events): phase 0.4 — EventDefinition + EventSubscription databas…
giovaborgogno Feb 27, 2026
243fa91
feat(events): phase 0.5 — register events and subscriptions during de…
giovaborgogno Feb 27, 2026
0c8c24b
feat(events): phase 0.6+0.7 — publish API endpoints + PublishEventSer…
giovaborgogno Feb 27, 2026
d1c87d5
feat(events): phase 0.8 — SDK .publish() wired to API
giovaborgogno Feb 27, 2026
85fb685
feat(events): phase 0.9 — integration tests for PublishEventService
giovaborgogno Feb 27, 2026
e6249e4
feat(events): phase 1.1 — schema versioning DB + SchemaRegistryService
giovaborgogno Feb 28, 2026
49b2903
feat(events): phase 1.2 — schema discovery API endpoints
giovaborgogno Feb 28, 2026
2a06ef6
feat(events): phase 1.3 — store JSON schema during deploy + validate …
giovaborgogno Feb 28, 2026
cfa67d0
feat(events): phase 1.4 — tests + changeset for schema registry
giovaborgogno Feb 28, 2026
cd426b3
feat(events): phase 2.1 — filter evaluator + pattern matcher with tests
giovaborgogno Feb 28, 2026
be7ca08
feat(events): phase 2.2 — filters in SDK + stored during deploy
giovaborgogno Feb 28, 2026
676d37e
feat(events): phase 2.3 — filter evaluation during fan-out
giovaborgogno Feb 28, 2026
846438c
feat(events): phase 2.4 — wildcard pattern subscriptions
giovaborgogno Feb 28, 2026
c2ec92e
feat(events): phase 2 — changeset for smart routing
giovaborgogno Feb 28, 2026
c63c1e7
feat(events): phase 3.1 — event_log_v1 ClickHouse table + insert func…
giovaborgogno Feb 28, 2026
8dfb002
feat(events): phase 3.2 — write to ClickHouse event log on each publish
giovaborgogno Feb 28, 2026
02369b1
feat(events): phase 3.3 — event history API endpoint
giovaborgogno Feb 28, 2026
3d98635
feat(events): phase 3.4 — event replay service + API endpoint
giovaborgogno Feb 28, 2026
d9a5d08
feat(events): phase 3 — changeset + memory updates for event persistence
giovaborgogno Feb 28, 2026
ec41396
feat(events): phase 4.1 — DeadLetterEvent model + enum + migration
giovaborgogno Feb 28, 2026
5ed4864
feat(events): phase 4.2 — store event context on runs + DLQ detection
giovaborgogno Feb 28, 2026
89d0dab
feat(events): phase 4.3 — DLQ management API endpoints
giovaborgogno Feb 28, 2026
2901353
feat(events): phase 4 — changeset + memory updates for dead letter queue
giovaborgogno Feb 28, 2026
dcd3ea3
feat(events): phase 5.1 — ordering keys via concurrencyKey
giovaborgogno Feb 28, 2026
8c033b3
feat(events): phase 5.2 — consumer groups for load-balanced fan-out
giovaborgogno Feb 28, 2026
3b3abf4
feat(events): phase 5.3 — integration tests for ordering + consumer g…
giovaborgogno Feb 28, 2026
e4d17a9
feat(events): phase 5 — changeset + memory updates for ordering + con…
giovaborgogno Feb 28, 2026
a522cb6
feat(events): phase 6.1 — runtime waitForEvent method
giovaborgogno Feb 28, 2026
c4bd534
feat(events): phase 6.2+6.3 — publishAndWait endpoint + SDK method
giovaborgogno Feb 28, 2026
a87bce4
feat(events): phase 6 tests — publishAndWait + event log writer
giovaborgogno Feb 28, 2026
cc4b888
chore: add phase 6 changeset for publishAndWait
giovaborgogno Feb 28, 2026
6454ef3
feat(events): phase 7 — rate limiting + backpressure
giovaborgogno Feb 28, 2026
7c68b2f
chore: add phase 7 changeset for event rate limiting
giovaborgogno Feb 28, 2026
42a3844
feat(events): phase 8 — observability + developer experience
giovaborgogno Feb 28, 2026
05f133b
chore: add phase 8 changeset for observability + DX
giovaborgogno Feb 28, 2026
81c09cd
feat(events): phase 9.1+9.2 — Redis rate limiter + hash-based consume…
giovaborgogno Mar 1, 2026
c7e5877
chore: update pubsub memory files — phases complete + pending items
giovaborgogno Mar 1, 2026
ae94c44
chore: restore full original roadmap with current status annotations
giovaborgogno Mar 1, 2026
9745d29
chore: mark completed tasks with [x] in roadmap based on actual imple…
giovaborgogno Mar 1, 2026
9bd85f6
feat(events): phase 8.6 — event system documentation + skill update
giovaborgogno Mar 1, 2026
c7c9119
feat(events): phase 8.7 — event-system reference project
giovaborgogno Mar 1, 2026
8510dbc
feat(events): phase 8.5 — JSDoc on createEvent and event() with examples
giovaborgogno Mar 1, 2026
c9148ec
feat(events): phase 8.4 — CLI commands for events
giovaborgogno Mar 1, 2026
d661c71
chore: add changeset for phase 8 DX improvements
giovaborgogno Mar 1, 2026
826762d
chore: update roadmap — mark 8.4, 8.5, 8.6, 8.7 as done
giovaborgogno Mar 1, 2026
d5a3299
chore: document E2E test findings — ordering limitation, payload size…
giovaborgogno Mar 1, 2026
3476f76
chore: update 9.6 — large payload issue is object store config, not o…
giovaborgogno Mar 1, 2026
ad83f88
feat(events): ordering key with per-key serialization + global concur…
giovaborgogno Mar 1, 2026
a8a5ce3
chore: mark 9.5 ordering as done in pending items
giovaborgogno Mar 1, 2026
193e26e
fix(events): fix ordering — update all Lua scripts for global concurr…
giovaborgogno Mar 1, 2026
c176496
fix(events): phase 10 — CRITICAL + HIGH audit fixes
giovaborgogno Mar 3, 2026
6d4434e
fix(events): phase 11 — MEDIUM audit fixes
giovaborgogno Mar 3, 2026
c05c326
fix(events): phase 13 — LOW audit fixes (cache bounds)
giovaborgogno Mar 3, 2026
f269a5e
chore: update memory files — audit fixes plan and roadmap status
giovaborgogno Mar 3, 2026
d625a22
test(events): add DLQ service integration tests (Phase 12)
giovaborgogno Mar 3, 2026
86dc70f
fix(events): fix typecheck errors in event routes, DLQ service, and T…
giovaborgogno Mar 4, 2026
63a4bd3
feat(events): phase 4.4 — SDK DLQ config per event
giovaborgogno Mar 4, 2026
1be7852
feat(events): stale subscription cleanup — daily cron job in admin wo…
giovaborgogno Mar 4, 2026
d53ec0b
feat(events): CLI commands — history, replay, dlq list, dlq retry
giovaborgogno Mar 4, 2026
bd74a50
test(events): add Redis-backed rate limiter integration tests
giovaborgogno Mar 4, 2026
ef2ec86
feat(events): consumer-side rate limiting + metrics endpoint
giovaborgogno Mar 4, 2026
6d5533b
fix(events): ClickHouse DateTime64 trailing 'Z' bug + ReplayEventsSer…
giovaborgogno Mar 4, 2026
7493b86
fix(events): address all audit findings — security, prod-readiness, t…
giovaborgogno Mar 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/event-dead-letter-queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/core": minor
"trigger.dev": minor
---

Add dead letter queue for failed event-triggered task runs
10 changes: 10 additions & 0 deletions .changeset/event-docs-cli-dx.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@trigger.dev/sdk": minor
"trigger.dev": minor
---

Add event system documentation, CLI commands, and developer experience improvements.
New `rules/4.4.0/events.md` documentation covering all event features.
CLI `trigger events list` and `trigger events publish` commands.
JSDoc on `createEvent()` and `event()` with usage examples.
Reference project `references/event-system/` demonstrating all event patterns.
10 changes: 10 additions & 0 deletions .changeset/event-observability-dx.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@trigger.dev/core": minor
"@trigger.dev/sdk": minor
"@internal/clickhouse": minor
"apps-webapp": minor
---

Add observability and developer experience improvements to the event system.
New stats API endpoint aggregates event publish counts and fan-out metrics from
ClickHouse. SDK gains a `validate()` method for pre-flight payload validation.
7 changes: 7 additions & 0 deletions .changeset/event-ordering-consumer-groups.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@trigger.dev/core": minor
"@trigger.dev/sdk": minor
"trigger.dev": minor
---

Add ordering keys and consumer groups for event subscriptions
6 changes: 6 additions & 0 deletions .changeset/event-persistence-replay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/core": minor
"trigger.dev": minor
---

Add event persistence in ClickHouse and replay API for pub/sub events
11 changes: 11 additions & 0 deletions .changeset/event-publish-and-wait.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"@trigger.dev/core": minor
"@trigger.dev/sdk": minor
"apps-webapp": minor
---

Add publishAndWait support to the event system. Events can now be published
with parentRunId to create waitpoints for each subscriber run, enabling
fan-out / fan-in patterns. The SDK exposes `event.publishAndWait()` which
publishes, blocks the parent run, and returns aggregated results from all
subscriber completions.
11 changes: 11 additions & 0 deletions .changeset/event-rate-limiting.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"@trigger.dev/core": minor
"@trigger.dev/sdk": minor
"@trigger.dev/database": minor
"apps-webapp": minor
---

Add per-event rate limiting to the pub/sub system. Events can now be configured
with a `rateLimit: { limit, window }` option that limits how many times they can
be published within a sliding time window. When exceeded, the API returns 429
with `x-ratelimit-limit`, `x-ratelimit-remaining`, and `retry-after` headers.
7 changes: 7 additions & 0 deletions .changeset/event-schema-registry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@trigger.dev/core": minor
"@trigger.dev/sdk": minor
"trigger.dev": minor
---

Add event schema registry with versioning, validation, and discovery API endpoints
7 changes: 7 additions & 0 deletions .changeset/event-smart-routing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@trigger.dev/core": minor
"@trigger.dev/sdk": minor
"trigger.dev": minor
---

Add smart routing for events: content-based filters and wildcard pattern subscriptions
33 changes: 33 additions & 0 deletions .claude/projects/-Users-terac-repos-trigger-dev/memory/MEMORY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Memory Index

## Pub/Sub Event System
- [Roadmap & Status](pubsub-roadmap.md) — phases 0-8 complete, pending items identified
- [Detailed Progress](pubsub-progress.md) — per-phase notes, commits, decisions
- [Pending Items](pubsub-pending.md) — Redis rate limiter, consumer groups, dashboard, etc.
- [Audit Fix Plan](pubsub-audit-fixes.md) — Phases 10-13: CRITICAL/HIGH/MEDIUM/LOW fixes + test coverage
- Repo conventions: [repo-conventions.md](repo-conventions.md)
- Branch: `feat/pubsub-event-system`

## Repo Quick Reference

- Build: `pnpm run build --filter <pkg>`, Test: `pnpm run test --filter <pkg>`
- Build order: core → sdk → cli → run-engine → webapp
- Services extend `WithRunEngine`, use `traceWithEnv()`, throw `ServiceValidationError`
- API routes use `createActionApiRoute()` builder
- Tests use testcontainers (never mocks), vitest
- Import `@trigger.dev/core` subpaths only, never root
- Migrations: clean extraneous lines, indexes need CONCURRENTLY in separate files
- Changesets required for `packages/*` changes (default: patch)
- Tags in integration tests: avoid `tags` option in trigger calls — `createTag` uses global prisma mock `{}`

## Rate Limiting Patterns in Codebase
- `apps/webapp/app/services/rateLimiter.server.ts` — Upstash `@upstash/ratelimit` wrapper (sliding window, token bucket, fixed window)
- `apps/webapp/app/v3/GCRARateLimiter.server.ts` — Custom GCRA with Redis Lua scripts
- Both use dedicated Redis connection (`RATE_LIMIT_REDIS_HOST` env vars)
- Good reference implementations: `mfaRateLimiter.server.ts`, `magicLinkRateLimiter.server.ts`

## User Preferences

- Documentation and roadmap files must be written in English
- Commit frequently (per sub-step)
- Never commit broken code
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
# Pub/Sub Event System — Audit Fix Plan

Findings from 5-agent parallel audit (2026-03-03). Organized by priority.
Follow [Implementation Process & Guidelines](pubsub-roadmap.md#implementation-process--guidelines) for each phase.

---

## Phase 10: Audit Fixes — CRITICAL + HIGH

### 10.1 — Fix `expireTtlRuns` Lua: global concurrency slot leak (CRITICAL)

**Found by**: Redis auditor
**Severity**: CRITICAL — permanent slot leak
**File**: `internal-packages/run-engine/src/run-queue/index.ts:2633-2726`

**Problem**: The `expireTtlRuns` Lua script removes from `queueCurrentConcurrency`, `queueCurrentDequeued`, `envCurrentConcurrency`, `envCurrentDequeued` but does NOT remove from `globalCurrentConcurrency`. When a run with an ordering key expires via TTL, the global concurrency slot is permanently leaked, eventually starving the entire queue.

**Fix**:
1. Add `globalCurrentConcurrencyKey` as a new KEYS parameter to the `expireTtlRuns` Lua
2. Add `redis.call('SREM', globalCurrentConcurrencyKey, messageId)` alongside existing SREMs
3. Update `numberOfKeys`, type declaration, and caller to pass the key
4. Update the caller that invokes `expireTtlRuns` to compute and pass `queueGlobalCurrentConcurrencyKeyFromQueue`

**Verify**: Run existing run-engine TTL tests

### 10.2 — Fix `clearMessageFromConcurrencySets` bare queue name (HIGH)

**Found by**: Redis auditor
**Severity**: HIGH — SREM to wrong key, slot never released
**File**: `internal-packages/run-engine/src/engine/index.ts:2240-2243`

**Problem**: `clearMessageFromConcurrencySets` is called with `taskRun.queue` which is a bare queue name (e.g. `"my-task"`), not a full Redis key. `queueGlobalCurrentConcurrencyKeyFromQueue()` expects a full key like `{org:X}:proj:Y:env:Z:queue:my-task` and produces a nonsense key from a bare name.

**Fix**: Trace how other callers of similar methods get the full queue key (likely from the `message.queue` field which includes the full path). Ensure `clearMessageFromConcurrencySets` either:
- Receives the full queue key, or
- Has access to the env/org/project context to construct it

**Verify**: Check that the same issue exists for the existing per-key `queueCurrentConcurrencyKeyFromQueue` call (it probably does but SREM on a wrong key is a no-op, not a crash).

### 10.3 — Add `.max()` to batch publish items array (HIGH)

**Found by**: Security auditor
**Severity**: HIGH — potential DoS
**File**: `packages/core/src/v3/schemas/api.ts` — `BatchPublishEventRequestBody`

**Fix**: Add `.max(100)` (or similar) to the `items` array in `BatchPublishEventRequestBody`. Matches the pattern of existing batch trigger which has limits.

### 10.4 — Fix publishAndWait schema: parentRunId required but options optional (HIGH)

**Found by**: API auditor
**Severity**: HIGH — schema mismatch causes runtime 400 instead of Zod validation error
**File**: `packages/core/src/v3/schemas/api.ts:1658-1671`

**Fix**: Either:
- Make `options` required in `PublishAndWaitEventRequestBody`, or
- Move `parentRunId` to be a top-level required field outside of `options`

### 10.5 — Fix ClickHouse interval string interpolation (HIGH)

**Found by**: Security auditor
**Severity**: HIGH — fragile pattern
**File**: `apps/webapp/app/routes/api.v1.events.$eventId.stats.ts:54`

**Fix**: Use parameterized query or keep the whitelist validation but use a safer pattern (map from allowed period to interval string rather than interpolating user input).

### 10.6 — Add missing index for pattern subscription query (HIGH)

**Found by**: DB auditor
**Severity**: HIGH — full table scan on every publish
**File**: `internal-packages/database/prisma/schema.prisma` — `EventSubscription`

**Fix**:
1. Add `@@index([projectId, environmentId, enabled])` to EventSubscription model
2. Create migration with `CREATE INDEX CONCURRENTLY` in its own file
3. Run `pnpm run db:migrate:deploy && pnpm run generate`

### 10.7 — Fix batch publish partial failure semantics (HIGH)

**Found by**: API auditor
**Severity**: HIGH — client can't determine which items succeeded
**File**: `apps/webapp/app/routes/api.v1.events.$eventId.batchPublish.ts:40-57`

**Fix**: Two options:
- **Option A**: Validate ALL items upfront before triggering any (current approach fails mid-batch)
- **Option B**: Return partial results with per-item status (more complex but more resilient)

Recommend Option A — validate schema + rate limits for all items first, then trigger.

---

## Phase 11: Audit Fixes — MEDIUM

### 11.1 — Fix N+1 in DLQ retryAll

**File**: `apps/webapp/app/v3/services/events/deadLetterManagement.server.ts:126-148`
**Fix**: Remove redundant re-fetch in `retry()` when called from `retryAll()`, or batch the operations.

### 11.2 — Add payload size check before fan-out

**File**: `apps/webapp/app/v3/services/events/publishEvent.server.ts`
**Fix**: Check payload byte size before triggering subscribers. Return 413 if over limit and object store is not configured.

### 11.3 — Fix inconsistent error handling in routes

**Files**: `api.v1.events.dlq.retry-all.ts`, `api.v1.events.ts`
**Fix**: Add try/catch with ServiceValidationError handling, matching other routes.

### 11.4 — Add CLI publish options support

**File**: `packages/cli-v3/src/commands/events/publish.ts`
**Fix**: Add `--delay`, `--tags`, `--idempotency-key`, `--ordering-key` options.

### 11.5 — Fix schema validation silent pass on compilation error

**File**: `apps/webapp/app/v3/services/events/schemaRegistry.server.ts:198-201`
**Fix**: Log a warning when ajv compilation fails, and optionally reject the publish.

### 11.6 — Add stale subscription cleanup

**File**: `apps/webapp/app/v3/services/events/publishEvent.server.ts`
**Fix**: When a subscriber trigger fails consistently, log a warning and optionally disable the subscription after N consecutive failures.

### 11.7 — Add data cleanup mechanism

**Fix**: Add a periodic cleanup job (or TTL-based approach) for:
- Disabled EventSubscriptions older than 30 days
- Processed DeadLetterEvents (RETRIED/DISCARDED) older than 30 days
- Deprecated EventDefinitions with no active subscriptions

---

## Phase 12: Test Coverage

### 12.1 — Tests for ReplayEventsService

**File**: `apps/webapp/test/engine/replayEvents.test.ts` (new)
**Tests**:
- Replay with date range filter
- Replay with task filter
- Replay dry run (count only)
- Replay with idempotency (no duplicate triggers)
- Replay when ClickHouse is unavailable (graceful error)

Note: These require ClickHouse in testcontainers or mocking.

### 12.2 — Tests for DeadLetterService

**File**: `apps/webapp/test/engine/deadLetterService.test.ts` (new)
**Tests**:
- Failed event-triggered run creates DLQ entry
- Non-event run does NOT create DLQ entry
- DLQ entry has correct eventType, payload, error
- Multiple failures create separate DLQ entries

### 12.3 — Tests for DeadLetterManagementService

**File**: `apps/webapp/test/engine/deadLetterManagement.test.ts` (new)
**Tests**:
- List DLQ entries with pagination
- List with eventType filter
- List with status filter
- Retry creates new run with correct payload
- Retry marks DLQ entry as RETRIED
- Discard marks entry as DISCARDED
- RetryAll processes up to 1000 items
- Retry/discard nonexistent ID returns error

### 12.4 — Tests for RedisEventRateLimitChecker

**File**: `apps/webapp/test/engine/eventRateLimiter.test.ts` (extend)
**Tests**:
- Redis checker allows under limit
- Redis checker blocks over limit
- Redis checker returns correct remaining/retryAfter
- Different configs get separate Ratelimit instances

Note: Requires Redis in testcontainers.

### 12.5 — Tests for SchemaRegistryService.checkCompatibility

**File**: extend existing SchemaRegistryService tests
**Tests**:
- Compatible schema change (add optional field)
- Incompatible change (remove required field)
- Incompatible change (change field type)

---

## Phase 13: LOW Priority Fixes

### 13.1 — Add LRU bounds to caches
- `validatorCache` in SchemaRegistryService: max 1000 entries
- `patternCache`/`filterCache` in core evaluators: max 1000 entries
- `InMemoryEventRateLimitChecker.windows`: evict entries older than 2x window

### 13.2 — Tighten Zod schemas
- `payload: z.any()` → `payload: z.unknown()`
- `metadata: z.any()` → `metadata: z.record(z.unknown())`
- Add `.max(256)` to idempotencyKey
- Add DLQ status validation with Zod instead of `as` cast

### 13.3 — Remove dead code
- Unused `compileFilter`/`evaluateFilter` exports from core filterEvaluator

### 13.4 — Fix batchPublish URL naming
- Current: `/api/v1/events/:id/batchPublish` (camelCase)
- Consider: `/api/v1/events/:id/batch-publish` or keep for consistency

---

## Execution Order

```
Phase 10 (CRITICAL+HIGH) → Phase 12 (Tests) → Phase 11 (MEDIUM) → Phase 13 (LOW)
```

Phase 10 first because it contains a CRITICAL bug (permanent slot leak).
Phase 12 second because tests validate the fixes and catch regressions.
Phase 11 and 13 are improvements, not blockers.

## Verification per phase

Same as roadmap guidelines:
1. `pnpm run build --filter @internal/run-engine --filter webapp --filter @trigger.dev/core --filter @trigger.dev/sdk`
2. `cd internal-packages/run-engine && pnpm run test --run` (run-engine: 236+ must pass)
3. `cd apps/webapp && pnpm run test ./test/engine/publishEvent.test.ts --run` (24+ must pass)
4. `cd apps/webapp && pnpm run test ./test/engine/eventRateLimiter.test.ts --run` (11+ must pass)
5. New test files must pass
6. Commit after each sub-step: `feat(events): phase X.Y — <description>`
Loading