Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 40 additions & 19 deletions core/consensus/qbft/qbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@

type subscriber func(ctx context.Context, duty core.Duty, value proto.Message) error

// maxConsensusMsgSize caps the wire size of an incoming QBFTConsensusMsg, well below
// the 128MB default p2p frame limit. A legitimate message carries at most a handful of
// small justification sub-messages (bounded in handle) plus its values, the largest of
// which is a single block proposal (a few MB on mainnet); 32MB leaves ample margin while
// bounding the receive/decode/allocation cost a malicious peer can inflict per message.
const maxConsensusMsgSize = 32 * 1024 * 1024 // 32 MB.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope you know this is sufficient.. I saw full blocks ~20Mb in our metrics, it is close to 32...


var supportedCompareDuties = []core.DutyType{core.DutyAttester}

// newDefinition returns a qbft definition (this is constant across all consensus instances).
Expand Down Expand Up @@ -365,7 +372,7 @@
func (c *Consensus) Start(ctx context.Context) {
p2p.RegisterHandler("qbft", c.p2pNode, protocols.QBFTv2ProtocolID,
func() proto.Message { return new(pbv1.QBFTConsensusMsg) },
c.handle)
c.handle, p2p.WithReadLimit(maxConsensusMsgSize))

go func() {
for {
Expand Down Expand Up @@ -659,7 +666,7 @@
}

// handle processes an incoming consensus wire message.
func (c *Consensus) handle(ctx context.Context, _ peer.ID, req proto.Message) (proto.Message, bool, error) {

Check failure on line 669 in core/consensus/qbft/qbft.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 17 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=ObolNetwork_charon&issues=AZ68fYPWk8ZjXgreu3Tp&open=AZ68fYPWk8ZjXgreu3Tp&pullRequest=4560
t0 := time.Now()

pbMsg, ok := req.(*pbv1.QBFTConsensusMsg)
Expand All @@ -678,27 +685,18 @@
return nil, false, errors.New("invalid duty", z.Any("duty", duty))
}

// Bound justification and value counts before doing any expensive per-element
// work: each justification requires an ECDSA signature recovery and each value
// a proto unmarshal + hash. Without this, a single authenticated peer could send
// a large message (up to the 128MB p2p frame) packed with hundreds of thousands
// of sub-messages to exhaust CPU/memory on every peer (amplification DoS).
//
// A legitimate justification set contains at most a quorum of ROUND-CHANGE
// messages plus a quorum of PREPARE messages (see qbft.getJustifiedQrc), bounded
// above by 2*nodes. Each message (the main message plus each justification)
// references at most two values (value and prepared value), so the values map is
// bounded by 2*(justifications+1).
nodes := len(c.pubkeys)
if n := len(pbMsg.GetJustification()); n > 2*nodes {
return nil, false, errors.New("too many justifications", z.Int("count", n), z.Int("max", 2*nodes))
}

if n, maxValues := len(pbMsg.GetValues()), 2*(len(pbMsg.GetJustification())+1); n > maxValues {
return nil, false, errors.New("too many values", z.Int("count", n), z.Int("max", maxValues))
if err := verifyMsgLimits(pbMsg, len(c.pubkeys)); err != nil {
return nil, false, err
}

for _, justification := range pbMsg.GetJustification() {
// Bail out as soon as the receive deadline fires rather than burning the full
// CPU budget on signature recovery for every justification in a large message.
if ctx.Err() != nil {
return nil, false, errors.Wrap(ctx.Err(), "receive cancelled during justification verification",
z.Any("duty", duty), z.Any("after", time.Since(t0)))
}

if err := verifyMsg(justification, c.pubkeys); err != nil {
return nil, false, errors.Wrap(err, "invalid justification")
}
Expand Down Expand Up @@ -796,6 +794,29 @@
return peerIdx, nil
}

// verifyMsgLimits bounds the justification and value counts of a consensus message
// before any expensive per-element work (each justification requires an ECDSA
// signature recovery, each value a proto unmarshal + hash). Without it a single
// authenticated peer could pack one large message with many sub-messages to exhaust
// CPU/memory on every peer (amplification DoS).
func verifyMsgLimits(pbMsg *pbv1.QBFTConsensusMsg, nodes int) error {
// A legitimate justification set contains at most a quorum of ROUND-CHANGE plus a
// quorum of PREPARE messages (see qbft.getJustifiedQrc), bounded above by 2*nodes.
maxJust := 2 * nodes
if n := len(pbMsg.GetJustification()); n > maxJust {
return errors.New("too many justifications", z.Int("count", n), z.Int("max", maxJust))
}

// Each message (the main message plus each justification) references at most two
// values (value and prepared value), so the values are bounded by 2*(justifications+1).
maxValues := 2 * (len(pbMsg.GetJustification()) + 1)
if n := len(pbMsg.GetValues()); n > maxValues {
return errors.New("too many values", z.Int("count", n), z.Int("max", maxValues))
}

return nil
}

func verifyMsg(msg *pbv1.QBFTMsg, pubkeys map[int64]*k1.PublicKey) error {
if msg == nil || msg.GetDuty() == nil {
return errors.New("invalid consensus message")
Expand Down
53 changes: 30 additions & 23 deletions core/qbft/qbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,30 @@ func Run[I any, V comparable, C any](ctx context.Context, d Definition[I, V, C],
return true
}

// allowDecidedResend reports whether a post-decision ROUND-CHANGE from source at
// round may trigger a MsgDecided rebroadcast, recording it when it does. It permits
// at most one rebroadcast per source per strictly-increasing round, capped at
// maxDecidedResends per source, so duplicate, replayed or maliciously
// round-incremented messages can't repeatedly trigger a large rebroadcast
// (amplification DoS), while a peer advancing to a genuinely new round still gets
// served. The transport is expected to authenticate sources, but tracked sources
// are capped at d.Nodes as defense in depth so forged source IDs can't grow this
// state (and the total resends) without bound.
allowDecidedResend := func(source, round int64) bool {
resend, ok := decidedResends[source]
if !ok && len(decidedResends) >= d.Nodes {
return false
}

if round <= resend.Round || resend.Count >= maxDecidedResends {
return false
}

decidedResends[source] = decidedResend{Round: round, Count: resend.Count + 1}

return true
}
Comment on lines +301 to +314

// changeRound updates round and clears the rule dedup state.
changeRound := func(newRound int64, rule UponRule) {
if round == newRound {
Expand Down Expand Up @@ -332,30 +356,13 @@ func Run[I any, V comparable, C any](ctx context.Context, d Definition[I, V, C],
inputValueCh = nil // Don't read from this channel again.

case msg := <-t.Receive:
// Just send Qcommit if consensus already decided
// Just send Qcommit if consensus already decided. The resend is rate-limited
// (see allowDecidedResend) to bound amplification; note this runs before the
// isJustified check, so the ROUND-CHANGE need not even be justified.
if len(qCommit) > 0 {
if msg.Source() != process && msg.Type() == MsgRoundChange { // Algorithm 3:17
// Rebroadcast at most once per source per (strictly increasing)
// round, capped at maxDecidedResends per source: duplicate,
// replayed or maliciously round-incremented ROUND-CHANGE messages
// can't repeatedly trigger a large MsgDecided rebroadcast
// (amplification DoS). Note this path runs before the isJustified
// check, so the ROUND-CHANGE need not even be justified. A peer
// advancing to a genuinely new round still gets a fresh resend
// (up to the cap).
//
// The transport is expected to authenticate sources, but cap the
// tracked sources at d.Nodes as defense in depth so forged source
// IDs can't grow this state (and the total resends) without bound.
resend, ok := decidedResends[msg.Source()]
if !ok && len(decidedResends) >= d.Nodes {
break
}

if msg.Round() > resend.Round && resend.Count < maxDecidedResends {
decidedResends[msg.Source()] = decidedResend{Round: msg.Round(), Count: resend.Count + 1}
err = broadcastMsg(MsgDecided, qCommit[0].Value(), qCommit)
}
if msg.Source() != process && msg.Type() == MsgRoundChange && // Algorithm 3:17
allowDecidedResend(msg.Source(), msg.Round()) {
err = broadcastMsg(MsgDecided, qCommit[0].Value(), qCommit)
}

break
Expand Down
15 changes: 15 additions & 0 deletions p2p/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,21 @@ func WithDelimitedProtocol(pID protocol.ID) func(*sendRecvOpts) {
}
}

// WithReadLimit returns an option that caps the maximum size in bytes of a single
// message read for the registered protocol(s), overriding the default maxMsgSize (128MB).
// Use a tighter limit for protocols whose legitimate messages are known to be much
// smaller, to bound the receive/decode/allocation cost of oversized (potentially
// malicious) messages before they ever reach the handler.
func WithReadLimit(limit int) func(*sendRecvOpts) {
return func(opts *sendRecvOpts) {
for _, pID := range opts.protocols {
opts.readersByProtocol[pID] = func(s network.Stream) pbio.Reader {
return pbio.NewDelimitedReader(s, limit)
}
}
}
Comment on lines +252 to +258
}

// SetFuzzerDefaultsUnsafe sets default reader and writer functions to fuzzed versions of the same if p2p fuzz is enabled.
//
// The fuzzReaderWriter is responsible for creating a customized reader and writer for each network stream
Expand Down
28 changes: 28 additions & 0 deletions p2p/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package p2p_test

import (
"context"
"math"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -42,6 +44,32 @@ func TestWithReceiveTimeout(t *testing.T) {
}
}

func TestWithReadLimit(t *testing.T) {
servers := []host.Host{testutil.CreateHost(t, testutil.AvailableAddr(t)), testutil.CreateQUICHost(t, testutil.AvailableUDPAddr(t))}
clients := []host.Host{testutil.CreateHost(t, testutil.AvailableAddr(t)), testutil.CreateQUICHost(t, testutil.AvailableUDPAddr(t))}

for i := range len(servers) {
client, server := clients[i], servers[i]

client.Peerstore().AddAddrs(server.ID(), server.Addrs(), time.Hour)

protocolID := protocol.ID("testprotocol")

var handled atomic.Bool
p2p.RegisterHandler("test", server, protocolID, func() proto.Message { return new(pbv1.Duty) },
func(context.Context, peer.ID, proto.Message) (proto.Message, bool, error) {
handled.Store(true) // Must never run: the message exceeds the read limit.
return nil, false, nil
}, p2p.WithReadLimit(4)) // Tiny limit so any real message trips it.

// Slot serializes to an 11 byte varint, well over the 4 byte read limit.
err := p2p.SendReceive(context.Background(), client, server.ID(),
&pbv1.Duty{Slot: math.MaxUint64}, new(pbv1.Duty), protocolID)
require.Error(t, err)
require.False(t, handled.Load(), "handler must not run when message exceeds read limit")
}
}

func TestWithSendTimeout(t *testing.T) {
servers := []host.Host{testutil.CreateHost(t, testutil.AvailableAddr(t)), testutil.CreateQUICHost(t, testutil.AvailableUDPAddr(t))}
clients := []host.Host{testutil.CreateHost(t, testutil.AvailableAddr(t)), testutil.CreateQUICHost(t, testutil.AvailableUDPAddr(t))}
Expand Down
Loading