Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
}

sseListener.SubscribeChainReorgEvent(sched.HandleChainReorgEvent)
sseListener.SubscribeBlockEvent(sched.HandleBlockEvent)
sseListener.SubscribeHeadEvent(sched.HandleHeadEvent)

sched.SubscribeSlots(setFeeRecipient(eth2Cl, builderRegSvc.FeeRecipient))

Expand Down Expand Up @@ -614,6 +614,12 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

// Invalidate early-fetched (head-event-triggered) attestation data on reorgs, since the cached
// data was verified against a head that may no longer be canonical.
if featureset.Enabled(featureset.FetchAttOnBlock) || featureset.Enabled(featureset.FetchAttOnBlockWithDelay) {
sseListener.SubscribeChainReorgEvent(fetch.HandleChainReorg)
}

dutyDB := dutydb.NewMemDB(deadlinerFunc("dutydb"))

vapi, err := validatorapi.NewComponent(eth2Cl, allPubSharesByKey, nodeIdx.ShareIdx, builderRegSvc.FeeRecipient, conf.BuilderAPI, lock.TargetGasLimit)
Expand Down
7 changes: 5 additions & 2 deletions app/featureset/featureset.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ const (
// In case they differ, Charon does not sign the attestation.
ChainSplitHalt = "chain_split_halt"

// FetchAttOnBlock enables fetching attestation data upon block processing event from beacon node via SSE.
// Fallback to T=1/3 if block event is not received in time.
// FetchAttOnBlock enables fetching attestation data early upon the SSE "head" event from the beacon node
// (fork-choice head updated), rather than waiting for the scheduled deadline. Triggering on the head event
// (instead of the "block" event) ensures the beacon node's head has settled onto the new block before
// fetching, avoiding stale attestation data at epoch boundaries. Fetched data is dropped if it does not
// vote for the head from the event. Falls back to T=1/3 if no head event is received in time.
FetchAttOnBlock = "fetch_att_on_block"

// FetchAttOnBlockWithDelay enables fetching attestation data with 300ms delay.
Expand Down
46 changes: 35 additions & 11 deletions app/sse/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ package sse

import (
"context"
"encoding/hex"
"encoding/json"
"math"
"net/http"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -22,19 +24,19 @@ import (

type (
ChainReorgEventHandlerFunc func(ctx context.Context, epoch eth2p0.Epoch)
BlockEventHandlerFunc func(ctx context.Context, slot eth2p0.Slot, bnAddr string)
HeadEventHandlerFunc func(ctx context.Context, slot eth2p0.Slot, blockRoot eth2p0.Root, bnAddr string)
)

type Listener interface {
SubscribeChainReorgEvent(ChainReorgEventHandlerFunc)
SubscribeBlockEvent(BlockEventHandlerFunc)
SubscribeHeadEvent(HeadEventHandlerFunc)
}

type listener struct {
sync.Mutex

chainReorgSubs []ChainReorgEventHandlerFunc
blockSubs []BlockEventHandlerFunc
headSubs []HeadEventHandlerFunc
lastReorgEpoch eth2p0.Epoch

// blockGossipTimes stores timestamps of block gossip events per slot and beacon node address
Expand Down Expand Up @@ -64,7 +66,7 @@ func StartListener(ctx context.Context, eth2Cl eth2wrap.Client, addresses, heade

l := &listener{
chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0),
blockSubs: make([]BlockEventHandlerFunc, 0),
headSubs: make([]HeadEventHandlerFunc, 0),
blockGossipTimes: make(map[uint64]map[string]time.Time),
genesisTime: genesisTime,
slotDuration: slotDuration,
Expand Down Expand Up @@ -105,11 +107,11 @@ func (p *listener) SubscribeChainReorgEvent(handler ChainReorgEventHandlerFunc)
p.chainReorgSubs = append(p.chainReorgSubs, handler)
}

func (p *listener) SubscribeBlockEvent(handler BlockEventHandlerFunc) {
func (p *listener) SubscribeHeadEvent(handler HeadEventHandlerFunc) {
p.Lock()
defer p.Unlock()

p.blockSubs = append(p.blockSubs, handler)
p.headSubs = append(p.headSubs, handler)
}

func (p *listener) eventHandler(ctx context.Context, event *event, addr string) error {
Expand Down Expand Up @@ -167,6 +169,13 @@ func (p *listener) handleHeadEvent(ctx context.Context, event *event, addr strin
z.Str("prev_ddr", head.PreviousDutyDependentRoot),
z.Str("curr_ddr", head.CurrentDutyDependentRoot))

blockRoot, err := parseRoot(head.Block)
if err != nil {
return errors.Wrap(err, "parse head block root", z.Str("addr", addr))
}

p.notifyHeadEvent(ctx, eth2p0.Slot(slot), blockRoot, addr)

return nil
}

Expand Down Expand Up @@ -270,8 +279,6 @@ func (p *listener) handleBlockEvent(ctx context.Context, event *event, addr stri

sseBlockHistogram.WithLabelValues(addr).Observe(delay.Seconds())

p.notifyBlockEvent(ctx, eth2p0.Slot(slot), addr)

return nil
}

Expand All @@ -289,15 +296,32 @@ func (p *listener) notifyChainReorg(ctx context.Context, epoch eth2p0.Epoch) {
}
}

func (p *listener) notifyBlockEvent(ctx context.Context, slot eth2p0.Slot, bnAddr string) {
func (p *listener) notifyHeadEvent(ctx context.Context, slot eth2p0.Slot, blockRoot eth2p0.Root, bnAddr string) {
p.Lock()
defer p.Unlock()

for _, sub := range p.blockSubs {
sub(ctx, slot, bnAddr)
for _, sub := range p.headSubs {
sub(ctx, slot, blockRoot, bnAddr)
}
}

// parseRoot parses a 0x-prefixed hex string into an eth2p0.Root.
func parseRoot(hexRoot string) (eth2p0.Root, error) {
b, err := hex.DecodeString(strings.TrimPrefix(hexRoot, "0x"))
if err != nil {
return eth2p0.Root{}, errors.Wrap(err, "decode hex root")
}

var root eth2p0.Root
if len(b) != len(root) {
return eth2p0.Root{}, errors.New("invalid root length", z.Int("length", len(b)))
}

copy(root[:], b)

return root, nil
}

// computeDelay computes the delay between start of the slot and receiving the event.
func (p *listener) computeDelay(slot uint64, eventTS time.Time, delayOKFunc func(delay time.Duration) bool) (time.Duration, bool) {
slotStartTime := p.genesisTime.Add(time.Duration(slot) * p.slotDuration)
Expand Down
15 changes: 8 additions & 7 deletions app/sse/listener_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestHandleEvents(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
l := &listener{
chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0),
blockSubs: make([]BlockEventHandlerFunc, 0),
headSubs: make([]HeadEventHandlerFunc, 0),
slotDuration: 12 * time.Second,
slotsPerEpoch: 32,
genesisTime: time.Date(2020, 12, 1, 12, 0, 23, 0, time.UTC),
Expand Down Expand Up @@ -161,21 +161,22 @@ func TestSubscribeNotifyChainReorg(t *testing.T) {
require.Equal(t, eth2p0.Epoch(10), reportedEpochs[1])
}

func TestSubscribeNotifyBlockEvent(t *testing.T) {
func TestSubscribeNotifyHeadEvent(t *testing.T) {
ctx := t.Context()
l := &listener{
blockSubs: make([]BlockEventHandlerFunc, 0),
headSubs: make([]HeadEventHandlerFunc, 0),
}

reportedSlots := make([]eth2p0.Slot, 0)

l.SubscribeBlockEvent(func(_ context.Context, slot eth2p0.Slot, bnAddr string) {
l.SubscribeHeadEvent(func(_ context.Context, slot eth2p0.Slot, _ eth2p0.Root, bnAddr string) {
reportedSlots = append(reportedSlots, slot)
})

l.notifyBlockEvent(ctx, eth2p0.Slot(100), "http://test-bn:5052")
l.notifyBlockEvent(ctx, eth2p0.Slot(100), "http://test-bn:5052") // Duplicate should be reported (no dedup for block events)
l.notifyBlockEvent(ctx, eth2p0.Slot(101), "http://test-bn:5052")
root := eth2p0.Root{0x01}
l.notifyHeadEvent(ctx, eth2p0.Slot(100), root, "http://test-bn:5052")
l.notifyHeadEvent(ctx, eth2p0.Slot(100), root, "http://test-bn:5052") // Duplicate should be reported (no dedup for head events)
l.notifyHeadEvent(ctx, eth2p0.Slot(101), root, "http://test-bn:5052")

require.Len(t, reportedSlots, 3)
require.Equal(t, eth2p0.Slot(100), reportedSlots[0])
Expand Down
45 changes: 43 additions & 2 deletions core/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,64 @@ func (f *Fetcher) Subscribe(fn func(context.Context, core.Duty, core.UnsignedDat
}

// FetchOnly fetches attestation data and caches it without triggering subscribers.
// This allows early fetching on block events while deferring consensus to the scheduled time.
func (f *Fetcher) FetchOnly(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet, bnAddr string) error {
// This allows early fetching on head events while deferring consensus to the scheduled time.
// The data is only cached if it votes for headBlockRoot (the head from the SSE head event);
// otherwise it is dropped so consensus re-fetches fresh data at the scheduled deadline.
func (f *Fetcher) FetchOnly(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet, bnAddr string, headBlockRoot eth2p0.Root) error {
if duty.Type != core.DutyAttester {
return errors.New("unsupported duty", z.Str("type", duty.Type.String()))
}

// Evict stale cache entries: head events arrive in increasing slot order, so any cached slot
// below the current one is past its consensus deadline and was either already consumed by Fetch
// or permanently orphaned (e.g. a late head event arriving after consensus already ran). This
// bounds the cache without relying on reorg events.
f.attDataCache.Range(func(key, _ any) bool {
if s, ok := key.(uint64); ok && s < duty.Slot {
f.attDataCache.Delete(s)
}

return true
})

unsignedSet, err := f.fetchAttesterDataFrom(ctx, duty.Slot, defSet, bnAddr)
if err != nil {
return errors.Wrap(err, "fetch attester data for early cache")
}

// Verify the fetched attestation data votes for the head block reported by the SSE head event.
// If the beacon node served data for a different head (e.g. the head moved on between the event
// and the request), skip caching so consensus re-fetches fresh data at the scheduled deadline.
for _, data := range unsignedSet {
attData, ok := data.(core.AttestationData)
if !ok {
return errors.New("invalid attestation data type")
}

if attData.Data.BeaconBlockRoot != headBlockRoot {
log.Debug(ctx, "Skipping early attestation cache: fetched head differs from head event",
z.U64("slot", duty.Slot), z.Str("bn_addr", bnAddr),
z.Str("head_event_root", headBlockRoot.String()),
z.Str("fetched_root", attData.Data.BeaconBlockRoot.String()))

return nil
}
}

f.attDataCache.Store(duty.Slot, unsignedSet)
log.Debug(ctx, "Early attestation data fetched and cached", z.U64("slot", duty.Slot), z.Str("bn_addr", bnAddr))

return nil
}

// HandleChainReorg invalidates the early-fetch cache upon a chain reorg, since cached
// attestation data was verified against a head that may no longer be canonical.
// Consensus then re-fetches fresh data at the scheduled deadline.
func (f *Fetcher) HandleChainReorg(ctx context.Context, epoch eth2p0.Epoch) {
f.attDataCache.Clear()
log.Debug(ctx, "Early attestation data cache invalidated due to chain reorg", z.U64("epoch", uint64(epoch)))
}

// Fetch triggers fetching of a proposed duty data set.
func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet) error {
var (
Expand Down
Loading
Loading