Skip to content

fix destination filter for live event#331

Open
Yostra wants to merge 1 commit intomainfrom
destination_filter_for_live
Open

fix destination filter for live event#331
Yostra wants to merge 1 commit intomainfrom
destination_filter_for_live

Conversation

@Yostra
Copy link
Copy Markdown
Collaborator

@Yostra Yostra commented Apr 24, 2026

Summary

Fix a silent live-event regression on same-run continuation. Completed streams were being pruned from the catalog, which caused Stripe webhook / WebSocket / events-API messages for those streams to be dropped at both the source and destination filters. excludeTerminalStreams now takes a keepCompleted option, and pipeline_sync opts in.

Problem

When a pipeline resumes under the same run_id, the engine trims "already done" streams out of the catalog via excludeTerminalStreams. Until now that bucket included completed alongside skipped and errored.

The problem: once a stream's backfill finished, it vanished from the catalog on every subsequent continuation — and two downstream filters quietly threw away every live event for it:

  • Source-side the Stripe source derives streamNames from the catalog and filters every live event (webhook / WebSocket / events-API) through it, so events for completed streams never yielded a record.
  • Destination-side enforceCatalog drops any record whose stream isn't in the catalog, so anything that escaped the source filter was swallowed here.

The pipeline looked healthy (no errors,status: completed), but real-time data stopped flowing after the first continuation.

Fix

Treat completed as "backfill done"

  • excludeTerminalStreams takes a new opts.keepCompleted (default false, backward compact)

  • Both continuation sites in pipeline_sync (activeFilteredCatalog and activeCatalog) pass { keepCompleted: true }.

  • New unit test in apps/engine/src/lib/destination-filter.test.ts: completed streams are kept when keepCompleted: true, skipped/errored are still excluded.

  • Updated engine test skips skipped/errored streams but keeps completed streams on same-run continuation — asserts the source now receives completed streams and their status is re-emitted.


/** Exclude streams that already reached a terminal state in prior run progress. */
/**
* Exclude streams that already reached a terminal state in prior run progress.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should happen because webhook will not have the same ID as backfill

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants