From 7a09e2685e10ae38c361e592694b14fa69640a68 Mon Sep 17 00:00:00 2001 From: Yostra Date: Fri, 24 Apr 2026 12:03:13 +0200 Subject: [PATCH] fix destination filter for live event --- .../engine/src/lib/destination-filter.test.ts | 24 +++++++++++++++++ apps/engine/src/lib/destination-filter.ts | 26 +++++++++++++------ apps/engine/src/lib/engine.test.ts | 5 ++-- apps/engine/src/lib/engine.ts | 8 ++++-- 4 files changed, 51 insertions(+), 12 deletions(-) diff --git a/apps/engine/src/lib/destination-filter.test.ts b/apps/engine/src/lib/destination-filter.test.ts index a45b0b391..d72104e68 100644 --- a/apps/engine/src/lib/destination-filter.test.ts +++ b/apps/engine/src/lib/destination-filter.test.ts @@ -137,4 +137,28 @@ describe('excludeTerminalStreams()', () => { expect(filtered.streams.map((stream) => stream.stream.name)).toEqual(['customers', 'charges']) }) + + it('keeps completed streams when keepCompleted is true (for live-event routing)', () => { + const catalog = makeCatalog([ + { name: 'customers' }, + { name: 'charges' }, + { name: 'invoices' }, + { name: 'products' }, + ]) + + const filtered = excludeTerminalStreams( + catalog, + { + streams: { + customers: { status: 'completed', state_count: 0, record_count: 0 }, + charges: { status: 'skipped', state_count: 0, record_count: 0 }, + invoices: { status: 'errored', state_count: 0, record_count: 0 }, + products: { status: 'started', state_count: 0, record_count: 0 }, + }, + }, + { keepCompleted: true } + ) + + expect(filtered.streams.map((stream) => stream.stream.name)).toEqual(['customers', 'products']) + }) }) diff --git a/apps/engine/src/lib/destination-filter.ts b/apps/engine/src/lib/destination-filter.ts index eac266f8a..5871c1ba7 100644 --- a/apps/engine/src/lib/destination-filter.ts +++ b/apps/engine/src/lib/destination-filter.ts @@ -31,19 +31,29 @@ export function applySelection(catalog: ConfiguredCatalog): ConfiguredCatalog { } } -/** Exclude streams that already reached a terminal state in prior run progress. */ +/** + * Exclude streams that already reached a terminal state in prior run progress. + * + * When `keepCompleted` is true, only errored/skipped streams are excluded — + * completed streams stay in the catalog. This is load-bearing for live-event + * sources (webhooks, websocket): completed means "backfill done", not "stop + * routing events", so live events must continue to reach the stream. Source + * backfill implementations short-circuit completed streams via state + * (e.g. `remaining: []`), so there's no cost to keeping them in the catalog. + */ export function excludeTerminalStreams( catalog: ConfiguredCatalog, - progress?: Pick + progress?: Pick, + opts?: { keepCompleted?: boolean } ): ConfiguredCatalog { + const keepCompleted = opts?.keepCompleted ?? false const terminalStreams = new Set( Object.entries(progress?.streams ?? {}) - .filter( - ([, stream]) => - stream.status === 'completed' || - stream.status === 'skipped' || - stream.status === 'errored' - ) + .filter(([, stream]) => { + if (stream.status === 'skipped' || stream.status === 'errored') return true + if (stream.status === 'completed') return !keepCompleted + return false + }) .map(([name]) => name) ) diff --git a/apps/engine/src/lib/engine.test.ts b/apps/engine/src/lib/engine.test.ts index fa489d948..00d0260c1 100644 --- a/apps/engine/src/lib/engine.test.ts +++ b/apps/engine/src/lib/engine.test.ts @@ -922,7 +922,7 @@ describe('engine.pipeline_sync() pipeline', () => { expect(eof.eof.ending_state?.sync_run.progress?.elapsed_ms).toBeGreaterThan(5000) }) - it('skips previously terminal streams on same-run continuation', async () => { + it('skips skipped/errored streams but keeps completed streams on same-run continuation', async () => { let receivedCatalogNames: string[] = [] const source: Source = { async *spec() { @@ -1001,9 +1001,10 @@ describe('engine.pipeline_sync() pipeline', () => { ) const eof = output.find((m) => m.type === 'eof')! - expect(receivedCatalogNames).toEqual(['customers']) + expect(receivedCatalogNames).toEqual(['customers', 'invoices']) expect(eof.eof.request_progress?.streams).toEqual({ customers: expect.objectContaining({ status: 'completed' }), + invoices: expect.objectContaining({ status: 'completed' }), }) expect(eof.eof.ending_state?.sync_run.progress?.streams.charges).toEqual( expect.objectContaining({ status: 'skipped', message: 'not available' }) diff --git a/apps/engine/src/lib/engine.ts b/apps/engine/src/lib/engine.ts index d40d88fb3..ec468ecfd 100644 --- a/apps/engine/src/lib/engine.ts +++ b/apps/engine/src/lib/engine.ts @@ -566,7 +566,9 @@ export async function createEngine(resolver: ConnectorResolver): Promise const isContinuation = opts?.run_id != null && p.state?.sync_run.run_id === opts.run_id const activeFilteredCatalog = isContinuation - ? excludeTerminalStreams(p.filteredCatalog, p.state?.sync_run.progress) + ? excludeTerminalStreams(p.filteredCatalog, p.state?.sync_run.progress, { + keepCompleted: true, + }) : p.filteredCatalog // Run reducer first so time_ceiling is correct for a new run_id. @@ -580,7 +582,9 @@ export async function createEngine(resolver: ConnectorResolver): Promise const catalogWithRanges = withTimeRanges(p.catalog, syncState.sync_run.time_ceiling) const activeCatalog = isContinuation - ? excludeTerminalStreams(catalogWithRanges, p.state?.sync_run.progress) + ? excludeTerminalStreams(catalogWithRanges, p.state?.sync_run.progress, { + keepCompleted: true, + }) : catalogWithRanges // Source → destination pipeline. The destination is the sole consumer,