Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions libs/chainconsensus/oracle/error_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ var errInsufficientErrorOb = fmt.Errorf("insufficient number of errors")

// modeForError returns a slice of common errors for a given request when:
// 1. The total number of observations is at least (N+F)/2+1, and
// 2. The number of observed errors is at least F+1.
// If no single error was observed by at least F+1 nodes, it returns a slice
// of the most frequently observed errors whose combined observation count equals F+1.
// 2. The number of observed errors is at least minMatching.
// If no single error was observed by at least minMatching nodes, it returns a slice
// of the most frequently observed errors whose combined observation count equals minMatching.
// The returned int is the count of the most frequently observed error(s) (not necessarily identical).
func modeForError(N, F int, requestID string, aos []attributedObservation) ([][]byte, int, error) {
func modeForError(N, F, minMatching int, requestID string, aos []attributedObservation) ([][]byte, int, error) {
type keyT [sha256.Size]byte
counters := make(map[keyT]*counter[[]byte])
var totalNum int
Expand Down Expand Up @@ -82,13 +82,13 @@ func modeForError(N, F int, requestID string, aos []attributedObservation) ([][]
for _, c := range sortedCounters {
result = append(result, c.value)
count += c.count
if count >= F+1 {
if count >= minMatching {
break
}
}

if count < F+1 {
return nil, count, fmt.Errorf("%w: expected %d, got %d", errInsufficientErrorOb, F+1, count)
if count < minMatching {
return nil, count, fmt.Errorf("%w: expected %d, got %d", errInsufficientErrorOb, minMatching, count)
}

return result, count, nil
Expand Down
85 changes: 84 additions & 1 deletion libs/chainconsensus/oracle/error_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestErrorMode(t *testing.T) {
Observation: strToObservation(ob),
}
}
rawActualErrors, actualCount, err := modeForError(N, tc.F, requestID, aos)
rawActualErrors, actualCount, err := modeForError(N, tc.F, tc.F+1, requestID, aos)
if tc.ExpectedError != "" {
require.ErrorContains(t, err, tc.ExpectedError)
} else {
Expand Down Expand Up @@ -153,3 +153,86 @@ func TestErrorMode(t *testing.T) {
runTest(t, newObservation)
})
}

func TestErrorMode_TwoFPlusOneThreshold(t *testing.T) {
// N=7, F=2: with minMatching=5 (2F+1), we need 5 matching errors to succeed.
const requestID = "req-2f1"
N, F := 7, 2
minMatching := 2*F + 1 // 5

makeAos := func(errors []string) []attributedObservation {
aos := make([]attributedObservation, len(errors))
for i, e := range errors {
aos[i] = attributedObservation{
//nolint:gosec
Observer: commontypes.OracleID(i),
Observation: &types.Observation{
Observations: map[string]*types.RequestObservation{
requestID: {Observation: &types.RequestObservation_Error{Error: []byte(e)}},
},
},
}
}
return aos
}

t.Run("succeeds when minMatching number of identical errors are reported", func(t *testing.T) {
errors := []string{"err", "err", "err", "err", "err", "other", "other"}
result, actualCount, err := modeForError(N, F, minMatching, requestID, makeAos(errors))
require.NoError(t, err)
require.Equal(t, 5, actualCount)
require.Equal(t, []string{"err"}, func() []string {
s := make([]string, len(result))
for i, b := range result {
s[i] = string(b)
}
return s
}())
})

t.Run("succeeds when minMatching number of different errors are reported", func(t *testing.T) {
errors := []string{"err-a", "err-a", "err-b", "err-b", "err-c", "err-d", "err-e"}
result, actualCount, err := modeForError(N, F, minMatching, requestID, makeAos(errors))
require.NoError(t, err)
require.Equal(t, 5, actualCount)
require.Equal(t, []string{"err-a", "err-b", "err-c"}, func() []string {
s := make([]string, len(result))
for i, b := range result {
s[i] = string(b)
}
return s
}())
})

t.Run("fails when combined error observations don't reach minMatching", func(t *testing.T) {
// Only 4 error observations total (3 distinct errors), non-errors fill the remaining 3 slots
allObs := make([]attributedObservation, N)
errPayloads := []string{"err-a", "err-b", "err-a", "err-c"}
for i, e := range errPayloads {
allObs[i] = attributedObservation{
//nolint:gosec
Observer: commontypes.OracleID(i),
Observation: &types.Observation{
Observations: map[string]*types.RequestObservation{
requestID: {Observation: &types.RequestObservation_Error{Error: []byte(e)}},
},
},
}
}
// remaining 3 are non-error (EventuallyConsistent) — count toward totalNum but not error count
for i := len(errPayloads); i < N; i++ {
allObs[i] = attributedObservation{
//nolint:gosec
Observer: commontypes.OracleID(i),
Observation: &types.Observation{
Observations: map[string]*types.RequestObservation{
requestID: {Observation: &types.RequestObservation_EventuallyConsistent{EventuallyConsistent: []byte("value")}},
},
},
}
}
_, actualCount, err := modeForError(N, F, minMatching, requestID, allObs)
require.ErrorContains(t, err, "insufficient number of errors")
require.Equal(t, 4, actualCount)
})
}
8 changes: 4 additions & 4 deletions libs/chainconsensus/oracle/mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ type observation[keyT comparable, valueT any] struct {
}

// mode - returns most frequent value and its support count, if total number of observations is at least (N+F)/2+1 and
// number of values with identical keys is at least F+1. Returns error, otherwise.
// number of values with identical keys is at least minMatching. Returns error, otherwise.
// If multiple values have identical number of observations, prefers value reported by oracle with the lowest oracleID.
// The returned int is the count of nodes that observed the winning value (i.e. the number of identical responses).
func mode[keyT comparable, valueT any](N, F int, observations iter.Seq2[commontypes.OracleID, *observation[keyT, valueT]]) (valueT, int, error) {
func mode[keyT comparable, valueT any](N, F, minMatching int, observations iter.Seq2[commontypes.OracleID, *observation[keyT, valueT]]) (valueT, int, error) {
counters := make(map[keyT]*counter[valueT])
var totalNum int
for nodeID, nodeObservation := range observations {
Expand Down Expand Up @@ -66,9 +66,9 @@ func mode[keyT comparable, valueT any](N, F int, observations iter.Seq2[commonty
return zero, 0, errors.New("unexpected state: highestCounter is nil")
}

if highestCounter.count < F+1 {
if highestCounter.count < minMatching {
var zero valueT
return zero, highestCounter.count, fmt.Errorf("insufficient number of identical observations: expected %d, got %d", F+1, highestCounter.count)
return zero, highestCounter.count, fmt.Errorf("insufficient number of identical observations: expected %d, got %d", minMatching, highestCounter.count)
}

return highestCounter.value, highestCounter.count, nil
Expand Down
71 changes: 71 additions & 0 deletions libs/chainconsensus/oracle/mode_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package oracle

import (
"iter"
"testing"

"github.com/smartcontractkit/libocr/commontypes"
"github.com/stretchr/testify/require"
)

// stringObs returns an iterator over the given values, attributed to sequential oracle IDs.
// An empty string produces a nil observation (counts toward total but not toward any key).
func stringObs(values ...string) iter.Seq2[commontypes.OracleID, *observation[string, string]] {
return func(yield func(commontypes.OracleID, *observation[string, string]) bool) {
for i, v := range values {
//nolint:gosec
id := commontypes.OracleID(i)
if v == "" {
if !yield(id, nil) {
return
}
continue
}
if !yield(id, &observation[string, string]{Key: v, Value: v}) {
return
}
}
}
}

func TestMode_DefaultFPlusOne(t *testing.T) {
// N=7, F=2 → byzQuorumSize=5, default minMatching=F+1=3
N, F := 7, 2
got, actualCount, err := mode[string, string](N, F, F+1, stringObs("a", "a", "a", "b", "b", "c", ""))
require.NoError(t, err)
require.Equal(t, "a", got)
require.Equal(t, 3, actualCount)
}

func TestMode_DefaultFPlusOne_InsufficientMatching(t *testing.T) {
// N=7, F=2 → byzQuorumSize=5; default minMatching=F+1=3
N, F := 7, 2
_, actualCount, err := mode[string, string](N, F, F+1, stringObs("a", "a", "b", "b", "c", "c", "d"))
require.ErrorContains(t, err, "insufficient number of identical observations: expected 3, got 2")
require.Equal(t, 2, actualCount)
}

func TestMode_TwoFPlusOne(t *testing.T) {
// N=7, F=2 → minMatching=5; 5 agree on "a"
N, F := 7, 2
got, actualCount, err := mode[string, string](N, F, 2*F+1, stringObs("a", "a", "a", "a", "a", "b", "c"))
require.NoError(t, err)
require.Equal(t, "a", got)
require.Equal(t, 5, actualCount)
}

func TestMode_TwoFPlusOne_InsufficientMatching(t *testing.T) {
// N=7, F=2 → minMatching=5; only 4 agree on "a" → fail
N, F := 7, 2
_, actualCount, err := mode[string, string](N, F, 2*F+1, stringObs("a", "a", "a", "a", "b", "b", "c"))
require.ErrorContains(t, err, "insufficient number of identical observations: expected 5, got 4")
require.Equal(t, 4, actualCount)
}

func TestMode_InsufficientTotalObservations(t *testing.T) {
// N=7, F=2 → byzQuorumSize=5; only 4 provided → fail before matching check
N, F := 7, 2
_, actualCount, err := mode[string, string](N, F, F+1, stringObs("a", "a", "a", "a"))
require.ErrorContains(t, err, "insufficient number of observations: expected 5, got 4")
require.Equal(t, 0, actualCount)
}
32 changes: 22 additions & 10 deletions libs/chainconsensus/oracle/reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,19 @@ var _ ocr3types.ReportingPlugin[[]byte] = (*reportingPlugin)(nil)

type Config struct {
ocr3types.ReportingPluginConfig
MaxBatchSize int // max number of requests that this node will try to process in a single round
MaxObservationLength int // max length of observation in bytes
MaxBatchSize int // max number of requests that this node will try to process in a single round
MaxObservationLength int // max length of observation in bytes
MinResponsesToAggregate int // minimum responses to aggregate to accept a read value; 0 means use F+1
}

// matchingThreshold returns the minimum number of nodes that must report identical
// observations for a read value to be accepted. When MinIdenticalObservations is
// zero the default of F+1 is used.
func (c Config) matchingThreshold() int {
if c.MinResponsesToAggregate > 0 {
return c.MinResponsesToAggregate
}
return c.F + 1
}

type reportingPlugin struct {
Expand Down Expand Up @@ -455,7 +466,7 @@ func (rp *reportingPlugin) agreeOnObservationType(requestID string, aos []attrib
}
}

value, _, err := mode[ctypes.ObservationType, ctypes.ObservationType](rp.config.N, rp.config.F, iterator)
value, _, err := mode[ctypes.ObservationType, ctypes.ObservationType](rp.config.N, rp.config.F, rp.config.matchingThreshold(), iterator)
return value, err
}

Expand Down Expand Up @@ -521,18 +532,19 @@ func (rp *reportingPlugin) agreeOnAggregationMethod(requestID string, aos []attr
}
}

value, _, err := mode[string, string](rp.config.N, rp.config.F, iterator)
value, _, err := mode[string, string](rp.config.N, rp.config.F, rp.config.matchingThreshold(), iterator)
return value, err
}

func (rp *reportingPlugin) agreeOnMissingRequestIDs(aos []attributedObservation) ([]string, error) {
counter := make(map[string]int)
var result []string
minMatching := rp.config.matchingThreshold()
for _, ob := range aos {
// MissingRequestIDs are guaranteed to be unique per observation by ValidateObservation
for _, missingRequestID := range ob.Observation.MissingRequestIDs {
counter[missingRequestID]++
if counter[missingRequestID] == rp.config.F+1 {
if counter[missingRequestID] == minMatching {
result = append(result, missingRequestID)
}
}
Expand Down Expand Up @@ -567,7 +579,7 @@ func (rp *reportingPlugin) agreeOnEventuallyConsistentValue(requestID string, ao
}
}

return mode[[32]byte, []byte](rp.config.N, rp.config.F, iterator)
return mode[[32]byte, []byte](rp.config.N, rp.config.F, rp.config.matchingThreshold(), iterator)
}

func (rp *reportingPlugin) agreeOnHashableValue(requestID string, aos []attributedObservation) ([]byte, int, error) {
Expand Down Expand Up @@ -604,7 +616,7 @@ func (rp *reportingPlugin) agreeOnHashableValue(requestID string, aos []attribut
}
}

return mode[[32]byte, []byte](rp.config.N, rp.config.F, iterator)
return mode[[32]byte, []byte](rp.config.N, rp.config.F, rp.config.matchingThreshold(), iterator)
}

func medianInt64(heights []int64) float64 {
Expand Down Expand Up @@ -689,7 +701,7 @@ func (rp *reportingPlugin) agreeOnVolatileValue(requestID string, aos []attribut

var best *volatileOutcomeCandidate
for _, candidate := range candidates {
if candidate.supporters < rp.config.F+1 {
if candidate.supporters < rp.config.matchingThreshold() {
continue
}
if best == nil || isVolatileCandidateABetter(&candidate, best) {
Expand All @@ -703,7 +715,7 @@ func (rp *reportingPlugin) agreeOnVolatileValue(requestID string, aos []attribut
}, best.supporters, nil
}

errPayload, errorCount, err := modeForError(rp.config.N, rp.config.F, requestID, aos)
errPayload, errorCount, err := modeForError(rp.config.N, rp.config.F, rp.config.matchingThreshold(), requestID, aos)
if err != nil {
if errors.Is(err, errInsufficientErrorOb) {
return nil, errorCount, errors.New("no volatile outcome candidate reached F+1 supporters")
Expand Down Expand Up @@ -798,7 +810,7 @@ func (rp *reportingPlugin) Outcome(
Outcome: &ctypes.RequestOutcome_LockableToBlock{LockableToBlock: &emptypb.Empty{}},
})
case ctypes.ObservationType_ERROR:
requestErrors, identicalCount, err := modeForError(rp.config.N, rp.config.F, requestID, aos)
requestErrors, identicalCount, err := modeForError(rp.config.N, rp.config.F, rp.config.matchingThreshold(), requestID, aos)
rp.metrics.RecordIdenticalResponseCount(ctx, identicalCount, observationType.String())
if err != nil {
rp.logger.Infow("Could not determine request error", "requestID", requestID, "err", err)
Expand Down
13 changes: 10 additions & 3 deletions libs/chainconsensus/oracle/reporting_plugin_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,19 @@ func (rpf *ReportingPluginFactory) NewReportingPlugin(
return nil, ocr3types.ReportingPluginInfo{}, fmt.Errorf("failed to read reporting plugin config: %w", err)
}

//nolint:gosec // F and N values will never exceed uint32 max
// Allow MinResponsesToAggregate to be set to 0 to use the default of F+1. If set, it must be between F+1 and N.
if offchainCfg.MinResponsesToAggregate != 0 && (offchainCfg.MinResponsesToAggregate < uint32(config.F+1) || offchainCfg.MinResponsesToAggregate > uint32(config.N)) {
return nil, ocr3types.ReportingPluginInfo{}, fmt.Errorf("invalid MinResponsesToAggregate: %d; must be gte to %d (F+1) and lte to %d (N)", offchainCfg.MinResponsesToAggregate, config.F+1, config.N)
}

rpf.logger.Infof("Using reporting plugin config: %+v", offchainCfg)

cfg := Config{
ReportingPluginConfig: config,
MaxBatchSize: int(offchainCfg.MaxBatchSize),
MaxObservationLength: int(offchainCfg.MaxObservationLengthBytes),
ReportingPluginConfig: config,
MaxBatchSize: int(offchainCfg.MaxBatchSize),
MaxObservationLength: int(offchainCfg.MaxObservationLengthBytes),
MinResponsesToAggregate: int(offchainCfg.MinResponsesToAggregate),
}

return newReportingPlugin(cfg, rpf.logger, rpf.blocksProvider, rpf.requestsStore, rpf.metrics), ocr3types.ReportingPluginInfo{
Expand Down
Loading
Loading