diff --git a/src/lean_spec/subspecs/node/node.py b/src/lean_spec/subspecs/node/node.py index 83455879..978cc9ef 100644 --- a/src/lean_spec/subspecs/node/node.py +++ b/src/lean_spec/subspecs/node/node.py @@ -29,6 +29,7 @@ from lean_spec.subspecs.networking import NetworkEventSource, NetworkService from lean_spec.subspecs.ssz.hash import hash_tree_root from lean_spec.subspecs.sync import BlockCache, NetworkRequester, PeerManager, SyncService +from lean_spec.subspecs.validator import ValidatorRegistry, ValidatorService from lean_spec.types import Bytes32, Uint64 if TYPE_CHECKING: @@ -71,6 +72,17 @@ class NodeConfig: Use \":memory:\" for in-memory database (testing only). """ + validator_registry: ValidatorRegistry | None = field(default=None) + """ + Optional validator registry with secret keys. + + If provided, the node will participate in consensus by: + - Proposing blocks when scheduled + - Creating attestations every slot + + If None, the node runs in passive mode (sync only). + """ + @dataclass(slots=True) class Node: @@ -99,6 +111,9 @@ class Node: api_server: ApiServer | None = field(default=None) """Optional API server for checkpoint sync and status endpoints.""" + validator_service: ValidatorService | None = field(default=None) + """Optional validator service for block/attestation production.""" + _shutdown: asyncio.Event = field(default_factory=asyncio.Event) """Event signaling shutdown request.""" @@ -195,6 +210,18 @@ def from_genesis(cls, config: NodeConfig) -> Node: store_getter=lambda: sync_service.store, ) + # Create validator service if registry provided. + # + # Validators need keys to sign blocks and attestations. + # Without a registry, the node runs in passive mode. + validator_service: ValidatorService | None = None + if config.validator_registry is not None: + validator_service = ValidatorService( + sync_service=sync_service, + clock=clock, + registry=config.validator_registry, + ) + return cls( store=store, clock=clock, @@ -202,6 +229,7 @@ def from_genesis(cls, config: NodeConfig) -> Node: chain_service=chain_service, network_service=network_service, api_server=api_server, + validator_service=validator_service, ) @staticmethod @@ -297,6 +325,8 @@ async def run(self, *, install_signal_handlers: bool = True) -> None: tg.create_task(self.network_service.run()) if self.api_server is not None: tg.create_task(self.api_server.run()) + if self.validator_service is not None: + tg.create_task(self.validator_service.run()) tg.create_task(self._wait_shutdown()) def _install_signal_handlers(self) -> None: @@ -332,6 +362,8 @@ async def _wait_shutdown(self) -> None: self.network_service.stop() if self.api_server is not None: self.api_server.stop() + if self.validator_service is not None: + self.validator_service.stop() def stop(self) -> None: """ diff --git a/src/lean_spec/subspecs/validator/__init__.py b/src/lean_spec/subspecs/validator/__init__.py new file mode 100644 index 00000000..81967679 --- /dev/null +++ b/src/lean_spec/subspecs/validator/__init__.py @@ -0,0 +1,9 @@ +"""Validator service module for producing blocks and attestations.""" + +from .registry import ValidatorRegistry +from .service import ValidatorService + +__all__ = [ + "ValidatorService", + "ValidatorRegistry", +] diff --git a/src/lean_spec/subspecs/validator/registry.py b/src/lean_spec/subspecs/validator/registry.py new file mode 100644 index 00000000..19a689c2 --- /dev/null +++ b/src/lean_spec/subspecs/validator/registry.py @@ -0,0 +1,197 @@ +""" +Validator registry for managing validator keys. + +Loads validator keys from JSON configuration files. + +The registry supports two configuration files: + +1. **validators.json** - Maps node IDs to validator indices: + ```json + { + "node_0": [0, 1], + "node_1": [2] + } + ``` + +2. **validator-keys-manifest.json** - Contains key metadata and paths: + ```json + { + "num_validators": 3, + "validators": [ + {"index": 0, "pubkey_hex": "0xe2a03c...", "privkey_file": "validator_0_sk.ssz"} + ] + } + ``` +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from pathlib import Path + +from lean_spec.subspecs.xmss import SecretKey +from lean_spec.types import Uint64 + + +@dataclass(frozen=True, slots=True) +class ValidatorEntry: + """ + A single validator's key material. + + Holds both the index and the secret key needed for signing. + """ + + index: Uint64 + """Validator index in the registry.""" + + secret_key: SecretKey + """XMSS secret key for signing.""" + + +@dataclass(slots=True) +class ValidatorRegistry: + """ + Registry of validator keys controlled by this node. + + The registry holds secret keys for validators assigned to this node. + It provides lookup by validator index for signing operations. + """ + + _validators: dict[Uint64, ValidatorEntry] = field(default_factory=dict) + """Map from validator index to entry.""" + + def add(self, entry: ValidatorEntry) -> None: + """ + Add a validator entry to the registry. + + Args: + entry: Validator entry to add. + """ + self._validators[entry.index] = entry + + def get(self, index: Uint64) -> ValidatorEntry | None: + """ + Get validator entry by index. + + Args: + index: Validator index to look up. + + Returns: + Validator entry if found, None otherwise. + """ + return self._validators.get(index) + + def has(self, index: Uint64) -> bool: + """ + Check if we control this validator. + + Args: + index: Validator index to check. + + Returns: + True if we have keys for this validator. + """ + return index in self._validators + + def indices(self) -> list[Uint64]: + """ + Get all validator indices we control. + + Returns: + List of validator indices. + """ + return list(self._validators.keys()) + + def __len__(self) -> int: + """Number of validators in the registry.""" + return len(self._validators) + + @classmethod + def from_json( + cls, + node_id: str, + validators_path: Path | str, + manifest_path: Path | str, + ) -> ValidatorRegistry: + """ + Load validator registry from JSON configuration files. + + The loading process: + 1. Read validators.json to find indices assigned to this node + 2. Read manifest to get key file paths + 3. Load secret keys from SSZ files + + Args: + node_id: Identifier for this node in validators.json. + validators_path: Path to validators.json. + manifest_path: Path to validator-keys-manifest.json. + + Returns: + Registry populated with validator keys for this node. + """ + validators_path = Path(validators_path) + manifest_path = Path(manifest_path) + + # Load node-to-validator mapping. + with validators_path.open() as f: + validators_config = json.load(f) + + # Get indices assigned to this node. + # + # If node not in config, return empty registry. + assigned_indices = validators_config.get(node_id, []) + if not assigned_indices: + return cls() + + # Load manifest with key metadata. + with manifest_path.open() as f: + manifest = json.load(f) + + # Build index-to-entry lookup from manifest. + manifest_entries = {v["index"]: v for v in manifest.get("validators", [])} + + # Load keys for assigned validators. + registry = cls() + manifest_dir = manifest_path.parent + + for index in assigned_indices: + entry = manifest_entries.get(index) + if entry is None: + continue + + # Load secret key from SSZ file. + privkey_file = manifest_dir / entry["privkey_file"] + secret_key = SecretKey.decode_bytes(privkey_file.read_bytes()) + + registry.add( + ValidatorEntry( + index=Uint64(index), + secret_key=secret_key, + ) + ) + + return registry + + @classmethod + def from_secret_keys(cls, keys: dict[int, SecretKey]) -> ValidatorRegistry: + """ + Create registry from a dictionary of secret keys. + + Convenience method for testing or programmatic key loading. + + Args: + keys: Mapping from validator index to secret key. + + Returns: + Registry populated with provided keys. + """ + registry = cls() + for index, secret_key in keys.items(): + registry.add( + ValidatorEntry( + index=Uint64(index), + secret_key=secret_key, + ) + ) + return registry diff --git a/src/lean_spec/subspecs/validator/service.py b/src/lean_spec/subspecs/validator/service.py new file mode 100644 index 00000000..2d8e277b --- /dev/null +++ b/src/lean_spec/subspecs/validator/service.py @@ -0,0 +1,377 @@ +""" +Validator service for producing blocks and attestations. + +The Validator Problem +--------------------- +Ethereum consensus requires active participation from validators. +At specific intervals within each slot, validators must: + +- Interval 0: Propose blocks (if scheduled) +- Interval 1: Create attestations + +This service drives validator duties by monitoring the slot clock +and triggering production at the appropriate intervals. + +How It Works +------------ +1. Sleep until next interval boundary +2. Check if any validator we control has duties +3. For interval 0: Check proposer schedule, produce block if our turn +4. For interval 1: Produce attestations for all our validators +5. Emit produced blocks/attestations via callbacks +6. Repeat forever +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Awaitable, Callable +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +from lean_spec.subspecs.chain.clock import SlotClock +from lean_spec.subspecs.chain.config import SECONDS_PER_INTERVAL +from lean_spec.subspecs.containers import ( + Attestation, + AttestationData, + Block, + SignedAttestation, + SignedBlockWithAttestation, +) +from lean_spec.subspecs.containers.block import ( + AttestationSignatures, + BlockSignatures, + BlockWithAttestation, +) +from lean_spec.subspecs.containers.slot import Slot +from lean_spec.subspecs.xmss import TARGET_SIGNATURE_SCHEME +from lean_spec.types import Uint64 +from lean_spec.types.validator import is_proposer + +from .registry import ValidatorRegistry + +if TYPE_CHECKING: + from lean_spec.subspecs.sync import SyncService + + +# Callback types for publishing produced blocks and attestations. +BlockPublisher = Callable[[SignedBlockWithAttestation], Awaitable[None]] +"""Callback for publishing produced blocks.""" +AttestationPublisher = Callable[[SignedAttestation], Awaitable[None]] +"""Callback for publishing produced attestations.""" + + +async def _noop_block_publisher(block: SignedBlockWithAttestation) -> None: # noqa: ARG001 + """Default no-op block publisher.""" + + +async def _noop_attestation_publisher(attestation: SignedAttestation) -> None: # noqa: ARG001 + """Default no-op attestation publisher.""" + + +@dataclass(slots=True) +class ValidatorService: + """ + Drives validator duties based on the slot clock. + + The service: + - monitors interval boundaries + - triggers block production or attestation creation when our validators are scheduled. + """ + + sync_service: SyncService + """Sync service providing access to the forkchoice store.""" + + clock: SlotClock + """Slot clock for time calculation.""" + + registry: ValidatorRegistry + """Registry of validators we control.""" + + on_block: BlockPublisher = field(default=_noop_block_publisher) + """Callback invoked when a block is produced.""" + + on_attestation: AttestationPublisher = field(default=_noop_attestation_publisher) + """Callback invoked when an attestation is produced.""" + + _running: bool = field(default=False, repr=False) + """Whether the service is running.""" + + _blocks_produced: int = field(default=0, repr=False) + """Counter for produced blocks.""" + + _attestations_produced: int = field(default=0, repr=False) + """Counter for produced attestations.""" + + async def run(self) -> None: + """ + Main loop - check duties every interval. + + The loop: + 1. Sleeps until the next interval boundary + 2. Checks current interval within the slot + 3. Triggers appropriate duties + 4. Repeats until stopped + """ + self._running = True + + while self._running: + # Sleep until next interval boundary for precise timing. + await self._sleep_until_next_interval() + + # Skip if we have no validators to manage. + if len(self.registry) == 0: + continue + + # Get current slot and interval. + # + # Interval determines which duty type to check: + # - Interval 0: Block production + # - Interval 1: Attestation production + slot = self.clock.current_slot() + interval = self.clock.current_interval() + + if interval == 0: + # Block production interval. + # + # Check if any of our validators is the proposer. + await self._maybe_produce_block(slot) + + elif interval == 1: + # Attestation interval. + # + # All validators should attest to current head. + await self._produce_attestations(slot) + + async def _maybe_produce_block(self, slot: Slot) -> None: + """ + Produce a block if we are the proposer for this slot. + + Checks the proposer schedule against our validator registry. + If one of our validators should propose, produces and emits the block. + + Args: + slot: Current slot number. + """ + store = self.sync_service.store + head_state = store.states.get(store.head) + if head_state is None: + return + + num_validators = Uint64(len(head_state.validators)) + + # Check each validator we control. + # + # Only one validator can be the proposer per slot. + for validator_index in self.registry.indices(): + if not is_proposer(validator_index, slot, num_validators): + continue + + # We are the proposer. + # + # Produce the block using Store's production method. + try: + new_store, block, signatures = store.produce_block_with_signatures( + slot=slot, + validator_index=validator_index, + ) + + # Update the store through sync service. + # + # This ensures the block is integrated into forkchoice. + self.sync_service.store = new_store + + # Create signed block wrapper for publishing. + signed_block = self._sign_block(block, validator_index, signatures) + self._blocks_produced += 1 + + # Emit the block for network propagation. + await self.on_block(signed_block) + + except AssertionError: + # Proposer validation failed. + # + # This can happen during slot boundary transitions. + pass + + # Only one proposer per slot. + break + + async def _produce_attestations(self, slot: Slot) -> None: + """ + Produce attestations for all validators we control. + + Every validator should attest once per slot. + + Args: + slot: Current slot number. + """ + store = self.sync_service.store + + for validator_index in self.registry.indices(): + # Produce attestation data using Store's method. + # + # This calculates head, target, and source checkpoints. + attestation_data = store.produce_attestation_data(slot) + + # Sign the attestation using our secret key. + signed_attestation = self._sign_attestation(attestation_data, validator_index) + self._attestations_produced += 1 + + # Emit the attestation for network propagation. + await self.on_attestation(signed_attestation) + + def _sign_block( + self, + block: Block, + validator_index: Uint64, + attestation_signatures: list, + ) -> SignedBlockWithAttestation: + """ + Sign a block and wrap it for publishing. + + Creates the proposer attestation, signs it, and wraps everything + in SignedBlockWithAttestation. + + Args: + block: The block to sign. + validator_index: Index of the proposing validator. + attestation_signatures: Aggregated signatures for included attestations. + + Returns: + Signed block ready for publishing. + """ + store = self.sync_service.store + + # Create the proposer's attestation for this slot. + # + # The proposer also attests to the chain head they see. + proposer_attestation_data = store.produce_attestation_data(block.slot) + proposer_attestation = Attestation( + validator_id=validator_index, + data=proposer_attestation_data, + ) + + # Sign the proposer's attestation. + # + # Uses XMSS signature scheme from the validator's secret key. + entry = self.registry.get(validator_index) + if entry is None: + raise ValueError(f"No secret key for validator {validator_index}") + + message_bytes = proposer_attestation_data.data_root_bytes() + proposer_signature = TARGET_SIGNATURE_SCHEME.sign( + entry.secret_key, + block.slot, + bytes(message_bytes), + ) + + # Create the message wrapper. + # + # Bundles the block with the proposer's attestation. + message = BlockWithAttestation( + block=block, + proposer_attestation=proposer_attestation, + ) + + # Create the signature payload. + # + # Contains signatures for all included attestations plus the proposer's. + signature = BlockSignatures( + attestation_signatures=AttestationSignatures(data=attestation_signatures), + proposer_signature=proposer_signature, + ) + + return SignedBlockWithAttestation( + message=message, + signature=signature, + ) + + def _sign_attestation( + self, + attestation_data: AttestationData, + validator_index: Uint64, + ) -> SignedAttestation: + """ + Sign an attestation for publishing. + + Uses XMSS signature scheme with the validator's secret key. + + Args: + attestation_data: The attestation data to sign. + validator_index: Index of the attesting validator. + + Returns: + Signed attestation ready for publishing. + """ + # Get the secret key for this validator. + entry = self.registry.get(validator_index) + if entry is None: + raise ValueError(f"No secret key for validator {validator_index}") + + # Sign the attestation data root. + # + # Uses XMSS one-time signature for the current epoch (slot). + message_bytes = attestation_data.data_root_bytes() + signature = TARGET_SIGNATURE_SCHEME.sign( + entry.secret_key, + attestation_data.slot, + bytes(message_bytes), + ) + + return SignedAttestation( + validator_id=validator_index, + message=attestation_data, + signature=signature, + ) + + async def _sleep_until_next_interval(self) -> None: + """ + Sleep until the next interval boundary. + + Calculates the precise sleep duration to wake up at the start + of the next interval. + """ + now = self.clock._time_fn() + genesis = int(self.clock.genesis_time) + + elapsed = now - genesis + + if elapsed < 0: + # Before genesis - sleep until genesis. + await asyncio.sleep(-elapsed) + return + + # Current interval number. + current_interval = int(elapsed // int(SECONDS_PER_INTERVAL)) + + # Next interval boundary. + next_boundary = genesis + (current_interval + 1) * int(SECONDS_PER_INTERVAL) + + # Sleep until boundary. + sleep_time = max(0.0, next_boundary - now) + await asyncio.sleep(sleep_time) + + def stop(self) -> None: + """ + Stop the service. + + Sets the running flag to False, causing the run() loop to exit + after completing its current sleep cycle. + """ + self._running = False + + @property + def is_running(self) -> bool: + """Check if the service is currently running.""" + return self._running + + @property + def blocks_produced(self) -> int: + """Total blocks produced since creation.""" + return self._blocks_produced + + @property + def attestations_produced(self) -> int: + """Total attestations produced since creation.""" + return self._attestations_produced diff --git a/tests/lean_spec/subspecs/validator/__init__.py b/tests/lean_spec/subspecs/validator/__init__.py new file mode 100644 index 00000000..f526771e --- /dev/null +++ b/tests/lean_spec/subspecs/validator/__init__.py @@ -0,0 +1 @@ +"""Tests for validator module.""" diff --git a/tests/lean_spec/subspecs/validator/test_registry.py b/tests/lean_spec/subspecs/validator/test_registry.py new file mode 100644 index 00000000..4f2767bd --- /dev/null +++ b/tests/lean_spec/subspecs/validator/test_registry.py @@ -0,0 +1,179 @@ +"""Tests for ValidatorRegistry.""" + +from __future__ import annotations + +import json +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from lean_spec.subspecs.validator import ValidatorRegistry +from lean_spec.subspecs.validator.registry import ValidatorEntry +from lean_spec.types import Uint64 + + +class TestValidatorEntry: + """Tests for ValidatorEntry.""" + + def test_entry_is_frozen(self) -> None: + """ValidatorEntry is immutable.""" + mock_key = MagicMock() + entry = ValidatorEntry(index=Uint64(0), secret_key=mock_key) + + with pytest.raises(AttributeError): + entry.index = Uint64(1) # type: ignore[misc] + + +class TestValidatorRegistry: + """Tests for ValidatorRegistry.""" + + def test_empty_registry(self) -> None: + """New registry is empty.""" + registry = ValidatorRegistry() + assert len(registry) == 0 + assert registry.indices() == [] + + def test_add_and_get(self) -> None: + """Entry can be added and retrieved.""" + registry = ValidatorRegistry() + mock_key = MagicMock() + entry = ValidatorEntry(index=Uint64(42), secret_key=mock_key) + + registry.add(entry) + + assert len(registry) == 1 + assert registry.get(Uint64(42)) is entry + assert registry.has(Uint64(42)) + + def test_get_nonexistent(self) -> None: + """Getting nonexistent entry returns None.""" + registry = ValidatorRegistry() + assert registry.get(Uint64(99)) is None + + def test_has_nonexistent(self) -> None: + """has() returns False for nonexistent entry.""" + registry = ValidatorRegistry() + assert not registry.has(Uint64(99)) + + def test_indices(self) -> None: + """indices() returns all validator indices.""" + registry = ValidatorRegistry() + for i in [3, 1, 4]: + mock_key = MagicMock() + registry.add(ValidatorEntry(index=Uint64(i), secret_key=mock_key)) + + indices = registry.indices() + assert set(indices) == {Uint64(1), Uint64(3), Uint64(4)} + + def test_from_secret_keys(self) -> None: + """Registry can be created from dict of secret keys.""" + mock_keys = {0: MagicMock(), 2: MagicMock()} + + registry = ValidatorRegistry.from_secret_keys(mock_keys) + + assert len(registry) == 2 + assert registry.has(Uint64(0)) + assert registry.has(Uint64(2)) + assert not registry.has(Uint64(1)) + + +class TestValidatorRegistryFromJson: + """Tests for JSON loading.""" + + def test_from_json_basic(self, tmp_path: Path) -> None: + """Registry loads from JSON files.""" + # Create validators.json + validators_file = tmp_path / "validators.json" + validators_file.write_text( + json.dumps( + { + "node_0": [0, 1], + "node_1": [2], + } + ) + ) + + # Create manifest.json + manifest_file = tmp_path / "manifest.json" + manifest_file.write_text( + json.dumps( + { + "validators": [ + {"index": 0, "privkey_file": "key_0.ssz"}, + {"index": 1, "privkey_file": "key_1.ssz"}, + {"index": 2, "privkey_file": "key_2.ssz"}, + ], + } + ) + ) + + # Create dummy key files + (tmp_path / "key_0.ssz").write_bytes(b"key0") + (tmp_path / "key_1.ssz").write_bytes(b"key1") + + # Mock SecretKey.decode_bytes + mock_key = MagicMock() + with patch( + "lean_spec.subspecs.xmss.SecretKey.decode_bytes", + return_value=mock_key, + ): + registry = ValidatorRegistry.from_json( + node_id="node_0", + validators_path=validators_file, + manifest_path=manifest_file, + ) + + assert len(registry) == 2 + assert registry.has(Uint64(0)) + assert registry.has(Uint64(1)) + assert not registry.has(Uint64(2)) + + def test_from_json_unknown_node(self, tmp_path: Path) -> None: + """Unknown node returns empty registry.""" + validators_file = tmp_path / "validators.json" + validators_file.write_text(json.dumps({"node_0": [0]})) + + manifest_file = tmp_path / "manifest.json" + manifest_file.write_text(json.dumps({"validators": []})) + + registry = ValidatorRegistry.from_json( + node_id="unknown_node", + validators_path=validators_file, + manifest_path=manifest_file, + ) + + assert len(registry) == 0 + + def test_from_json_missing_validator_in_manifest(self, tmp_path: Path) -> None: + """Missing validator in manifest is skipped.""" + validators_file = tmp_path / "validators.json" + validators_file.write_text(json.dumps({"node_0": [0, 99]})) + + manifest_file = tmp_path / "manifest.json" + manifest_file.write_text( + json.dumps( + { + "validators": [ + {"index": 0, "privkey_file": "key_0.ssz"}, + ], + } + ) + ) + + (tmp_path / "key_0.ssz").write_bytes(b"key0") + + mock_key = MagicMock() + with patch( + "lean_spec.subspecs.xmss.SecretKey.decode_bytes", + return_value=mock_key, + ): + registry = ValidatorRegistry.from_json( + node_id="node_0", + validators_path=validators_file, + manifest_path=manifest_file, + ) + + # Only index 0 should be loaded (99 is not in manifest) + assert len(registry) == 1 + assert registry.has(Uint64(0)) diff --git a/tests/lean_spec/subspecs/validator/test_service.py b/tests/lean_spec/subspecs/validator/test_service.py new file mode 100644 index 00000000..e545e59d --- /dev/null +++ b/tests/lean_spec/subspecs/validator/test_service.py @@ -0,0 +1,276 @@ +"""Tests for ValidatorService.""" + +from __future__ import annotations + +import asyncio +from unittest.mock import MagicMock, patch + +import pytest + +from lean_spec.subspecs.chain.clock import SlotClock +from lean_spec.subspecs.containers import ( + Block, + SignedAttestation, + SignedBlockWithAttestation, + State, +) +from lean_spec.subspecs.containers.slot import Slot +from lean_spec.subspecs.forkchoice import Store +from lean_spec.subspecs.sync.backfill_sync import NetworkRequester +from lean_spec.subspecs.sync.block_cache import BlockCache +from lean_spec.subspecs.sync.peer_manager import PeerManager +from lean_spec.subspecs.sync.service import SyncService +from lean_spec.subspecs.validator import ValidatorRegistry, ValidatorService +from lean_spec.subspecs.validator.registry import ValidatorEntry +from lean_spec.types import Uint64 + + +class MockNetworkRequester(NetworkRequester): + """Mock network requester for testing.""" + + async def request_block_by_root( + self, + peer_id: str, # noqa: ARG002 + root: bytes, # noqa: ARG002 + ) -> SignedBlockWithAttestation | None: + """Return None - no blocks available.""" + return None + + +@pytest.fixture +def store(genesis_state: State, genesis_block: Block) -> Store: + """Forkchoice store initialized with genesis.""" + return Store.get_forkchoice_store(genesis_state, genesis_block) + + +@pytest.fixture +def sync_service(store: Store) -> SyncService: + """Sync service with store.""" + return SyncService( + store=store, + peer_manager=PeerManager(), + block_cache=BlockCache(), + clock=SlotClock(genesis_time=Uint64(0)), + network=MockNetworkRequester(), + ) + + +@pytest.fixture +def mock_registry() -> ValidatorRegistry: + """Registry with mock keys for validators 0 and 1.""" + registry = ValidatorRegistry() + for i in [0, 1]: + mock_key = MagicMock() + registry.add(ValidatorEntry(index=Uint64(i), secret_key=mock_key)) + return registry + + +class TestValidatorServiceBasic: + """Basic tests for ValidatorService.""" + + def test_service_starts_stopped( + self, + sync_service: SyncService, + mock_registry: ValidatorRegistry, + ) -> None: + """Service is not running before start.""" + clock = SlotClock(genesis_time=Uint64(0)) + service = ValidatorService( + sync_service=sync_service, + clock=clock, + registry=mock_registry, + ) + + assert not service.is_running + assert service.blocks_produced == 0 + assert service.attestations_produced == 0 + + def test_stop_service( + self, + sync_service: SyncService, + mock_registry: ValidatorRegistry, + ) -> None: + """stop() sets running flag to False.""" + clock = SlotClock(genesis_time=Uint64(0)) + service = ValidatorService( + sync_service=sync_service, + clock=clock, + registry=mock_registry, + ) + + service._running = True + service.stop() + assert not service.is_running + + +class TestValidatorServiceDuties: + """Tests for duty execution.""" + + def test_no_block_when_not_proposer( + self, + sync_service: SyncService, + ) -> None: + """No block produced when we're not the proposer.""" + clock = SlotClock(genesis_time=Uint64(0)) + + # Registry with validator 2 only + registry = ValidatorRegistry() + mock_key = MagicMock() + registry.add(ValidatorEntry(index=Uint64(2), secret_key=mock_key)) + + blocks_received: list[SignedBlockWithAttestation] = [] + + async def capture_block(block: SignedBlockWithAttestation) -> None: + blocks_received.append(block) + + service = ValidatorService( + sync_service=sync_service, + clock=clock, + registry=registry, + on_block=capture_block, + ) + + async def check_no_blocks() -> None: + # Slot 0 proposer is validator 0, slot 1 is validator 1 + # Validator 2 is proposer for slot 2 + await service._maybe_produce_block(Slot(0)) + await service._maybe_produce_block(Slot(1)) + + asyncio.run(check_no_blocks()) + + assert len(blocks_received) == 0 + + def test_empty_registry_skips_duties( + self, + sync_service: SyncService, + ) -> None: + """Empty registry skips all duty execution.""" + clock = SlotClock(genesis_time=Uint64(0)) + registry = ValidatorRegistry() + + attestations_received: list[SignedAttestation] = [] + + async def capture_attestation(attestation: SignedAttestation) -> None: + attestations_received.append(attestation) + + service = ValidatorService( + sync_service=sync_service, + clock=clock, + registry=registry, + on_attestation=capture_attestation, + ) + + async def produce() -> None: + await service._produce_attestations(Slot(0)) + + asyncio.run(produce()) + + assert len(attestations_received) == 0 + assert service.attestations_produced == 0 + + +class TestValidatorServiceRun: + """Tests for the main run loop.""" + + def test_run_loop_can_be_stopped( + self, + sync_service: SyncService, + ) -> None: + """run() loop exits when stop() is called.""" + clock = SlotClock(genesis_time=Uint64(0)) + + # Use empty registry to avoid attestation production + registry = ValidatorRegistry() + + service = ValidatorService( + sync_service=sync_service, + clock=clock, + registry=registry, + ) + + call_count = 0 + + async def stop_on_second_call(_duration: float) -> None: + nonlocal call_count + call_count += 1 + if call_count >= 2: + service.stop() + + async def run_briefly() -> None: + with patch("asyncio.sleep", new=stop_on_second_call): + await service.run() + + asyncio.run(run_briefly()) + assert not service.is_running + + +class TestIntervalSleep: + """Tests for interval sleep calculation.""" + + def test_sleep_until_next_interval_mid_interval( + self, + sync_service: SyncService, + ) -> None: + """Sleep duration is calculated correctly mid-interval.""" + genesis = Uint64(1000) + current_time = 1000.5 # 0.5 seconds into first interval + + clock = SlotClock(genesis_time=genesis, _time_fn=lambda: current_time) + registry = ValidatorRegistry() + + service = ValidatorService( + sync_service=sync_service, + clock=clock, + registry=registry, + ) + + captured_duration: float | None = None + + async def capture_sleep(duration: float) -> None: + nonlocal captured_duration + captured_duration = duration + + async def check_sleep() -> None: + with patch("asyncio.sleep", new=capture_sleep): + await service._sleep_until_next_interval() + + asyncio.run(check_sleep()) + + # Should sleep until next interval boundary (1001.0) + expected = 1001.0 - current_time # 0.5 seconds + assert captured_duration is not None + assert abs(captured_duration - expected) < 0.001 + + def test_sleep_before_genesis( + self, + sync_service: SyncService, + ) -> None: + """Sleeps until genesis when current time is before genesis.""" + genesis = Uint64(1000) + current_time = 900.0 # 100 seconds before genesis + + clock = SlotClock(genesis_time=genesis, _time_fn=lambda: current_time) + registry = ValidatorRegistry() + + service = ValidatorService( + sync_service=sync_service, + clock=clock, + registry=registry, + ) + + captured_duration: float | None = None + + async def capture_sleep(duration: float) -> None: + nonlocal captured_duration + captured_duration = duration + + async def check_sleep() -> None: + with patch("asyncio.sleep", new=capture_sleep): + await service._sleep_until_next_interval() + + asyncio.run(check_sleep()) + + # Should sleep until genesis + expected = float(genesis) - current_time # 100 seconds + assert captured_duration is not None + assert abs(captured_duration - expected) < 0.001