Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1880,7 +1880,13 @@ func addFetchErrorForAllPartitions(resp *kmsg.FetchResponse, req *kmsg.FetchRequ
// encodeProduceRequest serializes a produce request with header into a wire frame.
func encodeProduceRequest(header *protocol.RequestHeader, req *kmsg.ProduceRequest) []byte {
formatter := kmsg.NewRequestFormatter(kmsg.FormatterClientID(clientIDStr(header.ClientID)))
return formatter.AppendRequest(nil, req, header.CorrelationID)
b := formatter.AppendRequest(nil, req, header.CorrelationID)
// AppendRequest emits a 4-byte size prefix, but WriteFrame re-adds one;
// strip it here so the wire frame is single-prefixed (header+body only).
if len(b) < 4 {
return b
}
return b[4:]
}

// parseProduceResponse deserializes a produce response from wire bytes.
Expand All @@ -1900,7 +1906,13 @@ func parseProduceResponse(data []byte, version int16) (*kmsg.ProduceResponse, er
// encodeFetchRequest serializes a fetch request with header into a wire frame.
func encodeFetchRequest(header *protocol.RequestHeader, req *kmsg.FetchRequest) []byte {
formatter := kmsg.NewRequestFormatter(kmsg.FormatterClientID(clientIDStr(header.ClientID)))
return formatter.AppendRequest(nil, req, header.CorrelationID)
b := formatter.AppendRequest(nil, req, header.CorrelationID)
// AppendRequest emits a 4-byte size prefix, but WriteFrame re-adds one;
// strip it here so the wire frame is single-prefixed (header+body only).
if len(b) < 4 {
return b
}
return b[4:]
}

// parseFetchResponse deserializes a fetch response from wire bytes.
Expand Down
306 changes: 306 additions & 0 deletions cmd/proxy/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"log/slog"
"net"
"sort"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1180,3 +1181,308 @@ func TestForwardFetchRetriesOnBackendError(t *testing.T) {
t.Fatalf("expected >=2 backend attempts (error then retry success), got %d", fb.attempts)
}
}

// assertSinglePrefixFrame drives encoded bytes through the exact wire path
// forwardToBackend uses: WriteFrame (which prepends a 4-byte big-endian size) ->
// ReadFrame -> ParseRequest (the broker decode path). It asserts the
// single-length-prefix invariant explicitly:
//
// 1. After WriteFrame, the first 4 bytes equal big-endian(len(rest)), AND
// 2. rest (the frame payload the broker reads back) equals the encode output
// byte-for-byte. Together these prove there is EXACTLY ONE size prefix on the
// wire. If the encode funcs kept AppendRequest's own prefix, the wire frame
// would be [size][size][header][body]; the outer size would then be 4 larger
// than len(encoded) and the round-tripped payload would carry a spurious
// leading 4 bytes, so both asserts catch the regression.
//
// It returns the parsed header and request for the caller to assert on.
func assertSinglePrefixFrame(t *testing.T, encoded []byte) (*protocol.RequestHeader, kmsg.Request) {
t.Helper()

var buf bytes.Buffer
if err := protocol.WriteFrame(&buf, encoded); err != nil {
t.Fatalf("WriteFrame: %v", err)
}
wire := buf.Bytes()
if len(wire) < 4 {
t.Fatalf("framed bytes too short: %d", len(wire))
}

// Assert 1: the on-wire size prefix is exactly len(encoded), not len(encoded)+4.
gotSize := binary.BigEndian.Uint32(wire[:4])
if int(gotSize) != len(encoded) {
t.Fatalf("on-wire size prefix=%d, want %d (len of encode output); a value of %d would mean a double prefix",
gotSize, len(encoded), len(encoded)+4)
}
// Assert 2: the framed payload equals the encode output byte-for-byte. This is
// the strict single-prefix proof: WriteFrame adds exactly one prefix and the
// encode output carries none of its own.
rest := wire[4:]
if !bytes.Equal(rest, encoded) {
t.Fatalf("frame payload (len %d) does not equal encode output (len %d) byte-for-byte; double prefix?",
len(rest), len(encoded))
}

frame, err := protocol.ReadFrame(&buf)
if err != nil {
t.Fatalf("ReadFrame: %v", err)
}
if !bytes.Equal(frame.Payload, encoded) {
t.Fatalf("ReadFrame payload does not match encode output byte-for-byte; double prefix?")
}
gotHeader, parsedReq, err := protocol.ParseRequest(frame.Payload)
if err != nil {
t.Fatalf("broker-path ParseRequest failed (double prefix bug?): %v", err)
}
return gotHeader, parsedReq
}

// makeFetchRequestTyped builds a fetch v12 request (the last version that carries
// the topic NAME on the wire; v13+ switches to TopicID UUIDs) with the given
// topic -> partitions layout, so the name survives a round-trip and makes a
// meaningful single-prefix assertion.
func makeFetchRequestTyped(version int16, topics map[string][]int32) *kmsg.FetchRequest {
req := kmsg.NewPtrFetchRequest()
req.SetVersion(version)
req.ReplicaID = -1
req.MaxWaitMillis = 500
req.MinBytes = 1
req.MaxBytes = 1048576
req.SessionEpoch = -1
// Sort topic names so encode output is deterministic across runs.
names := make([]string, 0, len(topics))
for name := range topics {
names = append(names, name)
}
sort.Strings(names)
for _, name := range names {
ft := kmsg.NewFetchRequestTopic()
ft.Topic = name
for _, p := range topics[name] {
fp := kmsg.NewFetchRequestTopicPartition()
fp.Partition = p
fp.FetchOffset = int64(p) * 10
fp.PartitionMaxBytes = 524288
ft.Partitions = append(ft.Partitions, fp)
}
req.Topics = append(req.Topics, ft)
}
return req
}

// TestEncodeRequestNoDoubleLengthPrefix is a real round-trip regression for the
// double-length-prefix bug on the REQUEST re-marshal path:
// encodeFetchRequest/encodeProduceRequest produce the bytes forwardToBackend
// writes via protocol.WriteFrame, which prepends a 4-byte size. If the encode
// funcs also kept AppendRequest's own size prefix, the on-wire frame would be
// [size][size][header][body]; a broker reading it via protocol.ParseRequest would
// see the inner size's bytes as apiKey/apiVersion (the observed "decode Produce
// v###: not enough data"). Every sub-test pushes the encoded bytes through
// WriteFrame -> ReadFrame -> ParseRequest exactly as the broker would, and asserts
// a SINGLE length prefix (see assertSinglePrefixFrame) plus that the header and
// topics survive intact.
func TestEncodeRequestNoDoubleLengthPrefix(t *testing.T) {
clientID := "proxy-test"
fetchHeader := func(corr int32) *protocol.RequestHeader {
return &protocol.RequestHeader{
APIKey: protocol.APIKeyFetch,
APIVersion: 12,
CorrelationID: corr,
ClientID: &clientID,
}
}

t.Run("fetch_single_partition", func(t *testing.T) {
req := makeFetchRequestTyped(12, map[string][]int32{"events.er1_items": {0}})
header := fetchHeader(1)

gotHeader, parsedReq := assertSinglePrefixFrame(t, encodeFetchRequest(header, req))

if gotHeader.APIKey != protocol.APIKeyFetch || gotHeader.APIVersion != 12 || gotHeader.CorrelationID != 1 {
t.Fatalf("header garbled: key=%d ver=%d corr=%d", gotHeader.APIKey, gotHeader.APIVersion, gotHeader.CorrelationID)
}
fr := parsedReq.(*kmsg.FetchRequest)
if len(fr.Topics) != 1 || fr.Topics[0].Topic != "events.er1_items" {
t.Fatalf("topics: %+v", fr.Topics)
}
if len(fr.Topics[0].Partitions) != 1 {
t.Fatalf("partitions: want 1 got %d", len(fr.Topics[0].Partitions))
}
})

t.Run("fetch_multi_partition", func(t *testing.T) {
req := makeFetchRequestTyped(12, map[string][]int32{
"events.er1_items": {0, 1, 2},
"citizen.audit.queries": {0, 1, 2},
})
header := fetchHeader(4242)

gotHeader, parsedReq := assertSinglePrefixFrame(t, encodeFetchRequest(header, req))

if gotHeader.APIVersion != 12 {
t.Fatalf("apiVersion: want 12 got %d (double prefix garbles version)", gotHeader.APIVersion)
}
if gotHeader.CorrelationID != 4242 {
t.Fatalf("correlationID: want 4242 got %d", gotHeader.CorrelationID)
}
fr := parsedReq.(*kmsg.FetchRequest)
if len(fr.Topics) != 2 {
t.Fatalf("topics: want 2 got %d", len(fr.Topics))
}
for _, ft := range fr.Topics {
if len(ft.Partitions) != 3 {
t.Fatalf("topic %q partitions: want 3 got %d", ft.Topic, len(ft.Partitions))
}
}
})

t.Run("fetch_empty_partitions", func(t *testing.T) {
// An empty topic / no records still produces a well-formed fetch sub-request
// with zero partitions; it must frame singly too. This is the shape a fetch
// against an empty topic takes after groupFetchPartitionsByBroker filtering.
req := makeFetchRequestTyped(12, map[string][]int32{"events.er1_items": {}})
header := fetchHeader(7)

gotHeader, parsedReq := assertSinglePrefixFrame(t, encodeFetchRequest(header, req))

if gotHeader.APIVersion != 12 {
t.Fatalf("apiVersion: want 12 got %d", gotHeader.APIVersion)
}
fr := parsedReq.(*kmsg.FetchRequest)
if len(fr.Topics) != 1 || len(fr.Topics[0].Partitions) != 0 {
t.Fatalf("want 1 topic with 0 partitions, got %+v", fr.Topics)
}
})

t.Run("fetch_multi_broker_fanout", func(t *testing.T) {
// Reproduce the canUseOriginal == false hot path: groupFetchPartitionsByBroker
// splits a fetch into one sub-request per owning broker, each of which is
// independently encodeFetchRequest'd and WriteFrame'd. With no router every
// partition lands under the round-robin group, so to exercise a genuine
// multi-broker split we build the sub-requests the same way the grouper does
// (one *kmsg.FetchRequest per broker, settings copied from the parent) and
// assert EACH sub-request frames singly and round-trips.
full := makeFetchRequestTyped(12, map[string][]int32{
"events.er1_items": {0, 1, 2, 3},
"citizen.audit.queries": {0, 1},
})
// Owner map mimicking three brokers owning disjoint partitions.
owner := func(topic string, part int32) string {
return []string{"broker-0", "broker-1", "broker-2"}[int(part)%3]
}
subReqs := splitFetchByOwnerForTest(full, owner)
if len(subReqs) != 3 {
t.Fatalf("expected 3 broker sub-requests, got %d", len(subReqs))
}

header := fetchHeader(31337)
encodings := make([][]byte, 0, len(subReqs))
for addr, sub := range subReqs {
enc := encodeFetchRequest(header, sub)
encodings = append(encodings, enc)
gotHeader, parsedReq := assertSinglePrefixFrame(t, enc)
if gotHeader.APIVersion != 12 {
t.Fatalf("broker %s sub-request apiVersion: want 12 got %d", addr, gotHeader.APIVersion)
}
fr, ok := parsedReq.(*kmsg.FetchRequest)
if !ok {
t.Fatalf("broker %s: parsed %T, not *kmsg.FetchRequest", addr, parsedReq)
}
if len(fr.Topics) == 0 {
t.Fatalf("broker %s sub-request has no topics", addr)
}
}

// No buffer aliasing on the fan-out hot path: each encodeFetchRequest calls
// AppendRequest(nil, ...) (a fresh allocation) and returns b[4:] (a sub-slice
// of that fresh buffer), so distinct sub-requests must not share backing
// storage. Distinct lengths or distinct backing arrays both prove this; we
// assert no two sub-request encodings alias the same backing array.
for i := 0; i < len(encodings); i++ {
for j := i + 1; j < len(encodings); j++ {
if len(encodings[i]) > 0 && len(encodings[j]) > 0 &&
&encodings[i][0] == &encodings[j][0] {
t.Fatalf("fan-out buffer aliasing: sub-request %d and %d share backing array", i, j)
}
}
}
})

t.Run("produce", func(t *testing.T) {
header := &protocol.RequestHeader{
APIKey: protocol.APIKeyProduce,
APIVersion: 9,
CorrelationID: 99,
ClientID: &clientID,
}
req := kmsg.NewPtrProduceRequest()
req.SetVersion(9)
req.TimeoutMillis = 1000
for _, topicName := range []string{"events.er1_items", "citizen.audit.queries"} {
pt := kmsg.NewProduceRequestTopic()
pt.Topic = topicName
for p := int32(0); p < 2; p++ {
pp := kmsg.NewProduceRequestTopicPartition()
pp.Partition = p
pp.Records = []byte{0x01, 0x02, 0x03}
pt.Partitions = append(pt.Partitions, pp)
}
req.Topics = append(req.Topics, pt)
}

gotHeader, parsedReq := assertSinglePrefixFrame(t, encodeProduceRequest(header, req))

if gotHeader.APIKey != protocol.APIKeyProduce {
t.Fatalf("apiKey: want %d got %d (double prefix)", protocol.APIKeyProduce, gotHeader.APIKey)
}
if gotHeader.APIVersion != 9 {
t.Fatalf("apiVersion: want 9 got %d (double prefix garbles version)", gotHeader.APIVersion)
}
pr := parsedReq.(*kmsg.ProduceRequest)
if len(pr.Topics) != 2 {
t.Fatalf("topics: want 2 got %d", len(pr.Topics))
}
if pr.Topics[1].Topic != "citizen.audit.queries" {
t.Fatalf("topic[1]: want citizen.audit.queries got %q", pr.Topics[1].Topic)
}
})
}

// splitFetchByOwnerForTest mirrors how groupFetchPartitionsByBroker builds one
// *kmsg.FetchRequest per owning broker (settings copied from the parent, topics
// and partitions distributed by owner). It is owner-map-driven so the test does
// not need a live etcd-backed PartitionRouter to exercise a multi-broker fan-out.
func splitFetchByOwnerForTest(req *kmsg.FetchRequest, owner func(topic string, part int32) string) map[string]*kmsg.FetchRequest {
groups := make(map[string]*kmsg.FetchRequest)
topicIdx := make(map[string]map[string]int)
for _, topic := range req.Topics {
for _, part := range topic.Partitions {
addr := owner(topic.Topic, part.Partition)
sub, ok := groups[addr]
if !ok {
sub = &kmsg.FetchRequest{
Version: req.Version,
ReplicaID: req.ReplicaID,
MaxWaitMillis: req.MaxWaitMillis,
MinBytes: req.MinBytes,
MaxBytes: req.MaxBytes,
IsolationLevel: req.IsolationLevel,
SessionID: req.SessionID,
SessionEpoch: req.SessionEpoch,
}
groups[addr] = sub
topicIdx[addr] = make(map[string]int)
}
idx, ok := topicIdx[addr][topic.Topic]
if !ok {
idx = len(sub.Topics)
st := kmsg.NewFetchRequestTopic()
st.Topic = topic.Topic
sub.Topics = append(sub.Topics, st)
topicIdx[addr][topic.Topic] = idx
}
sub.Topics[idx].Partitions = append(sub.Topics[idx].Partitions, part)
}
}
return groups
}
Loading