From 648d11ea19350e0adb6b2eb804804ae43560ecb8 Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Fri, 16 Jan 2026 00:21:33 +0100 Subject: [PATCH 1/2] core: add some very basic metrics --- pyproject.toml | 1 + src/lean_spec/subspecs/api/server.py | 14 +- src/lean_spec/subspecs/metrics/__init__.py | 44 +++++ src/lean_spec/subspecs/metrics/registry.py | 140 +++++++++++++++ src/lean_spec/subspecs/sync/service.py | 15 +- src/lean_spec/subspecs/validator/service.py | 3 + .../subspecs/metrics/test_registry.py | 166 ++++++++++++++++++ uv.lock | 13 +- 8 files changed, 393 insertions(+), 3 deletions(-) create mode 100644 src/lean_spec/subspecs/metrics/__init__.py create mode 100644 src/lean_spec/subspecs/metrics/registry.py create mode 100644 tests/lean_spec/subspecs/metrics/test_registry.py diff --git a/pyproject.toml b/pyproject.toml index 66694d84..34160f57 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "aiohttp>=3.11.0,<4", "cryptography>=46.0.0", "numpy>=2.0.0,<3", + "prometheus-client>=0.21.0,<1", ] [project.license] diff --git a/src/lean_spec/subspecs/api/server.py b/src/lean_spec/subspecs/api/server.py index b0d69be2..e9288710 100644 --- a/src/lean_spec/subspecs/api/server.py +++ b/src/lean_spec/subspecs/api/server.py @@ -1,9 +1,10 @@ """ -API server for checkpoint sync and node status endpoints. +API server for checkpoint sync, node status, and metrics endpoints. Provides HTTP endpoints for: - /lean/states/finalized - Serve finalized checkpoint state as SSZ - /health - Health check endpoint +- /metrics - Prometheus metrics endpoint This matches the checkpoint sync API implemented in zeam. """ @@ -18,6 +19,8 @@ from aiohttp import web +from lean_spec.subspecs.metrics import generate_metrics + if TYPE_CHECKING: from lean_spec.subspecs.forkchoice import Store @@ -34,6 +37,14 @@ async def _handle_health(_request: web.Request) -> web.Response: return web.json_response({"status": "healthy", "service": "lean-spec-api"}) +async def _handle_metrics(_request: web.Request) -> web.Response: + """Handle Prometheus metrics endpoint.""" + return web.Response( + body=generate_metrics(), + content_type="text/plain; version=0.0.4; charset=utf-8", + ) + + @dataclass(frozen=True, slots=True) class ApiServerConfig: """Configuration for the API server.""" @@ -87,6 +98,7 @@ async def start(self) -> None: app.add_routes( [ web.get("/health", _handle_health), + web.get("/metrics", _handle_metrics), web.get("/lean/states/finalized", self._handle_finalized_state), ] ) diff --git a/src/lean_spec/subspecs/metrics/__init__.py b/src/lean_spec/subspecs/metrics/__init__.py new file mode 100644 index 00000000..c6f23172 --- /dev/null +++ b/src/lean_spec/subspecs/metrics/__init__.py @@ -0,0 +1,44 @@ +""" +Metrics module for observability. + +Provides counters, gauges, and histograms for tracking consensus client behavior. +Exposes metrics in Prometheus text format. +""" + +from .registry import ( + REGISTRY, + attestations_invalid, + attestations_produced, + attestations_received, + attestations_valid, + block_processing_time, + blocks_processed, + blocks_proposed, + current_slot, + finalized_slot, + generate_metrics, + head_slot, + justified_slot, + peers_connected, + reorgs, + validators_count, +) + +__all__ = [ + "REGISTRY", + "attestations_invalid", + "attestations_produced", + "attestations_received", + "attestations_valid", + "block_processing_time", + "blocks_processed", + "blocks_proposed", + "current_slot", + "finalized_slot", + "generate_metrics", + "head_slot", + "justified_slot", + "peers_connected", + "reorgs", + "validators_count", +] diff --git a/src/lean_spec/subspecs/metrics/registry.py b/src/lean_spec/subspecs/metrics/registry.py new file mode 100644 index 00000000..7996270e --- /dev/null +++ b/src/lean_spec/subspecs/metrics/registry.py @@ -0,0 +1,140 @@ +""" +Metric registry using prometheus_client. + +Provides pre-defined metrics for a consensus client. +Exposes metrics in Prometheus text format via the /metrics endpoint. +""" + +from __future__ import annotations + +from prometheus_client import ( + CollectorRegistry, + Counter, + Gauge, + Histogram, + generate_latest, +) + +# Create a dedicated registry for lean-spec metrics. +# +# Using a dedicated registry avoids pollution from default Python process metrics. +REGISTRY = CollectorRegistry() + +# ----------------------------------------------------------------------------- +# Node Information +# ----------------------------------------------------------------------------- + +head_slot = Gauge( + "lean_head_slot", + "Current head slot", + registry=REGISTRY, +) + +current_slot = Gauge( + "lean_current_slot", + "Current time slot", + registry=REGISTRY, +) + +justified_slot = Gauge( + "lean_justified_slot", + "Latest justified slot", + registry=REGISTRY, +) + +finalized_slot = Gauge( + "lean_finalized_slot", + "Latest finalized slot", + registry=REGISTRY, +) + +validators_count = Gauge( + "lean_validators_count", + "Active validators", + registry=REGISTRY, +) + +# ----------------------------------------------------------------------------- +# Block Processing +# ----------------------------------------------------------------------------- + +blocks_processed = Counter( + "lean_blocks_processed_total", + "Total blocks processed", + registry=REGISTRY, +) + +block_processing_time = Histogram( + "lean_block_processing_seconds", + "Block processing duration", + buckets=(0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5), + registry=REGISTRY, +) + +# ----------------------------------------------------------------------------- +# Attestations +# ----------------------------------------------------------------------------- + +attestations_received = Counter( + "lean_attestations_received_total", + "Total attestations received", + registry=REGISTRY, +) + +attestations_valid = Counter( + "lean_attestations_valid_total", + "Valid attestations", + registry=REGISTRY, +) + +attestations_invalid = Counter( + "lean_attestations_invalid_total", + "Invalid attestations", + registry=REGISTRY, +) + +# ----------------------------------------------------------------------------- +# Network +# ----------------------------------------------------------------------------- + +peers_connected = Gauge( + "lean_peers_connected", + "Connected peers", + registry=REGISTRY, +) + +# ----------------------------------------------------------------------------- +# Consensus Events +# ----------------------------------------------------------------------------- + +reorgs = Counter( + "lean_reorgs_total", + "Chain reorganizations", + registry=REGISTRY, +) + +# ----------------------------------------------------------------------------- +# Validator Production +# ----------------------------------------------------------------------------- + +blocks_proposed = Counter( + "lean_blocks_proposed_total", + "Blocks proposed by this node", + registry=REGISTRY, +) + +attestations_produced = Counter( + "lean_attestations_produced_total", + "Attestations produced by this node", + registry=REGISTRY, +) + + +def generate_metrics() -> bytes: + """ + Generate Prometheus metrics output. + + Returns: + Prometheus text format output as bytes. + """ + return generate_latest(REGISTRY) diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index 364e54a0..67173aeb 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -41,6 +41,7 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING +from lean_spec.subspecs import metrics from lean_spec.subspecs.chain.clock import SlotClock from lean_spec.subspecs.containers import Block, SignedBlockWithAttestation from lean_spec.subspecs.containers.attestation import SignedAttestation @@ -203,12 +204,24 @@ def _process_block_wrapper( # Delegate to the actual block processor (typically Store.on_block). # # The processor validates the block and updates forkchoice state. - new_store = self.process_block(store, block) + with metrics.block_processing_time.time(): + new_store = self.process_block(store, block) # Track metrics after successful processing. # # We only count blocks that pass validation and update the store. self._blocks_processed += 1 + metrics.blocks_processed.inc() + + # Update chain state metrics. + metrics.head_slot.set(float(new_store.blocks[new_store.head].slot)) + metrics.justified_slot.set(float(new_store.latest_justified.slot)) + metrics.finalized_slot.set(float(new_store.latest_finalized.slot)) + + # Update validator count from head state. + head_state = new_store.states.get(new_store.head) + if head_state is not None: + metrics.validators_count.set(float(len(head_state.validators))) # Persist block and state to database if available. # diff --git a/src/lean_spec/subspecs/validator/service.py b/src/lean_spec/subspecs/validator/service.py index 2d8e277b..c4c6734f 100644 --- a/src/lean_spec/subspecs/validator/service.py +++ b/src/lean_spec/subspecs/validator/service.py @@ -29,6 +29,7 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING +from lean_spec.subspecs import metrics from lean_spec.subspecs.chain.clock import SlotClock from lean_spec.subspecs.chain.config import SECONDS_PER_INTERVAL from lean_spec.subspecs.containers import ( @@ -184,6 +185,7 @@ async def _maybe_produce_block(self, slot: Slot) -> None: # Create signed block wrapper for publishing. signed_block = self._sign_block(block, validator_index, signatures) self._blocks_produced += 1 + metrics.blocks_proposed.inc() # Emit the block for network propagation. await self.on_block(signed_block) @@ -217,6 +219,7 @@ async def _produce_attestations(self, slot: Slot) -> None: # Sign the attestation using our secret key. signed_attestation = self._sign_attestation(attestation_data, validator_index) self._attestations_produced += 1 + metrics.attestations_produced.inc() # Emit the attestation for network propagation. await self.on_attestation(signed_attestation) diff --git a/tests/lean_spec/subspecs/metrics/test_registry.py b/tests/lean_spec/subspecs/metrics/test_registry.py new file mode 100644 index 00000000..1b90b6bc --- /dev/null +++ b/tests/lean_spec/subspecs/metrics/test_registry.py @@ -0,0 +1,166 @@ +"""Tests for the Prometheus metrics registry.""" + +from __future__ import annotations + +from lean_spec.subspecs.metrics import ( + REGISTRY, + attestations_invalid, + attestations_produced, + attestations_received, + attestations_valid, + block_processing_time, + blocks_processed, + blocks_proposed, + current_slot, + finalized_slot, + generate_metrics, + head_slot, + justified_slot, + peers_connected, + reorgs, + validators_count, +) + + +class TestMetricTypes: + """Tests for metric type behavior.""" + + def test_counter_increments_correctly(self) -> None: + """Counter metrics increment by one on each call.""" + initial = blocks_processed._value.get() + blocks_processed.inc() + assert blocks_processed._value.get() == initial + 1.0 + + def test_gauge_sets_value_correctly(self) -> None: + """Gauge metrics can be set to arbitrary values.""" + head_slot.set(42.0) + assert head_slot._value.get() == 42.0 + + head_slot.set(100.0) + assert head_slot._value.get() == 100.0 + + def test_histogram_observes_values(self) -> None: + """Histogram metrics record observations.""" + # Get initial sample count from the histogram + initial_samples = list(block_processing_time.collect())[0].samples + initial_count = next(s.value for s in initial_samples if s.name.endswith("_count")) + + block_processing_time.observe(0.05) + + # Verify count increased + new_samples = list(block_processing_time.collect())[0].samples + new_count = next(s.value for s in new_samples if s.name.endswith("_count")) + assert new_count == initial_count + 1 + + +class TestMetricDefinitions: + """Tests for pre-defined metric definitions.""" + + def test_node_information_gauges_exist(self) -> None: + """Node information gauges are defined.""" + assert head_slot is not None + assert current_slot is not None + assert justified_slot is not None + assert finalized_slot is not None + assert validators_count is not None + + def test_block_processing_metrics_exist(self) -> None: + """Block processing metrics are defined.""" + assert blocks_processed is not None + assert block_processing_time is not None + + def test_attestation_metrics_exist(self) -> None: + """Attestation metrics are defined.""" + assert attestations_received is not None + assert attestations_valid is not None + assert attestations_invalid is not None + + def test_network_metrics_exist(self) -> None: + """Network metrics are defined.""" + assert peers_connected is not None + + def test_consensus_event_metrics_exist(self) -> None: + """Consensus event metrics are defined.""" + assert reorgs is not None + + def test_validator_production_metrics_exist(self) -> None: + """Validator production metrics are defined.""" + assert blocks_proposed is not None + assert attestations_produced is not None + + +class TestPrometheusOutput: + """Tests for Prometheus text format output.""" + + def test_generate_metrics_returns_bytes(self) -> None: + """Generate metrics returns bytes in Prometheus format.""" + output = generate_metrics() + assert isinstance(output, bytes) + + def test_output_contains_metric_names(self) -> None: + """Output contains expected metric names.""" + output = generate_metrics().decode("utf-8") + + assert "lean_head_slot" in output + assert "lean_blocks_processed_total" in output + assert "lean_block_processing_seconds" in output + assert "lean_attestations_received_total" in output + assert "lean_peers_connected" in output + + def test_output_contains_help_text(self) -> None: + """Output contains HELP lines for metrics.""" + output = generate_metrics().decode("utf-8") + + assert "# HELP lean_head_slot" in output + assert "# TYPE lean_head_slot gauge" in output + + def test_output_contains_histogram_buckets(self) -> None: + """Output contains histogram bucket values.""" + output = generate_metrics().decode("utf-8") + + # Histogram exports include _bucket, _count, _sum + assert "lean_block_processing_seconds_bucket" in output + assert "lean_block_processing_seconds_count" in output + assert "lean_block_processing_seconds_sum" in output + + +class TestRegistryIsolation: + """Tests for registry isolation from default metrics.""" + + def test_registry_is_dedicated(self) -> None: + """Our registry is separate from default prometheus registry.""" + from prometheus_client import REGISTRY as DEFAULT_REGISTRY + + assert REGISTRY is not DEFAULT_REGISTRY + + def test_metrics_registered_to_custom_registry(self) -> None: + """All metrics are registered to our custom registry.""" + # Verify a metric is in our registry by generating output + output = generate_metrics().decode("utf-8") + + # If head_slot is in our registry, it should appear in output + assert "lean_head_slot" in output + + +class TestHistogramTiming: + """Tests for histogram timing context manager.""" + + def test_time_context_manager_records_duration(self) -> None: + """Histogram time() context manager records duration.""" + import time + + # Get initial values from samples + initial_samples = list(block_processing_time.collect())[0].samples + initial_count = next(s.value for s in initial_samples if s.name.endswith("_count")) + initial_sum = next(s.value for s in initial_samples if s.name.endswith("_sum")) + + with block_processing_time.time(): + time.sleep(0.01) + + # Get new values + new_samples = list(block_processing_time.collect())[0].samples + new_count = next(s.value for s in new_samples if s.name.endswith("_count")) + new_sum = next(s.value for s in new_samples if s.name.endswith("_sum")) + + assert new_count == initial_count + 1 + assert new_sum > initial_sum diff --git a/uv.lock b/uv.lock index 17a767a4..1c3bf4e9 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.12" [manifest] @@ -853,6 +853,7 @@ dependencies = [ { name = "httpx" }, { name = "lean-multisig-py" }, { name = "numpy" }, + { name = "prometheus-client" }, { name = "pydantic" }, { name = "typing-extensions" }, ] @@ -905,6 +906,7 @@ requires-dist = [ { name = "httpx", specifier = ">=0.28.0,<1" }, { name = "lean-multisig-py", git = "https://github.com/anshalshukla/leanMultisig-py?branch=main" }, { name = "numpy", specifier = ">=2.0.0,<3" }, + { name = "prometheus-client", specifier = ">=0.21.0,<1" }, { name = "pydantic", specifier = ">=2.12.0,<3" }, { name = "typing-extensions", specifier = ">=4.4" }, ] @@ -1472,6 +1474,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" }, ] +[[package]] +name = "prometheus-client" +version = "0.24.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f0/58/a794d23feb6b00fc0c72787d7e87d872a6730dd9ed7c7b3e954637d8f280/prometheus_client-0.24.1.tar.gz", hash = "sha256:7e0ced7fbbd40f7b84962d5d2ab6f17ef88a72504dcf7c0b40737b43b2a461f9", size = 85616, upload-time = "2026-01-14T15:26:26.965Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/74/c3/24a2f845e3917201628ecaba4f18bab4d18a337834c1df2a159ee9d22a42/prometheus_client-0.24.1-py3-none-any.whl", hash = "sha256:150db128af71a5c2482b36e588fc8a6b95e498750da4b17065947c16070f4055", size = 64057, upload-time = "2026-01-14T15:26:24.42Z" }, +] + [[package]] name = "prompt-toolkit" version = "3.0.52" From 48f4040f2760f8d3fa2ef480a47282b9697d559f Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Fri, 16 Jan 2026 00:29:22 +0100 Subject: [PATCH 2/2] fix tests --- .../lean_spec/subspecs/networking/test_network_service.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/lean_spec/subspecs/networking/test_network_service.py b/tests/lean_spec/subspecs/networking/test_network_service.py index f662ae0e..f31070a7 100644 --- a/tests/lean_spec/subspecs/networking/test_network_service.py +++ b/tests/lean_spec/subspecs/networking/test_network_service.py @@ -85,9 +85,14 @@ def __init__(self, head_slot: int = 0) -> None: self._head_slot = head_slot self.head = Bytes32.zero() self.blocks: dict[Bytes32, Any] = {} + self.states: dict[Bytes32, Any] = {} self._attestations_received: list[SignedAttestation] = [] self._setup_genesis() + # Required by metrics in the sync service + self.latest_justified = Checkpoint(root=Bytes32.zero(), slot=Slot(0)) + self.latest_finalized = Checkpoint(root=Bytes32.zero(), slot=Slot(0)) + def _setup_genesis(self) -> None: """Set up genesis block in the store.""" from unittest.mock import MagicMock @@ -100,6 +105,7 @@ def on_block(self, block: SignedBlockWithAttestation) -> "MockStore": """Process a block: add to blocks dict and update head.""" new_store = MockStore(int(block.message.block.slot)) new_store.blocks = dict(self.blocks) + new_store.states = dict(self.states) new_store._attestations_received = list(self._attestations_received) root = hash_tree_root(block.message.block) new_store.blocks[root] = block.message.block @@ -110,6 +116,7 @@ def on_gossip_attestation(self, attestation: SignedAttestation) -> "MockStore": """Process an attestation: track it for verification.""" new_store = MockStore(self._head_slot) new_store.blocks = dict(self.blocks) + new_store.states = dict(self.states) new_store.head = self.head new_store._attestations_received = list(self._attestations_received) new_store._attestations_received.append(attestation)