diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 9fc7ff1a..befff6f1 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -1194,11 +1194,16 @@ async def reboot( (self.event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001 ) - await asyncio.gather( + results = await asyncio.gather( *[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners], *[listener(EventMigratingData()) for listener in migrating_listeners], + return_exceptions=True, ) + for result in results: + if isinstance(result, Exception): + self.log.exception('A pre-reboot event listener failed', exc_info=result) + if not self.configuration.actor_run_id: raise RuntimeError('actor_run_id cannot be None when running on the Apify platform.') diff --git a/tests/unit/actor/test_actor_helpers.py b/tests/unit/actor/test_actor_helpers.py index de2ff920..2cf84c51 100644 --- a/tests/unit/actor/test_actor_helpers.py +++ b/tests/unit/actor/test_actor_helpers.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import logging import warnings from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING @@ -344,3 +345,57 @@ async def test_get_remaining_time_returns_positive_when_timeout_in_future() -> N assert result is not None assert result > timedelta(0) assert result <= timedelta(minutes=5) + + +async def test_reboot_runs_all_listeners_even_when_one_fails( + apify_client_async_patcher: ApifyClientAsyncPatcher, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test that a failing pre-reboot event listener does not prevent other listeners from running. + + Directly injects raw async callables into the event manager's _listeners_to_wrappers + to simulate exceptions escaping the wrapper layer (the scenario return_exceptions=True guards against). + """ + apify_client_async_patcher.patch('run', 'reboot', return_value=None) + + persist_state_called = False + migrating_called = False + + async def failing_listener(*_args: object) -> None: + raise RuntimeError('persist_state listener error') + + async def successful_persist_state_listener(*_args: object) -> None: + nonlocal persist_state_called + persist_state_called = True + + async def successful_migrating_listener(*_args: object) -> None: + nonlocal migrating_called + migrating_called = True + + async with Actor: + Actor.configuration.is_at_home = True + Actor.configuration.actor_run_id = 'some-run-id' + + # Inject raw listeners directly into the event manager's internal structure, + # bypassing crawlee's wrapper that would catch exceptions on its own. + listeners_map = Actor.event_manager._listeners_to_wrappers + listeners_map[Event.PERSIST_STATE] = { + failing_listener: [failing_listener], + successful_persist_state_listener: [successful_persist_state_listener], + } + listeners_map[Event.MIGRATING] = { + successful_migrating_listener: [successful_migrating_listener], + } + + with caplog.at_level(logging.ERROR): + await Actor.reboot(custom_after_sleep=timedelta(milliseconds=1)) + + # All listeners ran despite the failure in one of them. + assert persist_state_called + assert migrating_called + + # The exception was logged. + assert any('A pre-reboot event listener failed' in r.message for r in caplog.records) + + # The reboot API call was still made. + assert len(apify_client_async_patcher.calls['run']['reboot']) == 1