diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 2a8ccc4d..644f2d4b 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -16,6 +16,7 @@ import ( "sync/atomic" "time" + internalutil "github.com/bootjp/elastickv/internal" "github.com/bootjp/elastickv/internal/raftengine" "github.com/cockroachdb/errors" etcdstorage "go.etcd.io/etcd/server/v3/storage" @@ -40,19 +41,106 @@ const ( // queue. Total buffered memory is bounded by // O(numPeers × MaxInflightMsg × avgMsgSize). // - // Raised from 256 → 1024 to absorb short CPU bursts without forcing + // Raised from 256 → 512 to absorb short CPU bursts without forcing // peers to reject with "etcd raft inbound step queue is full". // Under production congestion we observed the 256-slot inbound // stepCh on followers filling up while their event loop was held // up by adapter-side pebble seek storms (PRs #560, #562, #563, - // #565 removed most of that CPU); 1024 is a 4× safety margin. - // Note that with the current defaultMaxSizePerMsg of 1 MiB, the - // true worst-case bound can be much larger (up to roughly 1 GiB - // per peer if every slot held a max-sized message). In practice, - // typical MsgApp payloads are far smaller, so expected steady-state - // memory remains much lower than that worst-case bound. - defaultMaxInflightMsg = 1024 - defaultMaxSizePerMsg = 1 << 20 + // #565 removed most of that CPU); 512 is a 2× safety margin. + // + // We intentionally do NOT raise this in lock-step with the 2 MiB + // defaultMaxSizePerMsg: the two knobs multiply, and 1024 × 2 MiB + // is a 2 GiB per-peer worst-case product that a bursty multi-peer + // deployment can plausibly realise under TCP backpressure loss. + // 512 × 2 MiB halves that to 1 GiB per peer (4 GiB on a 5-node + // leader with 4 followers), which fits comfortably inside the + // 4–16 GiB RAM envelope of typical elastickv nodes while still + // preserving the MsgApp-batching win that motivates raising the + // byte cap above etcd/raft's 1 MiB upstream default on small-entry + // workloads. Operators who need deeper pipelines (large clusters + // with plenty of RAM) can raise this via + // ELASTICKV_RAFT_MAX_INFLIGHT_MSGS without a rebuild; operators + // who need a smaller memory ceiling can lower MaxSizePerMsg via + // ELASTICKV_RAFT_MAX_SIZE_PER_MSG. + defaultMaxInflightMsg = 512 + // minInboundChannelCap is the floor applied when sizing the engine's + // inbound stepCh / dispatchReportCh from the resolved MaxInflightMsg. + // Even if a (misconfigured) caller drops MaxInflightMsg below this, + // we keep at least this much buffering so that a single tick burst + // doesn't trip errStepQueueFull on the inbound side. 256 matches the + // pre-#529 compiled-in default that was known to be survivable. + minInboundChannelCap = 256 + // defaultMaxSizePerMsg caps the byte size of a single MsgApp payload. + // Set to 2 MiB — double etcd/raft's 1 MiB upstream default — so each + // MsgApp amortises more entries under small-entry workloads + // (Redis-style KV, median entry ~500 B; 2 MiB / 500 B ≈ 4000 entries + // per MsgApp already saturates the per-RPC batching benefit). + // Fewer MsgApps per committed byte means fewer dispatcher wake-ups + // on the leader and fewer recv syscalls on the follower; the + // follower's apply loop also contends less with the read path. + // + // Lowered from 4 MiB → 2 MiB in tandem with defaultMaxInflightMsg=512 + // to cap per-peer worst-case buffered Raft traffic at 1 GiB + // (512 × 2 MiB), i.e. 4 GiB on a 5-node leader with 4 followers. + // The previous 4 MiB cap produced a 2 GiB/peer, 8 GiB/leader + // worst case that was too tight for the 4–16 GiB RAM envelope + // typical elastickv nodes operate in; the batching win of 4 MiB + // over 2 MiB is marginal on small-entry workloads. + defaultMaxSizePerMsg = 2 << 20 + // maxInflightMsgEnvVar / maxSizePerMsgEnvVar let operators tune the + // Raft-level flow-control knobs without a rebuild. Parsed once at + // Open and passed through normalizeLimitConfig; invalid values fall + // back to the defaults with a warning. MaxSizePerMsg is expressed + // as an integer byte count for consistency with the other numeric + // knobs in this package. + maxInflightMsgEnvVar = "ELASTICKV_RAFT_MAX_INFLIGHT_MSGS" + maxSizePerMsgEnvVar = "ELASTICKV_RAFT_MAX_SIZE_PER_MSG" + // minMaxSizePerMsg is the lower bound accepted from the environment + // override. A payload cap below ~1 KiB makes MsgApp batching + // degenerate (one entry per message) which defeats the whole point + // of the knob; fall back to the default rather than rejecting so that a + // fat-fingered operator doesn't take out the engine. + minMaxSizePerMsg uint64 = 1 << 10 + // maxMaxInflightMsg caps the environment override for + // MaxInflightMsg. Open() uses the resolved value to allocate + // stepCh, dispatchReportCh, and every per-peer dispatch queue, so + // a fat-fingered ELASTICKV_RAFT_MAX_INFLIGHT_MSGS=1e8 triggers + // multi-GB channel allocations and crashes the process before the + // node even becomes healthy. 8192 is ~16× the compiled-in default + // (512) — plenty of headroom for pipelining experiments, and well + // below the point where channel allocation alone would OOM a + // 32 GiB runner. Values above this clamp back to the default with + // a warning (same policy as sub-1 values) so a misconfigured + // operator never hard-breaks startup. + maxMaxInflightMsg = 8192 + // maxMaxSizePerMsg caps the environment override for + // MaxSizePerMsg at HALF of the Raft transport's per-message budget + // (internal.GRPCMaxMessageBytes). Half — not an arbitrary subtraction + // — because etcd-raft's limitSize counts only Entry.Size() per entry + // when deciding whether to extend a batch, but the serialized + // raftpb.Message that actually crosses the wire ALSO carries + // per-entry framing inside its Entries repeated field + // (`1 + len(varint) + entry`), plus the outer Message envelope + // (Term/Index/From/To/LogTerm/Commit/etc.). + // + // Worst-case ratio analysis: a minimal Entry (no Data) has + // Size() ≈ 4 B (Term + Index + Type tags+varints). Its outer- + // Message framing is `1 + sovRaft(4)` = 2 B. So tiny-entry + // batches encode at up to 1.5× the entries-data budget on the + // wire. Realistic data-bearing entries (e.g. 100 B+ payloads) + // shrink this ratio to <1.05×, but the cap MUST cover the + // worst case to make the env override a safe bound rather than + // pushing the failure mode out to "ResourceExhausted under + // tiny-entry workloads". + // + // Halving the transport budget admits the worst-case 1.5× growth + // (32 MiB entries-data → 48 MiB on wire ≪ 64 MiB transport limit) + // and leaves ample headroom for realistic batching. Operators + // who genuinely need a higher cap should raise GRPCMaxMessageBytes + // in a deliberate, paired action and adjust this divisor with the + // same worst-case analysis. + maxMaxSizePerMsgDivisor uint64 = 2 + maxMaxSizePerMsg = uint64(internalutil.GRPCMaxMessageBytes) / maxMaxSizePerMsgDivisor // defaultHeartbeatBufPerPeer is the capacity of the priority dispatch channel. // It carries low-frequency control traffic: heartbeats, votes, read-index, // leader-transfer, and their corresponding response messages @@ -142,13 +230,21 @@ type OpenConfig struct { ElectionTick int HeartbeatTick int StateMachine StateMachine + // MaxSizePerMsg caps the byte size of a single MsgApp payload (Raft-level + // flow control). Default: 2 MiB (see defaultMaxSizePerMsg). Larger values + // amortise more entries per MsgApp under small-entry workloads; smaller + // values tighten worst-case memory. Operators can override at runtime via + // ELASTICKV_RAFT_MAX_SIZE_PER_MSG (integer byte count) without a + // rebuild; the env var takes precedence over the caller-supplied value. MaxSizePerMsg uint64 // MaxInflightMsg controls how many MsgApp messages Raft may have in-flight // per peer before waiting for an acknowledgement (Raft-level flow control). // It also sets the per-peer dispatch channel capacity, so total buffered // memory is bounded by O(numPeers * MaxInflightMsg * avgMsgSize). - // Default: 256. Increase for deeper pipelining on high-bandwidth links; - // lower in memory-constrained clusters. + // Default: 512 (see defaultMaxInflightMsg). Increase for deeper pipelining + // on high-bandwidth links; lower in memory-constrained clusters. Operators + // can override at runtime via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS without a + // rebuild; the env var takes precedence over the caller-supplied value. MaxInflightMsg int } @@ -419,24 +515,34 @@ func Open(ctx context.Context, cfg OpenConfig) (*Engine, error) { }() engine := &Engine{ - nodeID: prepared.cfg.NodeID, - localID: prepared.cfg.LocalID, - localAddress: prepared.cfg.LocalAddress, - dataDir: prepared.cfg.DataDir, - fsmSnapDir: filepath.Join(prepared.cfg.DataDir, fsmSnapDirName), - tickInterval: prepared.cfg.TickInterval, - electionTick: prepared.cfg.ElectionTick, - storage: prepared.disk.Storage, - rawNode: rawNode, - persist: prepared.disk.Persist, - fsm: prepared.cfg.StateMachine, - peers: peerMap, - transport: prepared.cfg.Transport, - proposeCh: make(chan proposalRequest), - readCh: make(chan readRequest), - adminCh: make(chan adminRequest), - stepCh: make(chan raftpb.Message, defaultMaxInflightMsg), - dispatchReportCh: make(chan dispatchReport, defaultMaxInflightMsg), + nodeID: prepared.cfg.NodeID, + localID: prepared.cfg.LocalID, + localAddress: prepared.cfg.LocalAddress, + dataDir: prepared.cfg.DataDir, + fsmSnapDir: filepath.Join(prepared.cfg.DataDir, fsmSnapDirName), + tickInterval: prepared.cfg.TickInterval, + electionTick: prepared.cfg.ElectionTick, + storage: prepared.disk.Storage, + rawNode: rawNode, + persist: prepared.disk.Persist, + fsm: prepared.cfg.StateMachine, + peers: peerMap, + transport: prepared.cfg.Transport, + proposeCh: make(chan proposalRequest), + readCh: make(chan readRequest), + adminCh: make(chan adminRequest), + // Size the inbound step / dispatch-report channels from the + // resolved MaxInflightMsg (post-normalizeLimitConfig, which has + // already applied the env override and compiled-in default) so + // that operators raising ELASTICKV_RAFT_MAX_INFLIGHT_MSGS above + // the default actually get the extra buffering they asked for. + // Using defaultMaxInflightMsg here would silently cap the + // channel at 512 (the current compiled-in default) even when + // the Raft layer has been told to keep 2048 in flight, + // re-triggering errStepQueueFull under the exact bursty + // conditions this knob is meant to absorb. + stepCh: make(chan raftpb.Message, inboundChannelCap(prepared.cfg.MaxInflightMsg)), + dispatchReportCh: make(chan dispatchReport, inboundChannelCap(prepared.cfg.MaxInflightMsg)), closeCh: make(chan struct{}), doneCh: make(chan struct{}), startedCh: make(chan struct{}), @@ -2581,6 +2687,19 @@ func normalizeTimingConfig(cfg OpenConfig) OpenConfig { return cfg } +// inboundChannelCap returns the capacity to use when allocating the +// engine's inbound stepCh and dispatchReportCh. It mirrors the resolved +// MaxInflightMsg but clamps to minInboundChannelCap so that a caller +// passing a tiny value doesn't shrink the buffers below a survivable +// floor. maxInflight is expected to be the post-normalizeLimitConfig +// value (compiled default or env override applied). +func inboundChannelCap(maxInflight int) int { + if maxInflight < minInboundChannelCap { + return minInboundChannelCap + } + return maxInflight +} + func normalizeLimitConfig(cfg OpenConfig) OpenConfig { if cfg.MaxInflightMsg <= 0 { cfg.MaxInflightMsg = defaultMaxInflightMsg @@ -2588,6 +2707,39 @@ func normalizeLimitConfig(cfg OpenConfig) OpenConfig { if cfg.MaxSizePerMsg == 0 { cfg.MaxSizePerMsg = defaultMaxSizePerMsg } + // Env overrides win over caller-supplied values so that operators can + // retune replication flow-control without a rebuild. This mirrors the + // behaviour of ELASTICKV_RAFT_SNAPSHOT_COUNT and + // ELASTICKV_RAFT_MAX_WAL_FILES. Invalid values fall back to the + // compiled-in defaults with a warning. + if v, ok := maxInflightMsgFromEnv(); ok { + cfg.MaxInflightMsg = v + } + if v, ok := maxSizePerMsgFromEnv(); ok { + cfg.MaxSizePerMsg = v + } + // Clamp the resolved values against the same upper bounds we use + // for env overrides. Without this, a programmatic caller passing + // a fat-fingered OpenConfig{MaxInflightMsg: 100_000_000} would + // bypass the env-side guard and trigger multi-GB channel + // allocations at Open() — defeating the whole point of + // maxMaxInflightMsg / maxMaxSizePerMsg. Symmetric to the env + // fallback policy: clamp to the compiled-in default, log a + // warning so the misconfiguration is auditable. + if cfg.MaxInflightMsg > maxMaxInflightMsg { + slog.Warn("OpenConfig.MaxInflightMsg exceeds safe cap; clamping to default", + "value", cfg.MaxInflightMsg, "max", maxMaxInflightMsg, "default", defaultMaxInflightMsg) + cfg.MaxInflightMsg = defaultMaxInflightMsg + } + if cfg.MaxSizePerMsg > maxMaxSizePerMsg { + slog.Warn("OpenConfig.MaxSizePerMsg exceeds transport budget; clamping to default", + "value", cfg.MaxSizePerMsg, "max", maxMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg)) + cfg.MaxSizePerMsg = uint64(defaultMaxSizePerMsg) + } + slog.Info("etcd raft engine: message size limits", + "max_inflight_msgs", cfg.MaxInflightMsg, + "max_size_per_msg_bytes", cfg.MaxSizePerMsg, + ) return cfg } @@ -2845,6 +2997,72 @@ func snapshotEveryFromEnv() uint64 { return n } +// maxInflightMsgFromEnv parses ELASTICKV_RAFT_MAX_INFLIGHT_MSGS. Returns +// (value, true) when the env var is set to a valid integer in +// [1, maxMaxInflightMsg]. Returns (0, false) when the var is unset so +// the caller can keep the existing cfg.MaxInflightMsg (which +// normalizeLimitConfig has already defaulted to defaultMaxInflightMsg). +// Invalid values (non-numeric, negative, zero, or above +// maxMaxInflightMsg) are logged at warn level and return +// (defaultMaxInflightMsg, true) so the engine actually applies the +// compiled-in default the log message promises — otherwise a malformed +// env var would silently let an unrelated caller-supplied value win. +// The upper cap is load-bearing: Open() allocates stepCh, +// dispatchReportCh, and every per-peer dispatch queue from this +// value, so a fat-fingered `ELASTICKV_RAFT_MAX_INFLIGHT_MSGS=100000000` +// would try to allocate multi-GB of channel memory before the node +// becomes healthy. Clamping to the default preserves operator intent +// ("I asked for a larger pipeline") far better than crashing startup. +func maxInflightMsgFromEnv() (int, bool) { + v := strings.TrimSpace(os.Getenv(maxInflightMsgEnvVar)) + if v == "" { + return 0, false + } + n, err := strconv.Atoi(v) + if err != nil || n < 1 { + slog.Warn("invalid ELASTICKV_RAFT_MAX_INFLIGHT_MSGS; using default", + "value", v, "default", defaultMaxInflightMsg) + return defaultMaxInflightMsg, true + } + if n > maxMaxInflightMsg { + slog.Warn("ELASTICKV_RAFT_MAX_INFLIGHT_MSGS exceeds safe cap; using default", + "value", v, "max", maxMaxInflightMsg, "default", defaultMaxInflightMsg) + return defaultMaxInflightMsg, true + } + return n, true +} + +// maxSizePerMsgFromEnv parses ELASTICKV_RAFT_MAX_SIZE_PER_MSG as a plain +// integer byte count. Returns (value, true) when the env var is set to a +// valid integer in [minMaxSizePerMsg, maxMaxSizePerMsg] +// (1 KiB .. 64 MiB). Returns (0, false) when the var is unset so +// normalizeLimitConfig can keep its earlier default. Invalid, too-small, +// or too-large values fall back to the compiled-in default with a +// warning and return (defaultMaxSizePerMsg, true) so the override +// actually applies the default the warning promises; a sub-KiB cap +// would make MsgApp batching degenerate, and a value above the gRPC +// transport's GRPCMaxMessageBytes budget would make Raft emit payloads +// the transport cannot carry, causing repeated send failures under +// large batches. +func maxSizePerMsgFromEnv() (uint64, bool) { + v := strings.TrimSpace(os.Getenv(maxSizePerMsgEnvVar)) + if v == "" { + return 0, false + } + n, err := strconv.ParseUint(v, 10, 64) + if err != nil || n < minMaxSizePerMsg { + slog.Warn("invalid ELASTICKV_RAFT_MAX_SIZE_PER_MSG; using default", + "value", v, "min", minMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg)) + return uint64(defaultMaxSizePerMsg), true + } + if n > maxMaxSizePerMsg { + slog.Warn("ELASTICKV_RAFT_MAX_SIZE_PER_MSG exceeds transport budget; using default", + "value", v, "max", maxMaxSizePerMsg, "default", uint64(defaultMaxSizePerMsg)) + return uint64(defaultMaxSizePerMsg), true + } + return n, true +} + // dispatcherLanesEnabledFromEnv returns true when the 4-lane dispatcher has // been explicitly opted into via ELASTICKV_RAFT_DISPATCHER_LANES. The value // is parsed with strconv.ParseBool, which accepts the standard tokens diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index 3e0119a4..aa3478c0 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1841,3 +1841,332 @@ func TestDispatcherLanesEnabledFromEnv(t *testing.T) { require.Equalf(t, c.want, dispatcherLanesEnabledFromEnv(), "env=%q", c.val) } } + +// TestMaxInflightMsgFromEnv_Unset pins the "no env var => caller wins" +// contract of maxInflightMsgFromEnv. normalizeLimitConfig relies on the +// second return to decide whether to overwrite the caller-supplied value. +func TestMaxInflightMsgFromEnv_Unset(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "") + n, ok := maxInflightMsgFromEnv() + require.False(t, ok) + require.Equal(t, 0, n) +} + +// TestMaxInflightMsgFromEnv_ReadsOverride pins that a valid positive +// integer is parsed and surfaced to the caller verbatim. +func TestMaxInflightMsgFromEnv_ReadsOverride(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "2048") + n, ok := maxInflightMsgFromEnv() + require.True(t, ok) + require.Equal(t, 2048, n) +} + +// TestMaxInflightMsgFromEnv_FallsBackOnInvalid pins the safety behaviour: +// a non-numeric, zero, or negative value is refused and the compiled-in +// default is surfaced (ok=true) so that normalizeLimitConfig actually +// applies the default the warning log promises, instead of letting a +// caller-supplied value silently win. +func TestMaxInflightMsgFromEnv_FallsBackOnInvalid(t *testing.T) { + cases := []string{"not-a-number", "0", "-3"} + for _, v := range cases { + t.Setenv(maxInflightMsgEnvVar, v) + n, ok := maxInflightMsgFromEnv() + require.Truef(t, ok, "env=%q", v) + require.Equalf(t, defaultMaxInflightMsg, n, "env=%q", v) + } +} + +// TestMaxSizePerMsgFromEnv_Unset pins the "no env var => caller wins" +// contract, symmetric with maxInflightMsgFromEnv above. +func TestMaxSizePerMsgFromEnv_Unset(t *testing.T) { + t.Setenv(maxSizePerMsgEnvVar, "") + n, ok := maxSizePerMsgFromEnv() + require.False(t, ok) + require.Equal(t, uint64(0), n) +} + +// TestMaxSizePerMsgFromEnv_ReadsOverride pins that a valid byte count +// >= minMaxSizePerMsg is accepted and surfaced to the caller verbatim. +func TestMaxSizePerMsgFromEnv_ReadsOverride(t *testing.T) { + t.Setenv(maxSizePerMsgEnvVar, "8388608") // 8 MiB + n, ok := maxSizePerMsgFromEnv() + require.True(t, ok) + require.Equal(t, uint64(8388608), n) +} + +// TestMaxSizePerMsgFromEnv_FallsBackOnInvalid covers the three failure +// modes: non-numeric, zero, and below-floor. The floor is minMaxSizePerMsg +// (1 KiB) — a smaller cap would make MsgApp batching degenerate. On +// failure the helper returns (defaultMaxSizePerMsg, true) so the caller +// actually applies the compiled-in default the log line promises, rather +// than silently letting a caller-supplied value win. +func TestMaxSizePerMsgFromEnv_FallsBackOnInvalid(t *testing.T) { + cases := []string{"not-a-number", "0", "512"} + for _, v := range cases { + t.Setenv(maxSizePerMsgEnvVar, v) + n, ok := maxSizePerMsgFromEnv() + require.Truef(t, ok, "env=%q", v) + require.Equalf(t, uint64(defaultMaxSizePerMsg), n, "env=%q", v) + } +} + +// TestMaxInflightMsgFromEnv_ClampsAboveCap pins the upper-bound safety +// behaviour: a value above maxMaxInflightMsg is refused (it would +// trigger multi-GB channel allocations at Open() and crash the process +// before the node becomes healthy) and the compiled-in default is +// surfaced with ok=true so normalizeLimitConfig actually applies it. +func TestMaxInflightMsgFromEnv_ClampsAboveCap(t *testing.T) { + cases := []string{ + strconv.Itoa(maxMaxInflightMsg + 1), + "100000000", // fat-fingered value from the Codex P2 report + } + for _, v := range cases { + t.Setenv(maxInflightMsgEnvVar, v) + n, ok := maxInflightMsgFromEnv() + require.Truef(t, ok, "env=%q", v) + require.Equalf(t, defaultMaxInflightMsg, n, "env=%q", v) + } +} + +// TestMaxInflightMsgFromEnv_AcceptsAtCap pins the boundary: exactly +// maxMaxInflightMsg must parse through unchanged. Catches an off-by-one +// regression that would silently clamp an operator-tuned value. +func TestMaxInflightMsgFromEnv_AcceptsAtCap(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, strconv.Itoa(maxMaxInflightMsg)) + n, ok := maxInflightMsgFromEnv() + require.True(t, ok) + require.Equal(t, maxMaxInflightMsg, n) +} + +// TestMaxSizePerMsgFromEnv_ClampsAboveTransportBudget pins that a +// MaxSizePerMsg above the gRPC transport's message-size budget +// (GRPCMaxMessageBytes) is refused. Without this clamp the override +// would make Raft emit MsgApp frames the transport physically cannot +// carry, producing repeated send failures under large batches. +func TestMaxSizePerMsgFromEnv_ClampsAboveTransportBudget(t *testing.T) { + over := strconv.FormatUint(maxMaxSizePerMsg+1, 10) + t.Setenv(maxSizePerMsgEnvVar, over) + n, ok := maxSizePerMsgFromEnv() + require.True(t, ok) + require.Equal(t, uint64(defaultMaxSizePerMsg), n) +} + +// TestMaxSizePerMsgFromEnv_AcceptsAtCap covers the boundary symmetrically +// with the inflight test above. +func TestMaxSizePerMsgFromEnv_AcceptsAtCap(t *testing.T) { + t.Setenv(maxSizePerMsgEnvVar, strconv.FormatUint(maxMaxSizePerMsg, 10)) + n, ok := maxSizePerMsgFromEnv() + require.True(t, ok) + require.Equal(t, maxMaxSizePerMsg, n) +} + +// TestMaxMaxSizePerMsg_ReservesEnvelopeHeadroom pins the invariant that +// the MaxSizePerMsg upper cap is at most half the gRPC transport budget. +// etcd-raft's limitSize counts only Entry.Size() per entry, but the +// serialized raftpb.Message also carries per-entry framing (`1 + len- +// varint + entry`) inside its Entries repeated field. For minimal +// 4-byte entries, the outer framing is 2 bytes per entry, so the wire +// representation can be up to 1.5x the entries-data budget. Halving +// the transport budget covers that worst case (32 MiB entries-data → +// 48 MiB on wire, well below the 64 MiB transport limit) and is the +// minimum cap that makes the env override safe under tiny-entry +// workloads. Raising GRPCMaxMessageBytes without re-deriving this +// divisor MUST be a deliberate, paired action. +func TestMaxMaxSizePerMsg_ReservesEnvelopeHeadroom(t *testing.T) { + require.Equal(t, uint64(internalutil.GRPCMaxMessageBytes)/maxMaxSizePerMsgDivisor, maxMaxSizePerMsg, + "maxMaxSizePerMsg must be GRPCMaxMessageBytes / maxMaxSizePerMsgDivisor") + require.LessOrEqual(t, maxMaxSizePerMsg, uint64(internalutil.GRPCMaxMessageBytes)/2, + "maxMaxSizePerMsg must be at most half the transport budget to absorb worst-case per-entry framing on tiny-entry batches") + require.Less(t, maxMaxSizePerMsg, uint64(internalutil.GRPCMaxMessageBytes), + "maxMaxSizePerMsg must leave strict headroom below the transport budget") +} + +// TestMaxMaxSizePerMsg_AbsorbsTinyEntryFraming pins the *quantitative* +// safety claim: even at the worst-case per-entry framing ratio +// (minimum Entry.Size() = 4 B + 2 B framing = 1.5x growth), a fully +// packed MsgApp at maxMaxSizePerMsg fits inside GRPCMaxMessageBytes +// with margin to spare. Catches a future change that raises +// maxMaxSizePerMsgDivisor below 2 (which would re-introduce the +// "tiny-entry workloads can ResourceExhaust gRPC" failure mode Codex +// flagged). +func TestMaxMaxSizePerMsg_AbsorbsTinyEntryFraming(t *testing.T) { + const minEntrySize = uint64(4) // Term + Index + Type tags+varints, no Data + const minFraming = uint64(2) // 1 (field tag) + sovRaft(4) = 1 (length varint) + const worstCaseRatio = minEntrySize + minFraming + maxWireBytes := maxMaxSizePerMsg / minEntrySize * worstCaseRatio + require.LessOrEqual(t, maxWireBytes, uint64(internalutil.GRPCMaxMessageBytes), + "worst-case wire size for tiny-entry batches must fit inside the transport budget") +} + +// TestNormalizeLimitConfig_ClampsCallerOverlimitInflight pins that a +// programmatic caller passing OpenConfig.MaxInflightMsg above +// maxMaxInflightMsg is clamped to defaultMaxInflightMsg, mirroring the +// env-side guard. Without this, Open()'s stepCh / dispatchReportCh / +// per-peer dispatch queue allocations would consume multi-GB on a +// fat-fingered programmatic config and crash the process before the +// node became healthy. +func TestNormalizeLimitConfig_ClampsCallerOverlimitInflight(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "") + t.Setenv(maxSizePerMsgEnvVar, "") + got := normalizeLimitConfig(OpenConfig{ + MaxInflightMsg: 100_000_000, // fat-fingered value from the Codex P2 report + }) + require.Equal(t, defaultMaxInflightMsg, got.MaxInflightMsg) +} + +// TestNormalizeLimitConfig_ClampsCallerOverlimitSize is the symmetric +// pin for caller-supplied OpenConfig.MaxSizePerMsg above the transport +// envelope budget. +func TestNormalizeLimitConfig_ClampsCallerOverlimitSize(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "") + t.Setenv(maxSizePerMsgEnvVar, "") + got := normalizeLimitConfig(OpenConfig{ + MaxSizePerMsg: maxMaxSizePerMsg + 1, + }) + require.Equal(t, uint64(defaultMaxSizePerMsg), got.MaxSizePerMsg) +} + +// TestNormalizeLimitConfig_AcceptsCallerAtCap is the boundary check: +// exactly maxMaxInflightMsg / maxMaxSizePerMsg from a caller must +// pass through unchanged. Catches an off-by-one regression that +// would silently clamp a legitimate operator-tuned programmatic +// config. +func TestNormalizeLimitConfig_AcceptsCallerAtCap(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "") + t.Setenv(maxSizePerMsgEnvVar, "") + got := normalizeLimitConfig(OpenConfig{ + MaxInflightMsg: maxMaxInflightMsg, + MaxSizePerMsg: maxMaxSizePerMsg, + }) + require.Equal(t, maxMaxInflightMsg, got.MaxInflightMsg) + require.Equal(t, maxMaxSizePerMsg, got.MaxSizePerMsg) +} + +// TestNormalizeLimitConfig_DefaultsWhenUnset pins the production defaults +// that reach raft.Config when neither the caller nor the operator has +// overridden them: 512 inflight msgs and 2 MiB per msg. The combination +// bounds worst-case per-peer buffered Raft traffic at 1 GiB (512 × 2 MiB); +// see defaultMaxInflightMsg for the memory-footprint rationale. +func TestNormalizeLimitConfig_DefaultsWhenUnset(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "") + t.Setenv(maxSizePerMsgEnvVar, "") + got := normalizeLimitConfig(OpenConfig{}) + require.Equal(t, defaultMaxInflightMsg, got.MaxInflightMsg) + require.Equal(t, uint64(defaultMaxSizePerMsg), got.MaxSizePerMsg) + require.Equal(t, 512, got.MaxInflightMsg) + require.Equal(t, uint64(2<<20), got.MaxSizePerMsg) +} + +// TestNormalizeLimitConfig_EnvOverridesCaller pins that a valid env var +// takes precedence over any caller-supplied cfg value. This is how an +// operator retunes a running deployment without a rebuild. +func TestNormalizeLimitConfig_EnvOverridesCaller(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "2048") + t.Setenv(maxSizePerMsgEnvVar, "8388608") + got := normalizeLimitConfig(OpenConfig{ + MaxInflightMsg: 256, + MaxSizePerMsg: 1 << 20, + }) + require.Equal(t, 2048, got.MaxInflightMsg) + require.Equal(t, uint64(8388608), got.MaxSizePerMsg) +} + +// TestNormalizeLimitConfig_InvalidEnvFallsBackToDefault pins that a +// malformed env override does NOT leak through to raft.Config; the +// compiled-in defaults are applied (even when the caller supplied a +// different value) so the operator-visible warning log matches reality. +func TestNormalizeLimitConfig_InvalidEnvFallsBackToDefault(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "not-a-number") + t.Setenv(maxSizePerMsgEnvVar, "-1") + got := normalizeLimitConfig(OpenConfig{}) + require.Equal(t, defaultMaxInflightMsg, got.MaxInflightMsg) + require.Equal(t, uint64(defaultMaxSizePerMsg), got.MaxSizePerMsg) +} + +// TestNormalizeLimitConfig_InvalidEnvOverridesCaller pins the fix for +// the "log message is a lie" gemini reviewer finding: when the env var +// is malformed, the helper warns "using default" — so the default MUST +// actually win, even if the caller supplied a non-default value. Prior +// to the fix the caller's 256 would silently survive, contradicting the +// log line. +func TestNormalizeLimitConfig_InvalidEnvOverridesCaller(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "garbage") + t.Setenv(maxSizePerMsgEnvVar, "also-garbage") + got := normalizeLimitConfig(OpenConfig{ + MaxInflightMsg: 256, + MaxSizePerMsg: 1 << 20, + }) + require.Equal(t, defaultMaxInflightMsg, got.MaxInflightMsg) + require.Equal(t, uint64(defaultMaxSizePerMsg), got.MaxSizePerMsg) +} + +// TestInboundChannelCap verifies the floor/passthrough behaviour of the +// stepCh / dispatchReportCh sizing helper: the resolved MaxInflightMsg +// drives capacity, but never below minInboundChannelCap. +func TestInboundChannelCap(t *testing.T) { + require.Equal(t, minInboundChannelCap, inboundChannelCap(0)) + require.Equal(t, minInboundChannelCap, inboundChannelCap(1)) + require.Equal(t, minInboundChannelCap, inboundChannelCap(minInboundChannelCap-1)) + require.Equal(t, minInboundChannelCap, inboundChannelCap(minInboundChannelCap)) + require.Equal(t, 1024, inboundChannelCap(1024)) + require.Equal(t, 2048, inboundChannelCap(2048)) +} + +// TestOpen_InboundChannelsHonourMaxInflightEnv pins the codex P1 fix: +// when ELASTICKV_RAFT_MAX_INFLIGHT_MSGS is raised above the compiled-in +// default, the engine's inbound stepCh and dispatchReportCh must be +// sized from the override, not the default. Previously these were hard- +// wired to defaultMaxInflightMsg, so a larger override would still hit +// errStepQueueFull at the compiled-in cap, silently defeating the whole +// tuning knob. +func TestOpen_InboundChannelsHonourMaxInflightEnv(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "2048") + // Open() normalizes BOTH raft limit env vars; leaving the size var + // unset here would let an ambient ELASTICKV_RAFT_MAX_SIZE_PER_MSG + // in the shell the test runs in influence config resolution and + // confuse an unrelated failure diagnosis. Pin it to "" so the size + // path always falls through to the caller's OpenConfig value. + t.Setenv(maxSizePerMsgEnvVar, "") + fsm := &testStateMachine{} + engine, err := Open(context.Background(), OpenConfig{ + NodeID: 1, + LocalID: "n1", + LocalAddress: "127.0.0.1:7001", + DataDir: t.TempDir(), + Bootstrap: true, + StateMachine: fsm, + }) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) + require.Equal(t, 2048, cap(engine.stepCh), + "stepCh capacity must reflect the env-overridden MaxInflightMsg") + require.Equal(t, 2048, cap(engine.dispatchReportCh), + "dispatchReportCh capacity must reflect the env-overridden MaxInflightMsg") +} + +// TestOpen_InboundChannelsDefaultCap pins that with no env override the +// inbound channels are sized from the compiled-in default (512), the +// current production value. +func TestOpen_InboundChannelsDefaultCap(t *testing.T) { + t.Setenv(maxInflightMsgEnvVar, "") + // See TestOpen_InboundChannelsHonourMaxInflightEnv for why the size + // env var is cleared here too. + t.Setenv(maxSizePerMsgEnvVar, "") + fsm := &testStateMachine{} + engine, err := Open(context.Background(), OpenConfig{ + NodeID: 1, + LocalID: "n1", + LocalAddress: "127.0.0.1:7001", + DataDir: t.TempDir(), + Bootstrap: true, + StateMachine: fsm, + }) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, engine.Close()) + }) + require.Equal(t, defaultMaxInflightMsg, cap(engine.stepCh)) + require.Equal(t, defaultMaxInflightMsg, cap(engine.dispatchReportCh)) +} diff --git a/main.go b/main.go index 484869c3..f52feca6 100644 --- a/main.go +++ b/main.go @@ -38,8 +38,25 @@ const ( etcdTickInterval = 10 * time.Millisecond etcdHeartbeatMinTicks = 1 etcdElectionMinTicks = 2 - etcdMaxSizePerMsg = 1 << 20 - etcdMaxInflightMsg = 256 + // etcdMaxSizePerMsg caps bytes per MsgApp. 2 MiB (double etcd/raft's + // 1 MiB upstream default) reduces MsgApp count per committed byte + // under small-entry KV workloads, cutting dispatcher wake-ups on the + // leader and recv syscalls on the follower, while keeping the + // per-peer worst-case buffered memory (MaxInflightMsg × MaxSizePerMsg + // = 512 × 2 MiB = 1 GiB) inside the RAM envelope of typical + // elastickv nodes (4–16 GiB). Operators can override via + // ELASTICKV_RAFT_MAX_SIZE_PER_MSG at runtime. + etcdMaxSizePerMsg = 2 << 20 + // etcdMaxInflightMsg caps in-flight MsgApps per peer. 512 gives a 2x + // safety margin over the pre-#529 default of 256 to absorb short CPU + // bursts, without letting the per-peer worst-case buffered memory + // (MaxInflightMsg × MaxSizePerMsg) grow beyond 1 GiB on the current + // 2 MiB MaxSizePerMsg — i.e. 4 GiB on a 5-node leader with 4 + // followers. Operators with wide-bandwidth LAN clusters and headroom + // can raise this via ELASTICKV_RAFT_MAX_INFLIGHT_MSGS at runtime; + // operators with tighter memory budgets can lower the byte cap via + // ELASTICKV_RAFT_MAX_SIZE_PER_MSG. + etcdMaxInflightMsg = 512 ) func newRaftFactory(engineType raftEngineType) (raftengine.Factory, error) {