Skip to content

Airbyte protocol v2 compliance (continues #149)#153

Open
mcrauwel wants to merge 6 commits into
mainfrom
mcrauwel/airbyte-v2-protocol
Open

Airbyte protocol v2 compliance (continues #149)#153
mcrauwel wants to merge 6 commits into
mainfrom
mcrauwel/airbyte-v2-protocol

Conversation

@mcrauwel

Copy link
Copy Markdown
Member

Summary

This continues @Smidge's work in #149 to make the connector Airbyte protocol v2 compliant (stream status trace messages + per-stream state). Because #149 originates from an organization-owned fork, GitHub's "allow maintainer edits" does not grant push access, so we could not push directly to that PR. This branch carries Smidge's original commits unchanged (authorship preserved) and adds one commit addressing the outstanding review.

Original PR: #149 — credit to @Smidge (Harry Smaje) for the protocol v2 work.

What's in the original commits (from #149)

  • Add TRACE / STREAM_STATUS message types and emit STARTED / COMPLETE / INCOMPLETE per stream.
  • Emit per-stream STATE (type=STREAM) instead of one legacy global blob.
  • Parse the Airbyte v2 per-stream state array on incremental syncs, with fallback to the legacy format (the fallback is byte-for-byte identical to the previous readState behavior, so existing state files keep working).

New commit — addresses @mhamza15's review on #149

  1. Process all shards on error. The shard loop now continues instead of breaking, so one failing shard no longer prevents the remaining shards in the stream from syncing. (With break, whether the other shards ran at all depended on Go's randomized map-iteration order.)
  2. Checkpoint progress-so-far on error. Database.Read returns a cursor reflecting partial progress alongside the error on a server timeout (planetscale_edge_database.go:283). We now persist that cursor before handling the error, so a retry doesn't re-read already-synced data.
  3. De-duplicated the namespace/state-key logic into a shared streamStateKeyFor helper used by the read loop and both branches of readState.

Two regression tests added — TestRead_ShardErrorStillProcessesOtherShards and TestRead_ProgressCursorPersistedOnError — both verified to fail against the un-fixed code and pass with the fix.

Testing

  • All tests pass for cmd/internal and cmd/airbyte-source.

🤖 Generated with Claude Code

mcrauwel added a commit that referenced this pull request Jun 18, 2026
## Summary

The "Analyze (go)" (CodeQL) check fails on every PR with:

> The actions `actions/checkout@v3`, `actions/setup-go@v3`, and
`github/codeql-action/init@v2` are not allowed in
`planetscale/airbyte-source` because all actions must be pinned to a
full-length commit SHA.

The org now enforces commit-SHA pinning for GitHub Actions, and
`.github/workflows/codeql.yml` still referenced actions by version tag.

## Changes

Pin every action in the CodeQL workflow to its release commit SHA (with
a `# vX` comment for readability). Bumped off the deprecated majors
while here:

- `actions/checkout` `v3` → `v4` (`34e1148`)
- `actions/setup-go` `v3` → `v5` (`40f1582`)
- `github/codeql-action/{init,autobuild,analyze}` `v2` → `v3`
(`dd903d2`)

This unblocks CI on all open PRs (including #153).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Smidge and others added 5 commits June 18, 2026 17:53
Airbyte 2.x requires sources to emit STREAM_STATUS trace messages
(STARTED, COMPLETE, INCOMPLETE) for each stream. Without these,
every sync fails with:

  "streams did not receive a terminal stream status message"

Changes:
- Add TRACE message type and stream status constants to types.go
- Add StreamDescriptor, AirbyteStreamStatus, AirbyteTraceMessage types
- Replace legacy global State() with per-stream StreamState() that
  emits state.type=STREAM (required by Airbyte 2.x, which rejects
  the LEGACY format with IllegalArgumentException)
- Add StreamStatus() method to emit STARTED/COMPLETE/INCOMPLETE traces
- Update AirbyteLogger interface and test mock accordingly
Update the read command to be fully compatible with Airbyte 2.x:

Read loop changes:
- Emit STARTED before reading each stream
- Emit COMPLETE after successful read, INCOMPLETE on error
- Replace os.Exit(1) with break on per-stream errors so remaining
  streams still get status messages
- Emit per-stream STATE (type=STREAM) after each stream completes
  instead of one global state blob at the end

State parsing changes:
- Handle Airbyte v2 per-stream state format on incremental syncs.
  Airbyte 2.x passes state back as a JSON array of per-stream state
  objects, not the legacy global SyncState blob. Without this, the
  second sync always fails because json.Unmarshal fails on the array
  format, causing os.Exit(1) before any streams are processed.
- Fall back to legacy format for backwards compatibility
- Default empty namespace to source database name to prevent state
  key mismatches
Logger tests:
- StreamState emits correct per-stream format with type=STREAM
- Multiple shards included in state output
- No legacy "data" field present (would cause LEGACY rejection)
- StreamStatus emits TRACE messages with correct status values
- JSON round-trip matches exact Airbyte protocol v2 structure

Read protocol tests:
- Read emits per-stream STATE, not legacy global state
- STARTED and COMPLETE emitted for each configured stream
- Correct message ordering: STARTED -> STATE -> COMPLETE
- Multi-shard state contains all shard cursors
- Read errors emit INCOMPLETE and skip state emission
Address review feedback:

1. Always emit StreamState after the shard loop, even on failure.
   Previously, state was only emitted when all shards succeeded. If
   shard A advanced and shard B failed, shard A's cursor was lost and
   the next retry would re-read already-synced data.

2. Return an error from the read command when any stream fails.
   The os.Exit(1) calls were replaced with break to allow other streams
   to emit proper status messages, but the command was silently exiting
   successfully. Now uses RunE so cobra surfaces the error and exits
   non-zero.

Also converts remaining os.Exit(1) calls to return errors for
consistency and testability, and adds a test for multi-shard partial
failure checkpointing.
Address the second round of review feedback on the read loop:

- On a shard error, continue to the remaining shards instead of break.
  A single failing shard no longer prevents the other shards in the
  stream from syncing (the previous break left it up to map-iteration
  order whether they ran at all).

- Persist the cursor returned by Database.Read before handling the
  error. Read returns a progress-so-far cursor alongside the error on
  a server timeout; checking the error first discarded it, causing the
  next attempt to re-read already-synced data.

- Extract the namespace/state-key logic into streamStateKeyFor so the
  read loop and both branches of readState share one implementation
  and cannot drift.

Adds tests proving each fix: TestRead_ShardErrorStillProcessesOtherShards
and TestRead_ProgressCursorPersistedOnError.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@mcrauwel mcrauwel force-pushed the mcrauwel/airbyte-v2-protocol branch from 63ba3e2 to cc30081 Compare June 18, 2026 15:53

@mhamza15 mhamza15 left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t

Comment thread cmd/airbyte-source/read.go Outdated
ch.Logger.State(syncState)
// Always emit state to checkpoint whatever progress was made,
// including partial progress when only some shards succeeded.
ch.Logger.StreamState(keyspaceOrDatabase, configuredStream.Stream.Name, syncState.Streams[streamStateKey])

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We checkpoint after the full loop is complete, whereas before we checkpointed after each shard. If we crash in the middle, we'll need to re-process the earlier shards. Should we move it back to the end of the loop?

Comment thread cmd/airbyte-source/read.go Outdated

if streamFailed {
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_INCOMPLETE)
readErr = fmt.Errorf("read failed for stream %v", streamStateKey)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We overwrite previous errors each time we find a new one, meaning only the last error survives. We may want to collect them all instead.

return messages
}

func setupReadCommand(t *testing.T, db *mockDatabase, catalogJSON string) (*bytes.Buffer, *Helper) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: catalogJSON is unused.

Comment thread cmd/airbyte-source/read.go Outdated
return syncState, err
// Try parsing as Airbyte v2 per-stream state array first
var perStreamStates []internal.AirbyteState
if err := json.Unmarshal([]byte(state), &perStreamStates); err == nil && len(perStreamStates) > 0 && perStreamStates[0].Type == internal.STATE_TYPE_STREAM {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An empty array would fail this check len(perStreamStates) > 0 and fall through to the legacy format, which will fail the unmarshal as well, when instead we should be initializing the shards from scratch.

Comment thread cmd/airbyte-source/read.go Outdated
return syncState, err
// Try parsing as Airbyte v2 per-stream state array first
var perStreamStates []internal.AirbyteState
if err := json.Unmarshal([]byte(state), &perStreamStates); err == nil && len(perStreamStates) > 0 && perStreamStates[0].Type == internal.STATE_TYPE_STREAM {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codex identified one suggestion. it seems legit to me.

Suggested change
if err := json.Unmarshal([]byte(state), &perStreamStates); err == nil && len(perStreamStates) > 0 && perStreamStates[0].Type == internal.STATE_TYPE_STREAM {
if err := json.Unmarshal([]byte(state), &perStreamStates); err == nil && (len(perStreamStates) == 0 || perStreamStates[0].Type == internal.STATE_TYPE_STREAM) {

Airbyte v2 state is represented as an array of state messages, and an empty array ([]) is valid when there are no stream checkpoints yet. The current condition only accepts v2 state when len(perStreamStates) > 0, so [] falls through to legacy SyncState parsing and fails because legacy state expects a JSON object.

Proposal: treat a successfully decoded empty v2 array as v2 state, then let the existing stream loop initialize fresh cursors.

- Checkpoint per-shard instead of once after the shard loop, so a crash
  mid-stream doesn't discard the progress of shards that already
  completed (mhamza15).
- Accumulate per-stream read errors with errors.Join instead of letting
  each failing stream overwrite the last, so no failure is lost (mhamza15).
- Treat an empty Airbyte v2 state array ("[]") as valid v2 state instead
  of falling through to the legacy object parser, which errors on an
  array; the stream loop then initializes fresh cursors (maxenglander,
  mhamza15). Adds TestReadState_EmptyV2ArrayInitializesFreshCursors,
  verified to fail without the fix.
- Drop the unused catalogJSON parameter from setupReadCommand (mhamza15).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@mcrauwel

Copy link
Copy Markdown
Member Author

Thanks for the reviews @maxenglander @mhamza15 🙏 Addressed all four points in 6e0a2f1:

  1. Checkpoint per-shard (read.go) — moved the StreamState emit inside the shard loop so a crash mid-stream no longer re-reads shards that already completed, instead of checkpointing only once after the loop.
  2. Collect all errors — per-stream failures now accumulate via errors.Join instead of each one overwriting the last.
  3. Empty v2 state array[] is now treated as valid v2 state (initializes fresh cursors) rather than falling through to the legacy object parser, which errored with cannot unmarshal array into Go value of type internal.SyncState. Took @maxenglander's suggested condition. Added TestReadState_EmptyV2ArrayInitializesFreshCursors, confirmed to fail without the fix (with exactly that unmarshal error) and pass with it.
  4. Nit — dropped the unused catalogJSON param from setupReadCommand.

The branch was also rebased onto current main (now includes the CodeQL pin from #154), so CI should be green here now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants