diff --git a/app/app.go b/app/app.go index d5f11e83bd..634f729a91 100644 --- a/app/app.go +++ b/app/app.go @@ -477,6 +477,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, } sseListener.SubscribeChainReorgEvent(sched.HandleChainReorgEvent) + sseListener.SubscribeBlockEvent(sched.HandleBlockEvent) feeRecipientFunc := func(pubkey core.PubKey) string { return feeRecipientAddrByCorePubkey[pubkey] diff --git a/app/featureset/config.go b/app/featureset/config.go index 95f2957b51..e6367f7eb6 100644 --- a/app/featureset/config.go +++ b/app/featureset/config.go @@ -117,14 +117,14 @@ func EnableForT(t *testing.T, feature Feature) { initMu.Lock() defer initMu.Unlock() - cache := state[feature] + state[feature] = enable t.Cleanup(func() { + initMu.Lock() + defer initMu.Unlock() state[feature] = cache }) - - state[feature] = enable } // DisableForT disables a feature for testing. @@ -133,12 +133,12 @@ func DisableForT(t *testing.T, feature Feature) { initMu.Lock() defer initMu.Unlock() - cache := state[feature] + state[feature] = disable t.Cleanup(func() { + initMu.Lock() + defer initMu.Unlock() state[feature] = cache }) - - state[feature] = disable } diff --git a/app/featureset/featureset.go b/app/featureset/featureset.go index dabc3422f7..f9c13e2271 100644 --- a/app/featureset/featureset.go +++ b/app/featureset/featureset.go @@ -70,6 +70,10 @@ const ( // ChainSplitHalt compares locally fetched attestation's target and source to leader's proposed target and source attestation. // In case they differ, Charon does not sign the attestation. ChainSplitHalt = "chain_split_halt" + + // FetchAttOnBlock enables fetching attestation data upon block processing event from beacon node via SSE. + // Fallback to T=1/3+300ms if block event is not received in time. + FetchAttOnBlock = "fetch_att_on_block" ) var ( @@ -88,6 +92,7 @@ var ( QUIC: statusAlpha, FetchOnlyCommIdx0: statusAlpha, ChainSplitHalt: statusAlpha, + FetchAttOnBlock: statusAlpha, // Add all features and their status here. } diff --git a/app/sse/listener.go b/app/sse/listener.go index a16afc37f9..5b993054d2 100644 --- a/app/sse/listener.go +++ b/app/sse/listener.go @@ -21,15 +21,18 @@ import ( ) type ChainReorgEventHandlerFunc func(ctx context.Context, epoch eth2p0.Epoch) +type BlockEventHandlerFunc func(ctx context.Context, slot eth2p0.Slot) type Listener interface { SubscribeChainReorgEvent(ChainReorgEventHandlerFunc) + SubscribeBlockEvent(BlockEventHandlerFunc) } type listener struct { sync.Mutex chainReorgSubs []ChainReorgEventHandlerFunc + blockSubs []BlockEventHandlerFunc lastReorgEpoch eth2p0.Epoch // immutable fields @@ -55,6 +58,7 @@ func StartListener(ctx context.Context, eth2Cl eth2wrap.Client, addresses, heade l := &listener{ chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0), + blockSubs: make([]BlockEventHandlerFunc, 0), genesisTime: genesisTime, slotDuration: slotDuration, slotsPerEpoch: slotsPerEpoch, @@ -94,6 +98,13 @@ func (p *listener) SubscribeChainReorgEvent(handler ChainReorgEventHandlerFunc) p.chainReorgSubs = append(p.chainReorgSubs, handler) } +func (p *listener) SubscribeBlockEvent(handler BlockEventHandlerFunc) { + p.Lock() + defer p.Unlock() + + p.blockSubs = append(p.blockSubs, handler) +} + func (p *listener) eventHandler(ctx context.Context, event *event, addr string) error { switch event.Event { case sseHeadEvent: @@ -247,6 +258,8 @@ func (p *listener) handleBlockEvent(ctx context.Context, event *event, addr stri sseBlockHistogram.WithLabelValues(addr).Observe(delay.Seconds()) + p.notifyBlockEvent(ctx, eth2p0.Slot(slot)) + return nil } @@ -264,6 +277,15 @@ func (p *listener) notifyChainReorg(ctx context.Context, epoch eth2p0.Epoch) { } } +func (p *listener) notifyBlockEvent(ctx context.Context, slot eth2p0.Slot) { + p.Lock() + defer p.Unlock() + + for _, sub := range p.blockSubs { + sub(ctx, slot) + } +} + // computeDelay computes the delay between start of the slot and receiving the event. func (p *listener) computeDelay(slot uint64, eventTS time.Time, delayOKFunc func(delay time.Duration) bool) (time.Duration, bool) { slotStartTime := p.genesisTime.Add(time.Duration(slot) * p.slotDuration) diff --git a/app/sse/listener_internal_test.go b/app/sse/listener_internal_test.go index 1f03f7a9ac..8ecbe8338a 100644 --- a/app/sse/listener_internal_test.go +++ b/app/sse/listener_internal_test.go @@ -83,12 +83,40 @@ func TestHandleEvents(t *testing.T) { }, err: errors.New("parse depth to uint64"), }, + { + name: "block event happy path", + event: &event{ + Event: sseBlockEvent, + Data: []byte(`{"slot":"42", "block":"0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "execution_optimistic": false}`), + Timestamp: time.Now(), + }, + err: nil, + }, + { + name: "block event incompatible data payload", + event: &event{ + Event: sseBlockEvent, + Data: []byte(`"error"`), + Timestamp: time.Now(), + }, + err: errors.New("unmarshal SSE block event"), + }, + { + name: "block event parse slot", + event: &event{ + Event: sseBlockEvent, + Data: []byte(`{"slot":"invalid", "block":"0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "execution_optimistic": false}`), + Timestamp: time.Now(), + }, + err: errors.New("parse slot to uint64"), + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { l := &listener{ chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0), + blockSubs: make([]BlockEventHandlerFunc, 0), slotDuration: 12 * time.Second, slotsPerEpoch: 32, genesisTime: time.Date(2020, 12, 1, 12, 0, 23, 0, time.UTC), @@ -133,6 +161,28 @@ func TestSubscribeNotifyChainReorg(t *testing.T) { require.Equal(t, eth2p0.Epoch(10), reportedEpochs[1]) } +func TestSubscribeNotifyBlockEvent(t *testing.T) { + ctx := t.Context() + l := &listener{ + blockSubs: make([]BlockEventHandlerFunc, 0), + } + + reportedSlots := make([]eth2p0.Slot, 0) + + l.SubscribeBlockEvent(func(_ context.Context, slot eth2p0.Slot) { + reportedSlots = append(reportedSlots, slot) + }) + + l.notifyBlockEvent(ctx, eth2p0.Slot(100)) + l.notifyBlockEvent(ctx, eth2p0.Slot(100)) // Duplicate should be reported (no dedup for block events) + l.notifyBlockEvent(ctx, eth2p0.Slot(101)) + + require.Len(t, reportedSlots, 3) + require.Equal(t, eth2p0.Slot(100), reportedSlots[0]) + require.Equal(t, eth2p0.Slot(100), reportedSlots[1]) + require.Equal(t, eth2p0.Slot(101), reportedSlots[2]) +} + func TestComputeDelay(t *testing.T) { genesisTimeString := "2020-12-01T12:00:23+00:00" genesisTime, err := time.Parse(time.RFC3339, genesisTimeString) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 91cb96bc70..2c468ade87 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -116,6 +116,7 @@ type Scheduler struct { builderEnabled bool schedSlotFunc schedSlotFunc epochResolved map[uint64]chan struct{} // Notification channels for epoch resolution + eventTriggeredAttestations sync.Map // Track attestation duties triggered via sse block event (map[uint64]bool) } // SubscribeDuties subscribes a callback function for triggered duties. @@ -185,6 +186,57 @@ func (s *Scheduler) HandleChainReorgEvent(ctx context.Context, epoch eth2p0.Epoc } } +// triggerDuty triggers all duty subscribers with the provided duty and definition set. +func (s *Scheduler) triggerDuty(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet) { + instrumentDuty(duty, defSet) + + dutyCtx := log.WithCtx(ctx, z.Any("duty", duty)) + if duty.Type == core.DutyProposer { + var span trace.Span + dutyCtx, span = core.StartDutyTrace(dutyCtx, duty, "core/scheduler.scheduleSlot") + defer span.End() + } + + for _, sub := range s.dutySubs { + clone, err := defSet.Clone() + if err != nil { + log.Error(dutyCtx, "Failed to clone duty definition set", err) + return + } + + if err := sub(dutyCtx, duty, clone); err != nil { + log.Error(dutyCtx, "Failed to trigger duty subscriber", err) + } + } +} + +// HandleBlockEvent handles block processing events from SSE and triggers early attestation data fetching. +func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot) { + if !featureset.Enabled(featureset.FetchAttOnBlock) { + return + } + + duty := core.Duty{ + Slot: uint64(slot), + Type: core.DutyAttester, + } + defSet, ok := s.getDutyDefinitionSet(duty) + if !ok { + // Nothing for this duty + return + } + + _, alreadyTriggered := s.eventTriggeredAttestations.LoadOrStore(uint64(slot), true) + if alreadyTriggered { + return + } + + log.Debug(ctx, "Early attestation data fetch triggered by SSE block event", z.U64("slot", uint64(slot))) + + // Trigger duty immediately (early fetch) + go s.triggerDuty(ctx, duty, defSet) +} + // emitCoreSlot calls all slot subscriptions asynchronously with the provided slot. func (s *Scheduler) emitCoreSlot(ctx context.Context, slot core.Slot) { for _, sub := range s.slotSubs { @@ -276,33 +328,22 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { } // Trigger duty async - go func() { - if !delaySlotOffset(ctx, slot, duty, s.delayFunc) { - return // context cancelled - } - - instrumentDuty(duty, defSet) - - dutyCtx := log.WithCtx(ctx, z.Any("duty", duty)) - if duty.Type == core.DutyProposer { - var span trace.Span - - dutyCtx, span = core.StartDutyTrace(dutyCtx, duty, "core/scheduler.scheduleSlot") - defer span.End() - } - - for _, sub := range s.dutySubs { - clone, err := defSet.Clone() // Clone for each subscriber. - if err != nil { - log.Error(dutyCtx, "Failed to clone duty definition set", err) - return + go func(duty core.Duty, defSet core.DutyDefinitionSet) { + // Special handling for attester duties when FetchAttOnBlock is enabled + if duty.Type == core.DutyAttester && featureset.Enabled(featureset.FetchAttOnBlock) { + if !s.waitForBlockEventOrTimeout(ctx, slot) { + return // context cancelled } - - if err := sub(dutyCtx, duty, clone); err != nil { - log.Error(dutyCtx, "Failed to trigger duty subscriber", err, z.U64("slot", slot.Slot)) + _, alreadyTriggered := s.eventTriggeredAttestations.LoadOrStore(slot.Slot, true) + if alreadyTriggered { + return // already triggered via block event } + } else if !delaySlotOffset(ctx, slot, duty, s.delayFunc) { + return // context cancelled } - }() + + s.triggerDuty(ctx, duty, defSet) + }(duty, defSet) } if slot.LastInEpoch() { @@ -333,6 +374,27 @@ func delaySlotOffset(ctx context.Context, slot core.Slot, duty core.Duty, delayF } } +// waitForBlockEventOrTimeout waits until the fallback timeout (T=1/3 + 300ms) is reached. +// Returns false if the context is cancelled, true otherwise. +func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Slot) bool { + // Calculate fallback timeout: 1/3 + 300ms + fn, ok := slotOffsets[core.DutyAttester] + if !ok { + return true + } + offset := fn(slot.SlotDuration) + 300*time.Millisecond + fallbackDeadline := slot.Time.Add(offset) + + select { + case <-ctx.Done(): + return false + case <-s.clock.After(time.Until(fallbackDeadline)): + log.Debug(ctx, "Fallback timeout reached for attestation, no block event received, possibly fetching stale head", + z.U64("slot", slot.Slot)) + return true + } +} + // resolveDuties resolves the duties for the slot's epoch, caching the results. func (s *Scheduler) resolveDuties(ctx context.Context, slot core.Slot) error { s.setResolvingEpoch(slot.Epoch()) @@ -690,6 +752,30 @@ func (s *Scheduler) trimDuties(epoch uint64) { } delete(s.dutiesByEpoch, epoch) + + if featureset.Enabled(featureset.FetchAttOnBlock) { + s.trimEventTriggeredAttestations(epoch) + } +} + +// trimEventTriggeredAttestations removes old slot entries from eventTriggeredAttestations. +func (s *Scheduler) trimEventTriggeredAttestations(epoch uint64) { + _, slotsPerEpoch, err := eth2wrap.FetchSlotsConfig(context.Background(), s.eth2Cl) + if err != nil { + return + } + + minSlotToKeep := (epoch + 1) * slotsPerEpoch // first slot of next epoch + s.eventTriggeredAttestations.Range(func(key, _ any) bool { + slot, ok := key.(uint64) + if !ok { + return true // continue iteration + } + if slot < minSlotToKeep { + s.eventTriggeredAttestations.Delete(slot) + } + return true // continue iteration + }) } // submitValidatorRegistrations submits the validator registrations for all DVs. diff --git a/core/scheduler/scheduler_test.go b/core/scheduler/scheduler_test.go index 230e1b53bb..adaf1a6af0 100644 --- a/core/scheduler/scheduler_test.go +++ b/core/scheduler/scheduler_test.go @@ -506,6 +506,111 @@ func TestHandleChainReorgEvent(t *testing.T) { require.NoError(t, <-doneCh) } +func TestFetchAttOnBlock(t *testing.T) { + var ( + t0 time.Time + valSet = beaconmock.ValidatorSetA + ) + + featureset.EnableForT(t, featureset.FetchAttOnBlock) + + // Configure beacon mock. + eth2Cl, err := beaconmock.New( + t.Context(), + beaconmock.WithValidatorSet(valSet), + beaconmock.WithGenesisTime(t0), + beaconmock.WithDeterministicAttesterDuties(1), // Duties in slots 0, 1, 2 + beaconmock.WithSlotsPerEpoch(4), + ) + require.NoError(t, err) + + // Construct scheduler. + schedSlotCh := make(chan core.Slot) + schedSlotFunc := func(ctx context.Context, slot core.Slot) { + select { + case <-ctx.Done(): + return + case schedSlotCh <- slot: + } + } + clock := newTestClock(t0) + dd := new(delayer) + valRegs := beaconmock.BuilderRegistrationSetA + sched := scheduler.NewForT(t, clock, dd.delay, valRegs, eth2Cl, schedSlotFunc, false) + + // Track triggered duties + var triggeredDuties []core.Duty + var dutyMux sync.Mutex + + sched.SubscribeDuties(func(ctx context.Context, duty core.Duty, _ core.DutyDefinitionSet) error { + dutyMux.Lock() + defer dutyMux.Unlock() + triggeredDuties = append(triggeredDuties, duty) + return nil + }) + + doneCh := make(chan error, 1) + + go func() { + doneCh <- sched.Run() + close(schedSlotCh) + }() + + for slot := range schedSlotCh { + if slot.Slot == 0 { + // Test case 1: Happy path - single block event triggers early + sched.HandleBlockEvent(t.Context(), 0) + + require.Eventually(t, func() bool { + dutyMux.Lock() + defer dutyMux.Unlock() + for _, d := range triggeredDuties { + if d.Type == core.DutyAttester && d.Slot == 0 { + return true + } + } + return false + }, 500*time.Millisecond, 5*time.Millisecond, "Attester duty for slot 0 should be triggered by block event") + } + + if slot.Slot == 1 { + // Test case 2: Deduplication - two block events should only trigger once + sched.HandleBlockEvent(t.Context(), 1) + sched.HandleBlockEvent(t.Context(), 1) // Duplicate + + require.Eventually(t, func() bool { + dutyMux.Lock() + defer dutyMux.Unlock() + count := 0 + for _, d := range triggeredDuties { + if d.Type == core.DutyAttester && d.Slot == 1 { + count++ + } + } + return count == 1 + }, 500*time.Millisecond, 5*time.Millisecond, "Attester duty for slot 1 should only be triggered once despite duplicate block events") + } + + if slot.Slot == 2 { + // Test case 3: Fallback - no block event, timeout should trigger + require.Eventually(t, func() bool { + dutyMux.Lock() + defer dutyMux.Unlock() + for _, d := range triggeredDuties { + if d.Type == core.DutyAttester && d.Slot == 2 { + return true + } + } + return false + }, 2*time.Second, 50*time.Millisecond, "Attester duty for slot 2 should be triggered by fallback timeout") + + sched.Stop() + } + } + + require.NoError(t, <-doneCh) +} + func TestSubmitValidatorRegistrations(t *testing.T) { // The test uses hard-coded validator registrations from beaconmock.BuilderRegistrationSetA. // The scheduler advances through 3 epochs to ensure it triggers the registration submission.