From 114c4bb637b851b87e4f9d16752c21dfff3242de Mon Sep 17 00:00:00 2001 From: kalo <24719519+KaloyanTanev@users.noreply.github.com> Date: Fri, 12 Jun 2026 18:35:15 +0300 Subject: [PATCH 1/3] Refactor to be closer to the rest of qbft code --- core/consensus/qbft/qbft.go | 43 +++++++++++++++++------------- core/qbft/qbft.go | 53 +++++++++++++++++++++---------------- 2 files changed, 55 insertions(+), 41 deletions(-) diff --git a/core/consensus/qbft/qbft.go b/core/consensus/qbft/qbft.go index 256e872a4..8f2931330 100644 --- a/core/consensus/qbft/qbft.go +++ b/core/consensus/qbft/qbft.go @@ -678,24 +678,8 @@ func (c *Consensus) handle(ctx context.Context, _ peer.ID, req proto.Message) (p return nil, false, errors.New("invalid duty", z.Any("duty", duty)) } - // Bound justification and value counts before doing any expensive per-element - // work: each justification requires an ECDSA signature recovery and each value - // a proto unmarshal + hash. Without this, a single authenticated peer could send - // a large message (up to the 128MB p2p frame) packed with hundreds of thousands - // of sub-messages to exhaust CPU/memory on every peer (amplification DoS). - // - // A legitimate justification set contains at most a quorum of ROUND-CHANGE - // messages plus a quorum of PREPARE messages (see qbft.getJustifiedQrc), bounded - // above by 2*nodes. Each message (the main message plus each justification) - // references at most two values (value and prepared value), so the values map is - // bounded by 2*(justifications+1). - nodes := len(c.pubkeys) - if n := len(pbMsg.GetJustification()); n > 2*nodes { - return nil, false, errors.New("too many justifications", z.Int("count", n), z.Int("max", 2*nodes)) - } - - if n, maxValues := len(pbMsg.GetValues()), 2*(len(pbMsg.GetJustification())+1); n > maxValues { - return nil, false, errors.New("too many values", z.Int("count", n), z.Int("max", maxValues)) + if err := verifyMsgLimits(pbMsg, len(c.pubkeys)); err != nil { + return nil, false, err } for _, justification := range pbMsg.GetJustification() { @@ -796,6 +780,29 @@ func (c *Consensus) getPeerIdx() (int64, error) { return peerIdx, nil } +// verifyMsgLimits bounds the justification and value counts of a consensus message +// before any expensive per-element work (each justification requires an ECDSA +// signature recovery, each value a proto unmarshal + hash). Without it a single +// authenticated peer could pack one large message with many sub-messages to exhaust +// CPU/memory on every peer (amplification DoS). +func verifyMsgLimits(pbMsg *pbv1.QBFTConsensusMsg, nodes int) error { + // A legitimate justification set contains at most a quorum of ROUND-CHANGE plus a + // quorum of PREPARE messages (see qbft.getJustifiedQrc), bounded above by 2*nodes. + maxJust := 2 * nodes + if n := len(pbMsg.GetJustification()); n > maxJust { + return errors.New("too many justifications", z.Int("count", n), z.Int("max", maxJust)) + } + + // Each message (the main message plus each justification) references at most two + // values (value and prepared value), so the values are bounded by 2*(justifications+1). + maxValues := 2 * (len(pbMsg.GetJustification()) + 1) + if n := len(pbMsg.GetValues()); n > maxValues { + return errors.New("too many values", z.Int("count", n), z.Int("max", maxValues)) + } + + return nil +} + func verifyMsg(msg *pbv1.QBFTMsg, pubkeys map[int64]*k1.PublicKey) error { if msg == nil || msg.GetDuty() == nil { return errors.New("invalid consensus message") diff --git a/core/qbft/qbft.go b/core/qbft/qbft.go index f51253d72..9de5cdc75 100644 --- a/core/qbft/qbft.go +++ b/core/qbft/qbft.go @@ -289,6 +289,30 @@ func Run[I any, V comparable, C any](ctx context.Context, d Definition[I, V, C], return true } + // allowDecidedResend reports whether a post-decision ROUND-CHANGE from source at + // round may trigger a MsgDecided rebroadcast, recording it when it does. It permits + // at most one rebroadcast per source per strictly-increasing round, capped at + // maxDecidedResends per source, so duplicate, replayed or maliciously + // round-incremented messages can't repeatedly trigger a large rebroadcast + // (amplification DoS), while a peer advancing to a genuinely new round still gets + // served. The transport is expected to authenticate sources, but tracked sources + // are capped at d.Nodes as defense in depth so forged source IDs can't grow this + // state (and the total resends) without bound. + allowDecidedResend := func(source, round int64) bool { + resend, ok := decidedResends[source] + if !ok && len(decidedResends) >= d.Nodes { + return false + } + + if round <= resend.Round || resend.Count >= maxDecidedResends { + return false + } + + decidedResends[source] = decidedResend{Round: round, Count: resend.Count + 1} + + return true + } + // changeRound updates round and clears the rule dedup state. changeRound := func(newRound int64, rule UponRule) { if round == newRound { @@ -332,30 +356,13 @@ func Run[I any, V comparable, C any](ctx context.Context, d Definition[I, V, C], inputValueCh = nil // Don't read from this channel again. case msg := <-t.Receive: - // Just send Qcommit if consensus already decided + // Just send Qcommit if consensus already decided. The resend is rate-limited + // (see allowDecidedResend) to bound amplification; note this runs before the + // isJustified check, so the ROUND-CHANGE need not even be justified. if len(qCommit) > 0 { - if msg.Source() != process && msg.Type() == MsgRoundChange { // Algorithm 3:17 - // Rebroadcast at most once per source per (strictly increasing) - // round, capped at maxDecidedResends per source: duplicate, - // replayed or maliciously round-incremented ROUND-CHANGE messages - // can't repeatedly trigger a large MsgDecided rebroadcast - // (amplification DoS). Note this path runs before the isJustified - // check, so the ROUND-CHANGE need not even be justified. A peer - // advancing to a genuinely new round still gets a fresh resend - // (up to the cap). - // - // The transport is expected to authenticate sources, but cap the - // tracked sources at d.Nodes as defense in depth so forged source - // IDs can't grow this state (and the total resends) without bound. - resend, ok := decidedResends[msg.Source()] - if !ok && len(decidedResends) >= d.Nodes { - break - } - - if msg.Round() > resend.Round && resend.Count < maxDecidedResends { - decidedResends[msg.Source()] = decidedResend{Round: msg.Round(), Count: resend.Count + 1} - err = broadcastMsg(MsgDecided, qCommit[0].Value(), qCommit) - } + if msg.Source() != process && msg.Type() == MsgRoundChange && // Algorithm 3:17 + allowDecidedResend(msg.Source(), msg.Round()) { + err = broadcastMsg(MsgDecided, qCommit[0].Value(), qCommit) } break From 495d5a2cd3b8a60204fab51c25adcba561f547d3 Mon Sep 17 00:00:00 2001 From: kalo <24719519+KaloyanTanev@users.noreply.github.com> Date: Fri, 12 Jun 2026 18:37:50 +0300 Subject: [PATCH 2/3] Cap incoming consensus message size --- core/consensus/qbft/qbft.go | 9 ++++++++- p2p/sender.go | 15 +++++++++++++++ p2p/sender_test.go | 28 ++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 1 deletion(-) diff --git a/core/consensus/qbft/qbft.go b/core/consensus/qbft/qbft.go index 8f2931330..7f2cc0133 100644 --- a/core/consensus/qbft/qbft.go +++ b/core/consensus/qbft/qbft.go @@ -37,6 +37,13 @@ import ( type subscriber func(ctx context.Context, duty core.Duty, value proto.Message) error +// maxConsensusMsgSize caps the wire size of an incoming QBFTConsensusMsg, well below +// the 128MB default p2p frame limit. A legitimate message carries at most a handful of +// small justification sub-messages (bounded in handle) plus its values, the largest of +// which is a single block proposal (a few MB on mainnet); 32MB leaves ample margin while +// bounding the receive/decode/allocation cost a malicious peer can inflict per message. +const maxConsensusMsgSize = 32 * 1024 * 1024 // 32 MB. + var supportedCompareDuties = []core.DutyType{core.DutyAttester} // newDefinition returns a qbft definition (this is constant across all consensus instances). @@ -365,7 +372,7 @@ func (c *Consensus) SubscribePriority(fn func(ctx context.Context, duty core.Dut func (c *Consensus) Start(ctx context.Context) { p2p.RegisterHandler("qbft", c.p2pNode, protocols.QBFTv2ProtocolID, func() proto.Message { return new(pbv1.QBFTConsensusMsg) }, - c.handle) + c.handle, p2p.WithReadLimit(maxConsensusMsgSize)) go func() { for { diff --git a/p2p/sender.go b/p2p/sender.go index b23f07b79..a36a69d15 100644 --- a/p2p/sender.go +++ b/p2p/sender.go @@ -243,6 +243,21 @@ func WithDelimitedProtocol(pID protocol.ID) func(*sendRecvOpts) { } } +// WithReadLimit returns an option that caps the maximum size in bytes of a single +// message read for the registered protocol(s), overriding the default maxMsgSize (128MB). +// Use a tighter limit for protocols whose legitimate messages are known to be much +// smaller, to bound the receive/decode/allocation cost of oversized (potentially +// malicious) messages before they ever reach the handler. +func WithReadLimit(limit int) func(*sendRecvOpts) { + return func(opts *sendRecvOpts) { + for _, pID := range opts.protocols { + opts.readersByProtocol[pID] = func(s network.Stream) pbio.Reader { + return pbio.NewDelimitedReader(s, limit) + } + } + } +} + // SetFuzzerDefaultsUnsafe sets default reader and writer functions to fuzzed versions of the same if p2p fuzz is enabled. // // The fuzzReaderWriter is responsible for creating a customized reader and writer for each network stream diff --git a/p2p/sender_test.go b/p2p/sender_test.go index c0eb4bad4..58943ffad 100644 --- a/p2p/sender_test.go +++ b/p2p/sender_test.go @@ -4,6 +4,8 @@ package p2p_test import ( "context" + "math" + "sync/atomic" "testing" "time" @@ -42,6 +44,32 @@ func TestWithReceiveTimeout(t *testing.T) { } } +func TestWithReadLimit(t *testing.T) { + servers := []host.Host{testutil.CreateHost(t, testutil.AvailableAddr(t)), testutil.CreateQUICHost(t, testutil.AvailableUDPAddr(t))} + clients := []host.Host{testutil.CreateHost(t, testutil.AvailableAddr(t)), testutil.CreateQUICHost(t, testutil.AvailableUDPAddr(t))} + + for i := range len(servers) { + client, server := clients[i], servers[i] + + client.Peerstore().AddAddrs(server.ID(), server.Addrs(), time.Hour) + + protocolID := protocol.ID("testprotocol") + + var handled atomic.Bool + p2p.RegisterHandler("test", server, protocolID, func() proto.Message { return new(pbv1.Duty) }, + func(context.Context, peer.ID, proto.Message) (proto.Message, bool, error) { + handled.Store(true) // Must never run: the message exceeds the read limit. + return nil, false, nil + }, p2p.WithReadLimit(4)) // Tiny limit so any real message trips it. + + // Slot serializes to an 11 byte varint, well over the 4 byte read limit. + err := p2p.SendReceive(context.Background(), client, server.ID(), + &pbv1.Duty{Slot: math.MaxUint64}, new(pbv1.Duty), protocolID) + require.Error(t, err) + require.False(t, handled.Load(), "handler must not run when message exceeds read limit") + } +} + func TestWithSendTimeout(t *testing.T) { servers := []host.Host{testutil.CreateHost(t, testutil.AvailableAddr(t)), testutil.CreateQUICHost(t, testutil.AvailableUDPAddr(t))} clients := []host.Host{testutil.CreateHost(t, testutil.AvailableAddr(t)), testutil.CreateQUICHost(t, testutil.AvailableUDPAddr(t))} From 0b0a6ca613dcdc374a5adf005e65fda3ad051d80 Mon Sep 17 00:00:00 2001 From: kalo <24719519+KaloyanTanev@users.noreply.github.com> Date: Fri, 12 Jun 2026 18:38:35 +0300 Subject: [PATCH 3/3] Fail fast on timeout --- core/consensus/qbft/qbft.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/consensus/qbft/qbft.go b/core/consensus/qbft/qbft.go index 7f2cc0133..1e4557fc8 100644 --- a/core/consensus/qbft/qbft.go +++ b/core/consensus/qbft/qbft.go @@ -690,6 +690,13 @@ func (c *Consensus) handle(ctx context.Context, _ peer.ID, req proto.Message) (p } for _, justification := range pbMsg.GetJustification() { + // Bail out as soon as the receive deadline fires rather than burning the full + // CPU budget on signature recovery for every justification in a large message. + if ctx.Err() != nil { + return nil, false, errors.Wrap(ctx.Err(), "receive cancelled during justification verification", + z.Any("duty", duty), z.Any("after", time.Since(t0))) + } + if err := verifyMsg(justification, c.pubkeys); err != nil { return nil, false, errors.Wrap(err, "invalid justification") }