Skip to content

Real-Time WebSocket Event Broadcasting#77

Merged
markoceri merged 18 commits intodevfrom
feature/websocket-real-time-events
Apr 8, 2026
Merged

Real-Time WebSocket Event Broadcasting#77
markoceri merged 18 commits intodevfrom
feature/websocket-real-time-events

Conversation

@markoceri
Copy link
Copy Markdown
Collaborator

Adds a WebSocket infrastructure for pushing domain events to connected clients in real-time. The system hooks into the existing event bus and follows the same DDD/hexagonal architecture patterns used by the FastAPI routes — each subdomain owns its serialization logic while a central manager handles connection lifecycle and message delivery.

What's included

Core Infrastructure (adapters/infrastructure/websocket/)

  • WebSocketManager — aggregator that collects subdomain handlers, subscribes to the event bus, and broadcasts serialized payloads to clients matching their subscription patterns
  • WebSocketEventHandler ABC — pure serializer base class, each subdomain implements a registrations property returning WebSocketEventRegistration entries (event_type + topic + serialize)
  • WebSocketEventRegistration frozen dataclass — declarative binding of a domain event class to a topic string and a serialization function
  • WebSocketMessage NamedTuple — typed internal container pairing topic with payload
  • FastAPI endpoint at /ws/events with topic-based pub/sub using fnmatch glob patterns (energy.*, *, etc.)
  • Topic discovery: clients send {"get_topics": true} to receive the full list of subscribable topics

Subdomain Handlers

One handler per subdomain, each with its own Pydantic schema:

Topic Handler Schema
energy.state EnergyWebSocketHandler EnergyStateSnapshotUpdatedSchema
miner.state MinerWebSocketHandler MinerStateChangedSchema
rule.engaged OptimizationUnitWebSocketHandler RuleEngagedSchema
policy.context PolicyWebSocketHandler DecisionalContextUpdatedSchema
config.updated ConfigurationWebSocketHandler ConfigurationUpdatedSchema

Documentation

  • docs/architecture/websocket-design.md — technical architecture, component descriptions, file structure, and step-by-step guide for adding new events
  • docs/WEBSOCKET.md — client-facing guide with protocol reference, payload schemas, pattern matching, and JS/Python examples

How it connects to the event bus

The WebSocketManager receives an EventBusInterface at construction. It iterates all handler registrations and calls event_bus.subscribe(event_type, callback, blocking=False) for each one. When a domain event is published anywhere in the system, the corresponding callback serializes it via the handler's function and broadcasts the result to matching WebSocket clients. The handlers themselves have no dependency on the event bus or the manager — they are pure serializers.

How to extend

Adding a new event to an existing subdomain requires no changes to the manager. Add a WebSocketEventRegistration to the handler's registrations list with a topic string, create the Pydantic schema, and write the serialize method. The manager discovers it automatically and the new topic appears in available_topics.

Tests

WebSocket Manager (test_websocket_manager.py)

  • Event bus subscription — verifies all 5 event types are registered with blocking=False; available_topics returns the complete list
  • Connection lifecycle — connect, disconnect, disconnect unknown (no-op)
  • Subscription management — subscribe with patterns, unsubscribe, subscribe when not connected (no-op)
  • Topic matching — exact match, wildcard (energy.*), star-all (*)
  • Broadcasting — delivery to matching subscribers, skip non-matching, skip unsubscribed, dead connection cleanup, multiple clients with mixed subscriptions
  • Subdomain payloadsrule.engaged, miner.state, config.updated messages delivered with correct topic and payload
  • Topic discovery — client sends {"get_topics": true} and receives sorted list of all available topics

Event serialization (tests/unit/application/events/)

One test module per event type verifying DomainEvent inheritance, field initialization, event_id/occurred_at generation, and to_dict() serialization:

  • test_energy_events.pyEnergyStateSnapshotUpdatedEvent
  • test_miner_events.pyMinerStateChangedEvent
  • test_optimization_events.pyRuleEngagedEvent
  • test_policy_events.pyDecisionalContextUpdatedEvent
  • test_configuration_events.pyConfigurationUpdatedEvent

- Implemented WebSocketManager to manage connections and broadcast domain events.
- Added WebSocket routes for handling client subscriptions and event notifications.
- Introduced event classes for energy, miner, optimization, and policy events.
- Integrated WebSocket dependencies into the application startup process.
- Updated services to publish relevant events to the WebSocket manager.
- Created unit tests for WebSocketManager and event classes to ensure functionality.
…ish events for energy state and decisional context
- Refactored event classes to align with the new structure, moving events to their respective domain modules.
- Enhanced tests to validate the new WebSocket broadcasting functionality and ensure proper event handling.
@markoceri markoceri self-assigned this Apr 3, 2026
@markoceri markoceri added documentation Improvements or additions to documentation enhancement New feature or request labels Apr 3, 2026
@markoceri markoceri merged commit d645b16 into dev Apr 8, 2026
@markoceri markoceri deleted the feature/websocket-real-time-events branch April 8, 2026 09:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant