From 3a8ba996e31b534f7e1893f86643a5226833b9ff Mon Sep 17 00:00:00 2001 From: remimd Date: Thu, 26 Feb 2026 10:57:00 +0100 Subject: [PATCH] feat: Add fail_silently to handler decorators --- cq/_core/dispatcher/base.py | 10 +++- cq/_core/dispatcher/bus.py | 13 +++-- cq/_core/handler.py | 99 ++++++++++++++++++++++--------------- cq/middlewares/retry.py | 2 +- cq/middlewares/scope.py | 2 +- docs/guides/messages.md | 16 ++++++ tests/test_command_bus.py | 11 +++++ uv.lock | 26 +++++++--- 8 files changed, 126 insertions(+), 53 deletions(-) diff --git a/cq/_core/dispatcher/base.py b/cq/_core/dispatcher/base.py index 90c79a0..d6abb0a 100644 --- a/cq/_core/dispatcher/base.py +++ b/cq/_core/dispatcher/base.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod from collections.abc import Awaitable, Callable +from contextlib import AsyncExitStack, suppress from typing import Protocol, Self, runtime_checkable from cq._core.middleware import Middleware, MiddlewareGroup @@ -40,5 +41,12 @@ async def _invoke_with_middlewares( handler: Callable[[I], Awaitable[O]], input_value: I, /, + fail_silently: bool = False, ) -> O: - return await self.__middleware_group.invoke(handler, input_value) + async with AsyncExitStack() as stack: + if fail_silently: + stack.enter_context(suppress(Exception)) + + return await self.__middleware_group.invoke(handler, input_value) + + return NotImplemented diff --git a/cq/_core/dispatcher/bus.py b/cq/_core/dispatcher/bus.py index bf3f2b3..ba70733 100644 --- a/cq/_core/dispatcher/bus.py +++ b/cq/_core/dispatcher/bus.py @@ -7,6 +7,7 @@ from cq._core.dispatcher.base import BaseDispatcher, Dispatcher from cq._core.handler import ( + HandleFunction, HandlerFactory, HandlerRegistry, MultipleHandlerRegistry, @@ -53,10 +54,7 @@ def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Sel self.__registry.subscribe(input_type, factory) return self - def _handlers_from( - self, - input_type: type[I], - ) -> Iterator[Callable[[I], Awaitable[O]]]: + def _handlers_from(self, input_type: type[I]) -> Iterator[HandleFunction[[I], O]]: return self.__registry.handlers_from(input_type) def _trigger_listeners(self, input_value: I, /, task_group: TaskGroup) -> None: @@ -75,7 +73,11 @@ async def dispatch(self, input_value: I, /) -> O: self._trigger_listeners(input_value, task_group) for handler in self._handlers_from(type(input_value)): - return await self._invoke_with_middlewares(handler, input_value) + return await self._invoke_with_middlewares( + handler, + input_value, + handler.fail_silently, + ) return NotImplemented @@ -95,4 +97,5 @@ async def dispatch(self, input_value: I, /) -> None: self._invoke_with_middlewares, handler, input_value, + handler.fail_silently, ) diff --git a/cq/_core/handler.py b/cq/_core/handler.py index ce364a0..636cf08 100644 --- a/cq/_core/handler.py +++ b/cq/_core/handler.py @@ -21,74 +21,96 @@ class Handler[**P, T](Protocol): __slots__ = () @abstractmethod - async def handle(self, *args: P.args, **kwargs: P.kwargs) -> T: + async def handle(self, /, *args: P.args, **kwargs: P.kwargs) -> T: raise NotImplementedError +@dataclass(repr=False, eq=False, frozen=True, slots=True) +class HandleFunction[**P, T]: + handler_factory: HandlerFactory[P, T] + handler_type: HandlerType[P, T] | None = field(default=None) + fail_silently: bool = field(default=False) + + async def __call__(self, /, *args: P.args, **kwargs: P.kwargs) -> T: + handler = await self.handler_factory() + return await handler.handle(*args, **kwargs) + + @runtime_checkable class HandlerRegistry[I, O](Protocol): __slots__ = () @abstractmethod - def handlers_from( - self, - input_type: type[I], - ) -> Iterator[Callable[[I], Awaitable[O]]]: + def handlers_from(self, input_type: type[I]) -> Iterator[HandleFunction[[I], O]]: raise NotImplementedError @abstractmethod - def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self: + def subscribe( + self, + input_type: type[I], + handler_factory: HandlerFactory[[I], O], + handler_type: HandlerType[[I], O] | None = ..., + fail_silently: bool = ..., + ) -> Self: raise NotImplementedError @dataclass(repr=False, eq=False, frozen=True, slots=True) class MultipleHandlerRegistry[I, O](HandlerRegistry[I, O]): - __factories: dict[type[I], list[HandlerFactory[[I], O]]] = field( + __values: dict[type[I], list[HandleFunction[[I], O]]] = field( default_factory=partial(defaultdict, list), init=False, ) - def handlers_from( + def handlers_from(self, input_type: type[I]) -> Iterator[HandleFunction[[I], O]]: + for key_type in _iter_key_types(input_type): + yield from self.__values.get(key_type, ()) + + def subscribe( self, input_type: type[I], - ) -> Iterator[Callable[[I], Awaitable[O]]]: - for key_type in _iter_key_types(input_type): - for factory in self.__factories.get(key_type, ()): - yield _make_handle_function(factory) + handler_factory: HandlerFactory[[I], O], + handler_type: HandlerType[[I], O] | None = None, + fail_silently: bool = False, + ) -> Self: + function = HandleFunction(handler_factory, handler_type, fail_silently) - def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self: for key_type in _build_key_types(input_type): - self.__factories[key_type].append(factory) + self.__values[key_type].append(function) return self @dataclass(repr=False, eq=False, frozen=True, slots=True) class SingleHandlerRegistry[I, O](HandlerRegistry[I, O]): - __factories: dict[type[I], HandlerFactory[[I], O]] = field( + __values: dict[type[I], HandleFunction[[I], O]] = field( default_factory=dict, init=False, ) - def handlers_from( - self, - input_type: type[I], - ) -> Iterator[Callable[[I], Awaitable[O]]]: + def handlers_from(self, input_type: type[I]) -> Iterator[HandleFunction[[I], O]]: for key_type in _iter_key_types(input_type): - factory = self.__factories.get(key_type, None) - if factory is not None: - yield _make_handle_function(factory) + function = self.__values.get(key_type, None) + if function is not None: + yield function - def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self: - entries = {key_type: factory for key_type in _build_key_types(input_type)} + def subscribe( + self, + input_type: type[I], + handler_factory: HandlerFactory[[I], O], + handler_type: HandlerType[[I], O] | None = None, + fail_silently: bool = False, + ) -> Self: + function = HandleFunction(handler_factory, handler_type, fail_silently) + entries = {key_type: function for key_type in _build_key_types(input_type)} for key_type in entries: - if key_type in self.__factories: + if key_type in self.__values: raise RuntimeError( f"A handler is already registered for the input type: `{key_type}`." ) - self.__factories.update(entries) + self.__values.update(entries) return self @@ -105,6 +127,7 @@ def __call__( input_or_handler_type: type[I], /, *, + fail_silently: bool = ..., threadsafe: bool | None = ..., ) -> Decorator: ... @@ -114,6 +137,7 @@ def __call__[T]( input_or_handler_type: T, /, *, + fail_silently: bool = ..., threadsafe: bool | None = ..., ) -> T: ... @@ -123,6 +147,7 @@ def __call__( input_or_handler_type: None = ..., /, *, + fail_silently: bool = ..., threadsafe: bool | None = ..., ) -> Decorator: ... @@ -131,6 +156,7 @@ def __call__[T]( input_or_handler_type: type[I] | T | None = None, /, *, + fail_silently: bool = False, threadsafe: bool | None = None, ) -> Any: if ( @@ -138,11 +164,16 @@ def __call__[T]( and isclass(input_or_handler_type) and issubclass(input_or_handler_type, Handler) ): - return self.__decorator(input_or_handler_type, threadsafe=threadsafe) + return self.__decorator( + input_or_handler_type, + fail_silently=fail_silently, + threadsafe=threadsafe, + ) return partial( self.__decorator, input_type=input_or_handler_type, # type: ignore[arg-type] + fail_silently=fail_silently, threadsafe=threadsafe, ) @@ -152,11 +183,12 @@ def __decorator( /, *, input_type: type[I] | None = None, + fail_silently: bool = False, threadsafe: bool | None = None, ) -> HandlerType[[I], O]: factory = self.injection_module.make_async_factory(wrapped, threadsafe) input_type = input_type or _resolve_input_type(wrapped) - self.registry.subscribe(input_type, factory) + self.registry.subscribe(input_type, factory, wrapped, fail_silently) return wrapped @@ -190,14 +222,3 @@ def _resolve_input_type[I, O](handler_type: HandlerType[[I], O]) -> type[I]: f"Unable to resolve input type for handler `{handler_type}`, " "`handle` method must have a type annotation for its first parameter." ) - - -def _make_handle_function[I, O]( - factory: HandlerFactory[[I], O], -) -> Callable[[I], Awaitable[O]]: - return partial(__handle, factory=factory) - - -async def __handle[I, O](input_value: I, *, factory: HandlerFactory[[I], O]) -> O: - handler = await factory() - return await handler.handle(input_value) diff --git a/cq/middlewares/retry.py b/cq/middlewares/retry.py index 5da8884..4db313b 100644 --- a/cq/middlewares/retry.py +++ b/cq/middlewares/retry.py @@ -25,7 +25,7 @@ def __init__( self.__exceptions = tuple(exceptions) self.__retry = retry - async def __call__(self, *args: Any, **kwargs: Any) -> MiddlewareResult[Any]: + async def __call__(self, /, *args: Any, **kwargs: Any) -> MiddlewareResult[Any]: retry = self.__retry for attempt in range(1, retry + 1): diff --git a/cq/middlewares/scope.py b/cq/middlewares/scope.py index 37121e3..0d0e46f 100644 --- a/cq/middlewares/scope.py +++ b/cq/middlewares/scope.py @@ -19,7 +19,7 @@ class InjectionScopeMiddleware: exist_ok: bool = field(default=False, kw_only=True) threadsafe: bool | None = field(default=None, kw_only=True) - async def __call__(self, *args: Any, **kwargs: Any) -> MiddlewareResult[Any]: + async def __call__(self, /, *args: Any, **kwargs: Any) -> MiddlewareResult[Any]: async with AsyncExitStack() as stack: try: await stack.enter_async_context( diff --git a/docs/guides/messages.md b/docs/guides/messages.md index d5f6988..03a140f 100644 --- a/docs/guides/messages.md +++ b/docs/guides/messages.md @@ -142,3 +142,19 @@ class TrackUserCreatedHandler(NamedTuple): async def handle(self, event: UserCreatedEvent): ... ``` + +### fail_silently + +The `fail_silently` option suppresses any exception raised by the handler instead of propagating it to the caller. Exceptions can still be caught and handled by middlewares before being suppressed. + +This is particularly useful for non-critical event handlers where a failure should not affect the rest of the system: + +```python +@event_handler(fail_silently=True) +class TrackUserCreatedHandler(NamedTuple): + analytics: AnalyticsService + + async def handle(self, event: UserCreatedEvent): + # An exception here won't propagate to the caller + ... +``` diff --git a/tests/test_command_bus.py b/tests/test_command_bus.py index 7930554..d0825db 100644 --- a/tests/test_command_bus.py +++ b/tests/test_command_bus.py @@ -33,3 +33,14 @@ async def handle(self, command: _Command) -> None: assert len(history.records) == 2 assert isinstance(history.records[0].args[0], _Event) assert isinstance(history.records[1].args[0], _Command) + + async def test_dispatch_with_fail_silently(self) -> None: + class _Command: ... + + @command_handler(fail_silently=True) + class _CommandHandler: + async def handle(self, command: _Command) -> None: + raise ValueError + + command_bus = find_instance(AnyCommandBus) + assert await command_bus.dispatch(_Command()) is NotImplemented diff --git a/uv.lock b/uv.lock index 65daaa1..a3e2427 100644 --- a/uv.lock +++ b/uv.lock @@ -378,7 +378,7 @@ wheels = [ [[package]] name = "fastapi" -version = "0.133.0" +version = "0.133.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "annotated-doc" }, @@ -387,9 +387,9 @@ dependencies = [ { name = "typing-extensions" }, { name = "typing-inspection" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/c2/04/ab382c7c03dd545f2c964d06e87ad0d5faa944a2434186ad9c285f5d87e0/fastapi-0.133.0.tar.gz", hash = "sha256:b900a2bf5685cdb0647a41d5900bdeafc3a9e8a28ac08c6246b76699e164d60d", size = 373265, upload-time = "2026-02-24T09:53:40.143Z" } +sdist = { url = "https://files.pythonhosted.org/packages/22/6f/0eafed8349eea1fa462238b54a624c8b408cd1ba2795c8e64aa6c34f8ab7/fastapi-0.133.1.tar.gz", hash = "sha256:ed152a45912f102592976fde6cbce7dae1a8a1053da94202e51dd35d184fadd6", size = 378741, upload-time = "2026-02-25T18:18:17.398Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/bf/b4/023e75a2ec3f5440e380df6caf4d28edc0806d007193e6fb0707237886a4/fastapi-0.133.0-py3-none-any.whl", hash = "sha256:0a78878483d60702a1dde864c24ab349a1a53ef4db6b6f74f8cd4a2b2bc67d2f", size = 104787, upload-time = "2026-02-24T09:53:41.404Z" }, + { url = "https://files.pythonhosted.org/packages/d2/c9/a175a7779f3599dfa4adfc97a6ce0e157237b3d7941538604aadaf97bfb6/fastapi-0.133.1-py3-none-any.whl", hash = "sha256:658f34ba334605b1617a65adf2ea6461901bdb9af3a3080d63ff791ecf7dc2e2", size = 109029, upload-time = "2026-02-25T18:18:18.578Z" }, ] [[package]] @@ -1176,6 +1176,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427", size = 229892, upload-time = "2024-03-01T18:36:18.57Z" }, ] +[[package]] +name = "python-discovery" +version = "1.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "filelock" }, + { name = "platformdirs" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/82/bb/93a3e83bdf9322c7e21cafd092e56a4a17c4d8ef4277b6eb01af1a540a6f/python_discovery-1.1.0.tar.gz", hash = "sha256:447941ba1aed8cc2ab7ee3cb91be5fc137c5bdbb05b7e6ea62fbdcb66e50b268", size = 55674, upload-time = "2026-02-26T09:42:49.668Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/06/54/82a6e2ef37f0f23dccac604b9585bdcbd0698604feb64807dcb72853693e/python_discovery-1.1.0-py3-none-any.whl", hash = "sha256:a162893b8809727f54594a99ad2179d2ede4bf953e12d4c7abc3cc9cdbd1437b", size = 30687, upload-time = "2026-02-26T09:42:48.548Z" }, +] + [[package]] name = "python-injection" version = "0.25.15" @@ -1457,16 +1470,17 @@ wheels = [ [[package]] name = "virtualenv" -version = "20.39.0" +version = "21.0.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "distlib" }, { name = "filelock" }, { name = "platformdirs" }, + { name = "python-discovery" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/ed/54/809199edc537dbace273495ac0884d13df26436e910a5ed4d0ec0a69806b/virtualenv-20.39.0.tar.gz", hash = "sha256:a15f0cebd00d50074fd336a169d53422436a12dfe15149efec7072cfe817df8b", size = 5869141, upload-time = "2026-02-23T18:09:13.349Z" } +sdist = { url = "https://files.pythonhosted.org/packages/ce/4f/d6a5ff3b020c801c808b14e2d2330cdc8ebefe1cdfbc457ecc368e971fec/virtualenv-21.0.0.tar.gz", hash = "sha256:e8efe4271b4a5efe7a4dce9d60a05fd11859406c0d6aa8464f4cf451bc132889", size = 5836591, upload-time = "2026-02-25T20:21:07.691Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/f7/b4/8268da45f26f4fe84f6eae80a6ca1485ffb490a926afecff75fc48f61979/virtualenv-20.39.0-py3-none-any.whl", hash = "sha256:44888bba3775990a152ea1f73f8e5f566d49f11bbd1de61d426fd7732770043e", size = 5839121, upload-time = "2026-02-23T18:09:11.173Z" }, + { url = "https://files.pythonhosted.org/packages/29/d1/3f62e4f9577b28c352c11623a03fb916096d5c131303d4861b4914481b6b/virtualenv-21.0.0-py3-none-any.whl", hash = "sha256:d44e70637402c7f4b10f48491c02a6397a3a187152a70cba0b6bc7642d69fb05", size = 5817167, upload-time = "2026-02-25T20:21:05.476Z" }, ] [[package]]