From d15b2eddfa6babad23ff9386e2c2bacb7bce390f Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Thu, 13 Nov 2025 18:58:09 +0000 Subject: [PATCH 1/6] fetch attestations on block event --- app/app.go | 1 + app/featureset/config.go | 12 +-- app/featureset/featureset.go | 5 ++ app/sse/listener.go | 22 +++++ app/sse/listener_internal_test.go | 50 +++++++++++ core/scheduler/scheduler.go | 142 +++++++++++++++++++++++------- core/scheduler/scheduler_test.go | 82 +++++++++++++++++ 7 files changed, 277 insertions(+), 37 deletions(-) diff --git a/app/app.go b/app/app.go index d5f11e83bd..634f729a91 100644 --- a/app/app.go +++ b/app/app.go @@ -477,6 +477,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, } sseListener.SubscribeChainReorgEvent(sched.HandleChainReorgEvent) + sseListener.SubscribeBlockEvent(sched.HandleBlockEvent) feeRecipientFunc := func(pubkey core.PubKey) string { return feeRecipientAddrByCorePubkey[pubkey] diff --git a/app/featureset/config.go b/app/featureset/config.go index 95f2957b51..e6367f7eb6 100644 --- a/app/featureset/config.go +++ b/app/featureset/config.go @@ -117,14 +117,14 @@ func EnableForT(t *testing.T, feature Feature) { initMu.Lock() defer initMu.Unlock() - cache := state[feature] + state[feature] = enable t.Cleanup(func() { + initMu.Lock() + defer initMu.Unlock() state[feature] = cache }) - - state[feature] = enable } // DisableForT disables a feature for testing. @@ -133,12 +133,12 @@ func DisableForT(t *testing.T, feature Feature) { initMu.Lock() defer initMu.Unlock() - cache := state[feature] + state[feature] = disable t.Cleanup(func() { + initMu.Lock() + defer initMu.Unlock() state[feature] = cache }) - - state[feature] = disable } diff --git a/app/featureset/featureset.go b/app/featureset/featureset.go index dabc3422f7..f9c13e2271 100644 --- a/app/featureset/featureset.go +++ b/app/featureset/featureset.go @@ -70,6 +70,10 @@ const ( // ChainSplitHalt compares locally fetched attestation's target and source to leader's proposed target and source attestation. // In case they differ, Charon does not sign the attestation. ChainSplitHalt = "chain_split_halt" + + // FetchAttOnBlock enables fetching attestation data upon block processing event from beacon node via SSE. + // Fallback to T=1/3+300ms if block event is not received in time. + FetchAttOnBlock = "fetch_att_on_block" ) var ( @@ -88,6 +92,7 @@ var ( QUIC: statusAlpha, FetchOnlyCommIdx0: statusAlpha, ChainSplitHalt: statusAlpha, + FetchAttOnBlock: statusAlpha, // Add all features and their status here. } diff --git a/app/sse/listener.go b/app/sse/listener.go index a16afc37f9..5b993054d2 100644 --- a/app/sse/listener.go +++ b/app/sse/listener.go @@ -21,15 +21,18 @@ import ( ) type ChainReorgEventHandlerFunc func(ctx context.Context, epoch eth2p0.Epoch) +type BlockEventHandlerFunc func(ctx context.Context, slot eth2p0.Slot) type Listener interface { SubscribeChainReorgEvent(ChainReorgEventHandlerFunc) + SubscribeBlockEvent(BlockEventHandlerFunc) } type listener struct { sync.Mutex chainReorgSubs []ChainReorgEventHandlerFunc + blockSubs []BlockEventHandlerFunc lastReorgEpoch eth2p0.Epoch // immutable fields @@ -55,6 +58,7 @@ func StartListener(ctx context.Context, eth2Cl eth2wrap.Client, addresses, heade l := &listener{ chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0), + blockSubs: make([]BlockEventHandlerFunc, 0), genesisTime: genesisTime, slotDuration: slotDuration, slotsPerEpoch: slotsPerEpoch, @@ -94,6 +98,13 @@ func (p *listener) SubscribeChainReorgEvent(handler ChainReorgEventHandlerFunc) p.chainReorgSubs = append(p.chainReorgSubs, handler) } +func (p *listener) SubscribeBlockEvent(handler BlockEventHandlerFunc) { + p.Lock() + defer p.Unlock() + + p.blockSubs = append(p.blockSubs, handler) +} + func (p *listener) eventHandler(ctx context.Context, event *event, addr string) error { switch event.Event { case sseHeadEvent: @@ -247,6 +258,8 @@ func (p *listener) handleBlockEvent(ctx context.Context, event *event, addr stri sseBlockHistogram.WithLabelValues(addr).Observe(delay.Seconds()) + p.notifyBlockEvent(ctx, eth2p0.Slot(slot)) + return nil } @@ -264,6 +277,15 @@ func (p *listener) notifyChainReorg(ctx context.Context, epoch eth2p0.Epoch) { } } +func (p *listener) notifyBlockEvent(ctx context.Context, slot eth2p0.Slot) { + p.Lock() + defer p.Unlock() + + for _, sub := range p.blockSubs { + sub(ctx, slot) + } +} + // computeDelay computes the delay between start of the slot and receiving the event. func (p *listener) computeDelay(slot uint64, eventTS time.Time, delayOKFunc func(delay time.Duration) bool) (time.Duration, bool) { slotStartTime := p.genesisTime.Add(time.Duration(slot) * p.slotDuration) diff --git a/app/sse/listener_internal_test.go b/app/sse/listener_internal_test.go index 1f03f7a9ac..8ecbe8338a 100644 --- a/app/sse/listener_internal_test.go +++ b/app/sse/listener_internal_test.go @@ -83,12 +83,40 @@ func TestHandleEvents(t *testing.T) { }, err: errors.New("parse depth to uint64"), }, + { + name: "block event happy path", + event: &event{ + Event: sseBlockEvent, + Data: []byte(`{"slot":"42", "block":"0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "execution_optimistic": false}`), + Timestamp: time.Now(), + }, + err: nil, + }, + { + name: "block event incompatible data payload", + event: &event{ + Event: sseBlockEvent, + Data: []byte(`"error"`), + Timestamp: time.Now(), + }, + err: errors.New("unmarshal SSE block event"), + }, + { + name: "block event parse slot", + event: &event{ + Event: sseBlockEvent, + Data: []byte(`{"slot":"invalid", "block":"0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "execution_optimistic": false}`), + Timestamp: time.Now(), + }, + err: errors.New("parse slot to uint64"), + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { l := &listener{ chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0), + blockSubs: make([]BlockEventHandlerFunc, 0), slotDuration: 12 * time.Second, slotsPerEpoch: 32, genesisTime: time.Date(2020, 12, 1, 12, 0, 23, 0, time.UTC), @@ -133,6 +161,28 @@ func TestSubscribeNotifyChainReorg(t *testing.T) { require.Equal(t, eth2p0.Epoch(10), reportedEpochs[1]) } +func TestSubscribeNotifyBlockEvent(t *testing.T) { + ctx := t.Context() + l := &listener{ + blockSubs: make([]BlockEventHandlerFunc, 0), + } + + reportedSlots := make([]eth2p0.Slot, 0) + + l.SubscribeBlockEvent(func(_ context.Context, slot eth2p0.Slot) { + reportedSlots = append(reportedSlots, slot) + }) + + l.notifyBlockEvent(ctx, eth2p0.Slot(100)) + l.notifyBlockEvent(ctx, eth2p0.Slot(100)) // Duplicate should be reported (no dedup for block events) + l.notifyBlockEvent(ctx, eth2p0.Slot(101)) + + require.Len(t, reportedSlots, 3) + require.Equal(t, eth2p0.Slot(100), reportedSlots[0]) + require.Equal(t, eth2p0.Slot(100), reportedSlots[1]) + require.Equal(t, eth2p0.Slot(101), reportedSlots[2]) +} + func TestComputeDelay(t *testing.T) { genesisTimeString := "2020-12-01T12:00:23+00:00" genesisTime, err := time.Parse(time.RFC3339, genesisTimeString) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 91cb96bc70..1cdf6fcdb3 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -81,13 +81,14 @@ func New(builderRegistrations []cluster.BuilderRegistration, eth2Cl eth2wrap.Cli } return &Scheduler{ - eth2Cl: eth2Cl, - builderRegistrations: registrations, - quit: make(chan struct{}), - duties: make(map[core.Duty]core.DutyDefinitionSet), - dutiesByEpoch: make(map[uint64][]core.Duty), - epochResolved: make(map[uint64]chan struct{}), - clock: clockwork.NewRealClock(), + eth2Cl: eth2Cl, + builderRegistrations: registrations, + quit: make(chan struct{}), + duties: make(map[core.Duty]core.DutyDefinitionSet), + dutiesByEpoch: make(map[uint64][]core.Duty), + epochResolved: make(map[uint64]chan struct{}), + eventTriggeredAttestations: make(map[uint64]bool), + clock: clockwork.NewRealClock(), delayFunc: func(_ core.Duty, deadline time.Time) <-chan time.Time { return time.After(time.Until(deadline)) }, @@ -116,6 +117,8 @@ type Scheduler struct { builderEnabled bool schedSlotFunc schedSlotFunc epochResolved map[uint64]chan struct{} // Notification channels for epoch resolution + eventTriggeredAttestations map[uint64]bool // Track attestation duties triggered via sse block event + eventTriggeredMutex sync.Mutex } // SubscribeDuties subscribes a callback function for triggered duties. @@ -185,6 +188,71 @@ func (s *Scheduler) HandleChainReorgEvent(ctx context.Context, epoch eth2p0.Epoc } } +// markAttestationEventTriggered checks if an attestation at this slot was already triggered and marks it as triggered. +// Returns true if the attestation was already triggered before. +func (s *Scheduler) markAttestationEventTriggered(slot uint64) bool { + s.eventTriggeredMutex.Lock() + defer s.eventTriggeredMutex.Unlock() + if s.eventTriggeredAttestations[slot] { + return true + } + s.eventTriggeredAttestations[slot] = true + return false +} + +// triggerDuty triggers all duty subscribers with the provided duty and definition set. +func (s *Scheduler) triggerDuty(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet) { + instrumentDuty(duty, defSet) + + dutyCtx := log.WithCtx(ctx, z.Any("duty", duty)) + if duty.Type == core.DutyProposer { + var span trace.Span + dutyCtx, span = core.StartDutyTrace(dutyCtx, duty, "core/scheduler.scheduleSlot") + defer span.End() + } + + for _, sub := range s.dutySubs { + clone, err := defSet.Clone() + if err != nil { + log.Error(dutyCtx, "Failed to clone duty definition set", err) + return + } + + if err := sub(dutyCtx, duty, clone); err != nil { + log.Error(dutyCtx, "Failed to trigger duty subscriber", err) + } + } +} + +// HandleBlockEvent handles block processing events from SSE and triggers early attestation data fetching. +func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot) { + if !featureset.Enabled(featureset.FetchAttOnBlock) { + return + } + + if s.markAttestationEventTriggered(uint64(slot)) { + return + } + + duty := core.Duty{ + Slot: uint64(slot), + Type: core.DutyAttester, + } + defSet, ok := s.getDutyDefinitionSet(duty) + if !ok { + // No attester duties for this slot, ignore + log.Debug(ctx, "No attester duties for slot, skipping early fetch", + z.U64("slot", uint64(slot))) + return + } + + log.Debug(ctx, "Early attestation fetch triggered by SSE block event", + z.U64("slot", uint64(slot))) + + // Trigger duty immediately (early fetch) + go s.triggerDuty(ctx, duty, defSet) +} + // emitCoreSlot calls all slot subscriptions asynchronously with the provided slot. func (s *Scheduler) emitCoreSlot(ctx context.Context, slot core.Slot) { for _, sub := range s.slotSubs { @@ -276,33 +344,23 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { } // Trigger duty async - go func() { - if !delaySlotOffset(ctx, slot, duty, s.delayFunc) { - return // context cancelled - } - - instrumentDuty(duty, defSet) - - dutyCtx := log.WithCtx(ctx, z.Any("duty", duty)) - if duty.Type == core.DutyProposer { - var span trace.Span - - dutyCtx, span = core.StartDutyTrace(dutyCtx, duty, "core/scheduler.scheduleSlot") - defer span.End() - } - - for _, sub := range s.dutySubs { - clone, err := defSet.Clone() // Clone for each subscriber. - if err != nil { - log.Error(dutyCtx, "Failed to clone duty definition set", err) - return + go func(duty core.Duty, defSet core.DutyDefinitionSet) { + // Special handling for attester duties when FetchAttOnBlock is enabled + if duty.Type == core.DutyAttester && featureset.Enabled(featureset.FetchAttOnBlock) { + if !s.waitForBlockEventOrTimeout(ctx, slot, duty) { + return // context cancelled } - - if err := sub(dutyCtx, duty, clone); err != nil { - log.Error(dutyCtx, "Failed to trigger duty subscriber", err, z.U64("slot", slot.Slot)) + if s.markAttestationEventTriggered(duty.Slot) { + return // already triggered via block event + } + } else { + if !delaySlotOffset(ctx, slot, duty, s.delayFunc) { + return // context cancelled } } - }() + + s.triggerDuty(ctx, duty, defSet) + }(duty, defSet) } if slot.LastInEpoch() { @@ -333,6 +391,28 @@ func delaySlotOffset(ctx context.Context, slot core.Slot, duty core.Duty, delayF } } +// waitForBlockEventOrTimeout waits for attestation duty with timeout fallback. +// Returns immediately if the duty was already triggered via block event. +// Otherwise waits until T=1/3 + 300ms (fallback timeout). +func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Slot, duty core.Duty) bool { + // Calculate fallback timeout: 1/3 + 300ms + fn, ok := slotOffsets[core.DutyAttester] + if !ok { + return true + } + offset := fn(slot.SlotDuration) + 300*time.Millisecond + fallbackDeadline := slot.Time.Add(offset) + + select { + case <-ctx.Done(): + return false + case <-s.clock.After(time.Until(fallbackDeadline)): + log.Debug(ctx, "Fallback timeout reached for attestation, no block event received, possibly fetching stale head", + z.U64("slot", slot.Slot)) + return true + } +} + // resolveDuties resolves the duties for the slot's epoch, caching the results. func (s *Scheduler) resolveDuties(ctx context.Context, slot core.Slot) error { s.setResolvingEpoch(slot.Epoch()) diff --git a/core/scheduler/scheduler_test.go b/core/scheduler/scheduler_test.go index 230e1b53bb..d69af60994 100644 --- a/core/scheduler/scheduler_test.go +++ b/core/scheduler/scheduler_test.go @@ -506,6 +506,88 @@ func TestHandleChainReorgEvent(t *testing.T) { require.NoError(t, <-doneCh) } +func TestHandleBlockEvent(t *testing.T) { + var ( + t0 time.Time + valSet = beaconmock.ValidatorSetA + ) + + featureset.EnableForT(t, featureset.FetchAttOnBlock) + + // Configure beacon mock. + eth2Cl, err := beaconmock.New( + t.Context(), + beaconmock.WithValidatorSet(valSet), + beaconmock.WithGenesisTime(t0), + beaconmock.WithDeterministicAttesterDuties(1), + beaconmock.WithSlotsPerEpoch(4), + ) + require.NoError(t, err) + + // Construct scheduler. + schedSlotCh := make(chan core.Slot) + schedSlotFunc := func(ctx context.Context, slot core.Slot) { + select { + case <-ctx.Done(): + return + case schedSlotCh <- slot: + } + } + clock := newTestClock(t0) + dd := new(delayer) + valRegs := beaconmock.BuilderRegistrationSetA + sched := scheduler.NewForT(t, clock, dd.delay, valRegs, eth2Cl, schedSlotFunc, false) + + // Track triggered duties + var triggeredDuties []core.Duty + var dutyMux sync.Mutex + + sched.SubscribeDuties(func(ctx context.Context, duty core.Duty, _ core.DutyDefinitionSet) error { + dutyMux.Lock() + defer dutyMux.Unlock() + triggeredDuties = append(triggeredDuties, duty) + return nil + }) + + doneCh := make(chan error, 1) + + go func() { + doneCh <- sched.Run() + close(schedSlotCh) + }() + + for slot := range schedSlotCh { + clock.Pause() + + switch slot.Slot { + case 1: + // Trigger block event for slot 1 (should trigger attester duty for slot 1 early) + sched.HandleBlockEvent(t.Context(), 1) + + // Give a moment for async trigger + time.Sleep(50 * time.Millisecond) + + dutyMux.Lock() + hasAttester1 := false + for _, d := range triggeredDuties { + if d.Type == core.DutyAttester && d.Slot == 1 { + hasAttester1 = true + break + } + } + dutyMux.Unlock() + + require.True(t, hasAttester1, "Attester duty for slot 1 should be triggered early by block event for slot 1") + + sched.Stop() + } + + clock.Resume() + } + + require.NoError(t, <-doneCh) +} + func TestSubmitValidatorRegistrations(t *testing.T) { // The test uses hard-coded validator registrations from beaconmock.BuilderRegistrationSetA. // The scheduler advances through 3 epochs to ensure it triggers the registration submission. From 45051b8232ce525ae1938ac8cf20e92545ea3d9b Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Fri, 14 Nov 2025 13:30:31 +0000 Subject: [PATCH 2/6] fix golangci lint --- core/scheduler/scheduler.go | 17 ++++++----------- core/scheduler/scheduler_test.go | 3 +-- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 1cdf6fcdb3..49fd2b4b8b 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -240,14 +240,11 @@ func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot) { } defSet, ok := s.getDutyDefinitionSet(duty) if !ok { - // No attester duties for this slot, ignore - log.Debug(ctx, "No attester duties for slot, skipping early fetch", - z.U64("slot", uint64(slot))) + // Nothing for this duty. return } - log.Debug(ctx, "Early attestation fetch triggered by SSE block event", - z.U64("slot", uint64(slot))) + log.Debug(ctx, "Early attestation data fetch triggered by SSE block event", z.U64("slot", uint64(slot))) // Trigger duty immediately (early fetch) go s.triggerDuty(ctx, duty, defSet) @@ -347,16 +344,14 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { go func(duty core.Duty, defSet core.DutyDefinitionSet) { // Special handling for attester duties when FetchAttOnBlock is enabled if duty.Type == core.DutyAttester && featureset.Enabled(featureset.FetchAttOnBlock) { - if !s.waitForBlockEventOrTimeout(ctx, slot, duty) { + if !s.waitForBlockEventOrTimeout(ctx, slot) { return // context cancelled } if s.markAttestationEventTriggered(duty.Slot) { return // already triggered via block event } - } else { - if !delaySlotOffset(ctx, slot, duty, s.delayFunc) { - return // context cancelled - } + } else if !delaySlotOffset(ctx, slot, duty, s.delayFunc) { + return // context cancelled } s.triggerDuty(ctx, duty, defSet) @@ -394,7 +389,7 @@ func delaySlotOffset(ctx context.Context, slot core.Slot, duty core.Duty, delayF // waitForBlockEventOrTimeout waits for attestation duty with timeout fallback. // Returns immediately if the duty was already triggered via block event. // Otherwise waits until T=1/3 + 300ms (fallback timeout). -func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Slot, duty core.Duty) bool { +func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Slot) bool { // Calculate fallback timeout: 1/3 + 300ms fn, ok := slotOffsets[core.DutyAttester] if !ok { diff --git a/core/scheduler/scheduler_test.go b/core/scheduler/scheduler_test.go index d69af60994..969a40367d 100644 --- a/core/scheduler/scheduler_test.go +++ b/core/scheduler/scheduler_test.go @@ -559,8 +559,7 @@ func TestHandleBlockEvent(t *testing.T) { for slot := range schedSlotCh { clock.Pause() - switch slot.Slot { - case 1: + if slot.Slot == 1 { // Trigger block event for slot 1 (should trigger attester duty for slot 1 early) sched.HandleBlockEvent(t.Context(), 1) From 168559f762ac69244d1ddd440b455c963f38f827 Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Mon, 17 Nov 2025 14:42:43 +0000 Subject: [PATCH 3/6] update eventtriggeredattestations to use sync.map --- core/scheduler/scheduler.go | 36 ++++++++++++------------------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 49fd2b4b8b..548e8c3a30 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -81,14 +81,13 @@ func New(builderRegistrations []cluster.BuilderRegistration, eth2Cl eth2wrap.Cli } return &Scheduler{ - eth2Cl: eth2Cl, - builderRegistrations: registrations, - quit: make(chan struct{}), - duties: make(map[core.Duty]core.DutyDefinitionSet), - dutiesByEpoch: make(map[uint64][]core.Duty), - epochResolved: make(map[uint64]chan struct{}), - eventTriggeredAttestations: make(map[uint64]bool), - clock: clockwork.NewRealClock(), + eth2Cl: eth2Cl, + builderRegistrations: registrations, + quit: make(chan struct{}), + duties: make(map[core.Duty]core.DutyDefinitionSet), + dutiesByEpoch: make(map[uint64][]core.Duty), + epochResolved: make(map[uint64]chan struct{}), + clock: clockwork.NewRealClock(), delayFunc: func(_ core.Duty, deadline time.Time) <-chan time.Time { return time.After(time.Until(deadline)) }, @@ -117,8 +116,7 @@ type Scheduler struct { builderEnabled bool schedSlotFunc schedSlotFunc epochResolved map[uint64]chan struct{} // Notification channels for epoch resolution - eventTriggeredAttestations map[uint64]bool // Track attestation duties triggered via sse block event - eventTriggeredMutex sync.Mutex + eventTriggeredAttestations sync.Map // Track attestation duties triggered via sse block event (map[uint64]bool) } // SubscribeDuties subscribes a callback function for triggered duties. @@ -188,18 +186,6 @@ func (s *Scheduler) HandleChainReorgEvent(ctx context.Context, epoch eth2p0.Epoc } } -// markAttestationEventTriggered checks if an attestation at this slot was already triggered and marks it as triggered. -// Returns true if the attestation was already triggered before. -func (s *Scheduler) markAttestationEventTriggered(slot uint64) bool { - s.eventTriggeredMutex.Lock() - defer s.eventTriggeredMutex.Unlock() - if s.eventTriggeredAttestations[slot] { - return true - } - s.eventTriggeredAttestations[slot] = true - return false -} - // triggerDuty triggers all duty subscribers with the provided duty and definition set. func (s *Scheduler) triggerDuty(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet) { instrumentDuty(duty, defSet) @@ -230,7 +216,8 @@ func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot) { return } - if s.markAttestationEventTriggered(uint64(slot)) { + _, alreadyTriggered := s.eventTriggeredAttestations.LoadOrStore(uint64(slot), true) + if alreadyTriggered { return } @@ -347,7 +334,8 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { if !s.waitForBlockEventOrTimeout(ctx, slot) { return // context cancelled } - if s.markAttestationEventTriggered(duty.Slot) { + _, alreadyTriggered := s.eventTriggeredAttestations.LoadOrStore(slot.Slot, true) + if alreadyTriggered { return // already triggered via block event } } else if !delaySlotOffset(ctx, slot, duty, s.delayFunc) { From 498162789c2fb493d9c2405bd9ff315677903f19 Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Mon, 17 Nov 2025 16:23:12 +0000 Subject: [PATCH 4/6] more tests and trim eventTriggeredAttestations map --- core/scheduler/scheduler.go | 41 ++++++++++++++++----- core/scheduler/scheduler_test.go | 62 ++++++++++++++++++++++---------- 2 files changed, 75 insertions(+), 28 deletions(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 548e8c3a30..f1ca88be46 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -216,18 +216,18 @@ func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot) { return } - _, alreadyTriggered := s.eventTriggeredAttestations.LoadOrStore(uint64(slot), true) - if alreadyTriggered { - return - } - duty := core.Duty{ Slot: uint64(slot), Type: core.DutyAttester, } defSet, ok := s.getDutyDefinitionSet(duty) if !ok { - // Nothing for this duty. + // Nothing for this duty + return + } + + _, alreadyTriggered := s.eventTriggeredAttestations.LoadOrStore(uint64(slot), true) + if alreadyTriggered { return } @@ -374,9 +374,8 @@ func delaySlotOffset(ctx context.Context, slot core.Slot, duty core.Duty, delayF } } -// waitForBlockEventOrTimeout waits for attestation duty with timeout fallback. -// Returns immediately if the duty was already triggered via block event. -// Otherwise waits until T=1/3 + 300ms (fallback timeout). +// waitForBlockEventOrTimeout waits until the fallback timeout (T=1/3 + 300ms) is reached. +// Returns false if the context is cancelled, true otherwise. func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Slot) bool { // Calculate fallback timeout: 1/3 + 300ms fn, ok := slotOffsets[core.DutyAttester] @@ -753,6 +752,30 @@ func (s *Scheduler) trimDuties(epoch uint64) { } delete(s.dutiesByEpoch, epoch) + + if featureset.Enabled(featureset.FetchAttOnBlock) { + s.trimEventTriggeredAttestations(epoch) + } +} + +// trimEventTriggeredAttestations removes old slot entries from eventTriggeredAttestations. +func (s *Scheduler) trimEventTriggeredAttestations(epoch uint64) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + _, slotsPerEpoch, err := eth2wrap.FetchSlotsConfig(ctx, s.eth2Cl) + if err != nil { + return + } + + minSlotToKeep := (epoch + 1) * slotsPerEpoch // first slot of next epoch + s.eventTriggeredAttestations.Range(func(key, value interface{}) bool { + slot := key.(uint64) + if slot < minSlotToKeep { + s.eventTriggeredAttestations.Delete(slot) + } + return true // continue iteration + }) } // submitValidatorRegistrations submits the validator registrations for all DVs. diff --git a/core/scheduler/scheduler_test.go b/core/scheduler/scheduler_test.go index 969a40367d..adaf1a6af0 100644 --- a/core/scheduler/scheduler_test.go +++ b/core/scheduler/scheduler_test.go @@ -506,7 +506,7 @@ func TestHandleChainReorgEvent(t *testing.T) { require.NoError(t, <-doneCh) } -func TestHandleBlockEvent(t *testing.T) { +func TestFetchAttOnBlock(t *testing.T) { var ( t0 time.Time valSet = beaconmock.ValidatorSetA @@ -519,7 +519,7 @@ func TestHandleBlockEvent(t *testing.T) { t.Context(), beaconmock.WithValidatorSet(valSet), beaconmock.WithGenesisTime(t0), - beaconmock.WithDeterministicAttesterDuties(1), + beaconmock.WithDeterministicAttesterDuties(1), // Duties in slots 0, 1, 2 beaconmock.WithSlotsPerEpoch(4), ) require.NoError(t, err) @@ -557,31 +557,55 @@ func TestHandleBlockEvent(t *testing.T) { }() for slot := range schedSlotCh { - clock.Pause() + if slot.Slot == 0 { + // Test case 1: Happy path - single block event triggers early + sched.HandleBlockEvent(t.Context(), 0) + + require.Eventually(t, func() bool { + dutyMux.Lock() + defer dutyMux.Unlock() + for _, d := range triggeredDuties { + if d.Type == core.DutyAttester && d.Slot == 0 { + return true + } + } + return false + }, 500*time.Millisecond, 5*time.Millisecond, "Attester duty for slot 0 should be triggered by block event") + } if slot.Slot == 1 { - // Trigger block event for slot 1 (should trigger attester duty for slot 1 early) + // Test case 2: Deduplication - two block events should only trigger once sched.HandleBlockEvent(t.Context(), 1) - - // Give a moment for async trigger - time.Sleep(50 * time.Millisecond) - - dutyMux.Lock() - hasAttester1 := false - for _, d := range triggeredDuties { - if d.Type == core.DutyAttester && d.Slot == 1 { - hasAttester1 = true - break + sched.HandleBlockEvent(t.Context(), 1) // Duplicate + + require.Eventually(t, func() bool { + dutyMux.Lock() + defer dutyMux.Unlock() + count := 0 + for _, d := range triggeredDuties { + if d.Type == core.DutyAttester && d.Slot == 1 { + count++ + } } - } - dutyMux.Unlock() + return count == 1 + }, 500*time.Millisecond, 5*time.Millisecond, "Attester duty for slot 1 should only be triggered once despite duplicate block events") + } - require.True(t, hasAttester1, "Attester duty for slot 1 should be triggered early by block event for slot 1") + if slot.Slot == 2 { + // Test case 3: Fallback - no block event, timeout should trigger + require.Eventually(t, func() bool { + dutyMux.Lock() + defer dutyMux.Unlock() + for _, d := range triggeredDuties { + if d.Type == core.DutyAttester && d.Slot == 2 { + return true + } + } + return false + }, 2*time.Second, 50*time.Millisecond, "Attester duty for slot 2 should be triggered by fallback timeout") sched.Stop() } - - clock.Resume() } require.NoError(t, <-doneCh) From 136ac08ae47eaa0818d941054ce5b1ab339595e3 Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Mon, 17 Nov 2025 16:38:14 +0000 Subject: [PATCH 5/6] fix linting --- core/scheduler/scheduler.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index f1ca88be46..bc97be3373 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -769,8 +769,11 @@ func (s *Scheduler) trimEventTriggeredAttestations(epoch uint64) { } minSlotToKeep := (epoch + 1) * slotsPerEpoch // first slot of next epoch - s.eventTriggeredAttestations.Range(func(key, value interface{}) bool { - slot := key.(uint64) + s.eventTriggeredAttestations.Range(func(key, _ any) bool { + slot, ok := key.(uint64) + if !ok { + return true // continue iteration + } if slot < minSlotToKeep { s.eventTriggeredAttestations.Delete(slot) } From 14add244938155c83f2d9d43c3865db9936c11cd Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Mon, 24 Nov 2025 15:33:26 +0000 Subject: [PATCH 6/6] Remove unnecessary context timeout --- core/scheduler/scheduler.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index bc97be3373..2c468ade87 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -760,10 +760,7 @@ func (s *Scheduler) trimDuties(epoch uint64) { // trimEventTriggeredAttestations removes old slot entries from eventTriggeredAttestations. func (s *Scheduler) trimEventTriggeredAttestations(epoch uint64) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - - _, slotsPerEpoch, err := eth2wrap.FetchSlotsConfig(ctx, s.eth2Cl) + _, slotsPerEpoch, err := eth2wrap.FetchSlotsConfig(context.Background(), s.eth2Cl) if err != nil { return }