From b34ba48ecef55435eed7063eed1e7179e6d17ca2 Mon Sep 17 00:00:00 2001 From: Matty Evans Date: Fri, 7 Feb 2025 14:19:16 +1000 Subject: [PATCH 1/4] refactor(beacon-chain): negate potential for race to write callbacks --- beacon_chain.go | 26 ++++++++- beacon_chain_test.go | 130 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 148 insertions(+), 8 deletions(-) diff --git a/beacon_chain.go b/beacon_chain.go index 4bc9300..21c78eb 100644 --- a/beacon_chain.go +++ b/beacon_chain.go @@ -1,6 +1,7 @@ package ethwallclock import ( + "sync" "time" ) @@ -8,6 +9,7 @@ type EthereumBeaconChain struct { slots *DefaultSlotCreator epochs *DefaultEpochCreator + mu sync.RWMutex // protects callback slices epochChangedCallbacks []func(current Epoch) slotChangedCallbacks []func(current Slot) @@ -38,7 +40,15 @@ func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, sl time.Sleep(time.Until(slot.TimeWindow().End())) slot = e.slots.Current() - for _, callback := range e.slotChangedCallbacks { + + // Take a read lock and copy the callbacks. + e.mu.RLock() + callbacks := make([]func(current Slot), len(e.slotChangedCallbacks)) + copy(callbacks, e.slotChangedCallbacks) + e.mu.RUnlock() + + // Execute callbacks from our copy. + for _, callback := range callbacks { go callback(slot) } } @@ -56,7 +66,15 @@ func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, sl time.Sleep(time.Until(epoch.TimeWindow().End())) epoch = e.epochs.Current() - for _, callback := range e.epochChangedCallbacks { + + // Take a read lock and copy the callbacks. + e.mu.RLock() + callbacks := make([]func(current Epoch), len(e.epochChangedCallbacks)) + copy(callbacks, e.epochChangedCallbacks) + e.mu.RUnlock() + + // Execute callbacks from our copy. + for _, callback := range callbacks { go callback(epoch) } } @@ -89,11 +107,15 @@ func (e *EthereumBeaconChain) Epochs() *DefaultEpochCreator { } func (e *EthereumBeaconChain) OnEpochChanged(callback func(current Epoch)) { + e.mu.Lock() e.epochChangedCallbacks = append(e.epochChangedCallbacks, callback) + e.mu.Unlock() } func (e *EthereumBeaconChain) OnSlotChanged(callback func(current Slot)) { + e.mu.Lock() e.slotChangedCallbacks = append(e.slotChangedCallbacks, callback) + e.mu.Unlock() } func (e *EthereumBeaconChain) Stop() { diff --git a/beacon_chain_test.go b/beacon_chain_test.go index 834d39f..bd82b8e 100644 --- a/beacon_chain_test.go +++ b/beacon_chain_test.go @@ -1,6 +1,7 @@ package ethwallclock import ( + "sync" "testing" "time" ) @@ -9,27 +10,144 @@ func TestBeaconChainEventCallbacks(t *testing.T) { beacon := NewEthereumBeaconChain(time.Now(), time.Second*1, 2) t.Run("Event callbacks", func(t *testing.T) { - epochCallbacks := 0 - slotCallbacks := 0 + var ( + mu sync.Mutex + epochCallbacks int + slotCallbacks int + ) beacon.OnEpochChanged(func(epoch Epoch) { + mu.Lock() epochCallbacks++ + mu.Unlock() }) beacon.OnSlotChanged(func(slot Slot) { + mu.Lock() slotCallbacks++ + mu.Unlock() }) time.Sleep(5100 * time.Millisecond) - if epochCallbacks != 2 { - t.Errorf("incorrect number of epoch callbacks: got %v, want %v", epochCallbacks, 2) + mu.Lock() + epochCount := epochCallbacks + slotCount := slotCallbacks + mu.Unlock() + + if epochCount != 2 { + t.Errorf("incorrect number of epoch callbacks: got %v, want %v", epochCount, 2) } - if slotCallbacks != 5 { - t.Errorf("incorrect number of slot callbacks: got %v, want %v", slotCallbacks, 5) + if slotCount != 5 { + t.Errorf("incorrect number of slot callbacks: got %v, want %v", slotCount, 5) } }) beacon.Stop() } + +func TestConcurrentCallbackRegistration(t *testing.T) { + // This test verifies two potential race conditions: + // 1. Concurrent registration of callbacks (modifying the callback slices) + // 2. Concurrent execution of callbacks (callbacks running in parallel) + + const ( + numCallbacks = 100 + slotDuration = 10 * time.Millisecond + testDuration = 200 * time.Millisecond + ) + + beacon := NewEthereumBeaconChain(time.Now(), slotDuration, 2) + defer beacon.Stop() + + var ( + wg sync.WaitGroup + mu sync.Mutex + callbacksExecuted int + callbacksRegistered int + ) + + // Concurrently register callbacks while slots are ticking + for i := 0; i < numCallbacks; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + beacon.OnSlotChanged(func(slot Slot) { + // Track both registration and execution + mu.Lock() + callbacksExecuted++ + mu.Unlock() + }) + + mu.Lock() + callbacksRegistered++ + mu.Unlock() + }() + } + + // Let the test run for a fixed duration + time.Sleep(testDuration) + + // Wait for all registration goroutines to complete + wg.Wait() + + // Check results + mu.Lock() + registered := callbacksRegistered + executed := callbacksExecuted + mu.Unlock() + + // Verify all callbacks were registered + if registered != numCallbacks { + t.Errorf("not all callbacks were registered: got %d, want %d", registered, numCallbacks) + } + + // Verify callbacks were actually executed + // We should have at least some executions given our test duration and slot duration + expectedMinExecutions := int(testDuration/slotDuration) * numCallbacks / 2 + if executed < expectedMinExecutions { + t.Errorf("too few callback executions: got %d, want at least %d", executed, expectedMinExecutions) + } + + t.Logf("Test completed: registered %d callbacks, executed %d times", registered, executed) +} + +/* +BenchmarkCallbackRegistration-10 8334013 217.0 ns/op 65 B/op 0 allocs/op +BenchmarkCallbackRegistration-10 7444645 258.7 ns/op 77 B/op 0 allocs/op +BenchmarkCallbackRegistration-10 7049173 269.3 ns/op 80 B/op 0 allocs/op +BenchmarkCallbackRegistration-10 7852233 230.5 ns/op 70 B/op 0 allocs/op +BenchmarkCallbackRegistration-10 7653772 387.5 ns/op 93 B/op 1 allocs/op +*/ +func BenchmarkCallbackRegistration(b *testing.B) { + beacon := NewEthereumBeaconChain(time.Now(), time.Millisecond, 2) + defer beacon.Stop() + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + beacon.OnSlotChanged(func(slot Slot) {}) + } + }) +} + +/* +BenchmarkCallbackRegistrationSequential-10 19174398 191.2 ns/op 66 B/op 0 allocs/op +BenchmarkCallbackRegistrationSequential-10 22953746 159.0 ns/op 65 B/op 0 allocs/op +BenchmarkCallbackRegistrationSequential-10 17665375 221.3 ns/op 74 B/op 0 allocs/op +BenchmarkCallbackRegistrationSequential-10 16663228 234.8 ns/op 80 B/op 0 allocs/op +BenchmarkCallbackRegistrationSequential-10 22371304 187.7 ns/op 69 B/op 0 allocs/op +*/ +func BenchmarkCallbackRegistrationSequential(b *testing.B) { + beacon := NewEthereumBeaconChain(time.Now(), time.Millisecond, 2) + defer beacon.Stop() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + beacon.OnSlotChanged(func(slot Slot) {}) + } +} From bb6e5d4bee850a8a1c38f4094ef1301f974a4ed8 Mon Sep 17 00:00:00 2001 From: Matty Evans Date: Fri, 7 Feb 2025 14:20:49 +1000 Subject: [PATCH 2/4] feat: Add CODEOWNERS file and Go workflow files --- .github/CODEOWNERS | 3 +++ .github/workflows/go-setup/action.yml | 17 +++++++++++++++++ .github/workflows/go-test.yml | 18 ++++++++++++++++++ 3 files changed, 38 insertions(+) create mode 100644 .github/CODEOWNERS create mode 100644 .github/workflows/go-setup/action.yml create mode 100644 .github/workflows/go-test.yml diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..7f618d5 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,3 @@ +* @samcm +* @savid +* @mattevans \ No newline at end of file diff --git a/.github/workflows/go-setup/action.yml b/.github/workflows/go-setup/action.yml new file mode 100644 index 0000000..99ff074 --- /dev/null +++ b/.github/workflows/go-setup/action.yml @@ -0,0 +1,17 @@ +name: 'Go Setup' +description: 'Sets up Go environment with caching' + +inputs: + go-version: + description: 'Go version to use' + required: false + default: '1.23.4' + +runs: + using: "composite" + steps: + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: ${{ inputs.go-version }} + cache: true \ No newline at end of file diff --git a/.github/workflows/go-test.yml b/.github/workflows/go-test.yml new file mode 100644 index 0000000..77811e8 --- /dev/null +++ b/.github/workflows/go-test.yml @@ -0,0 +1,18 @@ +name: go-test + +on: + push: + branches: + - master + pull_request: + +permissions: + contents: read + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: ./.github/workflows/go-setup + - run: go test -v -race ./... \ No newline at end of file From 37b96a3a4666394a6ab8f38b6827debd14e0efe0 Mon Sep 17 00:00:00 2001 From: Matty Evans Date: Fri, 7 Feb 2025 14:31:18 +1000 Subject: [PATCH 3/4] refactor: Remove unnecessary comments and white space --- beacon_chain.go | 2 +- beacon_chain_test.go | 105 ------------------------------------------- 2 files changed, 1 insertion(+), 106 deletions(-) diff --git a/beacon_chain.go b/beacon_chain.go index 21c78eb..b8d1561 100644 --- a/beacon_chain.go +++ b/beacon_chain.go @@ -9,7 +9,7 @@ type EthereumBeaconChain struct { slots *DefaultSlotCreator epochs *DefaultEpochCreator - mu sync.RWMutex // protects callback slices + mu sync.RWMutex epochChangedCallbacks []func(current Epoch) slotChangedCallbacks []func(current Slot) diff --git a/beacon_chain_test.go b/beacon_chain_test.go index bd82b8e..f1569e3 100644 --- a/beacon_chain_test.go +++ b/beacon_chain_test.go @@ -46,108 +46,3 @@ func TestBeaconChainEventCallbacks(t *testing.T) { beacon.Stop() } - -func TestConcurrentCallbackRegistration(t *testing.T) { - // This test verifies two potential race conditions: - // 1. Concurrent registration of callbacks (modifying the callback slices) - // 2. Concurrent execution of callbacks (callbacks running in parallel) - - const ( - numCallbacks = 100 - slotDuration = 10 * time.Millisecond - testDuration = 200 * time.Millisecond - ) - - beacon := NewEthereumBeaconChain(time.Now(), slotDuration, 2) - defer beacon.Stop() - - var ( - wg sync.WaitGroup - mu sync.Mutex - callbacksExecuted int - callbacksRegistered int - ) - - // Concurrently register callbacks while slots are ticking - for i := 0; i < numCallbacks; i++ { - wg.Add(1) - go func() { - defer wg.Done() - - beacon.OnSlotChanged(func(slot Slot) { - // Track both registration and execution - mu.Lock() - callbacksExecuted++ - mu.Unlock() - }) - - mu.Lock() - callbacksRegistered++ - mu.Unlock() - }() - } - - // Let the test run for a fixed duration - time.Sleep(testDuration) - - // Wait for all registration goroutines to complete - wg.Wait() - - // Check results - mu.Lock() - registered := callbacksRegistered - executed := callbacksExecuted - mu.Unlock() - - // Verify all callbacks were registered - if registered != numCallbacks { - t.Errorf("not all callbacks were registered: got %d, want %d", registered, numCallbacks) - } - - // Verify callbacks were actually executed - // We should have at least some executions given our test duration and slot duration - expectedMinExecutions := int(testDuration/slotDuration) * numCallbacks / 2 - if executed < expectedMinExecutions { - t.Errorf("too few callback executions: got %d, want at least %d", executed, expectedMinExecutions) - } - - t.Logf("Test completed: registered %d callbacks, executed %d times", registered, executed) -} - -/* -BenchmarkCallbackRegistration-10 8334013 217.0 ns/op 65 B/op 0 allocs/op -BenchmarkCallbackRegistration-10 7444645 258.7 ns/op 77 B/op 0 allocs/op -BenchmarkCallbackRegistration-10 7049173 269.3 ns/op 80 B/op 0 allocs/op -BenchmarkCallbackRegistration-10 7852233 230.5 ns/op 70 B/op 0 allocs/op -BenchmarkCallbackRegistration-10 7653772 387.5 ns/op 93 B/op 1 allocs/op -*/ -func BenchmarkCallbackRegistration(b *testing.B) { - beacon := NewEthereumBeaconChain(time.Now(), time.Millisecond, 2) - defer beacon.Stop() - - b.ResetTimer() - - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - beacon.OnSlotChanged(func(slot Slot) {}) - } - }) -} - -/* -BenchmarkCallbackRegistrationSequential-10 19174398 191.2 ns/op 66 B/op 0 allocs/op -BenchmarkCallbackRegistrationSequential-10 22953746 159.0 ns/op 65 B/op 0 allocs/op -BenchmarkCallbackRegistrationSequential-10 17665375 221.3 ns/op 74 B/op 0 allocs/op -BenchmarkCallbackRegistrationSequential-10 16663228 234.8 ns/op 80 B/op 0 allocs/op -BenchmarkCallbackRegistrationSequential-10 22371304 187.7 ns/op 69 B/op 0 allocs/op -*/ -func BenchmarkCallbackRegistrationSequential(b *testing.B) { - beacon := NewEthereumBeaconChain(time.Now(), time.Millisecond, 2) - defer beacon.Stop() - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - beacon.OnSlotChanged(func(slot Slot) {}) - } -} From 38456f7da8be84fb8457508c06d24cabdb9f1962 Mon Sep 17 00:00:00 2001 From: Matty Evans Date: Wed, 23 Apr 2025 15:40:36 +1000 Subject: [PATCH 4/4] feat(beacon_chain): add stop mechanism to EthereumBeaconChain test(beacon_chain): add tests for concurrent stop and nil wallclock scenarios Adds a `Stop` method to the `EthereumBeaconChain` to allow for graceful shutdown of the background goroutines. This prevents goroutine leaks when the beacon chain is no longer needed. Adds tests to ensure that calling `Stop` concurrently with callback registration does not cause a race condition or panic. Adds a test to specifically address a production issue where calling `OnEpochChanged` on a nil `EthereumBeaconChain` receiver caused a panic. This test simulates the scenario where the wallclock becomes nil and verifies that the expected panic occurs, highlighting the need for nil checks before calling methods on the wallclock. --- beacon_chain.go | 67 +++++++++++++++++++++++++++++++++++--- beacon_chain_test.go | 77 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+), 4 deletions(-) diff --git a/beacon_chain.go b/beacon_chain.go index b8d1561..508c641 100644 --- a/beacon_chain.go +++ b/beacon_chain.go @@ -15,6 +15,8 @@ type EthereumBeaconChain struct { slotCh chan struct{} epochCh chan struct{} + stopCh chan struct{} + stopped bool } func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, slotsPerEpoch uint64) *EthereumBeaconChain { @@ -27,6 +29,8 @@ func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, sl slotCh: make(chan struct{}), epochCh: make(chan struct{}), + stopCh: make(chan struct{}), + stopped: false, } go func() { @@ -34,6 +38,8 @@ func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, sl select { case <-e.slotCh: return + case <-e.stopCh: + return default: slot := e.slots.Current() @@ -43,6 +49,13 @@ func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, sl // Take a read lock and copy the callbacks. e.mu.RLock() + + if e.stopped { + e.mu.RUnlock() + + return + } + callbacks := make([]func(current Slot), len(e.slotChangedCallbacks)) copy(callbacks, e.slotChangedCallbacks) e.mu.RUnlock() @@ -60,6 +73,8 @@ func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, sl select { case <-e.epochCh: return + case <-e.stopCh: + return default: epoch := e.epochs.Current() @@ -69,6 +84,13 @@ func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, sl // Take a read lock and copy the callbacks. e.mu.RLock() + + if e.stopped { + e.mu.RUnlock() + + return + } + callbacks := make([]func(current Epoch), len(e.epochChangedCallbacks)) copy(callbacks, e.epochChangedCallbacks) e.mu.RUnlock() @@ -108,19 +130,56 @@ func (e *EthereumBeaconChain) Epochs() *DefaultEpochCreator { func (e *EthereumBeaconChain) OnEpochChanged(callback func(current Epoch)) { e.mu.Lock() + defer e.mu.Unlock() + + if e.stopped { + return + } + e.epochChangedCallbacks = append(e.epochChangedCallbacks, callback) - e.mu.Unlock() } func (e *EthereumBeaconChain) OnSlotChanged(callback func(current Slot)) { e.mu.Lock() + defer e.mu.Unlock() + + if e.stopped { + return + } + e.slotChangedCallbacks = append(e.slotChangedCallbacks, callback) - e.mu.Unlock() } func (e *EthereumBeaconChain) Stop() { - e.slotCh <- struct{}{} - e.epochCh <- struct{}{} + e.mu.Lock() + + if e.stopped { + e.mu.Unlock() + + return + } + + e.stopped = true + e.mu.Unlock() + + close(e.stopCh) + + // Send a signal to the other channels, but don't close them yet + // to avoid "send on closed channel" panics from any other goroutines. + select { + case e.slotCh <- struct{}{}: + default: + } + + select { + case e.epochCh <- struct{}{}: + default: + } + + // Small delay to allow goroutines to exit + time.Sleep(100 * time.Millisecond) + + // Now safe to close close(e.slotCh) close(e.epochCh) } diff --git a/beacon_chain_test.go b/beacon_chain_test.go index f1569e3..2b6be40 100644 --- a/beacon_chain_test.go +++ b/beacon_chain_test.go @@ -6,6 +6,16 @@ import ( "time" ) +// MetadataService mocks the metadata service we use across xatu. +type MetadataService struct { + wallclock *EthereumBeaconChain +} + +// Wallclock returns the wallclock instance (can be nil). +func (m *MetadataService) Wallclock() *EthereumBeaconChain { + return m.wallclock +} + func TestBeaconChainEventCallbacks(t *testing.T) { beacon := NewEthereumBeaconChain(time.Now(), time.Second*1, 2) @@ -46,3 +56,70 @@ func TestBeaconChainEventCallbacks(t *testing.T) { beacon.Stop() } + +// TestConcurrentStopAndCallback tests that there's no race condition +// between stopping the beacon chain and registering/executing callbacks. +func TestConcurrentStopAndCallback(t *testing.T) { + beacon := NewEthereumBeaconChain(time.Now(), time.Second*1, 2) + + // Set up a sync WaitGroup to coordinate goroutines. + var wg sync.WaitGroup + + // Start multiple goroutines that try to register callbacks. + for i := 0; i < 10; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + // Try to register a callback - should not panic even if Stop is called concurrently. + beacon.OnEpochChanged(func(epoch Epoch) {}) + }(i) + } + + // Start a goroutine that stops the beacon chain. + wg.Add(1) + go func() { + defer wg.Done() + // Small delay to increase chance of concurrent execution + time.Sleep(5 * time.Millisecond) + beacon.Stop() + }() + + // Wait for all goroutines to finish. + wg.Wait() +} + +// TestNilWallclockScenario specifically tests for the panic seen in production: +// when OnEpochChanged is called on a nil receiver. +func TestNilWallclockScenario(t *testing.T) { + // Create a metadata service with a valid wallclock + metadata := &MetadataService{ + wallclock: NewEthereumBeaconChain(time.Now(), time.Second*1, 2), + } + + wc := metadata.Wallclock() + if wc == nil { + t.Fatal("Wallclock should not be nil") + } + + // Register a callback. + wc.OnEpochChanged(func(epoch Epoch) {}) + + // If the beacon chain connection fails/is-lost, the wallclock becomes nil. + // Subsequent callbacks then attempt to call the nil wallclock, which panics. + metadata.wallclock = nil + + shouldPanic := func() { + defer func() { + if r := recover(); r == nil { + t.Error("Expected panic when using nil wallclock, but no panic occurred") + } else { + t.Logf("Got expected panic: %v", r) + } + }() + + wc := metadata.Wallclock() // Get nil wallclock. + wc.OnEpochChanged(func(epoch Epoch) {}) + } + + shouldPanic() +}