From 2d66d969dcc58fa8501f2b176c7e8543a79b39f1 Mon Sep 17 00:00:00 2001 From: Andrei Smirnov Date: Thu, 11 Jun 2026 17:57:53 +0300 Subject: [PATCH 1/2] core: qbft improvements --- core/consensus/qbft/qbft.go | 20 ++++ core/consensus/qbft/qbft_internal_test.go | 122 +++++++++++++++++++++ core/qbft/qbft.go | 38 ++++++- core/qbft/qbft_internal_test.go | 127 ++++++++++++++++++++++ 4 files changed, 306 insertions(+), 1 deletion(-) diff --git a/core/consensus/qbft/qbft.go b/core/consensus/qbft/qbft.go index b274cee9d0..256e872a40 100644 --- a/core/consensus/qbft/qbft.go +++ b/core/consensus/qbft/qbft.go @@ -678,6 +678,26 @@ func (c *Consensus) handle(ctx context.Context, _ peer.ID, req proto.Message) (p return nil, false, errors.New("invalid duty", z.Any("duty", duty)) } + // Bound justification and value counts before doing any expensive per-element + // work: each justification requires an ECDSA signature recovery and each value + // a proto unmarshal + hash. Without this, a single authenticated peer could send + // a large message (up to the 128MB p2p frame) packed with hundreds of thousands + // of sub-messages to exhaust CPU/memory on every peer (amplification DoS). + // + // A legitimate justification set contains at most a quorum of ROUND-CHANGE + // messages plus a quorum of PREPARE messages (see qbft.getJustifiedQrc), bounded + // above by 2*nodes. Each message (the main message plus each justification) + // references at most two values (value and prepared value), so the values map is + // bounded by 2*(justifications+1). + nodes := len(c.pubkeys) + if n := len(pbMsg.GetJustification()); n > 2*nodes { + return nil, false, errors.New("too many justifications", z.Int("count", n), z.Int("max", 2*nodes)) + } + + if n, maxValues := len(pbMsg.GetValues()), 2*(len(pbMsg.GetJustification())+1); n > maxValues { + return nil, false, errors.New("too many values", z.Int("count", n), z.Int("max", maxValues)) + } + for _, justification := range pbMsg.GetJustification() { if err := verifyMsg(justification, c.pubkeys); err != nil { return nil, false, errors.Wrap(err, "invalid justification") diff --git a/core/consensus/qbft/qbft_internal_test.go b/core/consensus/qbft/qbft_internal_test.go index 4d1d2f2344..21dc9bfe09 100644 --- a/core/consensus/qbft/qbft_internal_test.go +++ b/core/consensus/qbft/qbft_internal_test.go @@ -706,6 +706,128 @@ func TestQBFTConsensusHandle(t *testing.T) { } } +// TestQBFTConsensusHandleAmplificationLimits verifies that handle rejects messages +// carrying more justifications or values than a legitimate consensus message ever +// needs, before doing the expensive per-element signature recovery / unmarshalling. +// This caps the CPU/memory amplification a single authenticated peer can inflict. +func TestQBFTConsensusHandleAmplificationLimits(t *testing.T) { + // newConsensus returns a single-node consensus, so max justifications = 2*nodes = 2. + newConsensus := func(t *testing.T) (*Consensus, *k1.PrivateKey) { + t.Helper() + + var c Consensus + + deadliner := coremocks.NewDeadliner(t) + deadliner.On("Add", mock.Anything).Maybe().Return(core.DeadlineScheduled) + c.deadliner = deadliner + c.gaterFunc = func(core.Duty) bool { return true } + c.mutable.instances = make(map[core.Duty]*instance.IO[Msg]) + + p2pKey := testutil.GenerateInsecureK1Key(t, 0) + c.pubkeys = make(map[int64]*k1.PublicKey) + c.pubkeys[0] = p2pKey.PubKey() + + return &c, p2pKey + } + + // signedBase returns a validly-signed main message so verification reaches the limit checks. + signedBase := func(t *testing.T, p2pKey *k1.PrivateKey) *pbv1.QBFTConsensusMsg { + t.Helper() + + base := &pbv1.QBFTConsensusMsg{Msg: newRandomQBFTMsg(t)} + base.Msg.PeerIdx = 0 + base.Msg.Round = 1 + base.Msg.Duty = &pbv1.Duty{Slot: 42, Type: 1} + + msgHash, err := hashProto(base.GetMsg()) + require.NoError(t, err) + + sign, err := k1util.Sign(p2pKey, msgHash[:]) + require.NoError(t, err) + + base.Msg.Signature = sign + + return base + } + + // signedJustification returns a validly-signed justification matching the base message's duty. + signedJustification := func(t *testing.T, p2pKey *k1.PrivateKey) *pbv1.QBFTMsg { + t.Helper() + + j := newRandomQBFTMsg(t) + j.PeerIdx = 0 + j.Duty = &pbv1.Duty{Slot: 42, Type: 1} + + jHash, err := hashProto(j) + require.NoError(t, err) + + j.Signature, err = k1util.Sign(p2pKey, jHash[:]) + require.NoError(t, err) + + return j + } + + t.Run("too many justifications rejected", func(t *testing.T) { + c, p2pKey := newConsensus(t) + base := signedBase(t, p2pKey) + + // 3 justifications > 2*nodes (2). Content is irrelevant since the count + // check runs before any per-justification verification. + for range 3 { + base.Justification = append(base.Justification, &pbv1.QBFTMsg{}) + } + + _, _, err := c.handle(context.Background(), "peerID", base) + require.ErrorContains(t, err, "too many justifications") + }) + + t.Run("max justifications accepted", func(t *testing.T) { + c, p2pKey := newConsensus(t) + base := signedBase(t, p2pKey) + + // Exactly 2*nodes (2) justifications must not be rejected by the count check. + for range 2 { + base.Justification = append(base.Justification, signedJustification(t, p2pKey)) + } + + _, _, err := c.handle(context.Background(), "peerID", base) + require.NoError(t, err) + }) + + t.Run("too many values rejected", func(t *testing.T) { + c, p2pKey := newConsensus(t) + base := signedBase(t, p2pKey) + + // 0 justifications => max values = 2*(0+1) = 2. Provide 3. + base.Values = []*anypb.Any{{}, {}, {}} + + _, _, err := c.handle(context.Background(), "peerID", base) + require.ErrorContains(t, err, "too many values") + }) + + t.Run("max values accepted", func(t *testing.T) { + c, p2pKey := newConsensus(t) + base := signedBase(t, p2pKey) + + // 2 justifications => max values = 2*(2+1) = 6. A message carrying exactly + // the maximum must pass the count check and the rest of handle, guarding + // against the bound being tightened below the legitimate maximum. + for range 2 { + base.Justification = append(base.Justification, signedJustification(t, p2pKey)) + } + + for i := range 6 { + value, err := anypb.New(&pbv1.Duty{Slot: uint64(i + 1)}) + require.NoError(t, err) + + base.Values = append(base.Values, value) + } + + _, _, err := c.handle(context.Background(), "peerID", base) + require.NoError(t, err) + }) +} + func TestInstanceIO_MaybeStart(t *testing.T) { t.Run("MaybeStart for new instance", func(t *testing.T) { inst1 := instance.NewIO[Msg]() diff --git a/core/qbft/qbft.go b/core/qbft/qbft.go index f1093f02a1..f51253d72e 100644 --- a/core/qbft/qbft.go +++ b/core/qbft/qbft.go @@ -158,6 +158,21 @@ type dedupKey struct { Round int64 } +// maxDecidedResends bounds the number of MsgDecided rebroadcasts that +// post-decision ROUND-CHANGE messages from a single peer can trigger. +// A lagging peer re-sends ROUND-CHANGE with an increasing round on each +// timeout until it learns the decided value, so a handful of resends is +// ample for liveness, while the cap stops a malicious peer minting +// ever-higher rounds from extracting unlimited large rebroadcasts. +const maxDecidedResends = 16 + +// decidedResend tracks the MsgDecided rebroadcasts triggered by a peer's +// post-decision ROUND-CHANGE messages. +type decidedResend struct { + Round int64 // Highest round a rebroadcast was triggered for. + Count int // Total rebroadcasts triggered by the peer. +} + // errors var ( errCompare = errors.New("compare leader value with local value failed") @@ -211,6 +226,7 @@ func Run[I any, V comparable, C any](ctx context.Context, d Definition[I, V, C], qCommit []Msg[I, V, C] buffer = make(map[int64][]Msg[I, V, C]) dedupRules = make(map[dedupKey]bool) + decidedResends = make(map[int64]decidedResend) // Bounds MsgDecided rebroadcasts by peer source. timerChan <-chan time.Time stopTimer func() ) @@ -319,7 +335,27 @@ func Run[I any, V comparable, C any](ctx context.Context, d Definition[I, V, C], // Just send Qcommit if consensus already decided if len(qCommit) > 0 { if msg.Source() != process && msg.Type() == MsgRoundChange { // Algorithm 3:17 - err = broadcastMsg(MsgDecided, qCommit[0].Value(), qCommit) + // Rebroadcast at most once per source per (strictly increasing) + // round, capped at maxDecidedResends per source: duplicate, + // replayed or maliciously round-incremented ROUND-CHANGE messages + // can't repeatedly trigger a large MsgDecided rebroadcast + // (amplification DoS). Note this path runs before the isJustified + // check, so the ROUND-CHANGE need not even be justified. A peer + // advancing to a genuinely new round still gets a fresh resend + // (up to the cap). + // + // The transport is expected to authenticate sources, but cap the + // tracked sources at d.Nodes as defense in depth so forged source + // IDs can't grow this state (and the total resends) without bound. + resend, ok := decidedResends[msg.Source()] + if !ok && len(decidedResends) >= d.Nodes { + break + } + + if msg.Round() > resend.Round && resend.Count < maxDecidedResends { + decidedResends[msg.Source()] = decidedResend{Round: msg.Round(), Count: resend.Count + 1} + err = broadcastMsg(MsgDecided, qCommit[0].Value(), qCommit) + } } break diff --git a/core/qbft/qbft_internal_test.go b/core/qbft/qbft_internal_test.go index a078d6742b..67429c4775 100644 --- a/core/qbft/qbft_internal_test.go +++ b/core/qbft/qbft_internal_test.go @@ -620,6 +620,133 @@ func (m msg) Justification() []Msg[int64, int64, int64] { return resp } +// TestDecidedRebroadcastLimits verifies that once consensus has decided, post-decision +// ROUND-CHANGE messages trigger at most one MsgDecided rebroadcast per source per +// (strictly increasing) round, capped at maxDecidedResends per source. This bounds +// amplification while still serving lagging peers that advance to new rounds. +func TestDecidedRebroadcastLimits(t *testing.T) { + const ( + n = 4 + process = 0 + value = 42 + ) + + // Build a justified MsgDecided: quorum (3) commits for round 1, value 42. + commits := []msg{ + {msgType: MsgCommit, peerIdx: 1, round: 1, value: value}, + {msgType: MsgCommit, peerIdx: 2, round: 1, value: value}, + {msgType: MsgCommit, peerIdx: 3, round: 1, value: value}, + } + decided := msg{msgType: MsgDecided, peerIdx: 1, round: 1, value: value, justify: commits} + + rc := func(source, round int64) msg { + return msg{msgType: MsgRoundChange, peerIdx: source, round: round} + } + + // runDecidedInstance starts a qbft instance, sends it the decided message and + // returns a synchronous send function plus the channel collecting MsgDecided + // broadcasts. The receive channel is unbuffered, so the instance only accepts + // a send once it has fully processed all earlier messages (the just-sent + // message may still be in flight, hence tests end with an inert flush send). + // This makes broadcast-count assertions deterministic. + runDecidedInstance := func(t *testing.T) (func(msg), chan MsgType) { + t.Helper() + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + recv := make(chan Msg[int64, int64, int64]) + decidedBroadcasts := make(chan MsgType, 100) + + def := noopDef + def.Nodes = n + def.FIFOLimit = 100 + def.Decide = func(context.Context, int64, int64, []Msg[int64, int64, int64]) {} + + trans := Transport[int64, int64, int64]{ + Broadcast: func(_ context.Context, typ MsgType, _ int64, _ int64, _ int64, _ int64, + _ int64, _ int64, _ []Msg[int64, int64, int64], + ) error { + if typ == MsgDecided { + decidedBroadcasts <- typ + } + + return nil + }, + Receive: recv, + } + + // Never-delivering input channels (this process is not a leader and proposes nothing). + go func() { + _ = Run(ctx, def, trans, 0, process, make(chan int64), make(chan int64)) + }() + + send := func(m msg) { + select { + case recv <- m: + case <-time.After(5 * time.Second): + require.Fail(t, "timeout sending message to qbft instance") + } + } + + send(decided) + + return send, decidedBroadcasts + } + + t.Run("dedup duplicates and stale rounds", func(t *testing.T) { + send, broadcasts := runDecidedInstance(t) + + for _, m := range []msg{ + rc(2, 2), // Rebroadcast #1. + rc(2, 2), // Duplicate, no rebroadcast. + rc(2, 2), // Duplicate, no rebroadcast. + rc(3, 2), // Rebroadcast #2 (other source). + rc(3, 2), // Duplicate, no rebroadcast. + rc(2, 1), // Stale round (already rebroadcast for round 2), no rebroadcast. + rc(2, 3), // Rebroadcast #3 (source advanced to a new round). + } { + send(m) + } + + // Flush with an inert message: once this send returns, all messages above + // have been fully processed, so the broadcast count is final. + send(rc(2, 1)) + + require.Len(t, broadcasts, 3) + }) + + t.Run("resend cap per source", func(t *testing.T) { + send, broadcasts := runDecidedInstance(t) + + // One peer keeps advancing rounds: only the first maxDecidedResends + // ROUND-CHANGE messages may trigger a rebroadcast. + for round := int64(2); round < 2+maxDecidedResends+5; round++ { + send(rc(2, round)) + } + + // Flush with an inert message (stale round, never rebroadcast). + send(rc(2, 1)) + + require.Len(t, broadcasts, maxDecidedResends) + }) + + t.Run("forged sources capped", func(t *testing.T) { + send, broadcasts := runDecidedInstance(t) + + // Distinct forged source IDs: at most n sources are tracked, so only + // the first n may trigger a rebroadcast; further new sources are ignored. + for source := int64(100); source < 100+n+5; source++ { + send(rc(source, 2)) + } + + // Flush with an inert message (stale round, never rebroadcast). + send(rc(100, 1)) + + require.Len(t, broadcasts, n) + }) +} + func TestIsJustifiedPrePrepare(t *testing.T) { const ( n = 4 From 42e3beacfa6473304630e1a9fe11b760b0713247 Mon Sep 17 00:00:00 2001 From: Andrei Smirnov Date: Thu, 11 Jun 2026 18:09:11 +0300 Subject: [PATCH 2/2] core/consensus/qbft: pin justification round in test helper Set Round explicitly in signedJustification so the test does not depend on newRandomQBFTMsg's random round being non-zero (verifyMsg rejects round <= 0), matching the adjacent signedBase helper. Co-Authored-By: Claude Fable 5 --- core/consensus/qbft/qbft_internal_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/core/consensus/qbft/qbft_internal_test.go b/core/consensus/qbft/qbft_internal_test.go index 21dc9bfe09..599daaefa2 100644 --- a/core/consensus/qbft/qbft_internal_test.go +++ b/core/consensus/qbft/qbft_internal_test.go @@ -756,6 +756,7 @@ func TestQBFTConsensusHandleAmplificationLimits(t *testing.T) { j := newRandomQBFTMsg(t) j.PeerIdx = 0 + j.Round = 1 // verifyMsg requires round > 0, don't rely on the random value. j.Duty = &pbv1.Duty{Slot: 42, Type: 1} jHash, err := hashProto(j)