Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
}

sseListener.SubscribeChainReorgEvent(sched.HandleChainReorgEvent)
sseListener.SubscribeBlockEvent(sched.HandleBlockEvent)

feeRecipientFunc := func(pubkey core.PubKey) string {
return feeRecipientAddrByCorePubkey[pubkey]
Expand Down
12 changes: 6 additions & 6 deletions app/featureset/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ func EnableForT(t *testing.T, feature Feature) {

initMu.Lock()
defer initMu.Unlock()

cache := state[feature]
state[feature] = enable

t.Cleanup(func() {
initMu.Lock()
defer initMu.Unlock()
state[feature] = cache
})

state[feature] = enable
}

// DisableForT disables a feature for testing.
Expand All @@ -133,12 +133,12 @@ func DisableForT(t *testing.T, feature Feature) {

initMu.Lock()
defer initMu.Unlock()

cache := state[feature]
state[feature] = disable

t.Cleanup(func() {
initMu.Lock()
defer initMu.Unlock()
state[feature] = cache
})

state[feature] = disable
}
5 changes: 5 additions & 0 deletions app/featureset/featureset.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ const (
// ChainSplitHalt compares locally fetched attestation's target and source to leader's proposed target and source attestation.
// In case they differ, Charon does not sign the attestation.
ChainSplitHalt = "chain_split_halt"

// FetchAttOnBlock enables fetching attestation data upon block processing event from beacon node via SSE.
// Fallback to T=1/3+300ms if block event is not received in time.
FetchAttOnBlock = "fetch_att_on_block"
)

var (
Expand All @@ -88,6 +92,7 @@ var (
QUIC: statusAlpha,
FetchOnlyCommIdx0: statusAlpha,
ChainSplitHalt: statusAlpha,
FetchAttOnBlock: statusAlpha,
// Add all features and their status here.
}

Expand Down
22 changes: 22 additions & 0 deletions app/sse/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ import (
)

type ChainReorgEventHandlerFunc func(ctx context.Context, epoch eth2p0.Epoch)
type BlockEventHandlerFunc func(ctx context.Context, slot eth2p0.Slot)

type Listener interface {
SubscribeChainReorgEvent(ChainReorgEventHandlerFunc)
SubscribeBlockEvent(BlockEventHandlerFunc)
}

type listener struct {
sync.Mutex

chainReorgSubs []ChainReorgEventHandlerFunc
blockSubs []BlockEventHandlerFunc
lastReorgEpoch eth2p0.Epoch

// immutable fields
Expand All @@ -55,6 +58,7 @@ func StartListener(ctx context.Context, eth2Cl eth2wrap.Client, addresses, heade

l := &listener{
chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0),
blockSubs: make([]BlockEventHandlerFunc, 0),
genesisTime: genesisTime,
slotDuration: slotDuration,
slotsPerEpoch: slotsPerEpoch,
Expand Down Expand Up @@ -94,6 +98,13 @@ func (p *listener) SubscribeChainReorgEvent(handler ChainReorgEventHandlerFunc)
p.chainReorgSubs = append(p.chainReorgSubs, handler)
}

func (p *listener) SubscribeBlockEvent(handler BlockEventHandlerFunc) {
p.Lock()
defer p.Unlock()

p.blockSubs = append(p.blockSubs, handler)
}

func (p *listener) eventHandler(ctx context.Context, event *event, addr string) error {
switch event.Event {
case sseHeadEvent:
Expand Down Expand Up @@ -247,6 +258,8 @@ func (p *listener) handleBlockEvent(ctx context.Context, event *event, addr stri

sseBlockHistogram.WithLabelValues(addr).Observe(delay.Seconds())

p.notifyBlockEvent(ctx, eth2p0.Slot(slot))

return nil
}

Expand All @@ -264,6 +277,15 @@ func (p *listener) notifyChainReorg(ctx context.Context, epoch eth2p0.Epoch) {
}
}

func (p *listener) notifyBlockEvent(ctx context.Context, slot eth2p0.Slot) {
p.Lock()
defer p.Unlock()

for _, sub := range p.blockSubs {
sub(ctx, slot)
}
}

// computeDelay computes the delay between start of the slot and receiving the event.
func (p *listener) computeDelay(slot uint64, eventTS time.Time, delayOKFunc func(delay time.Duration) bool) (time.Duration, bool) {
slotStartTime := p.genesisTime.Add(time.Duration(slot) * p.slotDuration)
Expand Down
50 changes: 50 additions & 0 deletions app/sse/listener_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,40 @@ func TestHandleEvents(t *testing.T) {
},
err: errors.New("parse depth to uint64"),
},
{
name: "block event happy path",
event: &event{
Event: sseBlockEvent,
Data: []byte(`{"slot":"42", "block":"0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "execution_optimistic": false}`),
Timestamp: time.Now(),
},
err: nil,
},
{
name: "block event incompatible data payload",
event: &event{
Event: sseBlockEvent,
Data: []byte(`"error"`),
Timestamp: time.Now(),
},
err: errors.New("unmarshal SSE block event"),
},
{
name: "block event parse slot",
event: &event{
Event: sseBlockEvent,
Data: []byte(`{"slot":"invalid", "block":"0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "execution_optimistic": false}`),
Timestamp: time.Now(),
},
err: errors.New("parse slot to uint64"),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
l := &listener{
chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0),
blockSubs: make([]BlockEventHandlerFunc, 0),
slotDuration: 12 * time.Second,
slotsPerEpoch: 32,
genesisTime: time.Date(2020, 12, 1, 12, 0, 23, 0, time.UTC),
Expand Down Expand Up @@ -133,6 +161,28 @@ func TestSubscribeNotifyChainReorg(t *testing.T) {
require.Equal(t, eth2p0.Epoch(10), reportedEpochs[1])
}

func TestSubscribeNotifyBlockEvent(t *testing.T) {
ctx := t.Context()
l := &listener{
blockSubs: make([]BlockEventHandlerFunc, 0),
}

reportedSlots := make([]eth2p0.Slot, 0)

l.SubscribeBlockEvent(func(_ context.Context, slot eth2p0.Slot) {
reportedSlots = append(reportedSlots, slot)
})

l.notifyBlockEvent(ctx, eth2p0.Slot(100))
l.notifyBlockEvent(ctx, eth2p0.Slot(100)) // Duplicate should be reported (no dedup for block events)
l.notifyBlockEvent(ctx, eth2p0.Slot(101))

require.Len(t, reportedSlots, 3)
require.Equal(t, eth2p0.Slot(100), reportedSlots[0])
require.Equal(t, eth2p0.Slot(100), reportedSlots[1])
require.Equal(t, eth2p0.Slot(101), reportedSlots[2])
}

func TestComputeDelay(t *testing.T) {
genesisTimeString := "2020-12-01T12:00:23+00:00"
genesisTime, err := time.Parse(time.RFC3339, genesisTimeString)
Expand Down
134 changes: 110 additions & 24 deletions core/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type Scheduler struct {
builderEnabled bool
schedSlotFunc schedSlotFunc
epochResolved map[uint64]chan struct{} // Notification channels for epoch resolution
eventTriggeredAttestations sync.Map // Track attestation duties triggered via sse block event (map[uint64]bool)
}

// SubscribeDuties subscribes a callback function for triggered duties.
Expand Down Expand Up @@ -185,6 +186,57 @@ func (s *Scheduler) HandleChainReorgEvent(ctx context.Context, epoch eth2p0.Epoc
}
}

// triggerDuty triggers all duty subscribers with the provided duty and definition set.
func (s *Scheduler) triggerDuty(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet) {
instrumentDuty(duty, defSet)

dutyCtx := log.WithCtx(ctx, z.Any("duty", duty))
if duty.Type == core.DutyProposer {
var span trace.Span
dutyCtx, span = core.StartDutyTrace(dutyCtx, duty, "core/scheduler.scheduleSlot")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure the trace name is right .scheduleSlot..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't modify this name, it was already like that in scheduleSlot

dutyCtx, span = core.StartDutyTrace(dutyCtx, duty, "core/scheduler.scheduleSlot")
but I extracted that logic to a separate function to reuse in HandleBlockEvent
Are you suggesting changing to .triggerDuty ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No suggestion, whatever you think will be useful when browsing trace data in Grafana.

defer span.End()
}

for _, sub := range s.dutySubs {
clone, err := defSet.Clone()
if err != nil {
log.Error(dutyCtx, "Failed to clone duty definition set", err)
return
}

if err := sub(dutyCtx, duty, clone); err != nil {
log.Error(dutyCtx, "Failed to trigger duty subscriber", err)
}
}
}

// HandleBlockEvent handles block processing events from SSE and triggers early attestation data fetching.
func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot) {
if !featureset.Enabled(featureset.FetchAttOnBlock) {
return
}

duty := core.Duty{
Slot: uint64(slot),
Type: core.DutyAttester,
}
defSet, ok := s.getDutyDefinitionSet(duty)
if !ok {
// Nothing for this duty
return
}

_, alreadyTriggered := s.eventTriggeredAttestations.LoadOrStore(uint64(slot), true)
if alreadyTriggered {
return
}

log.Debug(ctx, "Early attestation data fetch triggered by SSE block event", z.U64("slot", uint64(slot)))

// Trigger duty immediately (early fetch)
go s.triggerDuty(ctx, duty, defSet)
}

// emitCoreSlot calls all slot subscriptions asynchronously with the provided slot.
func (s *Scheduler) emitCoreSlot(ctx context.Context, slot core.Slot) {
for _, sub := range s.slotSubs {
Expand Down Expand Up @@ -276,33 +328,22 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) {
}

// Trigger duty async
go func() {
if !delaySlotOffset(ctx, slot, duty, s.delayFunc) {
return // context cancelled
}

instrumentDuty(duty, defSet)

dutyCtx := log.WithCtx(ctx, z.Any("duty", duty))
if duty.Type == core.DutyProposer {
var span trace.Span

dutyCtx, span = core.StartDutyTrace(dutyCtx, duty, "core/scheduler.scheduleSlot")
defer span.End()
}

for _, sub := range s.dutySubs {
clone, err := defSet.Clone() // Clone for each subscriber.
if err != nil {
log.Error(dutyCtx, "Failed to clone duty definition set", err)
return
go func(duty core.Duty, defSet core.DutyDefinitionSet) {
// Special handling for attester duties when FetchAttOnBlock is enabled
if duty.Type == core.DutyAttester && featureset.Enabled(featureset.FetchAttOnBlock) {
if !s.waitForBlockEventOrTimeout(ctx, slot) {
return // context cancelled
}

if err := sub(dutyCtx, duty, clone); err != nil {
log.Error(dutyCtx, "Failed to trigger duty subscriber", err, z.U64("slot", slot.Slot))
_, alreadyTriggered := s.eventTriggeredAttestations.LoadOrStore(slot.Slot, true)
if alreadyTriggered {
return // already triggered via block event
}
} else if !delaySlotOffset(ctx, slot, duty, s.delayFunc) {
return // context cancelled
}
}()

s.triggerDuty(ctx, duty, defSet)
}(duty, defSet)
}

if slot.LastInEpoch() {
Expand Down Expand Up @@ -333,6 +374,27 @@ func delaySlotOffset(ctx context.Context, slot core.Slot, duty core.Duty, delayF
}
}

// waitForBlockEventOrTimeout waits until the fallback timeout (T=1/3 + 300ms) is reached.
// Returns false if the context is cancelled, true otherwise.
func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Slot) bool {
// Calculate fallback timeout: 1/3 + 300ms
fn, ok := slotOffsets[core.DutyAttester]
if !ok {
return true
}
offset := fn(slot.SlotDuration) + 300*time.Millisecond
fallbackDeadline := slot.Time.Add(offset)

select {
case <-ctx.Done():
return false
case <-s.clock.After(time.Until(fallbackDeadline)):
log.Debug(ctx, "Fallback timeout reached for attestation, no block event received, possibly fetching stale head",
z.U64("slot", slot.Slot))
return true
}
}

// resolveDuties resolves the duties for the slot's epoch, caching the results.
func (s *Scheduler) resolveDuties(ctx context.Context, slot core.Slot) error {
s.setResolvingEpoch(slot.Epoch())
Expand Down Expand Up @@ -690,6 +752,30 @@ func (s *Scheduler) trimDuties(epoch uint64) {
}

delete(s.dutiesByEpoch, epoch)

if featureset.Enabled(featureset.FetchAttOnBlock) {
s.trimEventTriggeredAttestations(epoch)
}
}

// trimEventTriggeredAttestations removes old slot entries from eventTriggeredAttestations.
func (s *Scheduler) trimEventTriggeredAttestations(epoch uint64) {
_, slotsPerEpoch, err := eth2wrap.FetchSlotsConfig(context.Background(), s.eth2Cl)
if err != nil {
return
}

minSlotToKeep := (epoch + 1) * slotsPerEpoch // first slot of next epoch
s.eventTriggeredAttestations.Range(func(key, _ any) bool {
slot, ok := key.(uint64)
if !ok {
return true // continue iteration
}
if slot < minSlotToKeep {
s.eventTriggeredAttestations.Delete(slot)
}
return true // continue iteration
})
}

// submitValidatorRegistrations submits the validator registrations for all DVs.
Expand Down
Loading
Loading