From 18a25bdb39c8f1e6c5a109e64dc61ee46602b66f Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 11:12:02 +0000 Subject: [PATCH 01/17] fix(kv): absorb leader churn server-side so gRPC stays linearizable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #614 worked around mid-test leader churn by retrying "not leader" / "leader not found" RPC errors at the test client. That weakens the gRPC API contract: a linearizable client expects every call to either commit atomically or fail definitively, not to leak raft-internal re-election windows out as transient errors the caller has to retry. Move the retry into the coordinator instead so the gRPC layer stays linearizable from the client's perspective: - Coordinate.Dispatch wraps the per-attempt path in a bounded retry loop (5s budget, 25ms poll). Transient errors detected via isTransientLeaderError (ErrLeaderNotFound or any of the etcd/raft leadership-loss sentinels via isLeadershipLossError) are absorbed; business-logic errors surface unchanged. StartTS issuance moves inside the per-attempt path so a new leader's HLC mints a fresh timestamp that floors above any committed physical-ceiling lease. - LeaderProxy.forwardWithRetry no longer bails out immediately on ErrLeaderNotFound; it polls leaderProxyRetryBudget while the leader re-publishes, matching the coordinator's behaviour. Revert the test-side workarounds now that the server hides the churn: - Restore the three consistency tests to call gRPC directly with assert.NoError, the way they verify linearizability on the wire. - Drop rawPutEventually / rawGetEventually / txnPutEventually / txnGetEventually / txnDeleteEventually from adapter/test_util.go. The Redis rpushEventually / lpushEventually helpers stay in place (separate scope; the Redis adapter's leader handling is unchanged). Test plan - go vet ./... — clean - go build ./... — clean - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 218s - go test -race -count=1 ./kv/ — ok 2.1s - go test -race -count=1 ./adapter/ — ok 282s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap --- adapter/grpc_test.go | 99 ++++++++++++++++---------------------------- adapter/test_util.go | 60 --------------------------- kv/coordinator.go | 82 ++++++++++++++++++++++++++++++++++++ kv/leader_proxy.go | 54 ++++++++++++++++++------ 4 files changed, 160 insertions(+), 135 deletions(-) diff --git a/adapter/grpc_test.go b/adapter/grpc_test.go index d9597472..eb7adade 100644 --- a/adapter/grpc_test.go +++ b/adapter/grpc_test.go @@ -1,9 +1,7 @@ package adapter import ( - "bytes" "context" - "fmt" "strconv" "sync" "testing" @@ -153,58 +151,27 @@ func Test_consistency_satisfy_write_after_read_for_parallel(t *testing.T) { nodes, adders, _ := createNode(t, 3) c := rawKVClient(t, adders) - // 1000 concurrent clients × 3 RPCs saturates the single raft leader - // hard enough to provoke brief quorum checks to fail on CI, so retry - // transient leader-unavailable errors. The *Eventually helpers are - // intentionally NOT used here: they end in require.NoError, and - // require calls t.FailNow() which must run on the main test goroutine. - // Workers use retryNotLeader + an errors channel instead so all - // require/assert calls happen on the main goroutine after wg.Wait(). - ctx := context.Background() - const workers = 1000 - errCh := make(chan error, workers) wg := sync.WaitGroup{} - wg.Add(workers) - for i := range workers { + wg.Add(1000) + for i := range 1000 { go func(i int) { - defer wg.Done() key := []byte("test-key-parallel" + strconv.Itoa(i)) want := []byte(strconv.Itoa(i)) - put := func() error { - _, err := c.RawPut(ctx, &pb.RawPutRequest{Key: key, Value: want}) - return err - } - if err := retryNotLeader(ctx, put); err != nil { - errCh <- err - return - } - if err := retryNotLeader(ctx, put); err != nil { - errCh <- err - return - } - var resp *pb.RawGetResponse - err := retryNotLeader(ctx, func() error { - r, err := c.RawGet(ctx, &pb.RawGetRequest{Key: key}) - if err != nil { - return err - } - resp = r - return nil - }) - if err != nil { - errCh <- err - return - } - if !bytes.Equal(want, resp.Value) { - errCh <- fmt.Errorf("consistency check failed for key %s: want %q got %q", key, want, resp.Value) - } + _, err := c.RawPut( + context.Background(), + &pb.RawPutRequest{Key: key, Value: want}, + ) + assert.NoError(t, err, "Put RPC failed") + _, err = c.RawPut(context.TODO(), &pb.RawPutRequest{Key: key, Value: want}) + assert.NoError(t, err, "Put RPC failed") + + resp, err := c.RawGet(context.TODO(), &pb.RawGetRequest{Key: key}) + assert.NoError(t, err, "Get RPC failed") + assert.Equal(t, want, resp.Value, "consistency check failed") + wg.Done() }(i) } wg.Wait() - close(errCh) - for err := range errCh { - assert.NoError(t, err) - } shutdown(nodes) } @@ -216,18 +183,20 @@ func Test_consistency_satisfy_write_after_read_sequence(t *testing.T) { key := []byte("test-key-sequence") - // Use *Eventually helpers because a 9999-iteration loop across three - // t.Parallel adapter tests loads CI enough that the raft leader can - // briefly lose quorum and step down mid-run, surfacing as transient - // "not leader" / "leader not found" RPC errors. The helpers retry - // only those transient errors; any other error still fails the test. - ctx := context.Background() for i := range 9999 { want := []byte("sequence" + strconv.Itoa(i)) - rawPutEventually(t, ctx, c, &pb.RawPutRequest{Key: key, Value: want}) - rawPutEventually(t, ctx, c, &pb.RawPutRequest{Key: key, Value: want}) + _, err := c.RawPut( + context.Background(), + &pb.RawPutRequest{Key: key, Value: want}, + ) + assert.NoError(t, err, "Put RPC failed") + + _, err = c.RawPut(context.TODO(), &pb.RawPutRequest{Key: key, Value: want}) + assert.NoError(t, err, "Put RPC failed") + + resp, err := c.RawGet(context.TODO(), &pb.RawGetRequest{Key: key}) + assert.NoError(t, err, "Get RPC failed") - resp := rawGetEventually(t, ctx, c, &pb.RawGetRequest{Key: key}) assert.Equal(t, want, resp.Value, "consistency check failed") } } @@ -240,18 +209,22 @@ func Test_grpc_transaction(t *testing.T) { key := []byte("test-key-sequence") - // See Test_consistency_satisfy_write_after_read_sequence for why the - // *Eventually helpers are necessary here. - ctx := context.Background() for i := range 9999 { want := []byte("sequence" + strconv.Itoa(i)) - txnPutEventually(t, ctx, c, &pb.PutRequest{Key: key, Value: want}) - resp := txnGetEventually(t, ctx, c, &pb.GetRequest{Key: key}) + _, err := c.Put( + context.Background(), + &pb.PutRequest{Key: key, Value: want}, + ) + assert.NoError(t, err, "Put RPC failed") + resp, err := c.Get(context.TODO(), &pb.GetRequest{Key: key}) + assert.NoError(t, err, "Get RPC failed") assert.Equal(t, want, resp.Value, "consistency check failed") - txnDeleteEventually(t, ctx, c, &pb.DeleteRequest{Key: key}) + _, err = c.Delete(context.TODO(), &pb.DeleteRequest{Key: key}) + assert.NoError(t, err, "Delete RPC failed") - resp = txnGetEventually(t, ctx, c, &pb.GetRequest{Key: key}) + resp, err = c.Get(context.TODO(), &pb.GetRequest{Key: key}) + assert.NoError(t, err, "Get RPC failed") assert.Nil(t, resp.Value, "consistency check failed") } } diff --git a/adapter/test_util.go b/adapter/test_util.go index 951698d9..df345f68 100644 --- a/adapter/test_util.go +++ b/adapter/test_util.go @@ -607,63 +607,3 @@ func lpushEventually(t *testing.T, ctx context.Context, rdb *redis.Client, key s }) } -// rawPutEventually wraps RawKV.RawPut in doEventually so transient leader -// churn (either at startup or in the middle of a long-running loop) does -// not fail the test with "not leader" / "leader not found". -func rawPutEventually(t *testing.T, ctx context.Context, c pb.RawKVClient, req *pb.RawPutRequest) { - t.Helper() - doEventually(t, func() error { - _, err := c.RawPut(ctx, req) - return err - }) -} - -// rawGetEventually wraps RawKV.RawGet in doEventually and returns the -// response only after a successful (non-"not leader") call. -func rawGetEventually(t *testing.T, ctx context.Context, c pb.RawKVClient, req *pb.RawGetRequest) *pb.RawGetResponse { - t.Helper() - var resp *pb.RawGetResponse - doEventually(t, func() error { - r, err := c.RawGet(ctx, req) - if err != nil { - return err - } - resp = r - return nil - }) - return resp -} - -// txnPutEventually wraps TransactionalKV.Put in doEventually. -func txnPutEventually(t *testing.T, ctx context.Context, c pb.TransactionalKVClient, req *pb.PutRequest) { - t.Helper() - doEventually(t, func() error { - _, err := c.Put(ctx, req) - return err - }) -} - -// txnGetEventually wraps TransactionalKV.Get in doEventually and returns the -// response only after a successful (non-"not leader") call. -func txnGetEventually(t *testing.T, ctx context.Context, c pb.TransactionalKVClient, req *pb.GetRequest) *pb.GetResponse { - t.Helper() - var resp *pb.GetResponse - doEventually(t, func() error { - r, err := c.Get(ctx, req) - if err != nil { - return err - } - resp = r - return nil - }) - return resp -} - -// txnDeleteEventually wraps TransactionalKV.Delete in doEventually. -func txnDeleteEventually(t *testing.T, ctx context.Context, c pb.TransactionalKVClient, req *pb.DeleteRequest) { - t.Helper() - doEventually(t, func() error { - _, err := c.Delete(ctx, req) - return err - }) -} diff --git a/kv/coordinator.go b/kv/coordinator.go index 4648314e..45659cc7 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -16,6 +16,23 @@ import ( const redirectForwardTimeout = 5 * time.Second +// dispatchLeaderRetryBudget bounds how long Dispatch keeps absorbing +// transient leader-unavailable errors (no leader resolvable yet, local +// node just stepped down, forwarded RPC bounced off a stale leader). +// gRPC callers expect linearizable semantics — i.e. an operation either +// commits atomically or fails definitively — so the coordinator hides +// raft-internal leader churn behind a bounded retry instead of leaking +// "not leader" / "leader not found" errors out through the API. +// +// The budget is large enough to cover one or two complete re-elections +// even on a slow runner (etcd/raft randomised election timeout up to +// ~1s), and small enough that a permanent loss of quorum still surfaces +// to the caller in bounded time. +const dispatchLeaderRetryBudget = 5 * time.Second + +// dispatchLeaderRetryInterval is the poll interval between retries. +const dispatchLeaderRetryInterval = 25 * time.Millisecond + // hlcPhysicalWindowMs is the duration in milliseconds that the Raft-agreed // physical ceiling extends ahead of the current wall clock. Modelled after // TiDB's TSO 3-second window: the leader commits ceiling = now + window, and @@ -225,10 +242,48 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C } // Validate the request before any use to avoid panics on malformed input. + // Validation errors are not retryable, so do this once outside the loop. if err := validateOperationGroup(reqs); err != nil { return nil, err } + // Wrap the actual dispatch in a bounded retry loop so that transient + // leader-unavailable errors (no leader resolvable yet, local node just + // stepped down, forwarded RPC bounced off a stale leader) are absorbed + // inside the coordinator instead of leaking out through the gRPC API. + // The gRPC contract is linearizable: a single client call either + // commits atomically or returns a definitive error. "Leader not found" + // during a re-election is neither, so we wait briefly for the cluster + // to re-stabilise. Non-leader errors that exceed the retry budget are + // surfaced unchanged for callers to observe. + deadline := time.Now().Add(dispatchLeaderRetryBudget) + var lastResp *CoordinateResponse + var lastErr error + for { + lastResp, lastErr = c.dispatchOnce(ctx, reqs) + if lastErr == nil || !isTransientLeaderError(lastErr) { + return lastResp, lastErr + } + if !time.Now().Before(deadline) { + return lastResp, lastErr + } + select { + case <-ctx.Done(): + return lastResp, lastErr + case <-time.After(dispatchLeaderRetryInterval): + } + } +} + +// dispatchOnce runs a single Dispatch attempt without retry. It is the +// transactional unit retried by Dispatch on transient leader errors. +// +// StartTS issuance is intentionally inside the per-attempt path: if a +// previous attempt was rejected by a stale leader, the new leader's +// HLC must mint a fresh StartTS so it floors above any committed +// physical-ceiling lease. Re-using the previous StartTS could violate +// monotonicity across the leader transition. +func (c *Coordinate) dispatchOnce(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error) { if !c.IsLeader() { return c.redirect(ctx, reqs) } @@ -268,6 +323,33 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C return resp, err } +// isTransientLeaderError reports whether err is a transient +// leader-unavailable signal worth retrying inside Dispatch. +// +// Two distinct conditions qualify: +// - ErrLeaderNotFound — no leader address resolvable on this node yet +// (election in progress, or the previous leader stepped down and the +// successor has not propagated). Recoverable as soon as a new leader +// publishes its identity. +// - isLeadershipLossError — the etcd/raft engine rejected a Propose +// because it just lost leadership (ErrNotLeader / ErrLeadershipLost +// / ErrLeadershipTransferInProgress). Recoverable by re-routing +// through the new leader (redirect path). +// +// Business-logic failures (write conflict, validation, etc.) are NOT +// covered here — those must surface to the caller unchanged so client +// retry logic can distinguish "system was unavailable" from "your write +// was rejected on its merits". +func isTransientLeaderError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, ErrLeaderNotFound) { + return true + } + return isLeadershipLossError(err) +} + // refreshLeaseAfterDispatch extends the lease only when the dispatch // produced a real Raft commit. CommitIndex == 0 means the underlying // transaction manager short-circuited (empty-input Commit, no-op diff --git a/kv/leader_proxy.go b/kv/leader_proxy.go index 3e675cd5..877ecdd3 100644 --- a/kv/leader_proxy.go +++ b/kv/leader_proxy.go @@ -13,6 +13,16 @@ import ( const leaderForwardTimeout = 5 * time.Second const maxForwardRetries = 3 +// leaderProxyRetryBudget bounds how long forwardWithRetry keeps polling +// for a leader address while none is published yet. gRPC callers expect +// linearizable semantics, so the proxy hides brief re-election windows +// behind a bounded retry instead of returning ErrLeaderNotFound on the +// very first attempt. +const leaderProxyRetryBudget = 5 * time.Second + +// leaderProxyRetryInterval paces re-resolution of the leader address. +const leaderProxyRetryInterval = 25 * time.Millisecond + // LeaderProxy forwards transactional requests to the current raft leader when // the local node is not the leader. type LeaderProxy struct { @@ -51,27 +61,47 @@ func (p *LeaderProxy) Abort(reqs []*pb.Request) (*TransactionResponse, error) { return p.tm.Abort(reqs) } -// forwardWithRetry attempts to forward to the leader up to maxForwardRetries -// times, re-fetching the leader address on each failure to handle leadership -// changes between attempts. +// forwardWithRetry attempts to forward to the leader, re-fetching the +// leader address on each failure to handle leadership changes between +// attempts. Two retry signals are interleaved: +// +// - Forward-RPC failures are bounded by maxForwardRetries (each attempt +// re-resolves the leader address inside forward()). +// - ErrLeaderNotFound (no leader published yet) is bounded by +// leaderProxyRetryBudget so a brief re-election window does not bubble +// up to gRPC clients as a hard failure. Linearizable callers expect +// the proxy to either commit atomically or fail definitively, not to +// leak transient raft-internal churn. func (p *LeaderProxy) forwardWithRetry(reqs []*pb.Request) (*TransactionResponse, error) { if len(reqs) == 0 { return &TransactionResponse{}, nil } + deadline := time.Now().Add(leaderProxyRetryBudget) var lastErr error - for attempt := 0; attempt < maxForwardRetries; attempt++ { - resp, err := p.forward(reqs) - if err == nil { - return resp, nil + for { + // Each iteration of the outer loop runs up to maxForwardRetries + // fast retries against whatever leader is currently visible. If + // none is, we sleep one leaderProxyRetryInterval and re-poll + // until leaderProxyRetryBudget elapses. + for attempt := 0; attempt < maxForwardRetries; attempt++ { + resp, err := p.forward(reqs) + if err == nil { + return resp, nil + } + lastErr = err + if errors.Is(err, ErrLeaderNotFound) { + break + } + } + if !errors.Is(lastErr, ErrLeaderNotFound) { + return nil, errors.Wrapf(lastErr, "leader forward failed after %d retries", maxForwardRetries) } - lastErr = err - // If the leader is simply not found, retry won't help immediately. - if errors.Is(err, ErrLeaderNotFound) { - return nil, err + if !time.Now().Before(deadline) { + return nil, lastErr } + time.Sleep(leaderProxyRetryInterval) } - return nil, errors.Wrapf(lastErr, "leader forward failed after %d retries", maxForwardRetries) } func (p *LeaderProxy) forward(reqs []*pb.Request) (*TransactionResponse, error) { From 52ec37c39f1b78cf34bc98c28437a641b6596e18 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 11:25:41 +0000 Subject: [PATCH 02/17] style(adapter): drop trailing blank line to satisfy gci golangci-lint (gci) flagged adapter/test_util.go as improperly formatted after the helper-removal commit left a trailing blank line at end of file. Equivalent to running `gci write`; no behaviour change. https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap --- adapter/test_util.go | 1 - 1 file changed, 1 deletion(-) diff --git a/adapter/test_util.go b/adapter/test_util.go index df345f68..4de2c47a 100644 --- a/adapter/test_util.go +++ b/adapter/test_util.go @@ -606,4 +606,3 @@ func lpushEventually(t *testing.T, ctx context.Context, rdb *redis.Client, key s return rdb.LPush(ctx, key, vals...).Err() }) } - From 24b09cc9c3727f071b9461ae64643de3c832b2d8 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 11:38:01 +0000 Subject: [PATCH 03/17] refactor: address gemini-code-assist review comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit adapter/grpc_test.go (Test_consistency_satisfy_write_after_read_for_parallel): - Extract the worker count 1000 to a const workers so wg.Add(workers) and for i := range workers read off the same source of truth. - Move wg.Done() to defer at goroutine entry so panics or future early returns can't leave wg.Wait deadlocked. - Replace the remaining context.TODO() calls with context.Background() to match the first RawPut in the same block. kv/coordinator.go (Dispatch retry loop): - Reuse a single time.Timer with Reset across iterations instead of allocating a fresh timer per round via time.After. Go 1.23+ timer semantics make Reset on an unfired/expired timer safe without an explicit drain, so no extra bookkeeping is needed. Avoids a short-term allocation per retry under heavy leader churn. Test plan - go vet ./... — clean - go test -race ./kv/ -run 'Coordinate|Coordinator|LeaderProxy' -count=1 — ok - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 193s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap --- adapter/grpc_test.go | 11 ++++++----- kv/coordinator.go | 12 +++++++++++- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/adapter/grpc_test.go b/adapter/grpc_test.go index eb7adade..8c1a749e 100644 --- a/adapter/grpc_test.go +++ b/adapter/grpc_test.go @@ -152,9 +152,11 @@ func Test_consistency_satisfy_write_after_read_for_parallel(t *testing.T) { c := rawKVClient(t, adders) wg := sync.WaitGroup{} - wg.Add(1000) - for i := range 1000 { + const workers = 1000 + wg.Add(workers) + for i := range workers { go func(i int) { + defer wg.Done() key := []byte("test-key-parallel" + strconv.Itoa(i)) want := []byte(strconv.Itoa(i)) _, err := c.RawPut( @@ -162,13 +164,12 @@ func Test_consistency_satisfy_write_after_read_for_parallel(t *testing.T) { &pb.RawPutRequest{Key: key, Value: want}, ) assert.NoError(t, err, "Put RPC failed") - _, err = c.RawPut(context.TODO(), &pb.RawPutRequest{Key: key, Value: want}) + _, err = c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: want}) assert.NoError(t, err, "Put RPC failed") - resp, err := c.RawGet(context.TODO(), &pb.RawGetRequest{Key: key}) + resp, err := c.RawGet(context.Background(), &pb.RawGetRequest{Key: key}) assert.NoError(t, err, "Get RPC failed") assert.Equal(t, want, resp.Value, "consistency check failed") - wg.Done() }(i) } wg.Wait() diff --git a/kv/coordinator.go b/kv/coordinator.go index 45659cc7..ebf38b99 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -257,6 +257,15 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C // to re-stabilise. Non-leader errors that exceed the retry budget are // surfaced unchanged for callers to observe. deadline := time.Now().Add(dispatchLeaderRetryBudget) + // Reuse a single Timer across retries. time.After would allocate a + // fresh timer per iteration whose Go runtime entry lingers until the + // interval elapses, producing a short-term leak proportional to the + // retry rate. Under heavy mid-dispatch leader churn this is a hot + // loop, so we Reset the timer in place instead. Go 1.23+ timer + // semantics make Reset on an unfired/expired Timer safe without an + // explicit drain. + timer := time.NewTimer(dispatchLeaderRetryInterval) + defer timer.Stop() var lastResp *CoordinateResponse var lastErr error for { @@ -267,10 +276,11 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C if !time.Now().Before(deadline) { return lastResp, lastErr } + timer.Reset(dispatchLeaderRetryInterval) select { case <-ctx.Done(): return lastResp, lastErr - case <-time.After(dispatchLeaderRetryInterval): + case <-timer.C: } } } From 31ca8bd3f165f3617ab1b5de25b5df0137b40eb9 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 17:08:19 +0000 Subject: [PATCH 04/17] fix(kv): refresh StartTS and honour ctx on Dispatch retry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two correctness issues on the retry loop added in 18a25bd, flagged by Codex: P1 — stale StartTS on retry dispatchOnce only mints a fresh timestamp when reqs.StartTS == 0. Once the first attempt filled StartTS in place, every retry reused that stale value, and the FSM's OCC guard (LatestCommitTS > startTS in validateConflicts) could reject the retry with a write-conflict error if any other write committed against the same keys during the election window — exactly the case the retry is meant to absorb. Remember whether the caller asked us to assign timestamps (reqs.IsTxn && reqs.StartTS == 0 on entry) and, if so, reset StartTS/CommitTS back to 0 before each retry so dispatchOnce re-mints against the post-churn leader's HLC. P2 — ctx cancellation reported as leader error When the retry sleep was interrupted by ctx.Done the loop returned lastErr (the transient leader error) instead of ctx.Err(). Callers with a shorter deadline than the 5s retry budget need context.Canceled / context.DeadlineExceeded so they can tell "I gave up" from "cluster unavailable". Surface ctx.Err() on the ctx.Done branch. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 205s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap --- kv/coordinator.go | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/kv/coordinator.go b/kv/coordinator.go index ebf38b99..4164a163 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -256,6 +256,16 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C // during a re-election is neither, so we wait briefly for the cluster // to re-stabilise. Non-leader errors that exceed the retry budget are // surfaced unchanged for callers to observe. + // + // leaderAssignsTS captures whether the caller asked the coordinator to + // mint timestamps (reqs.StartTS == 0 on entry for a txn). If so, each + // retry MUST reset StartTS/CommitTS back to zero so dispatchOnce + // re-issues against the post-churn leader's HLC. Re-using a stale + // StartTS from a failed attempt would race the OCC conflict check in + // fsm.validateConflicts (LatestCommitTS > startTS) against writes + // that committed during the election window, surfacing as a + // write-conflict error the retry was meant to absorb. + leaderAssignsTS := reqs.IsTxn && reqs.StartTS == 0 deadline := time.Now().Add(dispatchLeaderRetryBudget) // Reuse a single Timer across retries. time.After would allocate a // fresh timer per iteration whose Go runtime entry lingers until the @@ -276,10 +286,24 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C if !time.Now().Before(deadline) { return lastResp, lastErr } + if leaderAssignsTS { + // Force dispatchOnce to mint a fresh StartTS on the next + // attempt; keep CommitTS tied to StartTS by clearing it too + // (the same invariant dispatchOnce enforces). + reqs.StartTS = 0 + reqs.CommitTS = 0 + } timer.Reset(dispatchLeaderRetryInterval) select { case <-ctx.Done(): - return lastResp, lastErr + // Caller deadline / cancellation takes precedence over the + // transient leader error: gRPC clients rely on + // context.Canceled / context.DeadlineExceeded to distinguish + // "I gave up waiting" from "cluster is unavailable", and a + // short-deadline caller against a leaderless cluster must not + // be told "not leader" when the real cause is their own + // cancellation. + return lastResp, errors.WithStack(ctx.Err()) case <-timer.C: } } From fead00e031e72a07c122614abd3a9003db676bef Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 17:16:20 +0000 Subject: [PATCH 05/17] refactor(kv): extract Dispatch retry helpers to satisfy cyclop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit golangci-lint (cyclop) flagged Dispatch at cyclomatic complexity 11 (limit 10). Split the retry loop's decision points into three small helpers, dropping the in-function branch count to 8: - coordinatorAssignsTimestamps(reqs) — does the caller expect the coordinator to mint StartTS/CommitTS? - shouldRetryDispatch(err) — is the error a transient leader-unavailable signal worth retrying? - waitForDispatchRetry(ctx, timer, interval) — sleep on the timer or return ctx.Err() wrapped, whichever comes first. No behaviour change; all existing retry semantics (StartTS refresh, ctx.Err on cancellation, 5s budget, 25ms interval, reused Timer) are preserved. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 209s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap --- kv/coordinator.go | 68 ++++++++++++++++++++++++++++++----------------- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/kv/coordinator.go b/kv/coordinator.go index 4164a163..e015c4e5 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -256,16 +256,7 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C // during a re-election is neither, so we wait briefly for the cluster // to re-stabilise. Non-leader errors that exceed the retry budget are // surfaced unchanged for callers to observe. - // - // leaderAssignsTS captures whether the caller asked the coordinator to - // mint timestamps (reqs.StartTS == 0 on entry for a txn). If so, each - // retry MUST reset StartTS/CommitTS back to zero so dispatchOnce - // re-issues against the post-churn leader's HLC. Re-using a stale - // StartTS from a failed attempt would race the OCC conflict check in - // fsm.validateConflicts (LatestCommitTS > startTS) against writes - // that committed during the election window, surfacing as a - // write-conflict error the retry was meant to absorb. - leaderAssignsTS := reqs.IsTxn && reqs.StartTS == 0 + leaderAssignsTS := coordinatorAssignsTimestamps(reqs) deadline := time.Now().Add(dispatchLeaderRetryBudget) // Reuse a single Timer across retries. time.After would allocate a // fresh timer per iteration whose Go runtime entry lingers until the @@ -280,7 +271,7 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C var lastErr error for { lastResp, lastErr = c.dispatchOnce(ctx, reqs) - if lastErr == nil || !isTransientLeaderError(lastErr) { + if !shouldRetryDispatch(lastErr) { return lastResp, lastErr } if !time.Now().Before(deadline) { @@ -289,26 +280,55 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C if leaderAssignsTS { // Force dispatchOnce to mint a fresh StartTS on the next // attempt; keep CommitTS tied to StartTS by clearing it too - // (the same invariant dispatchOnce enforces). + // (the same invariant dispatchOnce enforces). Re-using a + // stale StartTS would race the OCC conflict check in + // fsm.validateConflicts (LatestCommitTS > startTS) against + // writes that committed during the election window. reqs.StartTS = 0 reqs.CommitTS = 0 } - timer.Reset(dispatchLeaderRetryInterval) - select { - case <-ctx.Done(): - // Caller deadline / cancellation takes precedence over the - // transient leader error: gRPC clients rely on - // context.Canceled / context.DeadlineExceeded to distinguish - // "I gave up waiting" from "cluster is unavailable", and a - // short-deadline caller against a leaderless cluster must not - // be told "not leader" when the real cause is their own - // cancellation. - return lastResp, errors.WithStack(ctx.Err()) - case <-timer.C: + if err := waitForDispatchRetry(ctx, timer, dispatchLeaderRetryInterval); err != nil { + return lastResp, err } } } +// coordinatorAssignsTimestamps reports whether the caller expects the +// coordinator to mint StartTS/CommitTS on this dispatch. When true, each +// retry MUST reset the timestamps back to zero so dispatchOnce re-issues +// against the post-churn leader's HLC. +func coordinatorAssignsTimestamps(reqs *OperationGroup[OP]) bool { + if !reqs.IsTxn { + return false + } + return reqs.StartTS == 0 +} + +// shouldRetryDispatch reports whether Dispatch should loop again on the +// error returned by dispatchOnce. Only transient leader-unavailable +// signals qualify; a nil error and every other non-retryable error +// (write conflict, validation, etc.) must surface unchanged. +func shouldRetryDispatch(err error) bool { + if err == nil { + return false + } + return isTransientLeaderError(err) +} + +// waitForDispatchRetry sleeps for interval on timer or until ctx fires, +// whichever comes first. A ctx cancellation returns a wrapped ctx.Err() +// so gRPC clients can distinguish "I gave up waiting" from "cluster is +// unavailable"; timer expiry returns nil and the caller loops again. +func waitForDispatchRetry(ctx context.Context, timer *time.Timer, interval time.Duration) error { + timer.Reset(interval) + select { + case <-ctx.Done(): + return errors.WithStack(ctx.Err()) + case <-timer.C: + return nil + } +} + // dispatchOnce runs a single Dispatch attempt without retry. It is the // transactional unit retried by Dispatch on transient leader errors. // From e53e7437b67783bbd21c0a782247f0d0415b2db1 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 17:38:47 +0000 Subject: [PATCH 06/17] fix(kv): classify wire-level not-leader errors + add retry-loop tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses the outstanding Copilot / Codex review comments on PR #618. Transport-boundary classification (Codex P1, Copilot) When Coordinate.redirect forwards to a stale leader, the destination returns adapter.ErrNotLeader over gRPC as a generic Unknown status carrying only the string "not leader". errors.Is cannot traverse that wire boundary, so the existing isTransientLeaderError missed it and the retry loop exited early, leaking transient election churn to gRPC clients. Add a last-resort substring match on a closed list of phrases ("not leader", "leader not found") and route both Coordinate.Dispatch and LeaderProxy.forwardWithRetry through the same classifier so a forwarded stale-leader error triggers the slow-path poll instead of an immediate exit. Test-side nil-dereference guards (Copilot) The three consistency tests asserted NoError on each RPC but still dereferenced resp.Value unconditionally. On a failed assert, resp could be nil and the next line would panic, masking the real failure mode. Gate every resp access behind the assert.NoError return value (return for the parallel worker, continue for the sequence/txn loops) and replace the lingering context.TODO() calls with context.Background() so the dispatch paths don't carry a "placeholder" context all the way down. New unit tests (Copilot) kv/coordinator_retry_test.go: - TestIsTransientLeaderError_Classification pins the sentinel + string-match matrix (includes a simulated gRPC-Unknown wrapper). - TestCoordinateDispatch_RetriesTransientLeaderError drives a scripted transactional that fails twice (once with the typed raftengine sentinel, once with the wire string) and verifies Dispatch absorbs both. - TestCoordinateDispatch_NonTransientErrorSurfacesImmediately confirms a business-logic error exits the loop after one attempt. - TestCoordinateDispatch_RefreshesStartTSOnRetry checks that each retry mints a fresh, strictly greater StartTS when the caller passes 0 (the fix from 31ca8bd). - TestCoordinateDispatch_CtxCancelDuringRetrySurfaces verifies ctx.Err() wins over the transient leader error. kv/leader_proxy_test.go: - TestLeaderProxy_ForwardsAfterLeaderPublishes flips the engine's leader address from empty to a real gRPC Forward server after a brief delay and asserts the proxy polls through the gap. - TestLeaderProxy_FailsAfterLeaderBudgetElapses pins the 5s budget by never publishing a leader and asserting ErrLeaderNotFound is surfaced after the budget. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 6.7s - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 210s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap --- adapter/grpc_test.go | 50 +++++--- kv/coordinator.go | 46 +++++++- kv/coordinator_retry_test.go | 215 +++++++++++++++++++++++++++++++++++ kv/leader_proxy.go | 26 +++-- kv/leader_proxy_test.go | 145 +++++++++++++++++++++++ 5 files changed, 456 insertions(+), 26 deletions(-) create mode 100644 kv/coordinator_retry_test.go diff --git a/adapter/grpc_test.go b/adapter/grpc_test.go index 8c1a749e..e309728d 100644 --- a/adapter/grpc_test.go +++ b/adapter/grpc_test.go @@ -163,12 +163,18 @@ func Test_consistency_satisfy_write_after_read_for_parallel(t *testing.T) { context.Background(), &pb.RawPutRequest{Key: key, Value: want}, ) - assert.NoError(t, err, "Put RPC failed") + if !assert.NoError(t, err, "Put RPC failed") { + return + } _, err = c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: want}) - assert.NoError(t, err, "Put RPC failed") + if !assert.NoError(t, err, "Put RPC failed") { + return + } resp, err := c.RawGet(context.Background(), &pb.RawGetRequest{Key: key}) - assert.NoError(t, err, "Get RPC failed") + if !assert.NoError(t, err, "Get RPC failed") { + return + } assert.Equal(t, want, resp.Value, "consistency check failed") }(i) } @@ -190,13 +196,19 @@ func Test_consistency_satisfy_write_after_read_sequence(t *testing.T) { context.Background(), &pb.RawPutRequest{Key: key, Value: want}, ) - assert.NoError(t, err, "Put RPC failed") + if !assert.NoError(t, err, "Put RPC failed") { + continue + } - _, err = c.RawPut(context.TODO(), &pb.RawPutRequest{Key: key, Value: want}) - assert.NoError(t, err, "Put RPC failed") + _, err = c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: want}) + if !assert.NoError(t, err, "Put RPC failed") { + continue + } - resp, err := c.RawGet(context.TODO(), &pb.RawGetRequest{Key: key}) - assert.NoError(t, err, "Get RPC failed") + resp, err := c.RawGet(context.Background(), &pb.RawGetRequest{Key: key}) + if !assert.NoError(t, err, "Get RPC failed") { + continue + } assert.Equal(t, want, resp.Value, "consistency check failed") } @@ -216,16 +228,24 @@ func Test_grpc_transaction(t *testing.T) { context.Background(), &pb.PutRequest{Key: key, Value: want}, ) - assert.NoError(t, err, "Put RPC failed") - resp, err := c.Get(context.TODO(), &pb.GetRequest{Key: key}) - assert.NoError(t, err, "Get RPC failed") + if !assert.NoError(t, err, "Put RPC failed") { + continue + } + resp, err := c.Get(context.Background(), &pb.GetRequest{Key: key}) + if !assert.NoError(t, err, "Get RPC failed") { + continue + } assert.Equal(t, want, resp.Value, "consistency check failed") - _, err = c.Delete(context.TODO(), &pb.DeleteRequest{Key: key}) - assert.NoError(t, err, "Delete RPC failed") + _, err = c.Delete(context.Background(), &pb.DeleteRequest{Key: key}) + if !assert.NoError(t, err, "Delete RPC failed") { + continue + } - resp, err = c.Get(context.TODO(), &pb.GetRequest{Key: key}) - assert.NoError(t, err, "Get RPC failed") + resp, err = c.Get(context.Background(), &pb.GetRequest{Key: key}) + if !assert.NoError(t, err, "Get RPC failed") { + continue + } assert.Nil(t, resp.Value, "consistency check failed") } } diff --git a/kv/coordinator.go b/kv/coordinator.go index e015c4e5..05803ba4 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "log/slog" "reflect" + "strings" "time" "github.com/bootjp/elastickv/internal/monoclock" @@ -380,7 +381,7 @@ func (c *Coordinate) dispatchOnce(ctx context.Context, reqs *OperationGroup[OP]) // isTransientLeaderError reports whether err is a transient // leader-unavailable signal worth retrying inside Dispatch. // -// Two distinct conditions qualify: +// Three distinct conditions qualify: // - ErrLeaderNotFound — no leader address resolvable on this node yet // (election in progress, or the previous leader stepped down and the // successor has not propagated). Recoverable as soon as a new leader @@ -389,6 +390,16 @@ func (c *Coordinate) dispatchOnce(ctx context.Context, reqs *OperationGroup[OP]) // because it just lost leadership (ErrNotLeader / ErrLeadershipLost // / ErrLeadershipTransferInProgress). Recoverable by re-routing // through the new leader (redirect path). +// - Forwarded "not leader" / "leader not found" strings — when +// Coordinate.redirect forwards to a stale leader, the destination +// node returns adapter.ErrNotLeader (its own sentinel) which gRPC +// transports as a generic Unknown status carrying only the message +// "not leader". errors.Is cannot traverse that wire boundary, so we +// fall back to a case-insensitive substring match on the final error +// text. leaderErrorPhrases enumerates the exact phrases the adapter +// and coordinator layers emit so the match cannot accidentally +// swallow an unrelated business-logic error that happens to contain +// "leader" in its text. // // Business-logic failures (write conflict, validation, etc.) are NOT // covered here — those must surface to the caller unchanged so client @@ -401,7 +412,38 @@ func isTransientLeaderError(err error) bool { if errors.Is(err, ErrLeaderNotFound) { return true } - return isLeadershipLossError(err) + if isLeadershipLossError(err) { + return true + } + return hasTransientLeaderPhrase(err) +} + +// leaderErrorPhrases is the closed set of wire-level error strings the +// coordinator is willing to treat as transient once the typed sentinel +// has been dropped by a gRPC boundary. Keep this list tight — any +// addition must correspond to an error the system actually emits for +// leader churn, not a generic "failed" message that happens to mention +// leaders. +var leaderErrorPhrases = []string{ + "not leader", // adapter.ErrNotLeader, raftengine.ErrNotLeader, ErrLeadershipLost messages + "leader not found", // kv.ErrLeaderNotFound, adapter.ErrLeaderNotFound +} + +// hasTransientLeaderPhrase reports whether err.Error() contains one of +// the well-known transient-leader substrings. It is the last resort used +// after errors.Is fails across a gRPC boundary that strips the original +// sentinel chain. +func hasTransientLeaderPhrase(err error) bool { + if err == nil { + return false + } + msg := strings.ToLower(err.Error()) + for _, phrase := range leaderErrorPhrases { + if strings.Contains(msg, phrase) { + return true + } + } + return false } // refreshLeaseAfterDispatch extends the lease only when the dispatch diff --git a/kv/coordinator_retry_test.go b/kv/coordinator_retry_test.go new file mode 100644 index 00000000..a4e75c15 --- /dev/null +++ b/kv/coordinator_retry_test.go @@ -0,0 +1,215 @@ +package kv + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/bootjp/elastickv/internal/raftengine" + pb "github.com/bootjp/elastickv/proto" + cerrors "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// stubLeaderEngine reports State=Leader and a fixed Leader address. It is +// the minimum surface Coordinate.Dispatch needs to take the local-dispatch +// path (IsLeader() true) without engaging real raft machinery. Methods +// beyond that surface return zero values / nil; Dispatch's retry loop +// never calls them, and refreshLeaseAfterDispatch skips its LeaseProvider +// branch because this stub does not implement that interface. +type stubLeaderEngine struct{} + +func (stubLeaderEngine) Propose(context.Context, []byte) (*raftengine.ProposalResult, error) { + return &raftengine.ProposalResult{}, nil +} +func (stubLeaderEngine) State() raftengine.State { return raftengine.StateLeader } +func (stubLeaderEngine) Leader() raftengine.LeaderInfo { + return raftengine.LeaderInfo{ID: "self", Address: "127.0.0.1:0"} +} +func (stubLeaderEngine) VerifyLeader(context.Context) error { return nil } +func (stubLeaderEngine) LinearizableRead(context.Context) (uint64, error) { return 0, nil } +func (stubLeaderEngine) Status() raftengine.Status { + return raftengine.Status{State: raftengine.StateLeader} +} +func (stubLeaderEngine) Configuration(context.Context) (raftengine.Configuration, error) { + return raftengine.Configuration{}, nil +} +func (stubLeaderEngine) Close() error { return nil } + +// scriptedTransactional returns the first len(errs) entries from errs on +// successive Commit calls, then succeeds. It records every observed +// request so tests can assert on per-attempt StartTS values. Commit is +// called serially from the Dispatch retry loop, so a plain slice index +// (guarded by atomic.Int64 for race-detector friendliness) is enough. +type scriptedTransactional struct { + errs []error + commits atomic.Int64 + reqs [][]*pb.Request + onCommit func(call int64) // optional hook invoked inside Commit +} + +func (s *scriptedTransactional) Commit(reqs []*pb.Request) (*TransactionResponse, error) { + idx := s.commits.Add(1) - 1 + s.reqs = append(s.reqs, reqs) + if s.onCommit != nil { + s.onCommit(idx) + } + if int(idx) < len(s.errs) && s.errs[idx] != nil { + return nil, s.errs[idx] + } + return &TransactionResponse{CommitIndex: uint64(idx + 1)}, nil +} + +func (s *scriptedTransactional) Abort([]*pb.Request) (*TransactionResponse, error) { + return &TransactionResponse{}, nil +} + +func TestIsTransientLeaderError_Classification(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"kv.ErrLeaderNotFound", cerrors.WithStack(ErrLeaderNotFound), true}, + {"raftengine.ErrNotLeader", cerrors.WithStack(raftengine.ErrNotLeader), true}, + {"raftengine.ErrLeadershipLost", cerrors.WithStack(raftengine.ErrLeadershipLost), true}, + {"wire not-leader string", errors.New("not leader"), true}, + {"wire leader-not-found string", errors.New("leader not found"), true}, + {"gRPC status wrapping not leader", fmt.Errorf("rpc error: code = Unknown desc = not leader"), true}, + {"unrelated error", errors.New("write conflict"), false}, + {"validation error", errors.New("invalid request"), false}, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require.Equal(t, tc.want, isTransientLeaderError(tc.err)) + }) + } +} + +// newRetryCoordinate wires a Coordinate against stubLeaderEngine and the +// supplied scripted transaction manager. No engine registration happens, +// so the caller does not need to call Close. +func newRetryCoordinate(tx Transactional) *Coordinate { + return &Coordinate{ + transactionManager: tx, + engine: stubLeaderEngine{}, + clock: NewHLC(), + } +} + +func TestCoordinateDispatch_RetriesTransientLeaderError(t *testing.T) { + t.Parallel() + + // First two Commit calls fail with transient leader signals (once via + // the typed raftengine sentinel, once via the wire-level string that + // gRPC would transport); the third succeeds. Dispatch must absorb + // both and return success without leaking the transient error. + tx := &scriptedTransactional{ + errs: []error{ + cerrors.WithStack(raftengine.ErrNotLeader), + errors.New("leader not found"), + }, + } + c := newRetryCoordinate(tx) + + start := time.Now() + resp, err := c.Dispatch(context.Background(), &OperationGroup[OP]{ + Elems: []*Elem[OP]{{Op: Put, Key: []byte("k"), Value: []byte("v")}}, + }) + require.NoError(t, err) + require.NotNil(t, resp) + require.EqualValues(t, 3, tx.commits.Load()) + // Three attempts with a 25ms poll interval gives a realistic lower + // bound of ~50ms; anything shorter would mean the loop skipped its + // back-off. + require.GreaterOrEqual(t, time.Since(start), 2*dispatchLeaderRetryInterval) +} + +func TestCoordinateDispatch_NonTransientErrorSurfacesImmediately(t *testing.T) { + t.Parallel() + + business := errors.New("write conflict") + tx := &scriptedTransactional{errs: []error{business}} + c := newRetryCoordinate(tx) + + _, err := c.Dispatch(context.Background(), &OperationGroup[OP]{ + Elems: []*Elem[OP]{{Op: Put, Key: []byte("k"), Value: []byte("v")}}, + }) + require.ErrorIs(t, err, business) + // Exactly one attempt — the retry loop must not re-drive non-transient + // errors. + require.EqualValues(t, 1, tx.commits.Load()) +} + +func TestCoordinateDispatch_RefreshesStartTSOnRetry(t *testing.T) { + t.Parallel() + + tx := &scriptedTransactional{ + errs: []error{cerrors.WithStack(raftengine.ErrNotLeader)}, + } + c := newRetryCoordinate(tx) + + // Caller passes StartTS==0 — the contract is that the coordinator + // mints the timestamp. Each retry MUST reset StartTS to 0 so + // dispatchOnce re-mints against the post-churn clock; otherwise the + // FSM's LatestCommitTS > startTS check could reject the retry + // against a write that committed during the election window. + reqs := &OperationGroup[OP]{ + IsTxn: true, + Elems: []*Elem[OP]{{Op: Put, Key: []byte("k"), Value: []byte("v")}}, + } + _, err := c.Dispatch(context.Background(), reqs) + require.NoError(t, err) + require.EqualValues(t, 2, tx.commits.Load()) + require.Len(t, tx.reqs, 2) + require.Len(t, tx.reqs[0], 1) + require.Len(t, tx.reqs[1], 1) + + ts0 := tx.reqs[0][0].Ts + ts1 := tx.reqs[1][0].Ts + require.Greater(t, ts0, uint64(0), "first attempt must carry a minted StartTS") + require.Greater(t, ts1, ts0, "retry must mint a fresh, strictly greater StartTS") +} + +func TestCoordinateDispatch_CtxCancelDuringRetrySurfaces(t *testing.T) { + t.Parallel() + + // An always-transient failure keeps the retry loop alive; cancelling + // the context during the back-off must surface ctx.Err() to the + // caller rather than the transient leader error. gRPC clients rely + // on this to tell "I gave up" from "cluster unavailable". + ctx, cancel := context.WithCancel(context.Background()) + tx := &scriptedTransactional{ + errs: []error{ + cerrors.WithStack(raftengine.ErrNotLeader), + cerrors.WithStack(raftengine.ErrNotLeader), + cerrors.WithStack(raftengine.ErrNotLeader), + }, + onCommit: func(call int64) { + // Cancel after the first failed attempt so the next + // back-off select sees ctx.Done(). + if call == 0 { + cancel() + } + }, + } + c := &Coordinate{ + transactionManager: tx, + engine: stubLeaderEngine{}, + clock: NewHLC(), + } + + _, err := c.Dispatch(ctx, &OperationGroup[OP]{ + Elems: []*Elem[OP]{{Op: Put, Key: []byte("k"), Value: []byte("v")}}, + }) + require.ErrorIs(t, err, context.Canceled) +} diff --git a/kv/leader_proxy.go b/kv/leader_proxy.go index 877ecdd3..31c69284 100644 --- a/kv/leader_proxy.go +++ b/kv/leader_proxy.go @@ -67,11 +67,18 @@ func (p *LeaderProxy) Abort(reqs []*pb.Request) (*TransactionResponse, error) { // // - Forward-RPC failures are bounded by maxForwardRetries (each attempt // re-resolves the leader address inside forward()). -// - ErrLeaderNotFound (no leader published yet) is bounded by -// leaderProxyRetryBudget so a brief re-election window does not bubble -// up to gRPC clients as a hard failure. Linearizable callers expect -// the proxy to either commit atomically or fail definitively, not to -// leak transient raft-internal churn. +// - Transient leader-unavailable errors (no leader published yet, or +// the forwarded RPC landed on a stale leader that returned +// adapter.ErrNotLeader over the wire) are bounded by +// leaderProxyRetryBudget so a brief re-election window does not +// bubble up to gRPC clients as a hard failure. Linearizable callers +// expect the proxy to either commit atomically or fail definitively, +// not to leak transient raft-internal churn. +// +// The wire-level "not leader" string match is necessary because a stale +// leader's Internal.Forward returns adapter.ErrNotLeader whose error +// chain does not survive the gRPC boundary; errors.Is against +// ErrLeaderNotFound would miss it and exit after only the fast retries. func (p *LeaderProxy) forwardWithRetry(reqs []*pb.Request) (*TransactionResponse, error) { if len(reqs) == 0 { return &TransactionResponse{}, nil @@ -82,19 +89,20 @@ func (p *LeaderProxy) forwardWithRetry(reqs []*pb.Request) (*TransactionResponse for { // Each iteration of the outer loop runs up to maxForwardRetries // fast retries against whatever leader is currently visible. If - // none is, we sleep one leaderProxyRetryInterval and re-poll - // until leaderProxyRetryBudget elapses. + // none is (or the forward bounced off a stale leader), we sleep + // one leaderProxyRetryInterval and re-poll until + // leaderProxyRetryBudget elapses. for attempt := 0; attempt < maxForwardRetries; attempt++ { resp, err := p.forward(reqs) if err == nil { return resp, nil } lastErr = err - if errors.Is(err, ErrLeaderNotFound) { + if isTransientLeaderError(err) { break } } - if !errors.Is(lastErr, ErrLeaderNotFound) { + if !isTransientLeaderError(lastErr) { return nil, errors.Wrapf(lastErr, "leader forward failed after %d retries", maxForwardRetries) } if !time.Now().Before(deadline) { diff --git a/kv/leader_proxy_test.go b/kv/leader_proxy_test.go index ac9a668e..9e6f7187 100644 --- a/kv/leader_proxy_test.go +++ b/kv/leader_proxy_test.go @@ -4,6 +4,7 @@ import ( "context" "net" "sync" + "sync/atomic" "testing" "time" @@ -144,3 +145,147 @@ func TestLeaderProxy_ForwardsWhenFollower(t *testing.T) { require.NotNil(t, svc.lastReq) require.Len(t, svc.lastReq.Requests, 1) } + +// togglingFollowerEngine starts out reporting no resolvable leader and +// flips to a caller-supplied address after setLeader() is invoked. It +// models the re-election window during which forward() must poll rather +// than return ErrLeaderNotFound immediately. +type togglingFollowerEngine struct { + addr atomic.Pointer[string] +} + +func (e *togglingFollowerEngine) setLeader(addr string) { + s := addr + e.addr.Store(&s) +} + +func (e *togglingFollowerEngine) Propose(context.Context, []byte) (*raftengine.ProposalResult, error) { + return nil, raftengine.ErrNotLeader +} +func (e *togglingFollowerEngine) State() raftengine.State { return raftengine.StateFollower } +func (e *togglingFollowerEngine) Leader() raftengine.LeaderInfo { + p := e.addr.Load() + if p == nil { + return raftengine.LeaderInfo{} + } + return raftengine.LeaderInfo{ID: "leader", Address: *p} +} +func (e *togglingFollowerEngine) VerifyLeader(context.Context) error { return raftengine.ErrNotLeader } +func (e *togglingFollowerEngine) LinearizableRead(context.Context) (uint64, error) { + return 0, raftengine.ErrNotLeader +} +func (e *togglingFollowerEngine) Status() raftengine.Status { + return raftengine.Status{State: raftengine.StateFollower, Leader: e.Leader()} +} +func (e *togglingFollowerEngine) Configuration(context.Context) (raftengine.Configuration, error) { + return raftengine.Configuration{}, nil +} +func (e *togglingFollowerEngine) Close() error { return nil } + +func TestLeaderProxy_ForwardsAfterLeaderPublishes(t *testing.T) { + t.Parallel() + + // Bring up a real fake gRPC Forward server; LeaderProxy.forward() + // dials it via the connCache. We leave the engine's leader address + // empty initially so the first few forward() attempts fail with + // ErrLeaderNotFound, then flip to the real address after a brief + // delay. forwardWithRetry must absorb the empty-address window and + // succeed once the engine publishes. + var lc net.ListenConfig + lis, err := lc.Listen(context.Background(), "tcp", "127.0.0.1:0") + require.NoError(t, err) + + svc := &fakeInternal{resp: &pb.ForwardResponse{Success: true, CommitIndex: 42}} + srv := grpc.NewServer() + pb.RegisterInternalServer(srv, svc) + go func() { _ = srv.Serve(lis) }() + t.Cleanup(func() { + srv.Stop() + _ = lis.Close() + }) + + // Wait briefly so the gRPC server is ready. + dialer := &net.Dialer{Timeout: 100 * time.Millisecond} + require.Eventually(t, func() bool { + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + c, err := dialer.DialContext(ctx, "tcp", lis.Addr().String()) + if err != nil { + return false + } + _ = c.Close() + return true + }, 2*time.Second, 10*time.Millisecond) + + eng := &togglingFollowerEngine{} + p := NewLeaderProxyWithEngine(eng) + t.Cleanup(func() { _ = p.connCache.Close() }) + + // Publish the leader address after ~100ms (several poll intervals, + // well inside leaderProxyRetryBudget). forwardWithRetry should be + // mid-poll at that point and converge on success. + publishDelay := 100 * time.Millisecond + go func() { + time.Sleep(publishDelay) + eng.setLeader(lis.Addr().String()) + }() + + reqs := []*pb.Request{ + { + IsTxn: false, + Phase: pb.Phase_NONE, + Ts: 10, + Mutations: []*pb.Mutation{ + {Op: pb.Op_PUT, Key: []byte("k"), Value: []byte("v")}, + }, + }, + } + + start := time.Now() + resp, err := p.Commit(reqs) + elapsed := time.Since(start) + require.NoError(t, err) + require.Equal(t, uint64(42), resp.CommitIndex) + // The proxy must have waited at least until setLeader fired; + // otherwise it did not actually poll the missing-leader window. + require.GreaterOrEqual(t, elapsed, publishDelay) + // And it must have stopped polling well before the budget expires. + require.Less(t, elapsed, leaderProxyRetryBudget) +} + +func TestLeaderProxy_FailsAfterLeaderBudgetElapses(t *testing.T) { + t.Parallel() + + // No gRPC server, no address ever published → every forward() + // returns ErrLeaderNotFound immediately. forwardWithRetry must loop + // until leaderProxyRetryBudget elapses and then surface the final + // ErrLeaderNotFound instead of hanging forever. To keep the test + // snappy we only need to assert that (a) the call eventually returns + // with an error, (b) the error chain still contains ErrLeaderNotFound + // after the budget is exhausted. + // + // We cap the retry budget at its package default (5s); that is the + // contract we want to pin. Running the full 5s here is acceptable as + // a single pinned test, especially since it runs in t.Parallel. + eng := &togglingFollowerEngine{} + p := NewLeaderProxyWithEngine(eng) + t.Cleanup(func() { _ = p.connCache.Close() }) + + reqs := []*pb.Request{ + { + IsTxn: false, + Phase: pb.Phase_NONE, + Ts: 10, + Mutations: []*pb.Mutation{ + {Op: pb.Op_PUT, Key: []byte("k"), Value: []byte("v")}, + }, + }, + } + + start := time.Now() + _, err := p.Commit(reqs) + elapsed := time.Since(start) + require.Error(t, err) + require.ErrorIs(t, err, ErrLeaderNotFound) + require.GreaterOrEqual(t, elapsed, leaderProxyRetryBudget) +} From 88a08bd6ece64f934a35693c1de62b8ebcbe4ce7 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 17:41:50 +0000 Subject: [PATCH 07/17] style(kv): appease golangci on coordinator_retry_test Three lint fixes on the new test file: - gci: align the method-receiver block (VerifyLeader vs. LinearizableRead) the formatter prefers. - gosec G115: CommitIndex conversion from the atomic.Int64 counter is safe (monotonic non-negative) but gosec can't see the invariant; annotate inline. - copyloopvar: drop the Go 1.22+ redundant tc := tc shadow inside the classification table loop. https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap --- kv/coordinator_retry_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/kv/coordinator_retry_test.go b/kv/coordinator_retry_test.go index a4e75c15..fb81b219 100644 --- a/kv/coordinator_retry_test.go +++ b/kv/coordinator_retry_test.go @@ -29,7 +29,7 @@ func (stubLeaderEngine) State() raftengine.State { return raftengine.StateLeader func (stubLeaderEngine) Leader() raftengine.LeaderInfo { return raftengine.LeaderInfo{ID: "self", Address: "127.0.0.1:0"} } -func (stubLeaderEngine) VerifyLeader(context.Context) error { return nil } +func (stubLeaderEngine) VerifyLeader(context.Context) error { return nil } func (stubLeaderEngine) LinearizableRead(context.Context) (uint64, error) { return 0, nil } func (stubLeaderEngine) Status() raftengine.Status { return raftengine.Status{State: raftengine.StateLeader} @@ -60,7 +60,11 @@ func (s *scriptedTransactional) Commit(reqs []*pb.Request) (*TransactionResponse if int(idx) < len(s.errs) && s.errs[idx] != nil { return nil, s.errs[idx] } - return &TransactionResponse{CommitIndex: uint64(idx + 1)}, nil + // idx is strictly non-negative (atomic.Add on an int64 that starts + // at 0 and only ever increments), so the conversion to uint64 for + // CommitIndex is safe. gosec G115 can't see that invariant, so + // silence it inline. + return &TransactionResponse{CommitIndex: uint64(idx + 1)}, nil //nolint:gosec } func (s *scriptedTransactional) Abort([]*pb.Request) (*TransactionResponse, error) { @@ -87,7 +91,6 @@ func TestIsTransientLeaderError_Classification(t *testing.T) { } for _, tc := range cases { - tc := tc t.Run(tc.name, func(t *testing.T) { t.Parallel() require.Equal(t, tc.want, isTransientLeaderError(tc.err)) From 4f105043c1483d26c9c55efd3850ba839e505e9d Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 17:59:35 +0000 Subject: [PATCH 08/17] fix(kv): enforce retry-budget strictly in Dispatch and LeaderProxy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Both retry loops evaluated the budget only AFTER a full attempt had run, so the budget was advisory rather than strict. Codex flagged this as P2: - Coordinate.Dispatch (kv/coordinator.go:279) — after the retry back-off, the loop unconditionally ran another dispatchOnce. When the second-chance attempt took the redirect() path it could block for redirectForwardTimeout (5s), pushing total latency well past the advertised dispatchLeaderRetryBudget (5s). - LeaderProxy.forwardWithRetry (kv/leader_proxy.go:93) — after the inner maxForwardRetries cycle ended, the outer loop could sleep and then start another full cycle even if deadline had already passed. Each forward() inside that cycle carried its own 5s RPC timeout detached from the retry deadline, so the same 5s-over problem applied, potentially three times per cycle. Two changes: 1. Re-check the deadline immediately after the back-off sleep so a budget that expired during the sleep terminates the loop before issuing another attempt. Keep the existing pre-sleep check so the loop still short-circuits when the previous attempt itself ran the clock out. This preserves the happy-path (first attempt always runs even when the deadline is tight — deadline = now + budget on entry). 2. In LeaderProxy, derive a parent context with the retry deadline and plumb it into forward(); forward() then uses context.WithTimeout(parentCtx, leaderForwardTimeout) so its effective per-call timeout is min(leaderForwardTimeout, remaining budget). A forward() issued with <5s of budget left caps at exactly the remaining budget rather than the full RPC timeout. The inner fast-retry loop also now checks the deadline before each forward() to avoid kicking off extra RPCs as the budget dribbles away. Coordinate.dispatchOnce still calls transactionManager.Commit, which does not propagate a context down to engine.Propose today, so the local-leader path can still take longer than the budget if Commit itself blocks. That is an orthogonal refactor (the Transactional interface would need a ctx parameter) and is out of scope for this fix; the advertised budget guarantee now holds for the redirect path, which is the one clients observe over gRPC. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 6.9s - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 212s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap --- kv/coordinator.go | 9 +++++++++ kv/leader_proxy.go | 34 +++++++++++++++++++++++++++++++--- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/kv/coordinator.go b/kv/coordinator.go index 05803ba4..e5735c78 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -291,6 +291,15 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C if err := waitForDispatchRetry(ctx, timer, dispatchLeaderRetryInterval); err != nil { return lastResp, err } + // Re-check the deadline AFTER the back-off sleep. If the budget + // expired while we slept, do not start another dispatchOnce: + // redirect()'s gRPC forward carries a 5s per-call timeout, so a + // fresh attempt post-budget could push the total call far past + // the advertised dispatchLeaderRetryBudget and break the + // bounded-failure contract gRPC clients rely on. + if !time.Now().Before(deadline) { + return lastResp, lastErr + } } } diff --git a/kv/leader_proxy.go b/kv/leader_proxy.go index 31c69284..9f9ddb53 100644 --- a/kv/leader_proxy.go +++ b/kv/leader_proxy.go @@ -79,12 +79,26 @@ func (p *LeaderProxy) Abort(reqs []*pb.Request) (*TransactionResponse, error) { // leader's Internal.Forward returns adapter.ErrNotLeader whose error // chain does not survive the gRPC boundary; errors.Is against // ErrLeaderNotFound would miss it and exit after only the fast retries. +// +// The overall budget is strictly enforced: no new forward() attempt is +// started once time.Now() has passed deadline, and each per-attempt RPC +// is bounded by min(leaderForwardTimeout, remaining budget). Without +// that second bound, a single forward() could run for the full 5s RPC +// timeout AFTER the budget expired, pushing total latency well past +// leaderProxyRetryBudget. func (p *LeaderProxy) forwardWithRetry(reqs []*pb.Request) (*TransactionResponse, error) { if len(reqs) == 0 { return &TransactionResponse{}, nil } deadline := time.Now().Add(leaderProxyRetryBudget) + // Parent context carries the retry deadline so forward()'s per-call + // timeout (derived via context.WithTimeout(parentCtx, ...)) can + // never extend past it — context.WithTimeout picks the earlier of + // the two expirations. + parentCtx, cancelParent := context.WithDeadline(context.Background(), deadline) + defer cancelParent() + var lastErr error for { // Each iteration of the outer loop runs up to maxForwardRetries @@ -93,7 +107,11 @@ func (p *LeaderProxy) forwardWithRetry(reqs []*pb.Request) (*TransactionResponse // one leaderProxyRetryInterval and re-poll until // leaderProxyRetryBudget elapses. for attempt := 0; attempt < maxForwardRetries; attempt++ { - resp, err := p.forward(reqs) + if !time.Now().Before(deadline) { + // Budget expired mid-cycle; do not start another RPC. + break + } + resp, err := p.forward(parentCtx, reqs) if err == nil { return resp, nil } @@ -109,10 +127,17 @@ func (p *LeaderProxy) forwardWithRetry(reqs []*pb.Request) (*TransactionResponse return nil, lastErr } time.Sleep(leaderProxyRetryInterval) + // Re-check the deadline AFTER the back-off: if the budget is + // exhausted, do not enter another maxForwardRetries cycle + // (which could issue up to three more RPCs, each bounded by + // leaderForwardTimeout relative to the now-expired deadline). + if !time.Now().Before(deadline) { + return nil, lastErr + } } } -func (p *LeaderProxy) forward(reqs []*pb.Request) (*TransactionResponse, error) { +func (p *LeaderProxy) forward(parentCtx context.Context, reqs []*pb.Request) (*TransactionResponse, error) { addr := leaderAddrFromEngine(p.engine) if addr == "" { return nil, errors.WithStack(ErrLeaderNotFound) @@ -124,7 +149,10 @@ func (p *LeaderProxy) forward(reqs []*pb.Request) (*TransactionResponse, error) } cli := pb.NewInternalClient(conn) - ctx, cancel := context.WithTimeout(context.Background(), leaderForwardTimeout) + // context.WithTimeout on a deadline-bounded parent yields the + // earlier of the two — so a forward() issued with <5s of budget + // remaining caps at exactly the budget, not the full RPC timeout. + ctx, cancel := context.WithTimeout(parentCtx, leaderForwardTimeout) defer cancel() resp, err := cli.Forward(ctx, &pb.ForwardRequest{ From 3b136d591179b1a97196b87d76ea5dd8819c23a1 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 18:13:50 +0000 Subject: [PATCH 09/17] fix(kv): cap each dispatchOnce attempt by remaining retry budget MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P2 follow-up to 4f10504: that commit added wall-clock checks around the retry loop, but the per-attempt context handed to dispatchOnce was still the caller's original ctx. A near-expiry iteration could legitimately enter dispatchOnce, dispatch through redirect(), and sit in cli.Forward for the full 5s redirectForwardTimeout — the post-sleep wall-clock check would trip on the next pass, but the offending call had already exceeded the advertised 5s dispatchLeaderRetryBudget. Derive a deadline-bounded boundedCtx via context.WithDeadline(ctx, deadline) once at the top of the retry loop and pass it to dispatchOnce. context.WithDeadline picks the earlier of the caller's and our own deadline, so callers with tighter deadlines keep their cancellation semantics. The waitForDispatchRetry call still uses the caller's ctx so caller cancellation surfaces cleanly via ctx.Done in the back-off select. Add an explicit ctx.Err() check after each dispatchOnce so that caller cancellation/deadline takes precedence over any gRPC error that wraps boundedCtx's DeadlineExceeded — without that check the caller would see a context-deadline error from inside the gRPC stack instead of their own ctx.Err(). The local-leader path (transactionManager.Commit -> engine.Propose) still cannot honor the bounded ctx today because Transactional.Commit does not take one; that is an orthogonal interface change. The budget guarantee now holds for the redirect path, which is what external gRPC clients observe. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 6.8s - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 201s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap --- kv/coordinator.go | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/kv/coordinator.go b/kv/coordinator.go index e5735c78..6f754075 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -268,10 +268,32 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C // explicit drain. timer := time.NewTimer(dispatchLeaderRetryInterval) defer timer.Stop() + // boundedCtx caps every dispatchOnce call by the retry deadline so + // that a forward RPC inside redirect (which itself uses + // context.WithTimeout(ctx, redirectForwardTimeout)) can never run + // past the advertised dispatchLeaderRetryBudget. Without this bound, + // a near-expiry iteration could legitimately enter dispatchOnce and + // then sit in cli.Forward for the full 5s redirectForwardTimeout — + // the wall-clock check between iterations would trip on the next + // pass, but the offending call has already exceeded the budget. + // context.WithDeadline picks the earlier of the caller's deadline + // and ours, so callers with a tighter deadline still get their + // own cancellation semantics. + boundedCtx, cancelBounded := context.WithDeadline(ctx, deadline) + defer cancelBounded() var lastResp *CoordinateResponse var lastErr error for { - lastResp, lastErr = c.dispatchOnce(ctx, reqs) + lastResp, lastErr = c.dispatchOnce(boundedCtx, reqs) + // Caller-supplied ctx cancellation/deadline takes precedence + // over any error dispatchOnce returned (which may itself wrap + // context.Canceled / context.DeadlineExceeded propagated + // through boundedCtx). gRPC clients rely on the wrapped + // ctx.Err() to distinguish "I gave up" from "system was + // unavailable". + if err := ctx.Err(); err != nil { + return lastResp, errors.WithStack(err) + } if !shouldRetryDispatch(lastErr) { return lastResp, lastErr } @@ -292,11 +314,11 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C return lastResp, err } // Re-check the deadline AFTER the back-off sleep. If the budget - // expired while we slept, do not start another dispatchOnce: - // redirect()'s gRPC forward carries a 5s per-call timeout, so a - // fresh attempt post-budget could push the total call far past - // the advertised dispatchLeaderRetryBudget and break the - // bounded-failure contract gRPC clients rely on. + // expired while we slept, do not start another dispatchOnce — + // boundedCtx would just cancel it immediately, but exiting here + // keeps the surfaced error as the last transient leader signal + // instead of a context-deadline error from inside the gRPC + // stack. if !time.Now().Before(deadline) { return lastResp, lastErr } From afd8e301d28b00dea8e10b27e3afa66d835e310d Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 18:32:38 +0000 Subject: [PATCH 10/17] fix(kv): never discard a committed dispatch; defensive nil-err + nits MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two CodeRabbit-actionable issues plus the accompanying nit cluster. Actionable: Dispatch must not convert a successful commit into a ctx error (coordinator.go). The retry loop used to check ctx.Err() ahead of lastErr == nil. If dispatchOnce returned success concurrently with caller cancellation, the commit was already durable in the FSM, but the loop handed the caller ctx.Err() and discarded lastResp. A retrying client would then re-issue the write and — for non-idempotent ops — risk observable duplicate effects. Reorder so a nil error returns lastResp, nil immediately; ctx.Err() only wins on an error path. New test TestCoordinateDispatch_SuccessBeatsConcurrentCancel pins the contract by cancelling inside Commit just before it returns success and asserting NoError + non-zero CommitIndex. Actionable: LeaderProxy.forwardWithRetry defensive nil guard (leader_proxy.go). The inner fast-retry loop can, under a future shorter budget or a clock jump between the deadline computation and the first time.Now() check, exit without ever calling forward() and leave lastErr == nil. errors.Wrapf(nil, ...) is nil, so the next classification step would return (nil, nil) — a silent success the proxy never produced. Add an explicit if lastErr == nil { return nil, ErrLeaderNotFound } after the inner loop so that path surfaces a real error regardless of future refactors. Test-side nits: - adapter/grpc_test.go: move shutdown(nodes) to defer in the parallel test so a wedged worker or a test-timeout doesn't leak raft nodes into sibling tests. - adapter/grpc_test.go: swap continue -> break in the 9999-iteration sequence/transaction loops. A genuine regression used to emit thousands of duplicate assertion lines, drowning the root cause; now the first failure short-circuits the rest of the loop while defer shutdown still runs. - kv/leader_proxy_test.go: gate TestLeaderProxy_FailsAfterLeaderBudgetElapses behind testing.Short() so `go test -short ./kv/...` runs in ~100ms instead of 5s+. CI invocations without -short still exercise the budget-exhaust contract. - kv/coordinator_retry_test.go: TestIsTransientLeaderError_PinsRealSentinels asserts the three upstream sentinel values (kv.ErrLeaderNotFound, raftengine.ErrNotLeader, raftengine.ErrLeadershipLost) still classify as transient. A future rename of those error messages that drifts them out of leaderErrorPhrases would now fail CI instead of silently surfacing transient leader churn to gRPC clients. adapter.ErrNotLeader can't be pinned here without a kv->adapter test-time import cycle; a symmetric pin belongs in the adapter test package (out of scope for this commit). Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 6.9s - go test -count=1 -short ./kv/ -run 'TestLeaderProxy' — 0.14s (5s budget-exhaust test correctly skipped) - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 245s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap --- adapter/grpc_test.go | 23 ++++++++----- kv/coordinator.go | 12 ++++++- kv/coordinator_retry_test.go | 64 ++++++++++++++++++++++++++++++++++++ kv/leader_proxy.go | 10 ++++++ kv/leader_proxy_test.go | 3 ++ 5 files changed, 103 insertions(+), 9 deletions(-) diff --git a/adapter/grpc_test.go b/adapter/grpc_test.go index e309728d..3fd95dcd 100644 --- a/adapter/grpc_test.go +++ b/adapter/grpc_test.go @@ -150,6 +150,7 @@ func Test_consistency_satisfy_write_after_read_for_parallel(t *testing.T) { t.Parallel() nodes, adders, _ := createNode(t, 3) c := rawKVClient(t, adders) + defer shutdown(nodes) wg := sync.WaitGroup{} const workers = 1000 @@ -179,7 +180,6 @@ func Test_consistency_satisfy_write_after_read_for_parallel(t *testing.T) { }(i) } wg.Wait() - shutdown(nodes) } func Test_consistency_satisfy_write_after_read_sequence(t *testing.T) { @@ -196,18 +196,22 @@ func Test_consistency_satisfy_write_after_read_sequence(t *testing.T) { context.Background(), &pb.RawPutRequest{Key: key, Value: want}, ) + // Stop at the first RPC failure instead of continuing: a + // genuine regression would otherwise cascade into 9998 more + // iterations, each reporting the same broken invariant, and + // drown the real cause in test-output noise. if !assert.NoError(t, err, "Put RPC failed") { - continue + break } _, err = c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: want}) if !assert.NoError(t, err, "Put RPC failed") { - continue + break } resp, err := c.RawGet(context.Background(), &pb.RawGetRequest{Key: key}) if !assert.NoError(t, err, "Get RPC failed") { - continue + break } assert.Equal(t, want, resp.Value, "consistency check failed") @@ -228,23 +232,26 @@ func Test_grpc_transaction(t *testing.T) { context.Background(), &pb.PutRequest{Key: key, Value: want}, ) + // See Test_consistency_satisfy_write_after_read_sequence: + // break on first RPC failure so a single broken invariant + // does not amplify into thousands of assertion lines. if !assert.NoError(t, err, "Put RPC failed") { - continue + break } resp, err := c.Get(context.Background(), &pb.GetRequest{Key: key}) if !assert.NoError(t, err, "Get RPC failed") { - continue + break } assert.Equal(t, want, resp.Value, "consistency check failed") _, err = c.Delete(context.Background(), &pb.DeleteRequest{Key: key}) if !assert.NoError(t, err, "Delete RPC failed") { - continue + break } resp, err = c.Get(context.Background(), &pb.GetRequest{Key: key}) if !assert.NoError(t, err, "Get RPC failed") { - continue + break } assert.Nil(t, resp.Value, "consistency check failed") } diff --git a/kv/coordinator.go b/kv/coordinator.go index 6f754075..7d0a06e9 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -285,8 +285,18 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C var lastErr error for { lastResp, lastErr = c.dispatchOnce(boundedCtx, reqs) + // A successful dispatch means the commit already happened on + // the raft FSM. Do NOT let a racing caller cancellation + // convert that success into a reported failure: returning + // ctx.Err() now would mask the commit, and a client that + // retries a non-idempotent write could observe duplicate + // effects. This ordering MUST run before the ctx.Err() check + // below. + if lastErr == nil { + return lastResp, nil + } // Caller-supplied ctx cancellation/deadline takes precedence - // over any error dispatchOnce returned (which may itself wrap + // over the error dispatchOnce returned (which may itself wrap // context.Canceled / context.DeadlineExceeded propagated // through boundedCtx). gRPC clients rely on the wrapped // ctx.Err() to distinguish "I gave up" from "system was diff --git a/kv/coordinator_retry_test.go b/kv/coordinator_retry_test.go index fb81b219..ff7d2a54 100644 --- a/kv/coordinator_retry_test.go +++ b/kv/coordinator_retry_test.go @@ -216,3 +216,67 @@ func TestCoordinateDispatch_CtxCancelDuringRetrySurfaces(t *testing.T) { }) require.ErrorIs(t, err, context.Canceled) } + +func TestCoordinateDispatch_SuccessBeatsConcurrentCancel(t *testing.T) { + t.Parallel() + + // Inverse race of TestCoordinateDispatch_CtxCancelDuringRetrySurfaces: + // dispatchOnce returns SUCCESS on the same attempt where the caller + // cancels ctx. The commit already landed in the FSM, so the loop + // MUST report success rather than converting it into a + // context.Canceled error — doing so would make a retrying client + // re-issue the same write and risk duplicate effects for + // non-idempotent operations. Pins the fix for the CodeRabbit-major + // ordering bug in the retry loop (commit + cancel ordering). + ctx, cancel := context.WithCancel(context.Background()) + tx := &scriptedTransactional{ + onCommit: func(int64) { + // Cancel inside Commit, BEFORE Commit returns. Dispatch + // will then observe a successful Commit but a cancelled + // ctx on its first check. + cancel() + }, + } + c := newRetryCoordinate(tx) + + resp, err := c.Dispatch(ctx, &OperationGroup[OP]{ + Elems: []*Elem[OP]{{Op: Put, Key: []byte("k"), Value: []byte("v")}}, + }) + require.NoError(t, err) + require.NotNil(t, resp) + require.Greater(t, resp.CommitIndex, uint64(0)) +} + +// TestIsTransientLeaderError_PinsRealSentinels asserts that the real +// .Error() texts of the upstream sentinels we classify as transient +// still pass through isTransientLeaderError. If a future rename of +// these messages drifts them out of leaderErrorPhrases' closed list, +// this test catches it at CI time rather than during a production +// re-election window. Pinning kv.ErrLeaderNotFound covers the +// wire-level phrase "leader not found"; raftengine.ErrNotLeader +// covers both the errors.Is sentinel path AND the string-fallback +// path ("not leader" substring). +// +// adapter.ErrNotLeader is not pinned here because adapter imports kv, +// which would create a test-time import cycle. A symmetric pin lives +// in the adapter test package alongside that sentinel. +func TestIsTransientLeaderError_PinsRealSentinels(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + err error + }{ + {"kv.ErrLeaderNotFound", ErrLeaderNotFound}, + {"raftengine.ErrNotLeader", raftengine.ErrNotLeader}, + {"raftengine.ErrLeadershipLost", raftengine.ErrLeadershipLost}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require.True(t, isTransientLeaderError(tc.err), + "sentinel %q (%q) no longer classified as transient — update leaderErrorPhrases or the classifier", + tc.name, tc.err.Error()) + }) + } +} diff --git a/kv/leader_proxy.go b/kv/leader_proxy.go index 9f9ddb53..c6ba36a6 100644 --- a/kv/leader_proxy.go +++ b/kv/leader_proxy.go @@ -120,6 +120,16 @@ func (p *LeaderProxy) forwardWithRetry(reqs []*pb.Request) (*TransactionResponse break } } + // Defensive: if the inner loop exited on the deadline guard + // before ever calling forward() (e.g. a future refactor + // shortens the budget, or the clock jumps forward between the + // outer deadline computation and the inner check), lastErr + // stays nil and errors.Wrapf(nil, ...) would silently yield + // nil — handing callers a (nil, nil) "success" that never + // happened. Surface a real error instead. + if lastErr == nil { + return nil, errors.WithStack(ErrLeaderNotFound) + } if !isTransientLeaderError(lastErr) { return nil, errors.Wrapf(lastErr, "leader forward failed after %d retries", maxForwardRetries) } diff --git a/kv/leader_proxy_test.go b/kv/leader_proxy_test.go index 9e6f7187..fba3c09f 100644 --- a/kv/leader_proxy_test.go +++ b/kv/leader_proxy_test.go @@ -255,6 +255,9 @@ func TestLeaderProxy_ForwardsAfterLeaderPublishes(t *testing.T) { func TestLeaderProxy_FailsAfterLeaderBudgetElapses(t *testing.T) { t.Parallel() + if testing.Short() { + t.Skip("exhausts the full leaderProxyRetryBudget (5s); skipped in -short mode") + } // No gRPC server, no address ever published → every forward() // returns ErrLeaderNotFound immediately. forwardWithRetry must loop From db58eef3e91bd100f63efe62b10ea250dbf457af Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 18:43:06 +0000 Subject: [PATCH 11/17] refactor(kv): extract retry helpers to satisfy cyclop on Dispatch + forwardWithRetry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit golangci cyclop flagged both Dispatch (11) and forwardWithRetry (11) after the CodeRabbit-major success-vs-cancel ordering fix and the defensive nil-err guard in the previous commit pushed them over the limit (10). Split out small single-purpose helpers so the hot-path functions stay readable without reshuffling semantics: - kv/coordinator.go: extract prepareDispatchRetry(ctx, reqs, leaderAssignsTS, timer) that bundles the "clear txn timestamps + wait one back-off interval" bookkeeping. Dispatch now does one helper call instead of an if-block + wait call, dropping its cyclomatic complexity from 11 to 10. - kv/leader_proxy.go: extract runForwardCycle(parentCtx, reqs, deadline) that runs the inner maxForwardRetries fast-retry cycle and returns (resp, err, done). done=true is the terminal branch (commit or non-transient error); done=false feeds the outer slow poll. forwardWithRetry drops from 11 to 7. Both defensive nil-err guards from afd8e30 are preserved: runForwardCycle only calls errors.Wrapf when lastErr != nil, and forwardWithRetry still surfaces ErrLeaderNotFound when the cycle exited without ever calling forward(). A Codex P2 flag on this path was a false-positive against afd8e30 (the nil-guard was already in place there). Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 7.4s - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 283s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap --- kv/coordinator.go | 28 +++++++++++-------- kv/leader_proxy.go | 69 ++++++++++++++++++++++++++++++---------------- 2 files changed, 62 insertions(+), 35 deletions(-) diff --git a/kv/coordinator.go b/kv/coordinator.go index 7d0a06e9..f24f1eea 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -310,17 +310,7 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C if !time.Now().Before(deadline) { return lastResp, lastErr } - if leaderAssignsTS { - // Force dispatchOnce to mint a fresh StartTS on the next - // attempt; keep CommitTS tied to StartTS by clearing it too - // (the same invariant dispatchOnce enforces). Re-using a - // stale StartTS would race the OCC conflict check in - // fsm.validateConflicts (LatestCommitTS > startTS) against - // writes that committed during the election window. - reqs.StartTS = 0 - reqs.CommitTS = 0 - } - if err := waitForDispatchRetry(ctx, timer, dispatchLeaderRetryInterval); err != nil { + if err := prepareDispatchRetry(ctx, reqs, leaderAssignsTS, timer); err != nil { return lastResp, err } // Re-check the deadline AFTER the back-off sleep. If the budget @@ -371,6 +361,22 @@ func waitForDispatchRetry(ctx context.Context, timer *time.Timer, interval time. } } +// prepareDispatchRetry groups the pre-back-off bookkeeping Dispatch +// must do between attempts: clear StartTS/CommitTS when the caller +// asked the coordinator to mint them (so the next dispatchOnce issues +// against the post-churn HLC instead of a stale timestamp that could +// trip fsm.validateConflicts), then sleep one retry interval — or +// return ctx.Err() wrapped if the caller cancels during the sleep. +// Factored out of Dispatch to keep that function's cyclomatic +// complexity under the cyclop threshold without shuffling semantics. +func prepareDispatchRetry(ctx context.Context, reqs *OperationGroup[OP], leaderAssignsTS bool, timer *time.Timer) error { + if leaderAssignsTS { + reqs.StartTS = 0 + reqs.CommitTS = 0 + } + return waitForDispatchRetry(ctx, timer, dispatchLeaderRetryInterval) +} + // dispatchOnce runs a single Dispatch attempt without retry. It is the // transactional unit retried by Dispatch on transient leader errors. // diff --git a/kv/leader_proxy.go b/kv/leader_proxy.go index c6ba36a6..240c7afd 100644 --- a/kv/leader_proxy.go +++ b/kv/leader_proxy.go @@ -101,38 +101,25 @@ func (p *LeaderProxy) forwardWithRetry(reqs []*pb.Request) (*TransactionResponse var lastErr error for { - // Each iteration of the outer loop runs up to maxForwardRetries - // fast retries against whatever leader is currently visible. If - // none is (or the forward bounced off a stale leader), we sleep - // one leaderProxyRetryInterval and re-poll until - // leaderProxyRetryBudget elapses. - for attempt := 0; attempt < maxForwardRetries; attempt++ { - if !time.Now().Before(deadline) { - // Budget expired mid-cycle; do not start another RPC. - break - } - resp, err := p.forward(parentCtx, reqs) - if err == nil { - return resp, nil - } - lastErr = err - if isTransientLeaderError(err) { - break - } + // runForwardCycle runs up to maxForwardRetries fast retries against + // whatever leader is currently visible and returns either a + // committed response, a terminal error, or the last transient + // leader error for the outer loop to re-poll on. + resp, err, done := p.runForwardCycle(parentCtx, reqs, deadline) + if done { + return resp, err } - // Defensive: if the inner loop exited on the deadline guard + lastErr = err + // Defensive: if runForwardCycle exited on the deadline guard // before ever calling forward() (e.g. a future refactor // shortens the budget, or the clock jumps forward between the // outer deadline computation and the inner check), lastErr - // stays nil and errors.Wrapf(nil, ...) would silently yield - // nil — handing callers a (nil, nil) "success" that never + // stays nil. errors.Wrapf(nil, ...) would silently yield nil + // — handing callers a (nil, nil) "success" that never // happened. Surface a real error instead. if lastErr == nil { return nil, errors.WithStack(ErrLeaderNotFound) } - if !isTransientLeaderError(lastErr) { - return nil, errors.Wrapf(lastErr, "leader forward failed after %d retries", maxForwardRetries) - } if !time.Now().Before(deadline) { return nil, lastErr } @@ -147,6 +134,40 @@ func (p *LeaderProxy) forwardWithRetry(reqs []*pb.Request) (*TransactionResponse } } +// runForwardCycle issues up to maxForwardRetries forward() attempts +// against the current leader, short-circuiting on the budget. +// Returns: +// - (resp, nil, true) on a committed success — caller should return it. +// - (nil, err, true) on a non-transient error wrapped with the +// retry-count context — caller should return it. +// - (nil, lastTransientErr, false) when every attempt failed with a +// transient leader-unavailable signal — caller should back off and +// retry the cycle. +// - (nil, nil, false) when the inner loop exited on the deadline +// guard before calling forward() at all; caller surfaces +// ErrLeaderNotFound for that defensive path. +func (p *LeaderProxy) runForwardCycle(parentCtx context.Context, reqs []*pb.Request, deadline time.Time) (*TransactionResponse, error, bool) { + var lastErr error + for attempt := 0; attempt < maxForwardRetries; attempt++ { + if !time.Now().Before(deadline) { + // Budget expired mid-cycle; do not start another RPC. + break + } + resp, err := p.forward(parentCtx, reqs) + if err == nil { + return resp, nil, true + } + lastErr = err + if isTransientLeaderError(err) { + break + } + } + if lastErr != nil && !isTransientLeaderError(lastErr) { + return nil, errors.Wrapf(lastErr, "leader forward failed after %d retries", maxForwardRetries), true + } + return nil, lastErr, false +} + func (p *LeaderProxy) forward(parentCtx context.Context, reqs []*pb.Request) (*TransactionResponse, error) { addr := leaderAddrFromEngine(p.engine) if addr == "" { From 7ce64f5014a716c423cced0b90d34172db2b24a3 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 18:56:11 +0000 Subject: [PATCH 12/17] fix(kv): classify leadership-loss/transfer wire strings as transient MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P1 follow-up. The transient classifier whitelist was missing two sentinel strings: - raftengine.ErrLeadershipLost → "raft engine: leadership lost" - raftengine.ErrLeadershipTransferInProgress → "raft engine: leadership transfer in progress" When either sentinel crosses a gRPC boundary (e.g. a follower forwards to a leader that steps down mid-proposal, or Internal.Forward returns these to redirect()), the typed chain is dropped and only the message text survives. isLeadershipLossError's errors.Is check fails, and without a matching entry in leaderErrorPhrases the substring fallback also misses — so forwardWithRetry / Dispatch would treat a genuine re-election signal as terminal and leak the transient error to gRPC clients, defeating the bounded-retry contract of this PR. Extend leaderErrorPhrases to include "leadership lost" and "leadership transfer in progress", pair each phrase with the sentinel that emits it in the inline comment, and add four new classifier cases + one sentinel pin (ErrLeadershipTransferInProgress) so a future rename on either side fails CI rather than surfacing at production re-election time. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ -run 'TestIsTransientLeaderError' -v — ok, all 13 new/existing cases pass - go test -race -count=1 ./kv/ — ok 7.1s - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 256s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap --- kv/coordinator.go | 13 +++++++++++-- kv/coordinator_retry_test.go | 9 +++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/kv/coordinator.go b/kv/coordinator.go index f24f1eea..b675354e 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -471,9 +471,18 @@ func isTransientLeaderError(err error) bool { // addition must correspond to an error the system actually emits for // leader churn, not a generic "failed" message that happens to mention // leaders. +// +// Every string here MUST be paired with a kv/raftengine/adapter +// sentinel in isLeadershipLossError or isTransientLeaderError so the +// typed-sentinel path catches the same condition when the error chain +// is intact. TestIsTransientLeaderError_PinsRealSentinels locks the +// sentinel .Error() texts to this list; a rename on the sentinel side +// will fail that test and force a matching update here. var leaderErrorPhrases = []string{ - "not leader", // adapter.ErrNotLeader, raftengine.ErrNotLeader, ErrLeadershipLost messages - "leader not found", // kv.ErrLeaderNotFound, adapter.ErrLeaderNotFound + "not leader", // adapter.ErrNotLeader, raftengine.ErrNotLeader ("raft engine: not leader") + "leader not found", // kv.ErrLeaderNotFound, adapter.ErrLeaderNotFound + "leadership lost", // raftengine.ErrLeadershipLost ("raft engine: leadership lost") + "leadership transfer in progress", // raftengine.ErrLeadershipTransferInProgress } // hasTransientLeaderPhrase reports whether err.Error() contains one of diff --git a/kv/coordinator_retry_test.go b/kv/coordinator_retry_test.go index ff7d2a54..508b8e79 100644 --- a/kv/coordinator_retry_test.go +++ b/kv/coordinator_retry_test.go @@ -83,9 +83,16 @@ func TestIsTransientLeaderError_Classification(t *testing.T) { {"kv.ErrLeaderNotFound", cerrors.WithStack(ErrLeaderNotFound), true}, {"raftengine.ErrNotLeader", cerrors.WithStack(raftengine.ErrNotLeader), true}, {"raftengine.ErrLeadershipLost", cerrors.WithStack(raftengine.ErrLeadershipLost), true}, + {"raftengine.ErrLeadershipTransferInProgress", + cerrors.WithStack(raftengine.ErrLeadershipTransferInProgress), true}, {"wire not-leader string", errors.New("not leader"), true}, {"wire leader-not-found string", errors.New("leader not found"), true}, + {"wire leadership-lost string", errors.New("raft engine: leadership lost"), true}, + {"wire leadership-transfer string", + errors.New("raft engine: leadership transfer in progress"), true}, {"gRPC status wrapping not leader", fmt.Errorf("rpc error: code = Unknown desc = not leader"), true}, + {"gRPC status wrapping leadership lost", + fmt.Errorf("rpc error: code = Unknown desc = raft engine: leadership lost"), true}, {"unrelated error", errors.New("write conflict"), false}, {"validation error", errors.New("invalid request"), false}, } @@ -270,6 +277,8 @@ func TestIsTransientLeaderError_PinsRealSentinels(t *testing.T) { {"kv.ErrLeaderNotFound", ErrLeaderNotFound}, {"raftengine.ErrNotLeader", raftengine.ErrNotLeader}, {"raftengine.ErrLeadershipLost", raftengine.ErrLeadershipLost}, + {"raftengine.ErrLeadershipTransferInProgress", + raftengine.ErrLeadershipTransferInProgress}, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { From e41049010e1eaa91d9552751d53bb565924e2d01 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 19:09:36 +0000 Subject: [PATCH 13/17] fix(kv): tighten transient-leader phrase match to suffix only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P2 follow-up. hasTransientLeaderPhrase used strings.Contains, so any non-leader error whose text happened to embed one of the allowlisted phrases was misclassified as transient. The documented failure mode: store.WriteConflictError formats as "key: : write conflict", and a user-chosen key containing "not leader" (or any other allowlisted phrase) would trip the Contains match. Dispatch / forwardWithRetry would then spin retries for up to the 5s budget — reissuing a txn with fresh StartTS each time — instead of surfacing the original conflict immediately. Switched to strings.HasSuffix. I verified the actual rendered output of every wrapper the retry path produces: cerrors.Wrapf(leaderErr, "forward failed after %d retries", 3) -> "forward failed after 3 retries: leader not found" cerrors.WithStack(leaderErr) -> "leader not found" fmt.Errorf("forwarded: %w", leaderErr) -> "forwarded: leader not found" gRPC status Errorf -> "rpc error: code = Unknown desc = leader not found" gRPC wrap stacked under cerrors.Wrapf -> "forward: rpc error: code = Unknown desc = leader not found" Every observed wrapper places the original error at the END of the composed string, so a suffix check remains sufficient while rejecting WriteConflictError's middle-of-string leak. Pinned both "sneaky key" shapes in TestIsTransientLeaderError_Classification so the tradeoff can't silently regress. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 7.0s, includes two new "sneaky key" write-conflict cases that now correctly stay non-transient - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 187s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap --- kv/coordinator.go | 22 +++++++++++++++++----- kv/coordinator_retry_test.go | 10 ++++++++++ 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/kv/coordinator.go b/kv/coordinator.go index b675354e..3a16f28f 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -485,17 +485,29 @@ var leaderErrorPhrases = []string{ "leadership transfer in progress", // raftengine.ErrLeadershipTransferInProgress } -// hasTransientLeaderPhrase reports whether err.Error() contains one of -// the well-known transient-leader substrings. It is the last resort used -// after errors.Is fails across a gRPC boundary that strips the original -// sentinel chain. +// hasTransientLeaderPhrase reports whether err.Error() ENDS WITH one of +// the well-known transient-leader substrings. It is the last resort +// used after errors.Is fails across a gRPC boundary that strips the +// original sentinel chain. +// +// Suffix matching — not free-form Contains — is load-bearing here: +// several non-leader errors embed user-controlled text in the middle +// of their message. store.WriteConflictError, for example, formats as +// "key: : write conflict"; a conflicted key literally named +// "not leader" would trip a Contains-based classifier and cause the +// coordinator to spin retries (and reissue with fresh timestamps) on +// what is actually a terminal business failure. Every wrapper we +// observe in practice (cockroachdb Wrapf, fmt.Errorf %w-prefix, gRPC +// status.Errorf's "rpc error: code = X desc = " form, plus any +// stacking of those) places the original message at the END of the +// composed string, so a suffix check is both tight and sufficient. func hasTransientLeaderPhrase(err error) bool { if err == nil { return false } msg := strings.ToLower(err.Error()) for _, phrase := range leaderErrorPhrases { - if strings.Contains(msg, phrase) { + if strings.HasSuffix(msg, phrase) { return true } } diff --git a/kv/coordinator_retry_test.go b/kv/coordinator_retry_test.go index 508b8e79..414074c8 100644 --- a/kv/coordinator_retry_test.go +++ b/kv/coordinator_retry_test.go @@ -95,6 +95,16 @@ func TestIsTransientLeaderError_Classification(t *testing.T) { fmt.Errorf("rpc error: code = Unknown desc = raft engine: leadership lost"), true}, {"unrelated error", errors.New("write conflict"), false}, {"validation error", errors.New("invalid request"), false}, + // Codex P2 regression: before the HasSuffix switch this was + // misclassified as transient because the substring matcher + // saw "not leader" in the middle. store.WriteConflictError + // literally formats as "key: : write conflict", + // and a user-chosen key can embed any of the phrases. Must + // stay a terminal business error. + {"write conflict with sneaky key matching leader phrase", + errors.New("key: not leader: write conflict"), false}, + {"write conflict with key containing leadership lost", + errors.New("key: raft engine: leadership lost: write conflict"), false}, } for _, tc := range cases { From 640f009cee41019f197e6fcc86a08e9aa5494082 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 20:51:41 +0000 Subject: [PATCH 14/17] fix(kv): enforce retry budget strictly across the back-off sleep MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three Copilot points on e410490, all genuine: 1. (kv/coordinator.go) waitForDispatchRetry slept a fixed dispatchLeaderRetryInterval even when the remaining time in dispatchLeaderRetryBudget was smaller. Worst case: Dispatch exceeded its advertised budget by up to one full interval (25ms). Pass deadline into the helper and cap sleep at min(interval, time.Until(deadline)) so the budget is strictly bounded. 2. (kv/leader_proxy.go) time.Sleep(leaderProxyRetryInterval) was neither capped nor interruptible. It both slept past the budget by up to one interval AND ignored parentCtx, so a context cancellation in the budget-bounded parentCtx would still wait the full 25ms before tearing down. Replace with a timer + parentCtx select and cap at min(interval, remaining budget). 3. (kv/coordinator_retry_test.go) the scriptedTransactional commit counter was atomic.Int64 with a //nolint:gosec G115 suppression on the CommitIndex return. Switching to atomic.Uint64 drops both the unsigned conversion and the linter bypass — the counter is only ever monotonically incremented from zero so a signed type was never meaningful here. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 6.8s - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 190s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap --- kv/coordinator.go | 22 +++++++++++++++++----- kv/coordinator_retry_test.go | 14 +++++--------- kv/leader_proxy.go | 18 +++++++++++++++++- 3 files changed, 39 insertions(+), 15 deletions(-) diff --git a/kv/coordinator.go b/kv/coordinator.go index 3a16f28f..e12fd28c 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -310,7 +310,7 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C if !time.Now().Before(deadline) { return lastResp, lastErr } - if err := prepareDispatchRetry(ctx, reqs, leaderAssignsTS, timer); err != nil { + if err := prepareDispatchRetry(ctx, reqs, leaderAssignsTS, timer, deadline); err != nil { return lastResp, err } // Re-check the deadline AFTER the back-off sleep. If the budget @@ -351,8 +351,20 @@ func shouldRetryDispatch(err error) bool { // whichever comes first. A ctx cancellation returns a wrapped ctx.Err() // so gRPC clients can distinguish "I gave up waiting" from "cluster is // unavailable"; timer expiry returns nil and the caller loops again. -func waitForDispatchRetry(ctx context.Context, timer *time.Timer, interval time.Duration) error { - timer.Reset(interval) +// +// The sleep is additionally capped at time.Until(deadline). Without +// that cap, a caller that hits the retry budget with 0 && until < sleep { + sleep = until + } + timer.Reset(sleep) select { case <-ctx.Done(): return errors.WithStack(ctx.Err()) @@ -369,12 +381,12 @@ func waitForDispatchRetry(ctx context.Context, timer *time.Timer, interval time. // return ctx.Err() wrapped if the caller cancels during the sleep. // Factored out of Dispatch to keep that function's cyclomatic // complexity under the cyclop threshold without shuffling semantics. -func prepareDispatchRetry(ctx context.Context, reqs *OperationGroup[OP], leaderAssignsTS bool, timer *time.Timer) error { +func prepareDispatchRetry(ctx context.Context, reqs *OperationGroup[OP], leaderAssignsTS bool, timer *time.Timer, deadline time.Time) error { if leaderAssignsTS { reqs.StartTS = 0 reqs.CommitTS = 0 } - return waitForDispatchRetry(ctx, timer, dispatchLeaderRetryInterval) + return waitForDispatchRetry(ctx, timer, dispatchLeaderRetryInterval, deadline) } // dispatchOnce runs a single Dispatch attempt without retry. It is the diff --git a/kv/coordinator_retry_test.go b/kv/coordinator_retry_test.go index 414074c8..8a990ff4 100644 --- a/kv/coordinator_retry_test.go +++ b/kv/coordinator_retry_test.go @@ -46,9 +46,9 @@ func (stubLeaderEngine) Close() error { return nil } // (guarded by atomic.Int64 for race-detector friendliness) is enough. type scriptedTransactional struct { errs []error - commits atomic.Int64 + commits atomic.Uint64 reqs [][]*pb.Request - onCommit func(call int64) // optional hook invoked inside Commit + onCommit func(call uint64) // optional hook invoked inside Commit } func (s *scriptedTransactional) Commit(reqs []*pb.Request) (*TransactionResponse, error) { @@ -60,11 +60,7 @@ func (s *scriptedTransactional) Commit(reqs []*pb.Request) (*TransactionResponse if int(idx) < len(s.errs) && s.errs[idx] != nil { return nil, s.errs[idx] } - // idx is strictly non-negative (atomic.Add on an int64 that starts - // at 0 and only ever increments), so the conversion to uint64 for - // CommitIndex is safe. gosec G115 can't see that invariant, so - // silence it inline. - return &TransactionResponse{CommitIndex: uint64(idx + 1)}, nil //nolint:gosec + return &TransactionResponse{CommitIndex: idx + 1}, nil } func (s *scriptedTransactional) Abort([]*pb.Request) (*TransactionResponse, error) { @@ -214,7 +210,7 @@ func TestCoordinateDispatch_CtxCancelDuringRetrySurfaces(t *testing.T) { cerrors.WithStack(raftengine.ErrNotLeader), cerrors.WithStack(raftengine.ErrNotLeader), }, - onCommit: func(call int64) { + onCommit: func(call uint64) { // Cancel after the first failed attempt so the next // back-off select sees ctx.Done(). if call == 0 { @@ -247,7 +243,7 @@ func TestCoordinateDispatch_SuccessBeatsConcurrentCancel(t *testing.T) { // ordering bug in the retry loop (commit + cancel ordering). ctx, cancel := context.WithCancel(context.Background()) tx := &scriptedTransactional{ - onCommit: func(int64) { + onCommit: func(uint64) { // Cancel inside Commit, BEFORE Commit returns. Dispatch // will then observe a successful Commit but a cancelled // ctx on its first check. diff --git a/kv/leader_proxy.go b/kv/leader_proxy.go index 240c7afd..144486d5 100644 --- a/kv/leader_proxy.go +++ b/kv/leader_proxy.go @@ -123,7 +123,23 @@ func (p *LeaderProxy) forwardWithRetry(reqs []*pb.Request) (*TransactionResponse if !time.Now().Before(deadline) { return nil, lastErr } - time.Sleep(leaderProxyRetryInterval) + // Bounded, interruptible back-off. time.Sleep would both + // exceed the budget by up to one interval (if time.Until < + // interval remains) and ignore parentCtx — so a cancelled + // budget would wait out the full 25ms instead of tearing + // down immediately. Cap at min(interval, remaining) and + // wake on parentCtx.Done so the back-off tracks the + // deadline exactly. + sleep := leaderProxyRetryInterval + if until := time.Until(deadline); until > 0 && until < sleep { + sleep = until + } + timer := time.NewTimer(sleep) + select { + case <-parentCtx.Done(): + timer.Stop() + case <-timer.C: + } // Re-check the deadline AFTER the back-off: if the budget is // exhausted, do not enter another maxForwardRetries cycle // (which could issue up to three more RPCs, each bounded by From 846125befcbf562bc898ec060b3156d0cc1d99aa Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 21:00:38 +0000 Subject: [PATCH 15/17] style(kv): drop gosec G115 + cyclop violations from retry back-off MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two golangci flags on 640f009: - kv/coordinator_retry_test.go G115: the atomic.Uint64 switch in the previous commit moved the conversion from "uint64 return" to "int(idx) < len(s.errs)" on the slice bounds check, which just reintroduces G115 on the other side. Change errs from []error to map[uint64]error so the whole counter / index path stays unsigned end-to-end. Map lookup returns the zero value (nil error) on absent keys, so no bounds check is required and no int<->uint64 conversion is needed anywhere. No //nolint:gosec suppression required now. - kv/leader_proxy.go cyclop: the new timer+select back-off pushed forwardWithRetry's branch count to 11 (max 10). Extract waitLeaderProxyBackoff(parentCtx, interval, deadline) that bundles the cap + interruptible select so forwardWithRetry stays at 7 branches. Behaviour is unchanged — sleep is still capped at min(interval, remaining budget) and the select still wakes on parentCtx.Done(). Four existing scriptedTransactional call sites updated from slice literals to indexed map literals (errs: map[uint64]error{0: ..., 1: ...}) to match the new API. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 6.8s - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 191s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap --- kv/coordinator_retry_test.go | 37 +++++++++++++++++++----------------- kv/leader_proxy.go | 36 ++++++++++++++++++----------------- 2 files changed, 39 insertions(+), 34 deletions(-) diff --git a/kv/coordinator_retry_test.go b/kv/coordinator_retry_test.go index 8a990ff4..71722d6d 100644 --- a/kv/coordinator_retry_test.go +++ b/kv/coordinator_retry_test.go @@ -39,13 +39,16 @@ func (stubLeaderEngine) Configuration(context.Context) (raftengine.Configuration } func (stubLeaderEngine) Close() error { return nil } -// scriptedTransactional returns the first len(errs) entries from errs on -// successive Commit calls, then succeeds. It records every observed -// request so tests can assert on per-attempt StartTS values. Commit is -// called serially from the Dispatch retry loop, so a plain slice index -// (guarded by atomic.Int64 for race-detector friendliness) is enough. +// scriptedTransactional returns the error registered in errs for the +// 0-indexed call that triggered Commit; calls without a registered +// entry succeed. Using a map keyed by uint64 rather than a []error +// keeps the whole counter path unsigned end-to-end — no gosec G115 +// conversions between int (for a slice length) and uint64 (the atomic +// counter), and no //nolint suppression required. Commit is called +// serially from the Dispatch retry loop, so atomic.Uint64 is only +// race-detector ceremony. type scriptedTransactional struct { - errs []error + errs map[uint64]error commits atomic.Uint64 reqs [][]*pb.Request onCommit func(call uint64) // optional hook invoked inside Commit @@ -57,8 +60,8 @@ func (s *scriptedTransactional) Commit(reqs []*pb.Request) (*TransactionResponse if s.onCommit != nil { s.onCommit(idx) } - if int(idx) < len(s.errs) && s.errs[idx] != nil { - return nil, s.errs[idx] + if err := s.errs[idx]; err != nil { + return nil, err } return &TransactionResponse{CommitIndex: idx + 1}, nil } @@ -130,9 +133,9 @@ func TestCoordinateDispatch_RetriesTransientLeaderError(t *testing.T) { // gRPC would transport); the third succeeds. Dispatch must absorb // both and return success without leaking the transient error. tx := &scriptedTransactional{ - errs: []error{ - cerrors.WithStack(raftengine.ErrNotLeader), - errors.New("leader not found"), + errs: map[uint64]error{ + 0: cerrors.WithStack(raftengine.ErrNotLeader), + 1: errors.New("leader not found"), }, } c := newRetryCoordinate(tx) @@ -154,7 +157,7 @@ func TestCoordinateDispatch_NonTransientErrorSurfacesImmediately(t *testing.T) { t.Parallel() business := errors.New("write conflict") - tx := &scriptedTransactional{errs: []error{business}} + tx := &scriptedTransactional{errs: map[uint64]error{0: business}} c := newRetryCoordinate(tx) _, err := c.Dispatch(context.Background(), &OperationGroup[OP]{ @@ -170,7 +173,7 @@ func TestCoordinateDispatch_RefreshesStartTSOnRetry(t *testing.T) { t.Parallel() tx := &scriptedTransactional{ - errs: []error{cerrors.WithStack(raftengine.ErrNotLeader)}, + errs: map[uint64]error{0: cerrors.WithStack(raftengine.ErrNotLeader)}, } c := newRetryCoordinate(tx) @@ -205,10 +208,10 @@ func TestCoordinateDispatch_CtxCancelDuringRetrySurfaces(t *testing.T) { // on this to tell "I gave up" from "cluster unavailable". ctx, cancel := context.WithCancel(context.Background()) tx := &scriptedTransactional{ - errs: []error{ - cerrors.WithStack(raftengine.ErrNotLeader), - cerrors.WithStack(raftengine.ErrNotLeader), - cerrors.WithStack(raftengine.ErrNotLeader), + errs: map[uint64]error{ + 0: cerrors.WithStack(raftengine.ErrNotLeader), + 1: cerrors.WithStack(raftengine.ErrNotLeader), + 2: cerrors.WithStack(raftengine.ErrNotLeader), }, onCommit: func(call uint64) { // Cancel after the first failed attempt so the next diff --git a/kv/leader_proxy.go b/kv/leader_proxy.go index 144486d5..53b81210 100644 --- a/kv/leader_proxy.go +++ b/kv/leader_proxy.go @@ -123,23 +123,7 @@ func (p *LeaderProxy) forwardWithRetry(reqs []*pb.Request) (*TransactionResponse if !time.Now().Before(deadline) { return nil, lastErr } - // Bounded, interruptible back-off. time.Sleep would both - // exceed the budget by up to one interval (if time.Until < - // interval remains) and ignore parentCtx — so a cancelled - // budget would wait out the full 25ms instead of tearing - // down immediately. Cap at min(interval, remaining) and - // wake on parentCtx.Done so the back-off tracks the - // deadline exactly. - sleep := leaderProxyRetryInterval - if until := time.Until(deadline); until > 0 && until < sleep { - sleep = until - } - timer := time.NewTimer(sleep) - select { - case <-parentCtx.Done(): - timer.Stop() - case <-timer.C: - } + waitLeaderProxyBackoff(parentCtx, leaderProxyRetryInterval, deadline) // Re-check the deadline AFTER the back-off: if the budget is // exhausted, do not enter another maxForwardRetries cycle // (which could issue up to three more RPCs, each bounded by @@ -150,6 +134,24 @@ func (p *LeaderProxy) forwardWithRetry(reqs []*pb.Request) (*TransactionResponse } } +// waitLeaderProxyBackoff sleeps for up to interval but never past the +// remaining budget, and is interruptible via parentCtx — so a parent +// deadline or cancellation tears the back-off down immediately instead +// of waiting out the full interval. Factored out of forwardWithRetry +// so that function stays under the cyclop threshold. +func waitLeaderProxyBackoff(parentCtx context.Context, interval time.Duration, deadline time.Time) { + sleep := interval + if until := time.Until(deadline); until > 0 && until < sleep { + sleep = until + } + timer := time.NewTimer(sleep) + defer timer.Stop() + select { + case <-parentCtx.Done(): + case <-timer.C: + } +} + // runForwardCycle issues up to maxForwardRetries forward() attempts // against the current leader, short-circuiting on the budget. // Returns: From 29e1f2e93f6848a81269de7477a3e9df10d2c45f Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 21:14:26 +0000 Subject: [PATCH 16/17] fix(kv): surface transient leader error on retry-budget expiry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P2 on 846125b. When Dispatch's bounded retry budget fires while dispatchOnce is mid-call (for example redirect() waiting on cli.Forward), boundedCtx cancels the RPC and dispatchOnce returns a context.DeadlineExceeded wrapped through the gRPC stack. The previous code then took the "not transient, return as-is" path and handed that DeadlineExceeded to the gRPC caller — clients saw a generic internal timeout even though every prior attempt during the 5s window had reported a transient leader-churn signal, which is the behaviour client retry logic actually expects. Track the most recent TRANSIENT leader error we observed (lastTransientErr) separately from lastErr, and pick between them in a new finalDispatchErr helper when the retry loop terminates via shouldRetryDispatch == false. Rule: if our own retry budget has expired and we previously saw a transient leader error, surface that — the deadline is just how the loop noticed the budget ran out, not the real failure mode. Otherwise (business error inside the budget, or first attempt ever), surface lastErr unchanged. Pinned four cases in kv/coordinator_retry_test.go: 1. past deadline with a transient recorded → returns the transient (this is the Codex-flagged scenario) 2. past deadline with no transient seen → returns lastErr (prevents the helper from clobbering a genuine first-attempt failure with nil) 3. within budget, non-transient error → returns lastErr (write conflict / validation must not be masked by a stale transient) 4. within budget, nil transient → returns lastErr (nil-guard) Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 6.7s (includes the new TestFinalDispatchErr_PrefersTransientOnBudgetExpiry pin) - go test -race -run '^(Test_consistency_satisfy_write_after_read_sequence|Test_grpc_transaction|Test_consistency_satisfy_write_after_read_for_parallel)$' ./adapter/ -count=1 — ok 182s https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap --- kv/coordinator.go | 37 ++++++++++++++++++++++++++++++++++-- kv/coordinator_retry_test.go | 37 ++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/kv/coordinator.go b/kv/coordinator.go index e12fd28c..d168ffd4 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -282,7 +282,13 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C boundedCtx, cancelBounded := context.WithDeadline(ctx, deadline) defer cancelBounded() var lastResp *CoordinateResponse - var lastErr error + // lastErr tracks the most recent dispatchOnce result. lastTransientErr + // separately retains the most recent TRANSIENT leader error we + // actually observed from a leader-routing failure, distinct from a + // context.DeadlineExceeded that boundedCtx propagates when our + // retry budget fires mid-attempt. The distinction matters for the + // final surfaced error: see finalDispatchErr. + var lastErr, lastTransientErr error for { lastResp, lastErr = c.dispatchOnce(boundedCtx, reqs) // A successful dispatch means the commit already happened on @@ -305,8 +311,9 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C return lastResp, errors.WithStack(err) } if !shouldRetryDispatch(lastErr) { - return lastResp, lastErr + return lastResp, finalDispatchErr(lastErr, lastTransientErr, deadline) } + lastTransientErr = lastErr if !time.Now().Before(deadline) { return lastResp, lastErr } @@ -347,6 +354,32 @@ func shouldRetryDispatch(err error) bool { return isTransientLeaderError(err) } +// finalDispatchErr picks the error Dispatch surfaces when the retry +// loop terminates via shouldRetryDispatch == false. There are two +// cases: +// +// - The attempt returned a genuine non-transient error (write +// conflict, validation failure, etc.) while the budget was still +// healthy. Return lastErr unchanged; the caller needs the real +// failure reason, not a stale leader signal. +// - Our bounded retry budget fired mid-attempt, so dispatchOnce +// propagates a context.DeadlineExceeded from boundedCtx rather +// than a genuine failure. That deadline is just how the retry +// loop noticed it ran out; the *actual* failure mode is the +// transient leader churn we saw during the window. Return +// lastTransientErr so clients see "leader unavailable" instead +// of an internal gRPC timeout after bounded retries. +// +// The time.Now() check distinguishes the two: if now ≥ deadline, the +// budget is exhausted and the most recent non-transient lastErr is +// the deadline marker, not a real business error. +func finalDispatchErr(lastErr, lastTransientErr error, deadline time.Time) error { + if lastTransientErr != nil && !time.Now().Before(deadline) { + return lastTransientErr + } + return lastErr +} + // waitForDispatchRetry sleeps for interval on timer or until ctx fires, // whichever comes first. A ctx cancellation returns a wrapped ctx.Err() // so gRPC clients can distinguish "I gave up waiting" from "cluster is diff --git a/kv/coordinator_retry_test.go b/kv/coordinator_retry_test.go index 71722d6d..3378fa5e 100644 --- a/kv/coordinator_retry_test.go +++ b/kv/coordinator_retry_test.go @@ -298,3 +298,40 @@ func TestIsTransientLeaderError_PinsRealSentinels(t *testing.T) { }) } } + +// TestFinalDispatchErr_PrefersTransientOnBudgetExpiry pins the +// Codex-P2 fix: when Dispatch's bounded retry budget fires +// mid-attempt, dispatchOnce returns a context.DeadlineExceeded +// propagated from boundedCtx rather than a genuine failure. The +// gRPC caller should see the last transient leader error collected +// during the retry window, not the internal deadline leak — that +// describes the true failure mode ("leader unavailable") whereas +// the deadline is just how the retry loop noticed it ran out. +func TestFinalDispatchErr_PrefersTransientOnBudgetExpiry(t *testing.T) { + t.Parallel() + + transient := cerrors.WithStack(raftengine.ErrNotLeader) + ctxDeadline := cerrors.WithStack(context.DeadlineExceeded) + + // Past deadline, transient error accumulated: surface the + // transient leader error, not the deadline marker. + past := time.Now().Add(-time.Millisecond) + require.ErrorIs(t, finalDispatchErr(ctxDeadline, transient, past), raftengine.ErrNotLeader) + + // Past deadline but no transient ever seen (very first attempt + // returned a non-retryable failure that happens to straddle the + // deadline): surface lastErr unchanged so callers see the real + // reason, not a misleading nil. + require.ErrorIs(t, finalDispatchErr(ctxDeadline, nil, past), context.DeadlineExceeded) + + // Still within budget, non-transient error: surface lastErr. + // This is the healthy-path case where a write conflict / validation + // failure must reach the caller unclobbered. + future := time.Now().Add(5 * time.Second) + businessErr := errors.New("write conflict") + require.ErrorIs(t, finalDispatchErr(businessErr, transient, future), businessErr) + + // Within budget with no prior transient — same as above but + // exercises the nil-guard on lastTransientErr. + require.ErrorIs(t, finalDispatchErr(businessErr, nil, future), businessErr) +} From 26f086f6ea2d8f567afafa75aade54ac8af01622 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 24 Apr 2026 21:18:42 +0000 Subject: [PATCH 17/17] docs(kv): sync isTransientLeaderError doc + stabilise LeaderProxy test timing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two Copilot items on 29e1f2e, both small: - kv/coordinator.go:490: the doc-comment on isTransientLeaderError still said the wire-level fallback uses a "case-insensitive substring match". The implementation switched to strings.HasSuffix back in e410490 so WriteConflictError with a user-chosen key matching a leader phrase stays non-transient. Reword the doc to say SUFFIX match and spell out why (user-text in WriteConflictError keys). - kv/leader_proxy_test.go: in TestLeaderProxy_ForwardsAfterLeaderPublishes the start instant was captured after the `go func(){ time.Sleep(publishDelay) ...}()` launch, so the goroutine-scheduling delay ate into the observed elapsed and `require.GreaterOrEqual(t, elapsed, publishDelay)` could flake on slow runners even when the proxy did wait the full delay. Move `start := time.Now()` above the publish goroutine so the publishDelay lower bound is measured from the same instant as the Commit call. Test plan - go vet ./... — clean - go test -race -count=1 ./kv/ — ok 7.0s (incl. the now-stable TestLeaderProxy_ForwardsAfterLeaderPublishes) https://claude.ai/code/session_012boXutHkKDuYHfBeabQCap --- kv/coordinator.go | 15 ++++++++++----- kv/leader_proxy_test.go | 21 ++++++++++++--------- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/kv/coordinator.go b/kv/coordinator.go index d168ffd4..6a18899b 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -487,11 +487,16 @@ func (c *Coordinate) dispatchOnce(ctx context.Context, reqs *OperationGroup[OP]) // node returns adapter.ErrNotLeader (its own sentinel) which gRPC // transports as a generic Unknown status carrying only the message // "not leader". errors.Is cannot traverse that wire boundary, so we -// fall back to a case-insensitive substring match on the final error -// text. leaderErrorPhrases enumerates the exact phrases the adapter -// and coordinator layers emit so the match cannot accidentally -// swallow an unrelated business-logic error that happens to contain -// "leader" in its text. +// fall back to a case-insensitive SUFFIX match on the final error +// text (see hasTransientLeaderPhrase). Suffix-only — not a free +// Contains — because store.WriteConflictError formats as +// "key: : write conflict" and a user-chosen key embedding +// "not leader" must not be misclassified as transient. +// leaderErrorPhrases enumerates the exact phrases the adapter and +// coordinator layers emit; every observed wrapper (cockroachdb +// Wrapf, fmt.Errorf %w-prefix, gRPC status Errorf) places the +// original message at the end of the composed string so suffix +// matching catches the real churn signals without false positives. // // Business-logic failures (write conflict, validation, etc.) are NOT // covered here — those must surface to the caller unchanged so client diff --git a/kv/leader_proxy_test.go b/kv/leader_proxy_test.go index fba3c09f..92a26360 100644 --- a/kv/leader_proxy_test.go +++ b/kv/leader_proxy_test.go @@ -221,15 +221,6 @@ func TestLeaderProxy_ForwardsAfterLeaderPublishes(t *testing.T) { p := NewLeaderProxyWithEngine(eng) t.Cleanup(func() { _ = p.connCache.Close() }) - // Publish the leader address after ~100ms (several poll intervals, - // well inside leaderProxyRetryBudget). forwardWithRetry should be - // mid-poll at that point and converge on success. - publishDelay := 100 * time.Millisecond - go func() { - time.Sleep(publishDelay) - eng.setLeader(lis.Addr().String()) - }() - reqs := []*pb.Request{ { IsTxn: false, @@ -241,7 +232,19 @@ func TestLeaderProxy_ForwardsAfterLeaderPublishes(t *testing.T) { }, } + // Anchor start BEFORE launching the publish goroutine so the + // publishDelay lower bound on elapsed is measured from the same + // instant as the Commit call. Capturing start after the goroutine + // launch would subtract the goroutine-scheduling time from + // elapsed and can let the GreaterOrEqual(elapsed, publishDelay) + // assertion flake when the proxy really did wait the full delay. + publishDelay := 100 * time.Millisecond start := time.Now() + go func() { + time.Sleep(publishDelay) + eng.setLeader(lis.Addr().String()) + }() + resp, err := p.Commit(reqs) elapsed := time.Since(start) require.NoError(t, err)