diff --git a/app/app.go b/app/app.go index 227885c9c0..ccc14ea69f 100644 --- a/app/app.go +++ b/app/app.go @@ -513,7 +513,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, } sseListener.SubscribeChainReorgEvent(sched.HandleChainReorgEvent) - sseListener.SubscribeBlockEvent(sched.HandleBlockEvent) + sseListener.SubscribeHeadEvent(sched.HandleHeadEvent) sched.SubscribeSlots(setFeeRecipient(eth2Cl, builderRegSvc.FeeRecipient)) @@ -614,6 +614,12 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, return err } + // Invalidate early-fetched (head-event-triggered) attestation data on reorgs, since the cached + // data was verified against a head that may no longer be canonical. + if featureset.Enabled(featureset.FetchAttOnBlock) || featureset.Enabled(featureset.FetchAttOnBlockWithDelay) { + sseListener.SubscribeChainReorgEvent(fetch.HandleChainReorg) + } + dutyDB := dutydb.NewMemDB(deadlinerFunc("dutydb")) vapi, err := validatorapi.NewComponent(eth2Cl, allPubSharesByKey, nodeIdx.ShareIdx, builderRegSvc.FeeRecipient, conf.BuilderAPI, lock.TargetGasLimit) diff --git a/app/featureset/featureset.go b/app/featureset/featureset.go index 569fb621cb..07121b95a6 100644 --- a/app/featureset/featureset.go +++ b/app/featureset/featureset.go @@ -71,8 +71,11 @@ const ( // In case they differ, Charon does not sign the attestation. ChainSplitHalt = "chain_split_halt" - // FetchAttOnBlock enables fetching attestation data upon block processing event from beacon node via SSE. - // Fallback to T=1/3 if block event is not received in time. + // FetchAttOnBlock enables fetching attestation data early upon the SSE "head" event from the beacon node + // (fork-choice head updated), rather than waiting for the scheduled deadline. Triggering on the head event + // (instead of the "block" event) ensures the beacon node's head has settled onto the new block before + // fetching, avoiding stale attestation data at epoch boundaries. Fetched data is dropped if it does not + // vote for the head from the event. Falls back to T=1/3 if no head event is received in time. FetchAttOnBlock = "fetch_att_on_block" // FetchAttOnBlockWithDelay enables fetching attestation data with 300ms delay. diff --git a/app/sse/listener.go b/app/sse/listener.go index 3e50d2e3bb..2f6f2fc33b 100644 --- a/app/sse/listener.go +++ b/app/sse/listener.go @@ -4,10 +4,12 @@ package sse import ( "context" + "encoding/hex" "encoding/json" "math" "net/http" "strconv" + "strings" "sync" "time" @@ -22,19 +24,19 @@ import ( type ( ChainReorgEventHandlerFunc func(ctx context.Context, epoch eth2p0.Epoch) - BlockEventHandlerFunc func(ctx context.Context, slot eth2p0.Slot, bnAddr string) + HeadEventHandlerFunc func(ctx context.Context, slot eth2p0.Slot, blockRoot eth2p0.Root, bnAddr string) ) type Listener interface { SubscribeChainReorgEvent(ChainReorgEventHandlerFunc) - SubscribeBlockEvent(BlockEventHandlerFunc) + SubscribeHeadEvent(HeadEventHandlerFunc) } type listener struct { sync.Mutex chainReorgSubs []ChainReorgEventHandlerFunc - blockSubs []BlockEventHandlerFunc + headSubs []HeadEventHandlerFunc lastReorgEpoch eth2p0.Epoch // blockGossipTimes stores timestamps of block gossip events per slot and beacon node address @@ -64,7 +66,7 @@ func StartListener(ctx context.Context, eth2Cl eth2wrap.Client, addresses, heade l := &listener{ chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0), - blockSubs: make([]BlockEventHandlerFunc, 0), + headSubs: make([]HeadEventHandlerFunc, 0), blockGossipTimes: make(map[uint64]map[string]time.Time), genesisTime: genesisTime, slotDuration: slotDuration, @@ -105,11 +107,11 @@ func (p *listener) SubscribeChainReorgEvent(handler ChainReorgEventHandlerFunc) p.chainReorgSubs = append(p.chainReorgSubs, handler) } -func (p *listener) SubscribeBlockEvent(handler BlockEventHandlerFunc) { +func (p *listener) SubscribeHeadEvent(handler HeadEventHandlerFunc) { p.Lock() defer p.Unlock() - p.blockSubs = append(p.blockSubs, handler) + p.headSubs = append(p.headSubs, handler) } func (p *listener) eventHandler(ctx context.Context, event *event, addr string) error { @@ -167,6 +169,13 @@ func (p *listener) handleHeadEvent(ctx context.Context, event *event, addr strin z.Str("prev_ddr", head.PreviousDutyDependentRoot), z.Str("curr_ddr", head.CurrentDutyDependentRoot)) + blockRoot, err := parseRoot(head.Block) + if err != nil { + return errors.Wrap(err, "parse head block root", z.Str("addr", addr)) + } + + p.notifyHeadEvent(ctx, eth2p0.Slot(slot), blockRoot, addr) + return nil } @@ -270,8 +279,6 @@ func (p *listener) handleBlockEvent(ctx context.Context, event *event, addr stri sseBlockHistogram.WithLabelValues(addr).Observe(delay.Seconds()) - p.notifyBlockEvent(ctx, eth2p0.Slot(slot), addr) - return nil } @@ -289,15 +296,32 @@ func (p *listener) notifyChainReorg(ctx context.Context, epoch eth2p0.Epoch) { } } -func (p *listener) notifyBlockEvent(ctx context.Context, slot eth2p0.Slot, bnAddr string) { +func (p *listener) notifyHeadEvent(ctx context.Context, slot eth2p0.Slot, blockRoot eth2p0.Root, bnAddr string) { p.Lock() defer p.Unlock() - for _, sub := range p.blockSubs { - sub(ctx, slot, bnAddr) + for _, sub := range p.headSubs { + sub(ctx, slot, blockRoot, bnAddr) } } +// parseRoot parses a 0x-prefixed hex string into an eth2p0.Root. +func parseRoot(hexRoot string) (eth2p0.Root, error) { + b, err := hex.DecodeString(strings.TrimPrefix(hexRoot, "0x")) + if err != nil { + return eth2p0.Root{}, errors.Wrap(err, "decode hex root") + } + + var root eth2p0.Root + if len(b) != len(root) { + return eth2p0.Root{}, errors.New("invalid root length", z.Int("length", len(b))) + } + + copy(root[:], b) + + return root, nil +} + // computeDelay computes the delay between start of the slot and receiving the event. func (p *listener) computeDelay(slot uint64, eventTS time.Time, delayOKFunc func(delay time.Duration) bool) (time.Duration, bool) { slotStartTime := p.genesisTime.Add(time.Duration(slot) * p.slotDuration) diff --git a/app/sse/listener_internal_test.go b/app/sse/listener_internal_test.go index 77caacff56..731dd68aeb 100644 --- a/app/sse/listener_internal_test.go +++ b/app/sse/listener_internal_test.go @@ -116,7 +116,7 @@ func TestHandleEvents(t *testing.T) { t.Run(test.name, func(t *testing.T) { l := &listener{ chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0), - blockSubs: make([]BlockEventHandlerFunc, 0), + headSubs: make([]HeadEventHandlerFunc, 0), slotDuration: 12 * time.Second, slotsPerEpoch: 32, genesisTime: time.Date(2020, 12, 1, 12, 0, 23, 0, time.UTC), @@ -161,21 +161,22 @@ func TestSubscribeNotifyChainReorg(t *testing.T) { require.Equal(t, eth2p0.Epoch(10), reportedEpochs[1]) } -func TestSubscribeNotifyBlockEvent(t *testing.T) { +func TestSubscribeNotifyHeadEvent(t *testing.T) { ctx := t.Context() l := &listener{ - blockSubs: make([]BlockEventHandlerFunc, 0), + headSubs: make([]HeadEventHandlerFunc, 0), } reportedSlots := make([]eth2p0.Slot, 0) - l.SubscribeBlockEvent(func(_ context.Context, slot eth2p0.Slot, bnAddr string) { + l.SubscribeHeadEvent(func(_ context.Context, slot eth2p0.Slot, _ eth2p0.Root, bnAddr string) { reportedSlots = append(reportedSlots, slot) }) - l.notifyBlockEvent(ctx, eth2p0.Slot(100), "http://test-bn:5052") - l.notifyBlockEvent(ctx, eth2p0.Slot(100), "http://test-bn:5052") // Duplicate should be reported (no dedup for block events) - l.notifyBlockEvent(ctx, eth2p0.Slot(101), "http://test-bn:5052") + root := eth2p0.Root{0x01} + l.notifyHeadEvent(ctx, eth2p0.Slot(100), root, "http://test-bn:5052") + l.notifyHeadEvent(ctx, eth2p0.Slot(100), root, "http://test-bn:5052") // Duplicate should be reported (no dedup for head events) + l.notifyHeadEvent(ctx, eth2p0.Slot(101), root, "http://test-bn:5052") require.Len(t, reportedSlots, 3) require.Equal(t, eth2p0.Slot(100), reportedSlots[0]) diff --git a/core/fetcher/fetcher.go b/core/fetcher/fetcher.go index 97a31686bc..5d30e1d959 100644 --- a/core/fetcher/fetcher.go +++ b/core/fetcher/fetcher.go @@ -55,23 +55,64 @@ func (f *Fetcher) Subscribe(fn func(context.Context, core.Duty, core.UnsignedDat } // FetchOnly fetches attestation data and caches it without triggering subscribers. -// This allows early fetching on block events while deferring consensus to the scheduled time. -func (f *Fetcher) FetchOnly(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet, bnAddr string) error { +// This allows early fetching on head events while deferring consensus to the scheduled time. +// The data is only cached if it votes for headBlockRoot (the head from the SSE head event); +// otherwise it is dropped so consensus re-fetches fresh data at the scheduled deadline. +func (f *Fetcher) FetchOnly(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet, bnAddr string, headBlockRoot eth2p0.Root) error { if duty.Type != core.DutyAttester { return errors.New("unsupported duty", z.Str("type", duty.Type.String())) } + // Evict stale cache entries: head events arrive in increasing slot order, so any cached slot + // below the current one is past its consensus deadline and was either already consumed by Fetch + // or permanently orphaned (e.g. a late head event arriving after consensus already ran). This + // bounds the cache without relying on reorg events. + f.attDataCache.Range(func(key, _ any) bool { + if s, ok := key.(uint64); ok && s < duty.Slot { + f.attDataCache.Delete(s) + } + + return true + }) + unsignedSet, err := f.fetchAttesterDataFrom(ctx, duty.Slot, defSet, bnAddr) if err != nil { return errors.Wrap(err, "fetch attester data for early cache") } + // Verify the fetched attestation data votes for the head block reported by the SSE head event. + // If the beacon node served data for a different head (e.g. the head moved on between the event + // and the request), skip caching so consensus re-fetches fresh data at the scheduled deadline. + for _, data := range unsignedSet { + attData, ok := data.(core.AttestationData) + if !ok { + return errors.New("invalid attestation data type") + } + + if attData.Data.BeaconBlockRoot != headBlockRoot { + log.Debug(ctx, "Skipping early attestation cache: fetched head differs from head event", + z.U64("slot", duty.Slot), z.Str("bn_addr", bnAddr), + z.Str("head_event_root", headBlockRoot.String()), + z.Str("fetched_root", attData.Data.BeaconBlockRoot.String())) + + return nil + } + } + f.attDataCache.Store(duty.Slot, unsignedSet) log.Debug(ctx, "Early attestation data fetched and cached", z.U64("slot", duty.Slot), z.Str("bn_addr", bnAddr)) return nil } +// HandleChainReorg invalidates the early-fetch cache upon a chain reorg, since cached +// attestation data was verified against a head that may no longer be canonical. +// Consensus then re-fetches fresh data at the scheduled deadline. +func (f *Fetcher) HandleChainReorg(ctx context.Context, epoch eth2p0.Epoch) { + f.attDataCache.Clear() + log.Debug(ctx, "Early attestation data cache invalidated due to chain reorg", z.U64("epoch", uint64(epoch))) +} + // Fetch triggers fetching of a proposed duty data set. func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet) error { var ( diff --git a/core/fetcher/fetcher_test.go b/core/fetcher/fetcher_test.go index 07f8d12ce8..94a995463a 100644 --- a/core/fetcher/fetcher_test.go +++ b/core/fetcher/fetcher_test.go @@ -678,7 +678,7 @@ func TestFetchOnly(t *testing.T) { }) // FetchOnly should cache the attestation data without triggering subscribers - err = fetch.FetchOnly(ctx, duty, defSet, bmock.Address()) + err = fetch.FetchOnly(ctx, duty, defSet, bmock.Address(), earlyHeadRoot(ctx, t, bmock, slot)) require.NoError(t, err) require.False(t, subscriberCalled, "FetchOnly should not trigger subscribers") @@ -711,7 +711,7 @@ func TestFetchOnly(t *testing.T) { fetch := mustCreateFetcher(t, bmock) proposerDuty := core.NewProposerDuty(slot) - err = fetch.FetchOnly(ctx, proposerDuty, defSet, bmock.Address()) + err = fetch.FetchOnly(ctx, proposerDuty, defSet, bmock.Address(), eth2p0.Root{}) require.Error(t, err) require.Contains(t, err.Error(), "unsupported duty") }) @@ -723,7 +723,7 @@ func TestFetchOnly(t *testing.T) { fetch := mustCreateFetcher(t, bmock) // FetchOnly should cache the data - err = fetch.FetchOnly(ctx, duty, defSet, bmock.Address()) + err = fetch.FetchOnly(ctx, duty, defSet, bmock.Address(), earlyHeadRoot(ctx, t, bmock, slot)) require.NoError(t, err) // Now call Fetch with the cached data - should work fine @@ -737,4 +737,184 @@ func TestFetchOnly(t *testing.T) { err = fetch.Fetch(ctx, duty, defSet) require.NoError(t, err) }) + + t.Run("head mismatch skips cache", func(t *testing.T) { + bmock, err := beaconmock.New(t.Context()) + require.NoError(t, err) + + fetch := mustCreateFetcher(t, bmock) + + // FetchOnly with a head root that doesn't match the fetched data must not cache it. + err = fetch.FetchOnly(ctx, duty, defSet, bmock.Address(), eth2p0.Root{0xde, 0xad}) + require.NoError(t, err) + + // Fetch falls back to re-fetching fresh data (cache was not populated). + called := false + + fetch.Subscribe(func(ctx context.Context, resDuty core.Duty, resDataSet core.UnsignedDataSet) error { + called = true + + require.Len(t, resDataSet, 2) + + return nil + }) + + err = fetch.Fetch(ctx, duty, defSet) + require.NoError(t, err) + require.True(t, called) + }) +} + +// earlyHeadRoot returns the beacon block root the beacon mock votes for at the given slot, +// used as the expected head root for FetchOnly's verification. +func earlyHeadRoot(ctx context.Context, t *testing.T, bmock beaconmock.Mock, slot uint64) eth2p0.Root { + t.Helper() + + resp, err := bmock.AttestationData(ctx, ð2api.AttestationDataOpts{Slot: eth2p0.Slot(slot), CommitteeIndex: 0}) + require.NoError(t, err) + + return resp.Data.BeaconBlockRoot +} + +func TestFetchOnlyReorgInvalidatesCache(t *testing.T) { + ctx := context.Background() + + const ( + slot = 1 + vIdx = 2 + notZero = 99 + ) + + pubkey := testutil.RandomCorePubKey(t) + attDuty := eth2v1.AttesterDuty{ + Slot: slot, + ValidatorIndex: vIdx, + CommitteeIndex: vIdx, + CommitteeLength: notZero, + CommitteesAtSlot: notZero, + } + defSet := core.DutyDefinitionSet{pubkey: core.NewAttesterDefinition(&attDuty)} + duty := core.NewAttesterDuty(slot) + + bmock, err := beaconmock.New(t.Context()) + require.NoError(t, err) + + // The beacon node returns originalRoot as head until a reorg, then sentinel (the new canonical head). + var returnSentinel bool + + originalRoot := eth2p0.Root{0x11} + sentinel := eth2p0.Root{0xaa} + bmock.AttestationDataFunc = func(_ context.Context, s eth2p0.Slot, idx eth2p0.CommitteeIndex) (*eth2p0.AttestationData, error) { + root := originalRoot + if returnSentinel { + root = sentinel + } + + return ð2p0.AttestationData{ + Slot: s, + Index: idx, + BeaconBlockRoot: root, + Source: ð2p0.Checkpoint{Root: eth2p0.Root{0x01}}, + Target: ð2p0.Checkpoint{Root: eth2p0.Root{0x02}}, + }, nil + } + + fetch := mustCreateFetcher(t, bmock) + + // Early-fetch and cache the attestation data for the slot. + err = fetch.FetchOnly(ctx, duty, defSet, bmock.Address(), earlyHeadRoot(ctx, t, bmock, slot)) + require.NoError(t, err) + + // A reorg invalidates the cache, and the beacon node now serves a different (post-reorg) head. + fetch.HandleChainReorg(ctx, 0) + + returnSentinel = true + + // Fetch must re-fetch fresh data (cache was cleared), returning the new head rather than the stale cached one. + called := false + + fetch.Subscribe(func(_ context.Context, _ core.Duty, resDataSet core.UnsignedDataSet) error { + called = true + attData, ok := resDataSet[pubkey].(core.AttestationData) + require.True(t, ok) + require.Equal(t, sentinel, attData.Data.BeaconBlockRoot) + + return nil + }) + + err = fetch.Fetch(ctx, duty, defSet) + require.NoError(t, err) + require.True(t, called) +} + +func TestFetchOnlyEvictsStaleSlots(t *testing.T) { + ctx := context.Background() + + const ( + slot1 = 1 + slot2 = 2 + vIdx = 2 + notZero = 99 + ) + + pubkey := testutil.RandomCorePubKey(t) + defSetForSlot := func(slot uint64) core.DutyDefinitionSet { + return core.DutyDefinitionSet{pubkey: core.NewAttesterDefinition(ð2v1.AttesterDuty{ + Slot: eth2p0.Slot(slot), + ValidatorIndex: vIdx, + CommitteeIndex: vIdx, + CommitteeLength: notZero, + CommitteesAtSlot: notZero, + })} + } + + bmock, err := beaconmock.New(t.Context()) + require.NoError(t, err) + + // The beacon node returns originalRoot as head until toggled, then sentinel. + var returnSentinel bool + + originalRoot := eth2p0.Root{0x11} + sentinel := eth2p0.Root{0xaa} + bmock.AttestationDataFunc = func(_ context.Context, s eth2p0.Slot, idx eth2p0.CommitteeIndex) (*eth2p0.AttestationData, error) { + root := originalRoot + if returnSentinel { + root = sentinel + } + + return ð2p0.AttestationData{ + Slot: s, + Index: idx, + BeaconBlockRoot: root, + Source: ð2p0.Checkpoint{Root: eth2p0.Root{0x01}}, + Target: ð2p0.Checkpoint{Root: eth2p0.Root{0x02}}, + }, nil + } + + fetch := mustCreateFetcher(t, bmock) + + // Early-fetch and cache slot1. + err = fetch.FetchOnly(ctx, core.NewAttesterDuty(slot1), defSetForSlot(slot1), bmock.Address(), earlyHeadRoot(ctx, t, bmock, slot1)) + require.NoError(t, err) + + // A later head event (slot2) must evict the stale slot1 entry. + returnSentinel = true + err = fetch.FetchOnly(ctx, core.NewAttesterDuty(slot2), defSetForSlot(slot2), bmock.Address(), earlyHeadRoot(ctx, t, bmock, slot2)) + require.NoError(t, err) + + // Fetching slot1 must re-fetch fresh (its cache was evicted), returning the new head, not the stale one. + called := false + + fetch.Subscribe(func(_ context.Context, _ core.Duty, resDataSet core.UnsignedDataSet) error { + called = true + attData, ok := resDataSet[pubkey].(core.AttestationData) + require.True(t, ok) + require.Equal(t, sentinel, attData.Data.BeaconBlockRoot) + + return nil + }) + + err = fetch.Fetch(ctx, core.NewAttesterDuty(slot1), defSetForSlot(slot1)) + require.NoError(t, err) + require.True(t, called) } diff --git a/core/interfaces.go b/core/interfaces.go index 03925eed88..2ce712072d 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -26,7 +26,7 @@ type Scheduler interface { GetDutyDefinition(context.Context, Duty) (DutyDefinitionSet, error) // RegisterFetcherFetchOnly registers the fetcher's FetchOnly method. - RegisterFetcherFetchOnly(func(context.Context, Duty, DutyDefinitionSet, string) error) + RegisterFetcherFetchOnly(func(context.Context, Duty, DutyDefinitionSet, string, eth2p0.Root) error) } // Fetcher fetches proposed unsigned duty data. @@ -35,7 +35,7 @@ type Fetcher interface { Fetch(context.Context, Duty, DutyDefinitionSet) error // FetchOnly fetches attestation data and caches it without triggering subscribers. - FetchOnly(context.Context, Duty, DutyDefinitionSet, string) error + FetchOnly(context.Context, Duty, DutyDefinitionSet, string, eth2p0.Root) error // Subscribe registers a callback for proposed unsigned duty data sets. Subscribe(func(context.Context, Duty, UnsignedDataSet) error) @@ -248,9 +248,9 @@ type wireFuncs struct { SchedulerSubscribeDuties func(func(context.Context, Duty, DutyDefinitionSet) error) SchedulerSubscribeSlots func(func(context.Context, Slot) error) SchedulerGetDutyDefinition func(context.Context, Duty) (DutyDefinitionSet, error) - SchedulerRegisterFetcherFetchOnly func(func(context.Context, Duty, DutyDefinitionSet, string) error) + SchedulerRegisterFetcherFetchOnly func(func(context.Context, Duty, DutyDefinitionSet, string, eth2p0.Root) error) FetcherFetch func(context.Context, Duty, DutyDefinitionSet) error - FetcherFetchOnly func(context.Context, Duty, DutyDefinitionSet, string) error + FetcherFetchOnly func(context.Context, Duty, DutyDefinitionSet, string, eth2p0.Root) error FetcherSubscribe func(func(context.Context, Duty, UnsignedDataSet) error) FetcherRegisterAggSigDB func(func(context.Context, Duty, PubKey) (SignedData, error)) FetcherRegisterAwaitAttData func(func(ctx context.Context, slot uint64, commIdx uint64) (*eth2p0.AttestationData, error)) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index e297869dd2..7767982d7d 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -95,11 +95,11 @@ type Scheduler struct { dutiesMutex sync.RWMutex dutySubs []func(context.Context, core.Duty, core.DutyDefinitionSet) error slotSubs []func(context.Context, core.Slot) error - fetcherFetchOnly func(context.Context, core.Duty, core.DutyDefinitionSet, string) error + fetcherFetchOnly func(context.Context, core.Duty, core.DutyDefinitionSet, string, eth2p0.Root) error builderEnabled bool schedSlotFunc schedSlotFunc epochResolved map[uint64]chan struct{} // Notification channels for epoch resolution - eventTriggeredAttestations sync.Map // Track attestation duties triggered via sse block event (map[uint64]bool) + eventTriggeredAttestations sync.Map // Track attestation duties triggered via sse head event (map[uint64]bool) } // SubscribeDuties subscribes a callback function for triggered duties. @@ -110,7 +110,7 @@ func (s *Scheduler) SubscribeDuties(fn func(context.Context, core.Duty, core.Dut // RegisterFetcherFetchOnly registers the fetcher's FetchOnly method for early attestation fetching. // Note this should be called *before* Start. -func (s *Scheduler) RegisterFetcherFetchOnly(fn func(context.Context, core.Duty, core.DutyDefinitionSet, string) error) { +func (s *Scheduler) RegisterFetcherFetchOnly(fn func(context.Context, core.Duty, core.DutyDefinitionSet, string, eth2p0.Root) error) { s.fetcherFetchOnly = fn } @@ -197,8 +197,10 @@ func (s *Scheduler) HandleChainReorgEvent(ctx context.Context, epoch eth2p0.Epoc } } -// HandleBlockEvent handles SSE "block" events (block imported to fork choice) and triggers early attestation data fetching. -func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot, bnAddr string) { +// HandleHeadEvent handles SSE "head" events (fork-choice head updated) and triggers early attestation data fetching. +// Triggering on the head event (rather than the block event) ensures the beacon node's head has settled onto the +// new block before we fetch, avoiding stale attestation data at epoch boundaries. +func (s *Scheduler) HandleHeadEvent(ctx context.Context, slot eth2p0.Slot, blockRoot eth2p0.Root, bnAddr string) { if s.fetcherFetchOnly == nil { log.Warn(ctx, "Early attestation data fetch skipped, fetcher fetch-only function not registered", nil, z.U64("slot", uint64(slot)), z.Str("bn_addr", bnAddr)) return @@ -232,14 +234,14 @@ func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot, bnAd return } - log.Debug(ctx, "Early attestation data fetch triggered by SSE block event", z.U64("slot", uint64(slot)), z.Str("bn_addr", bnAddr)) + log.Debug(ctx, "Early attestation data fetch triggered by SSE head event", z.U64("slot", uint64(slot)), z.Str("bn_addr", bnAddr)) // Fetch attestation data early without triggering consensus // Use background context to prevent cancellation if SSE connection drops //nolint:gosec // The use of background context is intentional. go func() { fetchCtx := log.CopyFields(context.Background(), ctx) - if err := s.fetcherFetchOnly(fetchCtx, duty, clonedDefSet, bnAddr); err != nil { + if err := s.fetcherFetchOnly(fetchCtx, duty, clonedDefSet, bnAddr, blockRoot); err != nil { log.Warn(fetchCtx, "Early attestation data fetch failed", err, z.U64("slot", uint64(slot)), z.Str("bn_addr", bnAddr)) } }() @@ -349,7 +351,7 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { // Special handling for attester duties when FetchAttOnBlock features are enabled if duty.Type == core.DutyAttester && (featureset.Enabled(featureset.FetchAttOnBlock) || featureset.Enabled(featureset.FetchAttOnBlockWithDelay)) { - if !s.waitForBlockEventOrTimeout(dutyCtx, slot) { + if !s.waitForEarlyFetchOrTimeout(dutyCtx, slot) { return // context cancelled } @@ -402,10 +404,12 @@ func delaySlotOffset(ctx context.Context, slot core.Slot, duty core.Duty, delayF } } -// waitForBlockEventOrTimeout waits until the fallback timeout is reached. +// waitForEarlyFetchOrTimeout waits until the fallback timeout is reached. +// The head-event-triggered early fetch (HandleHeadEvent) runs concurrently and populates the +// attestation data cache before this deadline in the happy path. // If FetchAttOnBlockWithDelay is enabled, timeout is T=1/3+300ms, otherwise T=1/3. // Returns false if the context is cancelled, true otherwise. -func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Slot) bool { +func (s *Scheduler) waitForEarlyFetchOrTimeout(ctx context.Context, slot core.Slot) bool { // Calculate fallback timeout fn, ok := slotOffsets[core.DutyAttester] if !ok { @@ -428,10 +432,10 @@ func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Sl // Check if block event triggered early fetch if _, triggered := s.eventTriggeredAttestations.Load(slot.Slot); !triggered { if featureset.Enabled(featureset.FetchAttOnBlockWithDelay) { - log.Debug(ctx, "Proceeding with attestation at T=1/3+300ms (no early block event)", + log.Debug(ctx, "Proceeding with attestation at T=1/3+300ms (no early head event)", z.U64("slot", slot.Slot)) } else { - log.Debug(ctx, "Proceeding with attestation at T=1/3 (no early block event)", + log.Debug(ctx, "Proceeding with attestation at T=1/3 (no early head event)", z.U64("slot", slot.Slot)) } }