Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
18a25bd
fix(kv): absorb leader churn server-side so gRPC stays linearizable
claude Apr 24, 2026
52ec37c
style(adapter): drop trailing blank line to satisfy gci
claude Apr 24, 2026
24b09cc
refactor: address gemini-code-assist review comments
claude Apr 24, 2026
31ca8bd
fix(kv): refresh StartTS and honour ctx on Dispatch retry
claude Apr 24, 2026
fead00e
refactor(kv): extract Dispatch retry helpers to satisfy cyclop
claude Apr 24, 2026
e53e743
fix(kv): classify wire-level not-leader errors + add retry-loop tests
claude Apr 24, 2026
88a08bd
style(kv): appease golangci on coordinator_retry_test
claude Apr 24, 2026
4f10504
fix(kv): enforce retry-budget strictly in Dispatch and LeaderProxy
claude Apr 24, 2026
3b136d5
fix(kv): cap each dispatchOnce attempt by remaining retry budget
claude Apr 24, 2026
afd8e30
fix(kv): never discard a committed dispatch; defensive nil-err + nits
claude Apr 24, 2026
db58eef
refactor(kv): extract retry helpers to satisfy cyclop on Dispatch + f…
claude Apr 24, 2026
7ce64f5
fix(kv): classify leadership-loss/transfer wire strings as transient
claude Apr 24, 2026
e410490
fix(kv): tighten transient-leader phrase match to suffix only
claude Apr 24, 2026
640f009
fix(kv): enforce retry budget strictly across the back-off sleep
claude Apr 24, 2026
846125b
style(kv): drop gosec G115 + cyclop violations from retry back-off
claude Apr 24, 2026
29e1f2e
fix(kv): surface transient leader error on retry-budget expiry
claude Apr 24, 2026
26f086f
docs(kv): sync isTransientLeaderError doc + stabilise LeaderProxy tes…
claude Apr 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 56 additions & 55 deletions adapter/grpc_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package adapter

import (
"bytes"
"context"
"fmt"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -152,60 +150,36 @@ func Test_consistency_satisfy_write_after_read_for_parallel(t *testing.T) {
t.Parallel()
nodes, adders, _ := createNode(t, 3)
c := rawKVClient(t, adders)
defer shutdown(nodes)

// 1000 concurrent clients × 3 RPCs saturates the single raft leader
// hard enough to provoke brief quorum checks to fail on CI, so retry
// transient leader-unavailable errors. The *Eventually helpers are
// intentionally NOT used here: they end in require.NoError, and
// require calls t.FailNow() which must run on the main test goroutine.
// Workers use retryNotLeader + an errors channel instead so all
// require/assert calls happen on the main goroutine after wg.Wait().
ctx := context.Background()
const workers = 1000
errCh := make(chan error, workers)
wg := sync.WaitGroup{}
const workers = 1000
wg.Add(workers)
for i := range workers {
go func(i int) {
defer wg.Done()
key := []byte("test-key-parallel" + strconv.Itoa(i))
want := []byte(strconv.Itoa(i))
put := func() error {
_, err := c.RawPut(ctx, &pb.RawPutRequest{Key: key, Value: want})
return err
}
if err := retryNotLeader(ctx, put); err != nil {
errCh <- err
_, err := c.RawPut(
context.Background(),
&pb.RawPutRequest{Key: key, Value: want},
)
if !assert.NoError(t, err, "Put RPC failed") {
return
}
if err := retryNotLeader(ctx, put); err != nil {
errCh <- err
_, err = c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: want})
if !assert.NoError(t, err, "Put RPC failed") {
return
}
var resp *pb.RawGetResponse
err := retryNotLeader(ctx, func() error {
r, err := c.RawGet(ctx, &pb.RawGetRequest{Key: key})
if err != nil {
return err
}
resp = r
return nil
})
if err != nil {
errCh <- err

resp, err := c.RawGet(context.Background(), &pb.RawGetRequest{Key: key})
if !assert.NoError(t, err, "Get RPC failed") {
return
}
if !bytes.Equal(want, resp.Value) {
errCh <- fmt.Errorf("consistency check failed for key %s: want %q got %q", key, want, resp.Value)
}
assert.Equal(t, want, resp.Value, "consistency check failed")
}(i)
Comment on lines 159 to 180
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The wg.Done() call should be deferred at the beginning of the goroutine to ensure it is always executed, even if a panic occurs or if early returns are added in the future. Additionally, the use of context.TODO() is inconsistent with context.Background() used earlier in the same test.

Suggested change
go func(i int) {
defer wg.Done()
key := []byte("test-key-parallel" + strconv.Itoa(i))
want := []byte(strconv.Itoa(i))
put := func() error {
_, err := c.RawPut(ctx, &pb.RawPutRequest{Key: key, Value: want})
return err
}
if err := retryNotLeader(ctx, put); err != nil {
errCh <- err
return
}
if err := retryNotLeader(ctx, put); err != nil {
errCh <- err
return
}
var resp *pb.RawGetResponse
err := retryNotLeader(ctx, func() error {
r, err := c.RawGet(ctx, &pb.RawGetRequest{Key: key})
if err != nil {
return err
}
resp = r
return nil
})
if err != nil {
errCh <- err
return
}
if !bytes.Equal(want, resp.Value) {
errCh <- fmt.Errorf("consistency check failed for key %s: want %q got %q", key, want, resp.Value)
}
_, err := c.RawPut(
context.Background(),
&pb.RawPutRequest{Key: key, Value: want},
)
assert.NoError(t, err, "Put RPC failed")
_, err = c.RawPut(context.TODO(), &pb.RawPutRequest{Key: key, Value: want})
assert.NoError(t, err, "Put RPC failed")
resp, err := c.RawGet(context.TODO(), &pb.RawGetRequest{Key: key})
assert.NoError(t, err, "Get RPC failed")
assert.Equal(t, want, resp.Value, "consistency check failed")
wg.Done()
}(i)
go func(i int) {
defer wg.Done()
key := []byte("test-key-parallel" + strconv.Itoa(i))
want := []byte(strconv.Itoa(i))
_, err := c.RawPut(
context.Background(),
&pb.RawPutRequest{Key: key, Value: want},
)
assert.NoError(t, err, "Put RPC failed")
_, err = c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: want})
assert.NoError(t, err, "Put RPC failed")
resp, err := c.RawGet(context.Background(), &pb.RawGetRequest{Key: key})
assert.NoError(t, err, "Get RPC failed")
assert.Equal(t, want, resp.Value, "consistency check failed")
}(i)

}
wg.Wait()
close(errCh)
for err := range errCh {
assert.NoError(t, err)
}
shutdown(nodes)
}

func Test_consistency_satisfy_write_after_read_sequence(t *testing.T) {
Expand All @@ -216,18 +190,30 @@ func Test_consistency_satisfy_write_after_read_sequence(t *testing.T) {

key := []byte("test-key-sequence")

// Use *Eventually helpers because a 9999-iteration loop across three
// t.Parallel adapter tests loads CI enough that the raft leader can
// briefly lose quorum and step down mid-run, surfacing as transient
// "not leader" / "leader not found" RPC errors. The helpers retry
// only those transient errors; any other error still fails the test.
ctx := context.Background()
for i := range 9999 {
want := []byte("sequence" + strconv.Itoa(i))
rawPutEventually(t, ctx, c, &pb.RawPutRequest{Key: key, Value: want})
rawPutEventually(t, ctx, c, &pb.RawPutRequest{Key: key, Value: want})
_, err := c.RawPut(
context.Background(),
&pb.RawPutRequest{Key: key, Value: want},
)
// Stop at the first RPC failure instead of continuing: a
// genuine regression would otherwise cascade into 9998 more
// iterations, each reporting the same broken invariant, and
// drown the real cause in test-output noise.
if !assert.NoError(t, err, "Put RPC failed") {
break
}

_, err = c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: want})
if !assert.NoError(t, err, "Put RPC failed") {
break
}

resp, err := c.RawGet(context.Background(), &pb.RawGetRequest{Key: key})
if !assert.NoError(t, err, "Get RPC failed") {
break
}

resp := rawGetEventually(t, ctx, c, &pb.RawGetRequest{Key: key})
assert.Equal(t, want, resp.Value, "consistency check failed")
}
}
Expand All @@ -240,18 +226,33 @@ func Test_grpc_transaction(t *testing.T) {

key := []byte("test-key-sequence")

// See Test_consistency_satisfy_write_after_read_sequence for why the
// *Eventually helpers are necessary here.
ctx := context.Background()
for i := range 9999 {
want := []byte("sequence" + strconv.Itoa(i))
txnPutEventually(t, ctx, c, &pb.PutRequest{Key: key, Value: want})
resp := txnGetEventually(t, ctx, c, &pb.GetRequest{Key: key})
_, err := c.Put(
context.Background(),
&pb.PutRequest{Key: key, Value: want},
)
// See Test_consistency_satisfy_write_after_read_sequence:
// break on first RPC failure so a single broken invariant
// does not amplify into thousands of assertion lines.
if !assert.NoError(t, err, "Put RPC failed") {
break
}
resp, err := c.Get(context.Background(), &pb.GetRequest{Key: key})
if !assert.NoError(t, err, "Get RPC failed") {
break
}
assert.Equal(t, want, resp.Value, "consistency check failed")

txnDeleteEventually(t, ctx, c, &pb.DeleteRequest{Key: key})
_, err = c.Delete(context.Background(), &pb.DeleteRequest{Key: key})
if !assert.NoError(t, err, "Delete RPC failed") {
break
}

resp = txnGetEventually(t, ctx, c, &pb.GetRequest{Key: key})
resp, err = c.Get(context.Background(), &pb.GetRequest{Key: key})
if !assert.NoError(t, err, "Get RPC failed") {
break
}
assert.Nil(t, resp.Value, "consistency check failed")
}
}
Expand Down
61 changes: 0 additions & 61 deletions adapter/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,64 +606,3 @@ func lpushEventually(t *testing.T, ctx context.Context, rdb *redis.Client, key s
return rdb.LPush(ctx, key, vals...).Err()
})
}

// rawPutEventually wraps RawKV.RawPut in doEventually so transient leader
// churn (either at startup or in the middle of a long-running loop) does
// not fail the test with "not leader" / "leader not found".
func rawPutEventually(t *testing.T, ctx context.Context, c pb.RawKVClient, req *pb.RawPutRequest) {
t.Helper()
doEventually(t, func() error {
_, err := c.RawPut(ctx, req)
return err
})
}

// rawGetEventually wraps RawKV.RawGet in doEventually and returns the
// response only after a successful (non-"not leader") call.
func rawGetEventually(t *testing.T, ctx context.Context, c pb.RawKVClient, req *pb.RawGetRequest) *pb.RawGetResponse {
t.Helper()
var resp *pb.RawGetResponse
doEventually(t, func() error {
r, err := c.RawGet(ctx, req)
if err != nil {
return err
}
resp = r
return nil
})
return resp
}

// txnPutEventually wraps TransactionalKV.Put in doEventually.
func txnPutEventually(t *testing.T, ctx context.Context, c pb.TransactionalKVClient, req *pb.PutRequest) {
t.Helper()
doEventually(t, func() error {
_, err := c.Put(ctx, req)
return err
})
}

// txnGetEventually wraps TransactionalKV.Get in doEventually and returns the
// response only after a successful (non-"not leader") call.
func txnGetEventually(t *testing.T, ctx context.Context, c pb.TransactionalKVClient, req *pb.GetRequest) *pb.GetResponse {
t.Helper()
var resp *pb.GetResponse
doEventually(t, func() error {
r, err := c.Get(ctx, req)
if err != nil {
return err
}
resp = r
return nil
})
return resp
}

// txnDeleteEventually wraps TransactionalKV.Delete in doEventually.
func txnDeleteEventually(t *testing.T, ctx context.Context, c pb.TransactionalKVClient, req *pb.DeleteRequest) {
t.Helper()
doEventually(t, func() error {
_, err := c.Delete(ctx, req)
return err
})
}
Loading
Loading