Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions core/consensus/qbft/qbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@
}

// handle processes an incoming consensus wire message.
func (c *Consensus) handle(ctx context.Context, _ peer.ID, req proto.Message) (proto.Message, bool, error) {

Check failure on line 662 in core/consensus/qbft/qbft.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 16 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=ObolNetwork_charon&issues=AZ63Mc3PIgwHeUuLnncl&open=AZ63Mc3PIgwHeUuLnncl&pullRequest=4557
t0 := time.Now()

pbMsg, ok := req.(*pbv1.QBFTConsensusMsg)
Expand All @@ -678,6 +678,26 @@
return nil, false, errors.New("invalid duty", z.Any("duty", duty))
}

// Bound justification and value counts before doing any expensive per-element
// work: each justification requires an ECDSA signature recovery and each value
// a proto unmarshal + hash. Without this, a single authenticated peer could send
// a large message (up to the 128MB p2p frame) packed with hundreds of thousands
// of sub-messages to exhaust CPU/memory on every peer (amplification DoS).
//
// A legitimate justification set contains at most a quorum of ROUND-CHANGE
// messages plus a quorum of PREPARE messages (see qbft.getJustifiedQrc), bounded
// above by 2*nodes. Each message (the main message plus each justification)
// references at most two values (value and prepared value), so the values map is
// bounded by 2*(justifications+1).
nodes := len(c.pubkeys)
if n := len(pbMsg.GetJustification()); n > 2*nodes {
return nil, false, errors.New("too many justifications", z.Int("count", n), z.Int("max", 2*nodes))
}

if n, maxValues := len(pbMsg.GetValues()), 2*(len(pbMsg.GetJustification())+1); n > maxValues {
return nil, false, errors.New("too many values", z.Int("count", n), z.Int("max", maxValues))
}

for _, justification := range pbMsg.GetJustification() {
if err := verifyMsg(justification, c.pubkeys); err != nil {
return nil, false, errors.Wrap(err, "invalid justification")
Expand Down
123 changes: 123 additions & 0 deletions core/consensus/qbft/qbft_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,129 @@ func TestQBFTConsensusHandle(t *testing.T) {
}
}

// TestQBFTConsensusHandleAmplificationLimits verifies that handle rejects messages
// carrying more justifications or values than a legitimate consensus message ever
// needs, before doing the expensive per-element signature recovery / unmarshalling.
// This caps the CPU/memory amplification a single authenticated peer can inflict.
func TestQBFTConsensusHandleAmplificationLimits(t *testing.T) {
// newConsensus returns a single-node consensus, so max justifications = 2*nodes = 2.
newConsensus := func(t *testing.T) (*Consensus, *k1.PrivateKey) {
t.Helper()

var c Consensus

deadliner := coremocks.NewDeadliner(t)
deadliner.On("Add", mock.Anything).Maybe().Return(core.DeadlineScheduled)
c.deadliner = deadliner
c.gaterFunc = func(core.Duty) bool { return true }
c.mutable.instances = make(map[core.Duty]*instance.IO[Msg])

p2pKey := testutil.GenerateInsecureK1Key(t, 0)
c.pubkeys = make(map[int64]*k1.PublicKey)
c.pubkeys[0] = p2pKey.PubKey()

return &c, p2pKey
}

// signedBase returns a validly-signed main message so verification reaches the limit checks.
signedBase := func(t *testing.T, p2pKey *k1.PrivateKey) *pbv1.QBFTConsensusMsg {
t.Helper()

base := &pbv1.QBFTConsensusMsg{Msg: newRandomQBFTMsg(t)}
base.Msg.PeerIdx = 0
base.Msg.Round = 1
base.Msg.Duty = &pbv1.Duty{Slot: 42, Type: 1}

msgHash, err := hashProto(base.GetMsg())
require.NoError(t, err)

sign, err := k1util.Sign(p2pKey, msgHash[:])
require.NoError(t, err)

base.Msg.Signature = sign

return base
}

// signedJustification returns a validly-signed justification matching the base message's duty.
signedJustification := func(t *testing.T, p2pKey *k1.PrivateKey) *pbv1.QBFTMsg {
t.Helper()

j := newRandomQBFTMsg(t)
j.PeerIdx = 0
j.Round = 1 // verifyMsg requires round > 0, don't rely on the random value.
j.Duty = &pbv1.Duty{Slot: 42, Type: 1}

jHash, err := hashProto(j)
require.NoError(t, err)

j.Signature, err = k1util.Sign(p2pKey, jHash[:])
require.NoError(t, err)

return j
}

t.Run("too many justifications rejected", func(t *testing.T) {
c, p2pKey := newConsensus(t)
base := signedBase(t, p2pKey)

// 3 justifications > 2*nodes (2). Content is irrelevant since the count
// check runs before any per-justification verification.
for range 3 {
base.Justification = append(base.Justification, &pbv1.QBFTMsg{})
}

_, _, err := c.handle(context.Background(), "peerID", base)
require.ErrorContains(t, err, "too many justifications")
})

t.Run("max justifications accepted", func(t *testing.T) {
c, p2pKey := newConsensus(t)
base := signedBase(t, p2pKey)

// Exactly 2*nodes (2) justifications must not be rejected by the count check.
for range 2 {
base.Justification = append(base.Justification, signedJustification(t, p2pKey))
}

_, _, err := c.handle(context.Background(), "peerID", base)
require.NoError(t, err)
})

t.Run("too many values rejected", func(t *testing.T) {
c, p2pKey := newConsensus(t)
base := signedBase(t, p2pKey)

// 0 justifications => max values = 2*(0+1) = 2. Provide 3.
base.Values = []*anypb.Any{{}, {}, {}}

_, _, err := c.handle(context.Background(), "peerID", base)
require.ErrorContains(t, err, "too many values")
})

t.Run("max values accepted", func(t *testing.T) {
c, p2pKey := newConsensus(t)
base := signedBase(t, p2pKey)

// 2 justifications => max values = 2*(2+1) = 6. A message carrying exactly
// the maximum must pass the count check and the rest of handle, guarding
// against the bound being tightened below the legitimate maximum.
for range 2 {
base.Justification = append(base.Justification, signedJustification(t, p2pKey))
}

for i := range 6 {
value, err := anypb.New(&pbv1.Duty{Slot: uint64(i + 1)})
require.NoError(t, err)

base.Values = append(base.Values, value)
}

_, _, err := c.handle(context.Background(), "peerID", base)
require.NoError(t, err)
})
}

func TestInstanceIO_MaybeStart(t *testing.T) {
t.Run("MaybeStart for new instance", func(t *testing.T) {
inst1 := instance.NewIO[Msg]()
Expand Down
38 changes: 37 additions & 1 deletion core/qbft/qbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,21 @@ type dedupKey struct {
Round int64
}

// maxDecidedResends bounds the number of MsgDecided rebroadcasts that
// post-decision ROUND-CHANGE messages from a single peer can trigger.
// A lagging peer re-sends ROUND-CHANGE with an increasing round on each
// timeout until it learns the decided value, so a handful of resends is
// ample for liveness, while the cap stops a malicious peer minting
// ever-higher rounds from extracting unlimited large rebroadcasts.
const maxDecidedResends = 16

// decidedResend tracks the MsgDecided rebroadcasts triggered by a peer's
// post-decision ROUND-CHANGE messages.
type decidedResend struct {
Round int64 // Highest round a rebroadcast was triggered for.
Count int // Total rebroadcasts triggered by the peer.
}

// errors
var (
errCompare = errors.New("compare leader value with local value failed")
Expand Down Expand Up @@ -211,6 +226,7 @@ func Run[I any, V comparable, C any](ctx context.Context, d Definition[I, V, C],
qCommit []Msg[I, V, C]
buffer = make(map[int64][]Msg[I, V, C])
dedupRules = make(map[dedupKey]bool)
decidedResends = make(map[int64]decidedResend) // Bounds MsgDecided rebroadcasts by peer source.
timerChan <-chan time.Time
stopTimer func()
)
Expand Down Expand Up @@ -319,7 +335,27 @@ func Run[I any, V comparable, C any](ctx context.Context, d Definition[I, V, C],
// Just send Qcommit if consensus already decided
if len(qCommit) > 0 {
if msg.Source() != process && msg.Type() == MsgRoundChange { // Algorithm 3:17
err = broadcastMsg(MsgDecided, qCommit[0].Value(), qCommit)
// Rebroadcast at most once per source per (strictly increasing)
// round, capped at maxDecidedResends per source: duplicate,
// replayed or maliciously round-incremented ROUND-CHANGE messages
// can't repeatedly trigger a large MsgDecided rebroadcast
// (amplification DoS). Note this path runs before the isJustified
// check, so the ROUND-CHANGE need not even be justified. A peer
// advancing to a genuinely new round still gets a fresh resend
// (up to the cap).
//
// The transport is expected to authenticate sources, but cap the
// tracked sources at d.Nodes as defense in depth so forged source
// IDs can't grow this state (and the total resends) without bound.
resend, ok := decidedResends[msg.Source()]
if !ok && len(decidedResends) >= d.Nodes {
break
}

if msg.Round() > resend.Round && resend.Count < maxDecidedResends {
decidedResends[msg.Source()] = decidedResend{Round: msg.Round(), Count: resend.Count + 1}
err = broadcastMsg(MsgDecided, qCommit[0].Value(), qCommit)
}
}

break
Expand Down
127 changes: 127 additions & 0 deletions core/qbft/qbft_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,133 @@ func (m msg) Justification() []Msg[int64, int64, int64] {
return resp
}

// TestDecidedRebroadcastLimits verifies that once consensus has decided, post-decision
// ROUND-CHANGE messages trigger at most one MsgDecided rebroadcast per source per
// (strictly increasing) round, capped at maxDecidedResends per source. This bounds
// amplification while still serving lagging peers that advance to new rounds.
func TestDecidedRebroadcastLimits(t *testing.T) {
const (
n = 4
process = 0
value = 42
)

// Build a justified MsgDecided: quorum (3) commits for round 1, value 42.
commits := []msg{
{msgType: MsgCommit, peerIdx: 1, round: 1, value: value},
{msgType: MsgCommit, peerIdx: 2, round: 1, value: value},
{msgType: MsgCommit, peerIdx: 3, round: 1, value: value},
}
decided := msg{msgType: MsgDecided, peerIdx: 1, round: 1, value: value, justify: commits}

rc := func(source, round int64) msg {
return msg{msgType: MsgRoundChange, peerIdx: source, round: round}
}

// runDecidedInstance starts a qbft instance, sends it the decided message and
// returns a synchronous send function plus the channel collecting MsgDecided
// broadcasts. The receive channel is unbuffered, so the instance only accepts
// a send once it has fully processed all earlier messages (the just-sent
// message may still be in flight, hence tests end with an inert flush send).
// This makes broadcast-count assertions deterministic.
runDecidedInstance := func(t *testing.T) (func(msg), chan MsgType) {
t.Helper()

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

recv := make(chan Msg[int64, int64, int64])
decidedBroadcasts := make(chan MsgType, 100)

def := noopDef
def.Nodes = n
def.FIFOLimit = 100
def.Decide = func(context.Context, int64, int64, []Msg[int64, int64, int64]) {}

trans := Transport[int64, int64, int64]{
Broadcast: func(_ context.Context, typ MsgType, _ int64, _ int64, _ int64, _ int64,
_ int64, _ int64, _ []Msg[int64, int64, int64],
) error {
if typ == MsgDecided {
decidedBroadcasts <- typ
}

return nil
},
Receive: recv,
}

// Never-delivering input channels (this process is not a leader and proposes nothing).
go func() {
_ = Run(ctx, def, trans, 0, process, make(chan int64), make(chan int64))
}()

send := func(m msg) {
select {
case recv <- m:
case <-time.After(5 * time.Second):
require.Fail(t, "timeout sending message to qbft instance")
}
}

send(decided)

return send, decidedBroadcasts
}

t.Run("dedup duplicates and stale rounds", func(t *testing.T) {
send, broadcasts := runDecidedInstance(t)

for _, m := range []msg{
rc(2, 2), // Rebroadcast #1.
rc(2, 2), // Duplicate, no rebroadcast.
rc(2, 2), // Duplicate, no rebroadcast.
rc(3, 2), // Rebroadcast #2 (other source).
rc(3, 2), // Duplicate, no rebroadcast.
rc(2, 1), // Stale round (already rebroadcast for round 2), no rebroadcast.
rc(2, 3), // Rebroadcast #3 (source advanced to a new round).
} {
send(m)
}

// Flush with an inert message: once this send returns, all messages above
// have been fully processed, so the broadcast count is final.
send(rc(2, 1))

require.Len(t, broadcasts, 3)
})

t.Run("resend cap per source", func(t *testing.T) {
send, broadcasts := runDecidedInstance(t)

// One peer keeps advancing rounds: only the first maxDecidedResends
// ROUND-CHANGE messages may trigger a rebroadcast.
for round := int64(2); round < 2+maxDecidedResends+5; round++ {
send(rc(2, round))
}

// Flush with an inert message (stale round, never rebroadcast).
send(rc(2, 1))

require.Len(t, broadcasts, maxDecidedResends)
})

t.Run("forged sources capped", func(t *testing.T) {
send, broadcasts := runDecidedInstance(t)

// Distinct forged source IDs: at most n sources are tracked, so only
// the first n may trigger a rebroadcast; further new sources are ignored.
for source := int64(100); source < 100+n+5; source++ {
send(rc(source, 2))
}

// Flush with an inert message (stale round, never rebroadcast).
send(rc(100, 1))

require.Len(t, broadcasts, n)
})
}

func TestIsJustifiedPrePrepare(t *testing.T) {
const (
n = 4
Expand Down
Loading