Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies = [
"aiohttp>=3.11.0,<4",
"cryptography>=46.0.0",
"numpy>=2.0.0,<3",
"prometheus-client>=0.21.0,<1",
]

[project.license]
Expand Down
14 changes: 13 additions & 1 deletion src/lean_spec/subspecs/api/server.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""
API server for checkpoint sync and node status endpoints.
API server for checkpoint sync, node status, and metrics endpoints.

Provides HTTP endpoints for:
- /lean/states/finalized - Serve finalized checkpoint state as SSZ
- /health - Health check endpoint
- /metrics - Prometheus metrics endpoint

This matches the checkpoint sync API implemented in zeam.
"""
Expand All @@ -18,6 +19,8 @@

from aiohttp import web

from lean_spec.subspecs.metrics import generate_metrics

if TYPE_CHECKING:
from lean_spec.subspecs.forkchoice import Store

Expand All @@ -34,6 +37,14 @@ async def _handle_health(_request: web.Request) -> web.Response:
return web.json_response({"status": "healthy", "service": "lean-spec-api"})


async def _handle_metrics(_request: web.Request) -> web.Response:
"""Handle Prometheus metrics endpoint."""
return web.Response(
body=generate_metrics(),
content_type="text/plain; version=0.0.4; charset=utf-8",
)


@dataclass(frozen=True, slots=True)
class ApiServerConfig:
"""Configuration for the API server."""
Expand Down Expand Up @@ -87,6 +98,7 @@ async def start(self) -> None:
app.add_routes(
[
web.get("/health", _handle_health),
web.get("/metrics", _handle_metrics),
web.get("/lean/states/finalized", self._handle_finalized_state),
]
)
Expand Down
44 changes: 44 additions & 0 deletions src/lean_spec/subspecs/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""
Metrics module for observability.

Provides counters, gauges, and histograms for tracking consensus client behavior.
Exposes metrics in Prometheus text format.
"""

from .registry import (
REGISTRY,
attestations_invalid,
attestations_produced,
attestations_received,
attestations_valid,
block_processing_time,
blocks_processed,
blocks_proposed,
current_slot,
finalized_slot,
generate_metrics,
head_slot,
justified_slot,
peers_connected,
reorgs,
validators_count,
)

__all__ = [
"REGISTRY",
"attestations_invalid",
"attestations_produced",
"attestations_received",
"attestations_valid",
"block_processing_time",
"blocks_processed",
"blocks_proposed",
"current_slot",
"finalized_slot",
"generate_metrics",
"head_slot",
"justified_slot",
"peers_connected",
"reorgs",
"validators_count",
]
140 changes: 140 additions & 0 deletions src/lean_spec/subspecs/metrics/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""
Metric registry using prometheus_client.

Provides pre-defined metrics for a consensus client.
Exposes metrics in Prometheus text format via the /metrics endpoint.
"""

from __future__ import annotations

from prometheus_client import (
CollectorRegistry,
Counter,
Gauge,
Histogram,
generate_latest,
)

# Create a dedicated registry for lean-spec metrics.
#
# Using a dedicated registry avoids pollution from default Python process metrics.
REGISTRY = CollectorRegistry()

# -----------------------------------------------------------------------------
# Node Information
# -----------------------------------------------------------------------------

head_slot = Gauge(
"lean_head_slot",
"Current head slot",
registry=REGISTRY,
)

current_slot = Gauge(
"lean_current_slot",
"Current time slot",
registry=REGISTRY,
)

justified_slot = Gauge(
"lean_justified_slot",
"Latest justified slot",
registry=REGISTRY,
)

finalized_slot = Gauge(
"lean_finalized_slot",
"Latest finalized slot",
registry=REGISTRY,
)

validators_count = Gauge(
"lean_validators_count",
"Active validators",
registry=REGISTRY,
)

# -----------------------------------------------------------------------------
# Block Processing
# -----------------------------------------------------------------------------

blocks_processed = Counter(
"lean_blocks_processed_total",
"Total blocks processed",
registry=REGISTRY,
)

block_processing_time = Histogram(
"lean_block_processing_seconds",
"Block processing duration",
buckets=(0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5),
registry=REGISTRY,
)

# -----------------------------------------------------------------------------
# Attestations
# -----------------------------------------------------------------------------

attestations_received = Counter(
"lean_attestations_received_total",
"Total attestations received",
registry=REGISTRY,
)

attestations_valid = Counter(
"lean_attestations_valid_total",
"Valid attestations",
registry=REGISTRY,
)

attestations_invalid = Counter(
"lean_attestations_invalid_total",
"Invalid attestations",
registry=REGISTRY,
)

# -----------------------------------------------------------------------------
# Network
# -----------------------------------------------------------------------------

peers_connected = Gauge(
"lean_peers_connected",
"Connected peers",
registry=REGISTRY,
)

# -----------------------------------------------------------------------------
# Consensus Events
# -----------------------------------------------------------------------------

reorgs = Counter(
"lean_reorgs_total",
"Chain reorganizations",
registry=REGISTRY,
)

# -----------------------------------------------------------------------------
# Validator Production
# -----------------------------------------------------------------------------

blocks_proposed = Counter(
"lean_blocks_proposed_total",
"Blocks proposed by this node",
registry=REGISTRY,
)

attestations_produced = Counter(
"lean_attestations_produced_total",
"Attestations produced by this node",
registry=REGISTRY,
)


def generate_metrics() -> bytes:
"""
Generate Prometheus metrics output.

Returns:
Prometheus text format output as bytes.
"""
return generate_latest(REGISTRY)
15 changes: 14 additions & 1 deletion src/lean_spec/subspecs/sync/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from lean_spec.subspecs import metrics
from lean_spec.subspecs.chain.clock import SlotClock
from lean_spec.subspecs.containers import Block, SignedBlockWithAttestation
from lean_spec.subspecs.containers.attestation import SignedAttestation
Expand Down Expand Up @@ -203,12 +204,24 @@ def _process_block_wrapper(
# Delegate to the actual block processor (typically Store.on_block).
#
# The processor validates the block and updates forkchoice state.
new_store = self.process_block(store, block)
with metrics.block_processing_time.time():
new_store = self.process_block(store, block)

# Track metrics after successful processing.
#
# We only count blocks that pass validation and update the store.
self._blocks_processed += 1
metrics.blocks_processed.inc()

# Update chain state metrics.
metrics.head_slot.set(float(new_store.blocks[new_store.head].slot))
metrics.justified_slot.set(float(new_store.latest_justified.slot))
metrics.finalized_slot.set(float(new_store.latest_finalized.slot))

# Update validator count from head state.
head_state = new_store.states.get(new_store.head)
if head_state is not None:
metrics.validators_count.set(float(len(head_state.validators)))

# Persist block and state to database if available.
#
Expand Down
3 changes: 3 additions & 0 deletions src/lean_spec/subspecs/validator/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from lean_spec.subspecs import metrics
from lean_spec.subspecs.chain.clock import SlotClock
from lean_spec.subspecs.chain.config import SECONDS_PER_INTERVAL
from lean_spec.subspecs.containers import (
Expand Down Expand Up @@ -184,6 +185,7 @@ async def _maybe_produce_block(self, slot: Slot) -> None:
# Create signed block wrapper for publishing.
signed_block = self._sign_block(block, validator_index, signatures)
self._blocks_produced += 1
metrics.blocks_proposed.inc()

# Emit the block for network propagation.
await self.on_block(signed_block)
Expand Down Expand Up @@ -217,6 +219,7 @@ async def _produce_attestations(self, slot: Slot) -> None:
# Sign the attestation using our secret key.
signed_attestation = self._sign_attestation(attestation_data, validator_index)
self._attestations_produced += 1
metrics.attestations_produced.inc()

# Emit the attestation for network propagation.
await self.on_attestation(signed_attestation)
Expand Down
Loading
Loading