diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..7f618d5 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,3 @@ +* @samcm +* @savid +* @mattevans \ No newline at end of file diff --git a/.github/workflows/go-setup/action.yml b/.github/workflows/go-setup/action.yml new file mode 100644 index 0000000..99ff074 --- /dev/null +++ b/.github/workflows/go-setup/action.yml @@ -0,0 +1,17 @@ +name: 'Go Setup' +description: 'Sets up Go environment with caching' + +inputs: + go-version: + description: 'Go version to use' + required: false + default: '1.23.4' + +runs: + using: "composite" + steps: + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: ${{ inputs.go-version }} + cache: true \ No newline at end of file diff --git a/.github/workflows/go-test.yml b/.github/workflows/go-test.yml new file mode 100644 index 0000000..77811e8 --- /dev/null +++ b/.github/workflows/go-test.yml @@ -0,0 +1,18 @@ +name: go-test + +on: + push: + branches: + - master + pull_request: + +permissions: + contents: read + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: ./.github/workflows/go-setup + - run: go test -v -race ./... \ No newline at end of file diff --git a/beacon_chain.go b/beacon_chain.go index 4bc9300..508c641 100644 --- a/beacon_chain.go +++ b/beacon_chain.go @@ -1,6 +1,7 @@ package ethwallclock import ( + "sync" "time" ) @@ -8,11 +9,14 @@ type EthereumBeaconChain struct { slots *DefaultSlotCreator epochs *DefaultEpochCreator + mu sync.RWMutex epochChangedCallbacks []func(current Epoch) slotChangedCallbacks []func(current Slot) slotCh chan struct{} epochCh chan struct{} + stopCh chan struct{} + stopped bool } func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, slotsPerEpoch uint64) *EthereumBeaconChain { @@ -25,6 +29,8 @@ func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, sl slotCh: make(chan struct{}), epochCh: make(chan struct{}), + stopCh: make(chan struct{}), + stopped: false, } go func() { @@ -32,13 +38,30 @@ func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, sl select { case <-e.slotCh: return + case <-e.stopCh: + return default: slot := e.slots.Current() time.Sleep(time.Until(slot.TimeWindow().End())) slot = e.slots.Current() - for _, callback := range e.slotChangedCallbacks { + + // Take a read lock and copy the callbacks. + e.mu.RLock() + + if e.stopped { + e.mu.RUnlock() + + return + } + + callbacks := make([]func(current Slot), len(e.slotChangedCallbacks)) + copy(callbacks, e.slotChangedCallbacks) + e.mu.RUnlock() + + // Execute callbacks from our copy. + for _, callback := range callbacks { go callback(slot) } } @@ -50,13 +73,30 @@ func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, sl select { case <-e.epochCh: return + case <-e.stopCh: + return default: epoch := e.epochs.Current() time.Sleep(time.Until(epoch.TimeWindow().End())) epoch = e.epochs.Current() - for _, callback := range e.epochChangedCallbacks { + + // Take a read lock and copy the callbacks. + e.mu.RLock() + + if e.stopped { + e.mu.RUnlock() + + return + } + + callbacks := make([]func(current Epoch), len(e.epochChangedCallbacks)) + copy(callbacks, e.epochChangedCallbacks) + e.mu.RUnlock() + + // Execute callbacks from our copy. + for _, callback := range callbacks { go callback(epoch) } } @@ -89,16 +129,57 @@ func (e *EthereumBeaconChain) Epochs() *DefaultEpochCreator { } func (e *EthereumBeaconChain) OnEpochChanged(callback func(current Epoch)) { + e.mu.Lock() + defer e.mu.Unlock() + + if e.stopped { + return + } + e.epochChangedCallbacks = append(e.epochChangedCallbacks, callback) } func (e *EthereumBeaconChain) OnSlotChanged(callback func(current Slot)) { + e.mu.Lock() + defer e.mu.Unlock() + + if e.stopped { + return + } + e.slotChangedCallbacks = append(e.slotChangedCallbacks, callback) } func (e *EthereumBeaconChain) Stop() { - e.slotCh <- struct{}{} - e.epochCh <- struct{}{} + e.mu.Lock() + + if e.stopped { + e.mu.Unlock() + + return + } + + e.stopped = true + e.mu.Unlock() + + close(e.stopCh) + + // Send a signal to the other channels, but don't close them yet + // to avoid "send on closed channel" panics from any other goroutines. + select { + case e.slotCh <- struct{}{}: + default: + } + + select { + case e.epochCh <- struct{}{}: + default: + } + + // Small delay to allow goroutines to exit + time.Sleep(100 * time.Millisecond) + + // Now safe to close close(e.slotCh) close(e.epochCh) } diff --git a/beacon_chain_test.go b/beacon_chain_test.go index 834d39f..2b6be40 100644 --- a/beacon_chain_test.go +++ b/beacon_chain_test.go @@ -1,35 +1,125 @@ package ethwallclock import ( + "sync" "testing" "time" ) +// MetadataService mocks the metadata service we use across xatu. +type MetadataService struct { + wallclock *EthereumBeaconChain +} + +// Wallclock returns the wallclock instance (can be nil). +func (m *MetadataService) Wallclock() *EthereumBeaconChain { + return m.wallclock +} + func TestBeaconChainEventCallbacks(t *testing.T) { beacon := NewEthereumBeaconChain(time.Now(), time.Second*1, 2) t.Run("Event callbacks", func(t *testing.T) { - epochCallbacks := 0 - slotCallbacks := 0 + var ( + mu sync.Mutex + epochCallbacks int + slotCallbacks int + ) beacon.OnEpochChanged(func(epoch Epoch) { + mu.Lock() epochCallbacks++ + mu.Unlock() }) beacon.OnSlotChanged(func(slot Slot) { + mu.Lock() slotCallbacks++ + mu.Unlock() }) time.Sleep(5100 * time.Millisecond) - if epochCallbacks != 2 { - t.Errorf("incorrect number of epoch callbacks: got %v, want %v", epochCallbacks, 2) + mu.Lock() + epochCount := epochCallbacks + slotCount := slotCallbacks + mu.Unlock() + + if epochCount != 2 { + t.Errorf("incorrect number of epoch callbacks: got %v, want %v", epochCount, 2) } - if slotCallbacks != 5 { - t.Errorf("incorrect number of slot callbacks: got %v, want %v", slotCallbacks, 5) + if slotCount != 5 { + t.Errorf("incorrect number of slot callbacks: got %v, want %v", slotCount, 5) } }) beacon.Stop() } + +// TestConcurrentStopAndCallback tests that there's no race condition +// between stopping the beacon chain and registering/executing callbacks. +func TestConcurrentStopAndCallback(t *testing.T) { + beacon := NewEthereumBeaconChain(time.Now(), time.Second*1, 2) + + // Set up a sync WaitGroup to coordinate goroutines. + var wg sync.WaitGroup + + // Start multiple goroutines that try to register callbacks. + for i := 0; i < 10; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + // Try to register a callback - should not panic even if Stop is called concurrently. + beacon.OnEpochChanged(func(epoch Epoch) {}) + }(i) + } + + // Start a goroutine that stops the beacon chain. + wg.Add(1) + go func() { + defer wg.Done() + // Small delay to increase chance of concurrent execution + time.Sleep(5 * time.Millisecond) + beacon.Stop() + }() + + // Wait for all goroutines to finish. + wg.Wait() +} + +// TestNilWallclockScenario specifically tests for the panic seen in production: +// when OnEpochChanged is called on a nil receiver. +func TestNilWallclockScenario(t *testing.T) { + // Create a metadata service with a valid wallclock + metadata := &MetadataService{ + wallclock: NewEthereumBeaconChain(time.Now(), time.Second*1, 2), + } + + wc := metadata.Wallclock() + if wc == nil { + t.Fatal("Wallclock should not be nil") + } + + // Register a callback. + wc.OnEpochChanged(func(epoch Epoch) {}) + + // If the beacon chain connection fails/is-lost, the wallclock becomes nil. + // Subsequent callbacks then attempt to call the nil wallclock, which panics. + metadata.wallclock = nil + + shouldPanic := func() { + defer func() { + if r := recover(); r == nil { + t.Error("Expected panic when using nil wallclock, but no panic occurred") + } else { + t.Logf("Got expected panic: %v", r) + } + }() + + wc := metadata.Wallclock() // Get nil wallclock. + wc.OnEpochChanged(func(epoch Epoch) {}) + } + + shouldPanic() +}