diff --git a/adapter/grpc_test.go b/adapter/grpc_test.go index d9597472..3fd95dcd 100644 --- a/adapter/grpc_test.go +++ b/adapter/grpc_test.go @@ -1,9 +1,7 @@ package adapter import ( - "bytes" "context" - "fmt" "strconv" "sync" "testing" @@ -152,60 +150,36 @@ func Test_consistency_satisfy_write_after_read_for_parallel(t *testing.T) { t.Parallel() nodes, adders, _ := createNode(t, 3) c := rawKVClient(t, adders) + defer shutdown(nodes) - // 1000 concurrent clients × 3 RPCs saturates the single raft leader - // hard enough to provoke brief quorum checks to fail on CI, so retry - // transient leader-unavailable errors. The *Eventually helpers are - // intentionally NOT used here: they end in require.NoError, and - // require calls t.FailNow() which must run on the main test goroutine. - // Workers use retryNotLeader + an errors channel instead so all - // require/assert calls happen on the main goroutine after wg.Wait(). - ctx := context.Background() - const workers = 1000 - errCh := make(chan error, workers) wg := sync.WaitGroup{} + const workers = 1000 wg.Add(workers) for i := range workers { go func(i int) { defer wg.Done() key := []byte("test-key-parallel" + strconv.Itoa(i)) want := []byte(strconv.Itoa(i)) - put := func() error { - _, err := c.RawPut(ctx, &pb.RawPutRequest{Key: key, Value: want}) - return err - } - if err := retryNotLeader(ctx, put); err != nil { - errCh <- err + _, err := c.RawPut( + context.Background(), + &pb.RawPutRequest{Key: key, Value: want}, + ) + if !assert.NoError(t, err, "Put RPC failed") { return } - if err := retryNotLeader(ctx, put); err != nil { - errCh <- err + _, err = c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: want}) + if !assert.NoError(t, err, "Put RPC failed") { return } - var resp *pb.RawGetResponse - err := retryNotLeader(ctx, func() error { - r, err := c.RawGet(ctx, &pb.RawGetRequest{Key: key}) - if err != nil { - return err - } - resp = r - return nil - }) - if err != nil { - errCh <- err + + resp, err := c.RawGet(context.Background(), &pb.RawGetRequest{Key: key}) + if !assert.NoError(t, err, "Get RPC failed") { return } - if !bytes.Equal(want, resp.Value) { - errCh <- fmt.Errorf("consistency check failed for key %s: want %q got %q", key, want, resp.Value) - } + assert.Equal(t, want, resp.Value, "consistency check failed") }(i) } wg.Wait() - close(errCh) - for err := range errCh { - assert.NoError(t, err) - } - shutdown(nodes) } func Test_consistency_satisfy_write_after_read_sequence(t *testing.T) { @@ -216,18 +190,30 @@ func Test_consistency_satisfy_write_after_read_sequence(t *testing.T) { key := []byte("test-key-sequence") - // Use *Eventually helpers because a 9999-iteration loop across three - // t.Parallel adapter tests loads CI enough that the raft leader can - // briefly lose quorum and step down mid-run, surfacing as transient - // "not leader" / "leader not found" RPC errors. The helpers retry - // only those transient errors; any other error still fails the test. - ctx := context.Background() for i := range 9999 { want := []byte("sequence" + strconv.Itoa(i)) - rawPutEventually(t, ctx, c, &pb.RawPutRequest{Key: key, Value: want}) - rawPutEventually(t, ctx, c, &pb.RawPutRequest{Key: key, Value: want}) + _, err := c.RawPut( + context.Background(), + &pb.RawPutRequest{Key: key, Value: want}, + ) + // Stop at the first RPC failure instead of continuing: a + // genuine regression would otherwise cascade into 9998 more + // iterations, each reporting the same broken invariant, and + // drown the real cause in test-output noise. + if !assert.NoError(t, err, "Put RPC failed") { + break + } + + _, err = c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: want}) + if !assert.NoError(t, err, "Put RPC failed") { + break + } + + resp, err := c.RawGet(context.Background(), &pb.RawGetRequest{Key: key}) + if !assert.NoError(t, err, "Get RPC failed") { + break + } - resp := rawGetEventually(t, ctx, c, &pb.RawGetRequest{Key: key}) assert.Equal(t, want, resp.Value, "consistency check failed") } } @@ -240,18 +226,33 @@ func Test_grpc_transaction(t *testing.T) { key := []byte("test-key-sequence") - // See Test_consistency_satisfy_write_after_read_sequence for why the - // *Eventually helpers are necessary here. - ctx := context.Background() for i := range 9999 { want := []byte("sequence" + strconv.Itoa(i)) - txnPutEventually(t, ctx, c, &pb.PutRequest{Key: key, Value: want}) - resp := txnGetEventually(t, ctx, c, &pb.GetRequest{Key: key}) + _, err := c.Put( + context.Background(), + &pb.PutRequest{Key: key, Value: want}, + ) + // See Test_consistency_satisfy_write_after_read_sequence: + // break on first RPC failure so a single broken invariant + // does not amplify into thousands of assertion lines. + if !assert.NoError(t, err, "Put RPC failed") { + break + } + resp, err := c.Get(context.Background(), &pb.GetRequest{Key: key}) + if !assert.NoError(t, err, "Get RPC failed") { + break + } assert.Equal(t, want, resp.Value, "consistency check failed") - txnDeleteEventually(t, ctx, c, &pb.DeleteRequest{Key: key}) + _, err = c.Delete(context.Background(), &pb.DeleteRequest{Key: key}) + if !assert.NoError(t, err, "Delete RPC failed") { + break + } - resp = txnGetEventually(t, ctx, c, &pb.GetRequest{Key: key}) + resp, err = c.Get(context.Background(), &pb.GetRequest{Key: key}) + if !assert.NoError(t, err, "Get RPC failed") { + break + } assert.Nil(t, resp.Value, "consistency check failed") } } diff --git a/adapter/test_util.go b/adapter/test_util.go index 951698d9..4de2c47a 100644 --- a/adapter/test_util.go +++ b/adapter/test_util.go @@ -606,64 +606,3 @@ func lpushEventually(t *testing.T, ctx context.Context, rdb *redis.Client, key s return rdb.LPush(ctx, key, vals...).Err() }) } - -// rawPutEventually wraps RawKV.RawPut in doEventually so transient leader -// churn (either at startup or in the middle of a long-running loop) does -// not fail the test with "not leader" / "leader not found". -func rawPutEventually(t *testing.T, ctx context.Context, c pb.RawKVClient, req *pb.RawPutRequest) { - t.Helper() - doEventually(t, func() error { - _, err := c.RawPut(ctx, req) - return err - }) -} - -// rawGetEventually wraps RawKV.RawGet in doEventually and returns the -// response only after a successful (non-"not leader") call. -func rawGetEventually(t *testing.T, ctx context.Context, c pb.RawKVClient, req *pb.RawGetRequest) *pb.RawGetResponse { - t.Helper() - var resp *pb.RawGetResponse - doEventually(t, func() error { - r, err := c.RawGet(ctx, req) - if err != nil { - return err - } - resp = r - return nil - }) - return resp -} - -// txnPutEventually wraps TransactionalKV.Put in doEventually. -func txnPutEventually(t *testing.T, ctx context.Context, c pb.TransactionalKVClient, req *pb.PutRequest) { - t.Helper() - doEventually(t, func() error { - _, err := c.Put(ctx, req) - return err - }) -} - -// txnGetEventually wraps TransactionalKV.Get in doEventually and returns the -// response only after a successful (non-"not leader") call. -func txnGetEventually(t *testing.T, ctx context.Context, c pb.TransactionalKVClient, req *pb.GetRequest) *pb.GetResponse { - t.Helper() - var resp *pb.GetResponse - doEventually(t, func() error { - r, err := c.Get(ctx, req) - if err != nil { - return err - } - resp = r - return nil - }) - return resp -} - -// txnDeleteEventually wraps TransactionalKV.Delete in doEventually. -func txnDeleteEventually(t *testing.T, ctx context.Context, c pb.TransactionalKVClient, req *pb.DeleteRequest) { - t.Helper() - doEventually(t, func() error { - _, err := c.Delete(ctx, req) - return err - }) -} diff --git a/kv/coordinator.go b/kv/coordinator.go index 4648314e..6a18899b 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "log/slog" "reflect" + "strings" "time" "github.com/bootjp/elastickv/internal/monoclock" @@ -16,6 +17,23 @@ import ( const redirectForwardTimeout = 5 * time.Second +// dispatchLeaderRetryBudget bounds how long Dispatch keeps absorbing +// transient leader-unavailable errors (no leader resolvable yet, local +// node just stepped down, forwarded RPC bounced off a stale leader). +// gRPC callers expect linearizable semantics — i.e. an operation either +// commits atomically or fails definitively — so the coordinator hides +// raft-internal leader churn behind a bounded retry instead of leaking +// "not leader" / "leader not found" errors out through the API. +// +// The budget is large enough to cover one or two complete re-elections +// even on a slow runner (etcd/raft randomised election timeout up to +// ~1s), and small enough that a permanent loss of quorum still surfaces +// to the caller in bounded time. +const dispatchLeaderRetryBudget = 5 * time.Second + +// dispatchLeaderRetryInterval is the poll interval between retries. +const dispatchLeaderRetryInterval = 25 * time.Millisecond + // hlcPhysicalWindowMs is the duration in milliseconds that the Raft-agreed // physical ceiling extends ahead of the current wall clock. Modelled after // TiDB's TSO 3-second window: the leader commits ceiling = now + window, and @@ -225,10 +243,194 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C } // Validate the request before any use to avoid panics on malformed input. + // Validation errors are not retryable, so do this once outside the loop. if err := validateOperationGroup(reqs); err != nil { return nil, err } + // Wrap the actual dispatch in a bounded retry loop so that transient + // leader-unavailable errors (no leader resolvable yet, local node just + // stepped down, forwarded RPC bounced off a stale leader) are absorbed + // inside the coordinator instead of leaking out through the gRPC API. + // The gRPC contract is linearizable: a single client call either + // commits atomically or returns a definitive error. "Leader not found" + // during a re-election is neither, so we wait briefly for the cluster + // to re-stabilise. Non-leader errors that exceed the retry budget are + // surfaced unchanged for callers to observe. + leaderAssignsTS := coordinatorAssignsTimestamps(reqs) + deadline := time.Now().Add(dispatchLeaderRetryBudget) + // Reuse a single Timer across retries. time.After would allocate a + // fresh timer per iteration whose Go runtime entry lingers until the + // interval elapses, producing a short-term leak proportional to the + // retry rate. Under heavy mid-dispatch leader churn this is a hot + // loop, so we Reset the timer in place instead. Go 1.23+ timer + // semantics make Reset on an unfired/expired Timer safe without an + // explicit drain. + timer := time.NewTimer(dispatchLeaderRetryInterval) + defer timer.Stop() + // boundedCtx caps every dispatchOnce call by the retry deadline so + // that a forward RPC inside redirect (which itself uses + // context.WithTimeout(ctx, redirectForwardTimeout)) can never run + // past the advertised dispatchLeaderRetryBudget. Without this bound, + // a near-expiry iteration could legitimately enter dispatchOnce and + // then sit in cli.Forward for the full 5s redirectForwardTimeout — + // the wall-clock check between iterations would trip on the next + // pass, but the offending call has already exceeded the budget. + // context.WithDeadline picks the earlier of the caller's deadline + // and ours, so callers with a tighter deadline still get their + // own cancellation semantics. + boundedCtx, cancelBounded := context.WithDeadline(ctx, deadline) + defer cancelBounded() + var lastResp *CoordinateResponse + // lastErr tracks the most recent dispatchOnce result. lastTransientErr + // separately retains the most recent TRANSIENT leader error we + // actually observed from a leader-routing failure, distinct from a + // context.DeadlineExceeded that boundedCtx propagates when our + // retry budget fires mid-attempt. The distinction matters for the + // final surfaced error: see finalDispatchErr. + var lastErr, lastTransientErr error + for { + lastResp, lastErr = c.dispatchOnce(boundedCtx, reqs) + // A successful dispatch means the commit already happened on + // the raft FSM. Do NOT let a racing caller cancellation + // convert that success into a reported failure: returning + // ctx.Err() now would mask the commit, and a client that + // retries a non-idempotent write could observe duplicate + // effects. This ordering MUST run before the ctx.Err() check + // below. + if lastErr == nil { + return lastResp, nil + } + // Caller-supplied ctx cancellation/deadline takes precedence + // over the error dispatchOnce returned (which may itself wrap + // context.Canceled / context.DeadlineExceeded propagated + // through boundedCtx). gRPC clients rely on the wrapped + // ctx.Err() to distinguish "I gave up" from "system was + // unavailable". + if err := ctx.Err(); err != nil { + return lastResp, errors.WithStack(err) + } + if !shouldRetryDispatch(lastErr) { + return lastResp, finalDispatchErr(lastErr, lastTransientErr, deadline) + } + lastTransientErr = lastErr + if !time.Now().Before(deadline) { + return lastResp, lastErr + } + if err := prepareDispatchRetry(ctx, reqs, leaderAssignsTS, timer, deadline); err != nil { + return lastResp, err + } + // Re-check the deadline AFTER the back-off sleep. If the budget + // expired while we slept, do not start another dispatchOnce — + // boundedCtx would just cancel it immediately, but exiting here + // keeps the surfaced error as the last transient leader signal + // instead of a context-deadline error from inside the gRPC + // stack. + if !time.Now().Before(deadline) { + return lastResp, lastErr + } + } +} + +// coordinatorAssignsTimestamps reports whether the caller expects the +// coordinator to mint StartTS/CommitTS on this dispatch. When true, each +// retry MUST reset the timestamps back to zero so dispatchOnce re-issues +// against the post-churn leader's HLC. +func coordinatorAssignsTimestamps(reqs *OperationGroup[OP]) bool { + if !reqs.IsTxn { + return false + } + return reqs.StartTS == 0 +} + +// shouldRetryDispatch reports whether Dispatch should loop again on the +// error returned by dispatchOnce. Only transient leader-unavailable +// signals qualify; a nil error and every other non-retryable error +// (write conflict, validation, etc.) must surface unchanged. +func shouldRetryDispatch(err error) bool { + if err == nil { + return false + } + return isTransientLeaderError(err) +} + +// finalDispatchErr picks the error Dispatch surfaces when the retry +// loop terminates via shouldRetryDispatch == false. There are two +// cases: +// +// - The attempt returned a genuine non-transient error (write +// conflict, validation failure, etc.) while the budget was still +// healthy. Return lastErr unchanged; the caller needs the real +// failure reason, not a stale leader signal. +// - Our bounded retry budget fired mid-attempt, so dispatchOnce +// propagates a context.DeadlineExceeded from boundedCtx rather +// than a genuine failure. That deadline is just how the retry +// loop noticed it ran out; the *actual* failure mode is the +// transient leader churn we saw during the window. Return +// lastTransientErr so clients see "leader unavailable" instead +// of an internal gRPC timeout after bounded retries. +// +// The time.Now() check distinguishes the two: if now ≥ deadline, the +// budget is exhausted and the most recent non-transient lastErr is +// the deadline marker, not a real business error. +func finalDispatchErr(lastErr, lastTransientErr error, deadline time.Time) error { + if lastTransientErr != nil && !time.Now().Before(deadline) { + return lastTransientErr + } + return lastErr +} + +// waitForDispatchRetry sleeps for interval on timer or until ctx fires, +// whichever comes first. A ctx cancellation returns a wrapped ctx.Err() +// so gRPC clients can distinguish "I gave up waiting" from "cluster is +// unavailable"; timer expiry returns nil and the caller loops again. +// +// The sleep is additionally capped at time.Until(deadline). Without +// that cap, a caller that hits the retry budget with 0 && until < sleep { + sleep = until + } + timer.Reset(sleep) + select { + case <-ctx.Done(): + return errors.WithStack(ctx.Err()) + case <-timer.C: + return nil + } +} + +// prepareDispatchRetry groups the pre-back-off bookkeeping Dispatch +// must do between attempts: clear StartTS/CommitTS when the caller +// asked the coordinator to mint them (so the next dispatchOnce issues +// against the post-churn HLC instead of a stale timestamp that could +// trip fsm.validateConflicts), then sleep one retry interval — or +// return ctx.Err() wrapped if the caller cancels during the sleep. +// Factored out of Dispatch to keep that function's cyclomatic +// complexity under the cyclop threshold without shuffling semantics. +func prepareDispatchRetry(ctx context.Context, reqs *OperationGroup[OP], leaderAssignsTS bool, timer *time.Timer, deadline time.Time) error { + if leaderAssignsTS { + reqs.StartTS = 0 + reqs.CommitTS = 0 + } + return waitForDispatchRetry(ctx, timer, dispatchLeaderRetryInterval, deadline) +} + +// dispatchOnce runs a single Dispatch attempt without retry. It is the +// transactional unit retried by Dispatch on transient leader errors. +// +// StartTS issuance is intentionally inside the per-attempt path: if a +// previous attempt was rejected by a stale leader, the new leader's +// HLC must mint a fresh StartTS so it floors above any committed +// physical-ceiling lease. Re-using the previous StartTS could violate +// monotonicity across the leader transition. +func (c *Coordinate) dispatchOnce(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error) { if !c.IsLeader() { return c.redirect(ctx, reqs) } @@ -268,6 +470,100 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C return resp, err } +// isTransientLeaderError reports whether err is a transient +// leader-unavailable signal worth retrying inside Dispatch. +// +// Three distinct conditions qualify: +// - ErrLeaderNotFound — no leader address resolvable on this node yet +// (election in progress, or the previous leader stepped down and the +// successor has not propagated). Recoverable as soon as a new leader +// publishes its identity. +// - isLeadershipLossError — the etcd/raft engine rejected a Propose +// because it just lost leadership (ErrNotLeader / ErrLeadershipLost +// / ErrLeadershipTransferInProgress). Recoverable by re-routing +// through the new leader (redirect path). +// - Forwarded "not leader" / "leader not found" strings — when +// Coordinate.redirect forwards to a stale leader, the destination +// node returns adapter.ErrNotLeader (its own sentinel) which gRPC +// transports as a generic Unknown status carrying only the message +// "not leader". errors.Is cannot traverse that wire boundary, so we +// fall back to a case-insensitive SUFFIX match on the final error +// text (see hasTransientLeaderPhrase). Suffix-only — not a free +// Contains — because store.WriteConflictError formats as +// "key: : write conflict" and a user-chosen key embedding +// "not leader" must not be misclassified as transient. +// leaderErrorPhrases enumerates the exact phrases the adapter and +// coordinator layers emit; every observed wrapper (cockroachdb +// Wrapf, fmt.Errorf %w-prefix, gRPC status Errorf) places the +// original message at the end of the composed string so suffix +// matching catches the real churn signals without false positives. +// +// Business-logic failures (write conflict, validation, etc.) are NOT +// covered here — those must surface to the caller unchanged so client +// retry logic can distinguish "system was unavailable" from "your write +// was rejected on its merits". +func isTransientLeaderError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, ErrLeaderNotFound) { + return true + } + if isLeadershipLossError(err) { + return true + } + return hasTransientLeaderPhrase(err) +} + +// leaderErrorPhrases is the closed set of wire-level error strings the +// coordinator is willing to treat as transient once the typed sentinel +// has been dropped by a gRPC boundary. Keep this list tight — any +// addition must correspond to an error the system actually emits for +// leader churn, not a generic "failed" message that happens to mention +// leaders. +// +// Every string here MUST be paired with a kv/raftengine/adapter +// sentinel in isLeadershipLossError or isTransientLeaderError so the +// typed-sentinel path catches the same condition when the error chain +// is intact. TestIsTransientLeaderError_PinsRealSentinels locks the +// sentinel .Error() texts to this list; a rename on the sentinel side +// will fail that test and force a matching update here. +var leaderErrorPhrases = []string{ + "not leader", // adapter.ErrNotLeader, raftengine.ErrNotLeader ("raft engine: not leader") + "leader not found", // kv.ErrLeaderNotFound, adapter.ErrLeaderNotFound + "leadership lost", // raftengine.ErrLeadershipLost ("raft engine: leadership lost") + "leadership transfer in progress", // raftengine.ErrLeadershipTransferInProgress +} + +// hasTransientLeaderPhrase reports whether err.Error() ENDS WITH one of +// the well-known transient-leader substrings. It is the last resort +// used after errors.Is fails across a gRPC boundary that strips the +// original sentinel chain. +// +// Suffix matching — not free-form Contains — is load-bearing here: +// several non-leader errors embed user-controlled text in the middle +// of their message. store.WriteConflictError, for example, formats as +// "key: : write conflict"; a conflicted key literally named +// "not leader" would trip a Contains-based classifier and cause the +// coordinator to spin retries (and reissue with fresh timestamps) on +// what is actually a terminal business failure. Every wrapper we +// observe in practice (cockroachdb Wrapf, fmt.Errorf %w-prefix, gRPC +// status.Errorf's "rpc error: code = X desc = " form, plus any +// stacking of those) places the original message at the END of the +// composed string, so a suffix check is both tight and sufficient. +func hasTransientLeaderPhrase(err error) bool { + if err == nil { + return false + } + msg := strings.ToLower(err.Error()) + for _, phrase := range leaderErrorPhrases { + if strings.HasSuffix(msg, phrase) { + return true + } + } + return false +} + // refreshLeaseAfterDispatch extends the lease only when the dispatch // produced a real Raft commit. CommitIndex == 0 means the underlying // transaction manager short-circuited (empty-input Commit, no-op diff --git a/kv/coordinator_retry_test.go b/kv/coordinator_retry_test.go new file mode 100644 index 00000000..3378fa5e --- /dev/null +++ b/kv/coordinator_retry_test.go @@ -0,0 +1,337 @@ +package kv + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/bootjp/elastickv/internal/raftengine" + pb "github.com/bootjp/elastickv/proto" + cerrors "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// stubLeaderEngine reports State=Leader and a fixed Leader address. It is +// the minimum surface Coordinate.Dispatch needs to take the local-dispatch +// path (IsLeader() true) without engaging real raft machinery. Methods +// beyond that surface return zero values / nil; Dispatch's retry loop +// never calls them, and refreshLeaseAfterDispatch skips its LeaseProvider +// branch because this stub does not implement that interface. +type stubLeaderEngine struct{} + +func (stubLeaderEngine) Propose(context.Context, []byte) (*raftengine.ProposalResult, error) { + return &raftengine.ProposalResult{}, nil +} +func (stubLeaderEngine) State() raftengine.State { return raftengine.StateLeader } +func (stubLeaderEngine) Leader() raftengine.LeaderInfo { + return raftengine.LeaderInfo{ID: "self", Address: "127.0.0.1:0"} +} +func (stubLeaderEngine) VerifyLeader(context.Context) error { return nil } +func (stubLeaderEngine) LinearizableRead(context.Context) (uint64, error) { return 0, nil } +func (stubLeaderEngine) Status() raftengine.Status { + return raftengine.Status{State: raftengine.StateLeader} +} +func (stubLeaderEngine) Configuration(context.Context) (raftengine.Configuration, error) { + return raftengine.Configuration{}, nil +} +func (stubLeaderEngine) Close() error { return nil } + +// scriptedTransactional returns the error registered in errs for the +// 0-indexed call that triggered Commit; calls without a registered +// entry succeed. Using a map keyed by uint64 rather than a []error +// keeps the whole counter path unsigned end-to-end — no gosec G115 +// conversions between int (for a slice length) and uint64 (the atomic +// counter), and no //nolint suppression required. Commit is called +// serially from the Dispatch retry loop, so atomic.Uint64 is only +// race-detector ceremony. +type scriptedTransactional struct { + errs map[uint64]error + commits atomic.Uint64 + reqs [][]*pb.Request + onCommit func(call uint64) // optional hook invoked inside Commit +} + +func (s *scriptedTransactional) Commit(reqs []*pb.Request) (*TransactionResponse, error) { + idx := s.commits.Add(1) - 1 + s.reqs = append(s.reqs, reqs) + if s.onCommit != nil { + s.onCommit(idx) + } + if err := s.errs[idx]; err != nil { + return nil, err + } + return &TransactionResponse{CommitIndex: idx + 1}, nil +} + +func (s *scriptedTransactional) Abort([]*pb.Request) (*TransactionResponse, error) { + return &TransactionResponse{}, nil +} + +func TestIsTransientLeaderError_Classification(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"kv.ErrLeaderNotFound", cerrors.WithStack(ErrLeaderNotFound), true}, + {"raftengine.ErrNotLeader", cerrors.WithStack(raftengine.ErrNotLeader), true}, + {"raftengine.ErrLeadershipLost", cerrors.WithStack(raftengine.ErrLeadershipLost), true}, + {"raftengine.ErrLeadershipTransferInProgress", + cerrors.WithStack(raftengine.ErrLeadershipTransferInProgress), true}, + {"wire not-leader string", errors.New("not leader"), true}, + {"wire leader-not-found string", errors.New("leader not found"), true}, + {"wire leadership-lost string", errors.New("raft engine: leadership lost"), true}, + {"wire leadership-transfer string", + errors.New("raft engine: leadership transfer in progress"), true}, + {"gRPC status wrapping not leader", fmt.Errorf("rpc error: code = Unknown desc = not leader"), true}, + {"gRPC status wrapping leadership lost", + fmt.Errorf("rpc error: code = Unknown desc = raft engine: leadership lost"), true}, + {"unrelated error", errors.New("write conflict"), false}, + {"validation error", errors.New("invalid request"), false}, + // Codex P2 regression: before the HasSuffix switch this was + // misclassified as transient because the substring matcher + // saw "not leader" in the middle. store.WriteConflictError + // literally formats as "key: : write conflict", + // and a user-chosen key can embed any of the phrases. Must + // stay a terminal business error. + {"write conflict with sneaky key matching leader phrase", + errors.New("key: not leader: write conflict"), false}, + {"write conflict with key containing leadership lost", + errors.New("key: raft engine: leadership lost: write conflict"), false}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require.Equal(t, tc.want, isTransientLeaderError(tc.err)) + }) + } +} + +// newRetryCoordinate wires a Coordinate against stubLeaderEngine and the +// supplied scripted transaction manager. No engine registration happens, +// so the caller does not need to call Close. +func newRetryCoordinate(tx Transactional) *Coordinate { + return &Coordinate{ + transactionManager: tx, + engine: stubLeaderEngine{}, + clock: NewHLC(), + } +} + +func TestCoordinateDispatch_RetriesTransientLeaderError(t *testing.T) { + t.Parallel() + + // First two Commit calls fail with transient leader signals (once via + // the typed raftengine sentinel, once via the wire-level string that + // gRPC would transport); the third succeeds. Dispatch must absorb + // both and return success without leaking the transient error. + tx := &scriptedTransactional{ + errs: map[uint64]error{ + 0: cerrors.WithStack(raftengine.ErrNotLeader), + 1: errors.New("leader not found"), + }, + } + c := newRetryCoordinate(tx) + + start := time.Now() + resp, err := c.Dispatch(context.Background(), &OperationGroup[OP]{ + Elems: []*Elem[OP]{{Op: Put, Key: []byte("k"), Value: []byte("v")}}, + }) + require.NoError(t, err) + require.NotNil(t, resp) + require.EqualValues(t, 3, tx.commits.Load()) + // Three attempts with a 25ms poll interval gives a realistic lower + // bound of ~50ms; anything shorter would mean the loop skipped its + // back-off. + require.GreaterOrEqual(t, time.Since(start), 2*dispatchLeaderRetryInterval) +} + +func TestCoordinateDispatch_NonTransientErrorSurfacesImmediately(t *testing.T) { + t.Parallel() + + business := errors.New("write conflict") + tx := &scriptedTransactional{errs: map[uint64]error{0: business}} + c := newRetryCoordinate(tx) + + _, err := c.Dispatch(context.Background(), &OperationGroup[OP]{ + Elems: []*Elem[OP]{{Op: Put, Key: []byte("k"), Value: []byte("v")}}, + }) + require.ErrorIs(t, err, business) + // Exactly one attempt — the retry loop must not re-drive non-transient + // errors. + require.EqualValues(t, 1, tx.commits.Load()) +} + +func TestCoordinateDispatch_RefreshesStartTSOnRetry(t *testing.T) { + t.Parallel() + + tx := &scriptedTransactional{ + errs: map[uint64]error{0: cerrors.WithStack(raftengine.ErrNotLeader)}, + } + c := newRetryCoordinate(tx) + + // Caller passes StartTS==0 — the contract is that the coordinator + // mints the timestamp. Each retry MUST reset StartTS to 0 so + // dispatchOnce re-mints against the post-churn clock; otherwise the + // FSM's LatestCommitTS > startTS check could reject the retry + // against a write that committed during the election window. + reqs := &OperationGroup[OP]{ + IsTxn: true, + Elems: []*Elem[OP]{{Op: Put, Key: []byte("k"), Value: []byte("v")}}, + } + _, err := c.Dispatch(context.Background(), reqs) + require.NoError(t, err) + require.EqualValues(t, 2, tx.commits.Load()) + require.Len(t, tx.reqs, 2) + require.Len(t, tx.reqs[0], 1) + require.Len(t, tx.reqs[1], 1) + + ts0 := tx.reqs[0][0].Ts + ts1 := tx.reqs[1][0].Ts + require.Greater(t, ts0, uint64(0), "first attempt must carry a minted StartTS") + require.Greater(t, ts1, ts0, "retry must mint a fresh, strictly greater StartTS") +} + +func TestCoordinateDispatch_CtxCancelDuringRetrySurfaces(t *testing.T) { + t.Parallel() + + // An always-transient failure keeps the retry loop alive; cancelling + // the context during the back-off must surface ctx.Err() to the + // caller rather than the transient leader error. gRPC clients rely + // on this to tell "I gave up" from "cluster unavailable". + ctx, cancel := context.WithCancel(context.Background()) + tx := &scriptedTransactional{ + errs: map[uint64]error{ + 0: cerrors.WithStack(raftengine.ErrNotLeader), + 1: cerrors.WithStack(raftengine.ErrNotLeader), + 2: cerrors.WithStack(raftengine.ErrNotLeader), + }, + onCommit: func(call uint64) { + // Cancel after the first failed attempt so the next + // back-off select sees ctx.Done(). + if call == 0 { + cancel() + } + }, + } + c := &Coordinate{ + transactionManager: tx, + engine: stubLeaderEngine{}, + clock: NewHLC(), + } + + _, err := c.Dispatch(ctx, &OperationGroup[OP]{ + Elems: []*Elem[OP]{{Op: Put, Key: []byte("k"), Value: []byte("v")}}, + }) + require.ErrorIs(t, err, context.Canceled) +} + +func TestCoordinateDispatch_SuccessBeatsConcurrentCancel(t *testing.T) { + t.Parallel() + + // Inverse race of TestCoordinateDispatch_CtxCancelDuringRetrySurfaces: + // dispatchOnce returns SUCCESS on the same attempt where the caller + // cancels ctx. The commit already landed in the FSM, so the loop + // MUST report success rather than converting it into a + // context.Canceled error — doing so would make a retrying client + // re-issue the same write and risk duplicate effects for + // non-idempotent operations. Pins the fix for the CodeRabbit-major + // ordering bug in the retry loop (commit + cancel ordering). + ctx, cancel := context.WithCancel(context.Background()) + tx := &scriptedTransactional{ + onCommit: func(uint64) { + // Cancel inside Commit, BEFORE Commit returns. Dispatch + // will then observe a successful Commit but a cancelled + // ctx on its first check. + cancel() + }, + } + c := newRetryCoordinate(tx) + + resp, err := c.Dispatch(ctx, &OperationGroup[OP]{ + Elems: []*Elem[OP]{{Op: Put, Key: []byte("k"), Value: []byte("v")}}, + }) + require.NoError(t, err) + require.NotNil(t, resp) + require.Greater(t, resp.CommitIndex, uint64(0)) +} + +// TestIsTransientLeaderError_PinsRealSentinels asserts that the real +// .Error() texts of the upstream sentinels we classify as transient +// still pass through isTransientLeaderError. If a future rename of +// these messages drifts them out of leaderErrorPhrases' closed list, +// this test catches it at CI time rather than during a production +// re-election window. Pinning kv.ErrLeaderNotFound covers the +// wire-level phrase "leader not found"; raftengine.ErrNotLeader +// covers both the errors.Is sentinel path AND the string-fallback +// path ("not leader" substring). +// +// adapter.ErrNotLeader is not pinned here because adapter imports kv, +// which would create a test-time import cycle. A symmetric pin lives +// in the adapter test package alongside that sentinel. +func TestIsTransientLeaderError_PinsRealSentinels(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + err error + }{ + {"kv.ErrLeaderNotFound", ErrLeaderNotFound}, + {"raftengine.ErrNotLeader", raftengine.ErrNotLeader}, + {"raftengine.ErrLeadershipLost", raftengine.ErrLeadershipLost}, + {"raftengine.ErrLeadershipTransferInProgress", + raftengine.ErrLeadershipTransferInProgress}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require.True(t, isTransientLeaderError(tc.err), + "sentinel %q (%q) no longer classified as transient — update leaderErrorPhrases or the classifier", + tc.name, tc.err.Error()) + }) + } +} + +// TestFinalDispatchErr_PrefersTransientOnBudgetExpiry pins the +// Codex-P2 fix: when Dispatch's bounded retry budget fires +// mid-attempt, dispatchOnce returns a context.DeadlineExceeded +// propagated from boundedCtx rather than a genuine failure. The +// gRPC caller should see the last transient leader error collected +// during the retry window, not the internal deadline leak — that +// describes the true failure mode ("leader unavailable") whereas +// the deadline is just how the retry loop noticed it ran out. +func TestFinalDispatchErr_PrefersTransientOnBudgetExpiry(t *testing.T) { + t.Parallel() + + transient := cerrors.WithStack(raftengine.ErrNotLeader) + ctxDeadline := cerrors.WithStack(context.DeadlineExceeded) + + // Past deadline, transient error accumulated: surface the + // transient leader error, not the deadline marker. + past := time.Now().Add(-time.Millisecond) + require.ErrorIs(t, finalDispatchErr(ctxDeadline, transient, past), raftengine.ErrNotLeader) + + // Past deadline but no transient ever seen (very first attempt + // returned a non-retryable failure that happens to straddle the + // deadline): surface lastErr unchanged so callers see the real + // reason, not a misleading nil. + require.ErrorIs(t, finalDispatchErr(ctxDeadline, nil, past), context.DeadlineExceeded) + + // Still within budget, non-transient error: surface lastErr. + // This is the healthy-path case where a write conflict / validation + // failure must reach the caller unclobbered. + future := time.Now().Add(5 * time.Second) + businessErr := errors.New("write conflict") + require.ErrorIs(t, finalDispatchErr(businessErr, transient, future), businessErr) + + // Within budget with no prior transient — same as above but + // exercises the nil-guard on lastTransientErr. + require.ErrorIs(t, finalDispatchErr(businessErr, nil, future), businessErr) +} diff --git a/kv/leader_proxy.go b/kv/leader_proxy.go index 3e675cd5..53b81210 100644 --- a/kv/leader_proxy.go +++ b/kv/leader_proxy.go @@ -13,6 +13,16 @@ import ( const leaderForwardTimeout = 5 * time.Second const maxForwardRetries = 3 +// leaderProxyRetryBudget bounds how long forwardWithRetry keeps polling +// for a leader address while none is published yet. gRPC callers expect +// linearizable semantics, so the proxy hides brief re-election windows +// behind a bounded retry instead of returning ErrLeaderNotFound on the +// very first attempt. +const leaderProxyRetryBudget = 5 * time.Second + +// leaderProxyRetryInterval paces re-resolution of the leader address. +const leaderProxyRetryInterval = 25 * time.Millisecond + // LeaderProxy forwards transactional requests to the current raft leader when // the local node is not the leader. type LeaderProxy struct { @@ -51,30 +61,132 @@ func (p *LeaderProxy) Abort(reqs []*pb.Request) (*TransactionResponse, error) { return p.tm.Abort(reqs) } -// forwardWithRetry attempts to forward to the leader up to maxForwardRetries -// times, re-fetching the leader address on each failure to handle leadership -// changes between attempts. +// forwardWithRetry attempts to forward to the leader, re-fetching the +// leader address on each failure to handle leadership changes between +// attempts. Two retry signals are interleaved: +// +// - Forward-RPC failures are bounded by maxForwardRetries (each attempt +// re-resolves the leader address inside forward()). +// - Transient leader-unavailable errors (no leader published yet, or +// the forwarded RPC landed on a stale leader that returned +// adapter.ErrNotLeader over the wire) are bounded by +// leaderProxyRetryBudget so a brief re-election window does not +// bubble up to gRPC clients as a hard failure. Linearizable callers +// expect the proxy to either commit atomically or fail definitively, +// not to leak transient raft-internal churn. +// +// The wire-level "not leader" string match is necessary because a stale +// leader's Internal.Forward returns adapter.ErrNotLeader whose error +// chain does not survive the gRPC boundary; errors.Is against +// ErrLeaderNotFound would miss it and exit after only the fast retries. +// +// The overall budget is strictly enforced: no new forward() attempt is +// started once time.Now() has passed deadline, and each per-attempt RPC +// is bounded by min(leaderForwardTimeout, remaining budget). Without +// that second bound, a single forward() could run for the full 5s RPC +// timeout AFTER the budget expired, pushing total latency well past +// leaderProxyRetryBudget. func (p *LeaderProxy) forwardWithRetry(reqs []*pb.Request) (*TransactionResponse, error) { if len(reqs) == 0 { return &TransactionResponse{}, nil } + deadline := time.Now().Add(leaderProxyRetryBudget) + // Parent context carries the retry deadline so forward()'s per-call + // timeout (derived via context.WithTimeout(parentCtx, ...)) can + // never extend past it — context.WithTimeout picks the earlier of + // the two expirations. + parentCtx, cancelParent := context.WithDeadline(context.Background(), deadline) + defer cancelParent() + + var lastErr error + for { + // runForwardCycle runs up to maxForwardRetries fast retries against + // whatever leader is currently visible and returns either a + // committed response, a terminal error, or the last transient + // leader error for the outer loop to re-poll on. + resp, err, done := p.runForwardCycle(parentCtx, reqs, deadline) + if done { + return resp, err + } + lastErr = err + // Defensive: if runForwardCycle exited on the deadline guard + // before ever calling forward() (e.g. a future refactor + // shortens the budget, or the clock jumps forward between the + // outer deadline computation and the inner check), lastErr + // stays nil. errors.Wrapf(nil, ...) would silently yield nil + // — handing callers a (nil, nil) "success" that never + // happened. Surface a real error instead. + if lastErr == nil { + return nil, errors.WithStack(ErrLeaderNotFound) + } + if !time.Now().Before(deadline) { + return nil, lastErr + } + waitLeaderProxyBackoff(parentCtx, leaderProxyRetryInterval, deadline) + // Re-check the deadline AFTER the back-off: if the budget is + // exhausted, do not enter another maxForwardRetries cycle + // (which could issue up to three more RPCs, each bounded by + // leaderForwardTimeout relative to the now-expired deadline). + if !time.Now().Before(deadline) { + return nil, lastErr + } + } +} + +// waitLeaderProxyBackoff sleeps for up to interval but never past the +// remaining budget, and is interruptible via parentCtx — so a parent +// deadline or cancellation tears the back-off down immediately instead +// of waiting out the full interval. Factored out of forwardWithRetry +// so that function stays under the cyclop threshold. +func waitLeaderProxyBackoff(parentCtx context.Context, interval time.Duration, deadline time.Time) { + sleep := interval + if until := time.Until(deadline); until > 0 && until < sleep { + sleep = until + } + timer := time.NewTimer(sleep) + defer timer.Stop() + select { + case <-parentCtx.Done(): + case <-timer.C: + } +} + +// runForwardCycle issues up to maxForwardRetries forward() attempts +// against the current leader, short-circuiting on the budget. +// Returns: +// - (resp, nil, true) on a committed success — caller should return it. +// - (nil, err, true) on a non-transient error wrapped with the +// retry-count context — caller should return it. +// - (nil, lastTransientErr, false) when every attempt failed with a +// transient leader-unavailable signal — caller should back off and +// retry the cycle. +// - (nil, nil, false) when the inner loop exited on the deadline +// guard before calling forward() at all; caller surfaces +// ErrLeaderNotFound for that defensive path. +func (p *LeaderProxy) runForwardCycle(parentCtx context.Context, reqs []*pb.Request, deadline time.Time) (*TransactionResponse, error, bool) { var lastErr error for attempt := 0; attempt < maxForwardRetries; attempt++ { - resp, err := p.forward(reqs) + if !time.Now().Before(deadline) { + // Budget expired mid-cycle; do not start another RPC. + break + } + resp, err := p.forward(parentCtx, reqs) if err == nil { - return resp, nil + return resp, nil, true } lastErr = err - // If the leader is simply not found, retry won't help immediately. - if errors.Is(err, ErrLeaderNotFound) { - return nil, err + if isTransientLeaderError(err) { + break } } - return nil, errors.Wrapf(lastErr, "leader forward failed after %d retries", maxForwardRetries) + if lastErr != nil && !isTransientLeaderError(lastErr) { + return nil, errors.Wrapf(lastErr, "leader forward failed after %d retries", maxForwardRetries), true + } + return nil, lastErr, false } -func (p *LeaderProxy) forward(reqs []*pb.Request) (*TransactionResponse, error) { +func (p *LeaderProxy) forward(parentCtx context.Context, reqs []*pb.Request) (*TransactionResponse, error) { addr := leaderAddrFromEngine(p.engine) if addr == "" { return nil, errors.WithStack(ErrLeaderNotFound) @@ -86,7 +198,10 @@ func (p *LeaderProxy) forward(reqs []*pb.Request) (*TransactionResponse, error) } cli := pb.NewInternalClient(conn) - ctx, cancel := context.WithTimeout(context.Background(), leaderForwardTimeout) + // context.WithTimeout on a deadline-bounded parent yields the + // earlier of the two — so a forward() issued with <5s of budget + // remaining caps at exactly the budget, not the full RPC timeout. + ctx, cancel := context.WithTimeout(parentCtx, leaderForwardTimeout) defer cancel() resp, err := cli.Forward(ctx, &pb.ForwardRequest{ diff --git a/kv/leader_proxy_test.go b/kv/leader_proxy_test.go index ac9a668e..92a26360 100644 --- a/kv/leader_proxy_test.go +++ b/kv/leader_proxy_test.go @@ -4,6 +4,7 @@ import ( "context" "net" "sync" + "sync/atomic" "testing" "time" @@ -144,3 +145,153 @@ func TestLeaderProxy_ForwardsWhenFollower(t *testing.T) { require.NotNil(t, svc.lastReq) require.Len(t, svc.lastReq.Requests, 1) } + +// togglingFollowerEngine starts out reporting no resolvable leader and +// flips to a caller-supplied address after setLeader() is invoked. It +// models the re-election window during which forward() must poll rather +// than return ErrLeaderNotFound immediately. +type togglingFollowerEngine struct { + addr atomic.Pointer[string] +} + +func (e *togglingFollowerEngine) setLeader(addr string) { + s := addr + e.addr.Store(&s) +} + +func (e *togglingFollowerEngine) Propose(context.Context, []byte) (*raftengine.ProposalResult, error) { + return nil, raftengine.ErrNotLeader +} +func (e *togglingFollowerEngine) State() raftengine.State { return raftengine.StateFollower } +func (e *togglingFollowerEngine) Leader() raftengine.LeaderInfo { + p := e.addr.Load() + if p == nil { + return raftengine.LeaderInfo{} + } + return raftengine.LeaderInfo{ID: "leader", Address: *p} +} +func (e *togglingFollowerEngine) VerifyLeader(context.Context) error { return raftengine.ErrNotLeader } +func (e *togglingFollowerEngine) LinearizableRead(context.Context) (uint64, error) { + return 0, raftengine.ErrNotLeader +} +func (e *togglingFollowerEngine) Status() raftengine.Status { + return raftengine.Status{State: raftengine.StateFollower, Leader: e.Leader()} +} +func (e *togglingFollowerEngine) Configuration(context.Context) (raftengine.Configuration, error) { + return raftengine.Configuration{}, nil +} +func (e *togglingFollowerEngine) Close() error { return nil } + +func TestLeaderProxy_ForwardsAfterLeaderPublishes(t *testing.T) { + t.Parallel() + + // Bring up a real fake gRPC Forward server; LeaderProxy.forward() + // dials it via the connCache. We leave the engine's leader address + // empty initially so the first few forward() attempts fail with + // ErrLeaderNotFound, then flip to the real address after a brief + // delay. forwardWithRetry must absorb the empty-address window and + // succeed once the engine publishes. + var lc net.ListenConfig + lis, err := lc.Listen(context.Background(), "tcp", "127.0.0.1:0") + require.NoError(t, err) + + svc := &fakeInternal{resp: &pb.ForwardResponse{Success: true, CommitIndex: 42}} + srv := grpc.NewServer() + pb.RegisterInternalServer(srv, svc) + go func() { _ = srv.Serve(lis) }() + t.Cleanup(func() { + srv.Stop() + _ = lis.Close() + }) + + // Wait briefly so the gRPC server is ready. + dialer := &net.Dialer{Timeout: 100 * time.Millisecond} + require.Eventually(t, func() bool { + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + c, err := dialer.DialContext(ctx, "tcp", lis.Addr().String()) + if err != nil { + return false + } + _ = c.Close() + return true + }, 2*time.Second, 10*time.Millisecond) + + eng := &togglingFollowerEngine{} + p := NewLeaderProxyWithEngine(eng) + t.Cleanup(func() { _ = p.connCache.Close() }) + + reqs := []*pb.Request{ + { + IsTxn: false, + Phase: pb.Phase_NONE, + Ts: 10, + Mutations: []*pb.Mutation{ + {Op: pb.Op_PUT, Key: []byte("k"), Value: []byte("v")}, + }, + }, + } + + // Anchor start BEFORE launching the publish goroutine so the + // publishDelay lower bound on elapsed is measured from the same + // instant as the Commit call. Capturing start after the goroutine + // launch would subtract the goroutine-scheduling time from + // elapsed and can let the GreaterOrEqual(elapsed, publishDelay) + // assertion flake when the proxy really did wait the full delay. + publishDelay := 100 * time.Millisecond + start := time.Now() + go func() { + time.Sleep(publishDelay) + eng.setLeader(lis.Addr().String()) + }() + + resp, err := p.Commit(reqs) + elapsed := time.Since(start) + require.NoError(t, err) + require.Equal(t, uint64(42), resp.CommitIndex) + // The proxy must have waited at least until setLeader fired; + // otherwise it did not actually poll the missing-leader window. + require.GreaterOrEqual(t, elapsed, publishDelay) + // And it must have stopped polling well before the budget expires. + require.Less(t, elapsed, leaderProxyRetryBudget) +} + +func TestLeaderProxy_FailsAfterLeaderBudgetElapses(t *testing.T) { + t.Parallel() + if testing.Short() { + t.Skip("exhausts the full leaderProxyRetryBudget (5s); skipped in -short mode") + } + + // No gRPC server, no address ever published → every forward() + // returns ErrLeaderNotFound immediately. forwardWithRetry must loop + // until leaderProxyRetryBudget elapses and then surface the final + // ErrLeaderNotFound instead of hanging forever. To keep the test + // snappy we only need to assert that (a) the call eventually returns + // with an error, (b) the error chain still contains ErrLeaderNotFound + // after the budget is exhausted. + // + // We cap the retry budget at its package default (5s); that is the + // contract we want to pin. Running the full 5s here is acceptable as + // a single pinned test, especially since it runs in t.Parallel. + eng := &togglingFollowerEngine{} + p := NewLeaderProxyWithEngine(eng) + t.Cleanup(func() { _ = p.connCache.Close() }) + + reqs := []*pb.Request{ + { + IsTxn: false, + Phase: pb.Phase_NONE, + Ts: 10, + Mutations: []*pb.Mutation{ + {Op: pb.Op_PUT, Key: []byte("k"), Value: []byte("v")}, + }, + }, + } + + start := time.Now() + _, err := p.Commit(reqs) + elapsed := time.Since(start) + require.Error(t, err) + require.ErrorIs(t, err, ErrLeaderNotFound) + require.GreaterOrEqual(t, elapsed, leaderProxyRetryBudget) +}