Skip to content

fix(proxy): strip double length-prefix on fetch/produce fan-out re-marshal#157

Open
kamir wants to merge 2 commits into
KafScale:mainfrom
kamir:fix/fetch-double-length-prefix
Open

fix(proxy): strip double length-prefix on fetch/produce fan-out re-marshal#157
kamir wants to merge 2 commits into
KafScale:mainfrom
kamir:fix/fetch-double-length-prefix

Conversation

@kamir

@kamir kamir commented Jun 15, 2026

Copy link
Copy Markdown
Collaborator

Summary

The proxy double-frames forwarded Fetch/Produce requests on the re-marshal path, so the
backend broker reads the inner size prefix as the request's apiKey/apiVersion and rejects
every request with an impossible version. This breaks the consume path whenever a fetch
fans out to more than one broker, or on any retry. One-line cause, one-line fix in two
helpers, plus a strengthened round-trip regression test.

This is the REQUEST re-marshal path (not response assembly). Built on top of #156
(92458c7); see the consume-path boundary below for how this relates to #156 and #149.

Symptom

With a multi-broker cluster, consumers get no data. Broker logs, on essentially every request:

parse request: decode Produce v544: response did not contain enough data to be valid

(Produce maxes near v11, so v544 is impossible.) Proxy logs:

fetch forward failed ... error="read frame size: EOF"
fetch forward failed ... error="decode fetch response v13: response did not contain enough data to be valid"

Root cause

forwardToBackend sends the request via protocol.WriteFrame(conn, payload), which prepends
the 4-byte Kafka message-size prefix. The fetch fan-out has two payload sources:

  • canUseOriginal (payload = originalPayload): originalPayload came from ReadFrame and is
    header+body WITHOUT the size prefix, so WriteFrame adds exactly one prefix. Correct.
  • re-marshal (payload = encodeFetchRequest(header, subReq)): encodeFetchRequest returns
    kmsg.RequestFormatter.AppendRequest(...), which ALREADY includes the 4-byte size prefix.
    WriteFrame then adds a second one, so the wire is [size][size][header][body]. The broker
    reads the outer size, then parses the inner 4-byte size as apiKey(2)+apiVersion(2) and gets
    garbage.

canUseOriginal := originalPayload != nil && len(groups) == 1, so the double-frame fires
whenever the fetch fans out to more than one broker, or on any retry iteration (where
originalPayload is set to nil). A single-broker cluster never hit it because it always used
the original prefix-less payload, which is why this was not seen earlier. encodeProduceRequest
has the identical AppendRequest + WriteFrame pattern and the same latent double-frame.

Fix

Strip the 4-byte prefix that AppendRequest writes, in both encodeFetchRequest and
encodeProduceRequest, since WriteFrame re-adds it. The request body stays framed the
same way it was; we only remove the redundant inner prefix so WriteFrame is the single
source of the size prefix:

b := formatter.AppendRequest(nil, req, header.CorrelationID)
// AppendRequest emits a 4-byte size prefix, but WriteFrame re-adds one;
// strip it here so the wire frame is single-prefixed (header+body only).
if len(b) < 4 {
    return b
}
return b[4:]

No buffer aliasing on the fan-out hot path

The fan-out encodes each broker sub-request independently. Each encodeFetchRequest /
encodeProduceRequest call does formatter.AppendRequest(nil, ...), i.e. a fresh
allocation per sub-request, and returns b[4:], a sub-slice of that same fresh buffer.
Concurrent sub-requests therefore never share or mutate one backing buffer. The
strengthened test asserts this directly: across the fan-out sub-requests no two encodings
share a backing array.

Test

TestEncodeRequestNoDoubleLengthPrefix (cmd/proxy/main_test.go) is a real round-trip:
encode -> protocol.WriteFrame -> protocol.ReadFrame -> protocol.ParseRequest, the exact
broker decode path. It was strengthened to prove the single-prefix invariant directly
rather than only checking the decoded header. For each case, after WriteFrame it asserts:

  1. the first 4 bytes equal big-endian(len(rest)), and
  2. rest (the payload the broker reads back) equals the encodeFetchRequest /
    encodeProduceRequest output byte-for-byte,

so there is provably exactly one length prefix on the wire. It then asserts ParseRequest
returns the correct apiKey/apiVersion and that the topics/partitions survive.

Cases covered:

  • single-partition fetch (v12, which still carries the topic name on the wire),
  • multi-partition / multi-topic fetch,
  • a multi-broker FAN-OUT: sub-requests are built the way groupFetchPartitionsByBroker
    builds them (one *kmsg.FetchRequest per owning broker, settings copied from the parent),
    each sub-request is encoded and asserted to frame singly and round-trip, and the no-aliasing
    check above runs across them,
  • an empty-partition fetch (no records / empty topic) still frames correctly,
  • produce (v9).

go build ./..., go vet ./cmd/proxy/..., gofmt, and go test ./cmd/proxy/... are green.
Negative-tested: reverting the b[4:] strip in both encode funcs makes every case fail with
the same decode Produce v### / insufficient bytes garbage from the symptom, then restored.

The byte-level round-trip test is the concrete runnable evidence here. A live multi-broker
capture is pending the lab coming back online; the franz-go recipe below is the manual
reproduction to run against a real cluster once it is up.

Consume-path boundary: how this relates to #156 and #149

These three are orthogonal, not redundant:

So the default-config consume fix is this PR. #156 fixes a different (EOF/restart) failure
on the same forward path, and #149 only changes behaviour once you turn the default
sync-flush off.

franz-go reproduction recipe

Lab is offline, so this is the recipe to run against a real multi-broker cluster rather than
captured output. Produce N records across a multi-partition topic, then consume the same
topic THROUGH the proxy with a franz-go (kgo) client and observe empty before the fix and N
records after, at the same topic and HWM.

// go run against KAFSCALE_PROXY_ADDR (the proxy, not a broker), franz-go v1.20.x.
package main

import (
	"context"
	"fmt"
	"strconv"

	"github.com/twmb/franz-go/pkg/kgo"
)

func main() {
	const (
		proxy = "127.0.0.1:9092" // the kafSCALE proxy endpoint
		topic = "events.er1_items"
		n     = 100
	)
	ctx := context.Background()

	// 1) Produce N records across the topic's partitions through the proxy.
	pcl, err := kgo.NewClient(kgo.SeedBrokers(proxy))
	if err != nil {
		panic(err)
	}
	for i := 0; i < n; i++ {
		rec := &kgo.Record{Topic: topic, Value: []byte("v-" + strconv.Itoa(i))}
		if r := pcl.ProduceSync(ctx, rec); r.FirstErr() != nil {
			panic(r.FirstErr())
		}
	}
	pcl.Close()

	// 2) Consume the same topic from the beginning, through the proxy.
	//    Before the fix: PollFetches returns empty / decode errors and got stays 0.
	//    After the fix:  got reaches n.
	ccl, err := kgo.NewClient(
		kgo.SeedBrokers(proxy),
		kgo.ConsumeTopics(topic),
		kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
	)
	if err != nil {
		panic(err)
	}
	defer ccl.Close()

	got := 0
	for got < n {
		fs := ccl.PollFetches(ctx)
		if errs := fs.Errors(); len(errs) > 0 {
			fmt.Printf("fetch errors (expected before fix): %v\n", errs)
		}
		fs.EachRecord(func(*kgo.Record) { got++ })
		if fs.Empty() && got == 0 {
			fmt.Println("empty fetch: reproduces the pre-fix consume-through-proxy symptom")
			break
		}
	}
	fmt.Printf("consumed %d / %d records\n", got, n)
}

A multi-broker cluster (so the fetch fans out to more than one broker) is what makes the
re-marshal path fire. Against a single broker the original prefix-less payload is used and the
bug is masked.

kamir and others added 2 commits June 14, 2026 17:02
…re-marshal

encodeFetchRequest/encodeProduceRequest returned kmsg
RequestFormatter.AppendRequest output, which already carries a 4-byte
big-endian size prefix. forwardToBackend writes that payload via
protocol.WriteFrame, which prepends its own size prefix. On any path
that re-marshals (multi-broker fan-out where canUseOriginal is false, or
a retry where originalPayload is nil'd) the on-wire frame became
[size][size][header][body]. The broker reads the outer size, then parses
the inner size bytes as apiKey/apiVersion, producing the observed
"decode Produce v###: not enough data" and a dead consume path on the
3-broker azure-sim.

The canUseOriginal branch was unaffected because originalPayload is the
prefix-less ReadFrame payload.

Fix: strip the leading 4 bytes that AppendRequest writes so WriteFrame is
the single source of the size prefix. Add a real round-trip regression
(encode -> WriteFrame -> ReadFrame -> ParseRequest, the exact broker
decode path) for both fetch (v12, carries topic name) and produce (v9);
it fails on the pre-fix code with the same family of decode garbage.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…prefix regression

Strengthen TestEncodeRequestNoDoubleLengthPrefix so it proves the
single-length-prefix invariant directly instead of only checking the decoded
header. For each case, after protocol.WriteFrame the test now asserts the
first 4 bytes equal big-endian(len(rest)) AND that rest equals the
encodeFetchRequest/encodeProduceRequest output byte-for-byte (exactly one
prefix), then ParseRequest round-trips with the correct apiKey/apiVersion.

Cases added: single-partition fetch, multi-partition fetch, a multi-broker
fan-out (sub-requests built the way groupFetchPartitionsByBroker builds them,
each encoded and asserted to frame singly), and an empty-partition fetch.
The fan-out case also asserts no two sub-request encodings share a backing
array, confirming no buffer aliasing on the hot path (each encode does a
fresh AppendRequest(nil, ...) and returns a sub-slice of that buffer).

go test ./cmd/proxy/... passes; negative-tested by reverting the b[4:] strip
in both encode funcs, which makes every case fail with the same decode
garbage, then restored.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant