Open
Conversation
…point Add an optional handle_events hook on the Source protocol for stateless, event-driven processing (webhooks, push payloads, etc.) decoupled from read()'s backfill/polling. Wire it through the engine as pipeline_handle_events on both createEngine and createRemoteEngine, expose it as POST /pipeline_handle_events in the engine HTTP API, and implement it on source-stripe by extracting the existing event-dispatch logic into a shared processEventInput helper now reused by read()'s stdin and live-mode loops. Co-authored-by: Cursor <cursoragent@cursor.com> Committed-By-Agent: cursor
Use connector source_input schemas to expose pipeline_handle_events as typed events grouped by source type, e.g. events.stripe: StripeEvent[]. Co-authored-by: Cursor <cursoragent@cursor.com> Committed-By-Agent: cursor
There was a problem hiding this comment.
Pull request overview
This PR adds a new stateless handle_events path for sources and threads it through the engine so externally delivered events can be processed without going through the regular read() flow. In this repo, that primarily extends the Stripe source and the engine’s HTTP/OpenAPI surface.
Changes:
- Added optional
handle_events()support to the source/engine protocol and implementedpipeline_handle_eventsin the in-process engine, remote engine client, and HTTP API. - Refactored
source-stripeto reuse shared event-dispatch logic for bothread()andhandle_events(). - Added targeted source/API tests and regenerated OpenAPI artifacts for the new endpoint.
Reviewed changes
Copilot reviewed 10 out of 12 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
packages/source-stripe/src/src-webhook.ts |
Added shared event-input dispatch helper for webhook/raw-event handling. |
packages/source-stripe/src/index.ts |
Switched Stripe event paths to shared dispatch logic and added handle_events() implementation. |
packages/source-stripe/src/index.test.ts |
Added unit tests for Stripe handle_events() behavior. |
packages/protocol/src/protocol.ts |
Extended the source protocol with optional handle_events(). |
apps/engine/src/lib/source-test.ts |
Added test-source passthrough implementation of handle_events(). |
apps/engine/src/lib/remote-engine.ts |
Added remote client support for pipeline_handle_events. |
apps/engine/src/lib/engine.ts |
Added engine-level pipeline_handle_events orchestration. |
apps/engine/src/lib/createSchemas.ts |
Added request schema generation for grouped source events. |
apps/engine/src/api/app.ts |
Added POST /pipeline_handle_events route and request validation. |
apps/engine/src/api/app.test.ts |
Added endpoint tests for /pipeline_handle_events. |
apps/engine/src/__generated__/openapi.json |
Regenerated OpenAPI spec with new endpoint and schemas. |
apps/engine/src/__generated__/openapi.d.ts |
Regenerated typed OpenAPI declarations for the new endpoint. |
Files not reviewed (1)
- apps/engine/src/generated/openapi.json: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+113
to
+116
| const SourceEvents = | ||
| inputSchemas.length > 0 | ||
| ? configUnion(inputSchemas.map((s) => s.variant)).meta({ id: 'SourceEvents' }) | ||
| : z.record(z.string(), z.array(z.unknown())).meta({ id: 'SourceEvents' }) |
Comment on lines
+425
to
+428
| 'Streams the supplied events into the source connector\'s `handle_events` hook ' + | ||
| 'and returns the derived NDJSON messages (records, logs, traces). Stateless — ' + | ||
| 'no checkpointing or time limits. Fails 400 if the source does not implement ' + | ||
| '`handle_events`.', |
Comment on lines
+158
to
+160
| const eventBatch: unknown[] = [] | ||
| for await (const event of events) eventBatch.push(event) | ||
| const res = await post( |
Comment on lines
+424
to
+429
| let accountId: string | ||
| try { | ||
| accountId = (await resolveAccountMetadata(config, client)).accountId | ||
| } catch (err) { | ||
| yield errorToConnectionStatus(err) | ||
| return |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
handle_eventsmethod to the source protocol for stateless event-driven processing.pipeline_handle_eventsthrough the engine interface, remote engine client, andPOST /pipeline_handle_eventsHTTP endpoint.handle_eventsforsource-stripeby sharing the existing webhook/Stripe-event dispatch path used byread().Test Plan
pnpm --filter @stripe/sync-protocol exec tsc --noEmitpnpm --filter @stripe/sync-source-stripe exec tsc --noEmitpnpm --filter @stripe/sync-engine exec tsc --noEmitpnpm --filter @stripe/sync-service exec tsc --noEmit./scripts/generate-openapi.shNote: Vitest was not run locally because the macOS environment rejects the installed esbuild binary before the runner starts.