diff --git a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/utils.py b/packages/pytest-simcore/src/pytest_simcore/helpers/dynamic_scheduler.py similarity index 100% rename from services/dynamic-scheduler/tests/unit/services/generic_scheduler/utils.py rename to packages/pytest-simcore/src/pytest_simcore/helpers/dynamic_scheduler.py diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/events.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/events.py index e2895ab01f17..f0c276b567c5 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/events.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/events.py @@ -29,6 +29,7 @@ from ..services.notifier import get_notifier_lifespans from ..services.rabbitmq import rabbitmq_lifespan from ..services.redis import redis_lifespan +from ..services.scheduler import scheduler_lifespan from ..services.service_tracker import service_tracker_lifespan from ..services.status_monitor import status_monitor_lifespan from .settings import ApplicationSettings @@ -84,6 +85,7 @@ def create_app_lifespan( app_lifespan.add(lifespan) app_lifespan.add(generic_scheduler_lifespan) + app_lifespan.add(scheduler_lifespan) app_lifespan.add(service_tracker_lifespan) app_lifespan.add(deferred_manager_lifespan) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py index 5af60ab2ead6..9aed03a902b1 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/__init__.py @@ -17,10 +17,12 @@ ) from ._lifespan import generic_scheduler_lifespan from ._models import ( + OperationContext, OperationName, OperationToStart, ProvidedOperationContext, RequiredOperationContext, + ReservedContextKeys, ScheduleId, ) from ._operation import ( @@ -48,6 +50,7 @@ "get_step_store_proxy", "NoDataFoundError", "Operation", + "OperationContext", "OperationContextProxy", "OperationName", "OperationRegistry", @@ -57,6 +60,7 @@ "register_to_start_after_on_executed_completed", "register_to_start_after_on_reverted_completed", "RequiredOperationContext", + "ReservedContextKeys", "restart_operation_step_stuck_during_revert", "restart_operation_step_stuck_in_manual_intervention_during_execute", "ScheduleId", diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py index 7f405413a0fa..da4b49c37160 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_core.py @@ -1,4 +1,3 @@ -import asyncio import logging from collections.abc import Iterable from contextlib import suppress @@ -48,7 +47,6 @@ ) from ._models import ( EventType, - OperationContext, OperationErrorType, OperationName, OperationToStart, @@ -94,30 +92,32 @@ def __init__( async def start_operation( self, - operation_name: OperationName, - initial_operation_context: OperationContext, + operation_to_start: OperationToStart, on_execute_completed: OperationToStart | None, on_revert_completed: OperationToStart | None, ) -> ScheduleId: """start an operation by it's given name and providing an initial context""" schedule_id: ScheduleId = f"{uuid4()}" - initial_operation_context[ReservedContextKeys.SCHEDULE_ID] = schedule_id + operation_to_start.initial_context[ReservedContextKeys.SCHEDULE_ID] = ( + schedule_id + ) # check if operation is registered - operation = OperationRegistry.get_operation(operation_name) + operation = OperationRegistry.get_operation(operation_to_start.operation_name) for required_key in operation.initial_context_required_keys: - if required_key not in initial_operation_context: + if required_key not in operation_to_start.initial_context: raise OperationInitialContextKeyNotFoundError( - operation_name=operation_name, required_key=required_key + operation_name=operation_to_start.operation_name, + required_key=required_key, ) # NOTE: to ensure reproducibility of operations, the # operation steps cannot overwrite keys in the # initial context with their results raise_if_overwrites_any_operation_provided_key( - operation, initial_operation_context + operation, operation_to_start.initial_context ) schedule_data_proxy = ScheduleDataStoreProxy( @@ -125,7 +125,7 @@ async def start_operation( ) await schedule_data_proxy.create_or_update_multiple( { - "operation_name": operation_name, + "operation_name": operation_to_start.operation_name, "group_index": 0, "is_executing": True, } @@ -134,9 +134,11 @@ async def start_operation( operation_content_proxy = OperationContextProxy( store=self._store, schedule_id=schedule_id, - operation_name=operation_name, + operation_name=operation_to_start.operation_name, + ) + await operation_content_proxy.create_or_update( + operation_to_start.initial_context ) - await operation_content_proxy.create_or_update(initial_operation_context) if on_execute_completed: await register_to_start_after_on_executed_completed( @@ -188,7 +190,7 @@ async def cancel_operation(self, schedule_id: ScheduleId) -> None: operation_name=operation_name, group_index=group_index, step_group=step_group, - is_executing=is_executing, + is_executing=True, ) # not allowed to cancel while waiting for manual intervention @@ -210,13 +212,16 @@ async def _cancel_step(step_name: StepName, step_proxy: StepStoreProxy) -> None: with log_context( _logger, logging.DEBUG, - f"Cancelling step {step_name=} of {operation_name=} for {schedule_id=}", + f"cancelling step {step_name=} of {operation_name=} for {schedule_id=}", ): with suppress(NoDataFoundError): deferred_task_uid = await step_proxy.read("deferred_task_uid") # the deferred task might not be running when this is called # e.g. cancelling a repeating operation - await DeferredRunner.cancel(deferred_task_uid) + with log_context( + _logger, logging.DEBUG, f"cancelling {deferred_task_uid=}" + ): + await DeferredRunner.cancel(deferred_task_uid) await step_proxy.create_or_update("status", StepStatus.CANCELLED) @@ -226,12 +231,16 @@ async def _cancel_step(step_name: StepName, step_proxy: StepStoreProxy) -> None: schedule_id=schedule_id, operation_name=operation_name, step_group_name=step_group_name, - is_executing=is_executing, + is_executing=True, ) if ( await group_proxy.increment_and_get_done_steps_count() == expected_steps_count ): + # all steps cancelled, move to revert + await schedule_data_proxy.create_or_update( + "is_executing", value=False + ) await enqueue_schedule_event(self.app, schedule_id) await limited_gather( @@ -443,29 +452,29 @@ async def _on_schedule_event(self, schedule_id: ScheduleId) -> None: step_group_name = step_group.get_step_group_name(index=group_index) base_message = f"{step_group_name=} in {operation_name=} for {schedule_id=}" - if step_group.repeat_steps is True and is_executing: - with log_context(_logger, logging.DEBUG, f"REPEATING {base_message}"): - await self._advance_as_repeating( - schedule_data_proxy, - schedule_id, - operation_name, - group_index, - step_group, - group_step_proxies, - ) - - elif is_executing: - with log_context(_logger, logging.DEBUG, f"CREATING {base_message}"): - await self._advance_as_creating( - steps_statuses, - schedule_data_proxy, - schedule_id, - operation_name, - group_index, - step_group, - operation, - ) - + if is_executing: + if step_group.repeat_steps: + with log_context(_logger, logging.DEBUG, f"REPEATING {base_message}"): + await self._advance_as_repeating( + steps_statuses, + schedule_data_proxy, + schedule_id, + operation_name, + group_index, + step_group, + group_step_proxies, + ) + else: + with log_context(_logger, logging.DEBUG, f"CREATING {base_message}"): + await self._advance_as_creating( + steps_statuses, + schedule_data_proxy, + schedule_id, + operation_name, + group_index, + step_group, + operation, + ) else: with log_context(_logger, logging.DEBUG, f"REVERTING {base_message}"): await self._advance_as_reverting( @@ -479,6 +488,7 @@ async def _on_schedule_event(self, schedule_id: ScheduleId) -> None: async def _advance_as_repeating( self, + steps_statuses: dict[StepName, StepStatus], schedule_data_proxy: ScheduleDataStoreProxy, schedule_id: ScheduleId, operation_name: OperationName, @@ -487,27 +497,19 @@ async def _advance_as_repeating( group_step_proxies: dict[StepName, StepStoreProxy], ) -> None: # REPEATING logic: - # 1) sleep before repeating - # 2) if any of the repeating steps was cancelled -> move to revert - # 3) -> restart all steps in the group + # 1) if any of the repeating steps was cancelled -> move to revert + # 2) -> restart all steps in the group step_proxies: Iterable[StepStoreProxy] = group_step_proxies.values() - # 1) sleep before repeating - await asyncio.sleep(current_step_group.wait_before_repeat.total_seconds()) - - # 2) if any of the repeating steps was cancelled -> move to revert - - # since some time passed, query all steps statuses again, - # a cancellation request might have been requested - steps_stauses = await get_steps_statuses(step_proxies) - if any(status == StepStatus.CANCELLED for status in steps_stauses.values()): + # 1) if any of the repeating steps was cancelled -> move to revert + if any(status == StepStatus.CANCELLED for status in steps_statuses.values()): # NOTE: await schedule_data_proxy.create_or_update("is_executing", value=False) await enqueue_schedule_event(self.app, schedule_id) return - # 3) -> restart all steps in the group + # 2) -> restart all steps in the group await limited_gather( *(x.delete() for x in step_proxies), limit=PARALLEL_REQUESTS ) @@ -554,30 +556,30 @@ async def _advance_as_creating( # 1b) if reached the end of the EXECUTE operation -> remove all created data [EMIT execute complete event] on_executed_proxy = OperationEventsProxy( - self._store, schedule_id, EventType.ON_EXECUTEDD_COMPLETED + self._store, schedule_id, EventType.ON_EXECUTED_COMPLETED ) - on_executed_operation_name: OperationName | None = None - on_executed_initial_context: OperationContext | None = None + on_executed_to_start: OperationToStart | None = None + on_executed_on_execute_completed: OperationToStart | None = None + on_executed_on_revert_completed: OperationToStart | None = None if await on_executed_proxy.exists(): - on_executed_operation_name = await on_executed_proxy.read( - "operation_name" + on_executed_to_start = await on_executed_proxy.read("to_start") + on_executed_on_execute_completed = await on_executed_proxy.read( + "on_execute_completed" ) - on_executed_initial_context = await on_executed_proxy.read( - "initial_context" + on_executed_on_revert_completed = await on_executed_proxy.read( + "on_revert_completed" ) await cleanup_after_finishing( self._store, schedule_id=schedule_id, is_executing=True ) - if ( - on_executed_operation_name is not None - and on_executed_initial_context is not None - ): + if on_executed_to_start is not None: await enqueue_execute_completed_event( self.app, schedule_id, - on_executed_operation_name, - on_executed_initial_context, + on_executed_to_start, + on_execute_completed=on_executed_on_execute_completed, + on_revert_completed=on_executed_on_revert_completed, ) return @@ -657,28 +659,28 @@ async def _advance_as_reverting( on_reverted_proxy = OperationEventsProxy( self._store, schedule_id, EventType.ON_REVERT_COMPLETED ) - on_reverted_operation_name: OperationName | None = None - on_reverted_initial_context: OperationContext | None = None + on_reverted_to_start: OperationToStart | None = None + on_reverted_on_execute_completed: OperationToStart | None = None + on_reverted_on_revert_completed: OperationToStart | None = None if await on_reverted_proxy.exists(): - on_reverted_operation_name = await on_reverted_proxy.read( - "operation_name" + on_reverted_to_start = await on_reverted_proxy.read("to_start") + on_reverted_on_execute_completed = await on_reverted_proxy.read( + "on_execute_completed" ) - on_reverted_initial_context = await on_reverted_proxy.read( - "initial_context" + on_reverted_on_revert_completed = await on_reverted_proxy.read( + "on_revert_completed" ) await cleanup_after_finishing( self._store, schedule_id=schedule_id, is_executing=False ) - if ( - on_reverted_operation_name is not None - and on_reverted_initial_context is not None - ): + if on_reverted_to_start is not None: await enqueue_revert_completed_event( self.app, schedule_id, - on_reverted_operation_name, - on_reverted_initial_context, + on_reverted_to_start, + on_execute_completed=on_reverted_on_execute_completed, + on_revert_completed=on_reverted_on_revert_completed, ) return @@ -747,15 +749,13 @@ async def _advance_as_reverting( async def start_operation( app: FastAPI, - operation_name: OperationName, - initial_operation_context: OperationContext, + operation_to_start: OperationToStart, *, on_execute_completed: OperationToStart | None = None, on_revert_completed: OperationToStart | None = None, ) -> ScheduleId: return await Core.get_from_app_state(app).start_operation( - operation_name, - initial_operation_context, + operation_to_start, on_execute_completed, on_revert_completed, ) @@ -763,7 +763,7 @@ async def start_operation( async def cancel_operation(app: FastAPI, schedule_id: ScheduleId) -> None: """ - Unstruct scheduler to revert all steps completed until + Instruct scheduler to revert all steps completed until now for the running operation. `reverting` refers to the act of reverting the effects of a step diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_deferred_runner.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_deferred_runner.py index 5fb19777554f..50c0f152d3e3 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_deferred_runner.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_deferred_runner.py @@ -1,3 +1,4 @@ +import asyncio from datetime import timedelta from fastapi import FastAPI @@ -5,13 +6,9 @@ from servicelib.deferred_tasks import BaseDeferredHandler, DeferredContext, TaskUID from servicelib.deferred_tasks._models import TaskResultError -from ._errors import ( - OperationContextValueIsNoneError, - ProvidedOperationContextKeysAreMissingError, -) +from ._errors import ProvidedOperationContextKeysAreMissingError from ._event import enqueue_schedule_event from ._models import ( - OperationContext, OperationName, ProvidedOperationContext, ScheduleId, @@ -93,14 +90,7 @@ async def _enqueue_schedule_event_if_group_is_done(context: DeferredContext) -> await enqueue_schedule_event(app, schedule_id) -def _raise_if_any_context_value_is_none( - operation_context: OperationContext, -) -> None: - if any(value is None for value in operation_context.values()): - raise OperationContextValueIsNoneError(operation_context=operation_context) - - -def _raise_if_provided_context_keys_are_missing_or_none( +def _raise_if_provided_context_keys_are_missing( provided_context: ProvidedOperationContext, expected_keys: set[str], ) -> None: @@ -112,8 +102,6 @@ def _raise_if_provided_context_keys_are_missing_or_none( expected_keys=expected_keys, ) - _raise_if_any_context_value_is_none(provided_context) - class DeferredRunner(BaseDeferredHandler[None]): @classmethod @@ -179,26 +167,30 @@ async def run(cls, context: DeferredContext) -> None: required_context = await operation_context_proxy.read( *step.get_execute_requires_context_keys() ) - _raise_if_any_context_value_is_none(required_context) + await asyncio.sleep(step.get_sleep_before_execute().total_seconds()) step_provided_operation_context = await step.execute(app, required_context) + await asyncio.sleep(step.get_sleep_after_execute().total_seconds()) + provided_operation_context = step_provided_operation_context or {} execute_provides_keys = step.get_execute_provides_context_keys() - _raise_if_provided_context_keys_are_missing_or_none( + _raise_if_provided_context_keys_are_missing( provided_operation_context, execute_provides_keys ) else: required_context = await operation_context_proxy.read( *step.get_revert_requires_context_keys() ) - _raise_if_any_context_value_is_none(required_context) + await asyncio.sleep(step.get_sleep_before_revert().total_seconds()) step_provided_operation_context = await step.revert(app, required_context) + await asyncio.sleep(step.get_sleep_after_revert().total_seconds()) + provided_operation_context = step_provided_operation_context or {} revert_provides_keys = step.get_revert_provides_context_keys() - _raise_if_provided_context_keys_are_missing_or_none( + _raise_if_provided_context_keys_are_missing( provided_operation_context, revert_provides_keys ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py index 32357a413b27..c299dcc679d6 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_errors.py @@ -38,10 +38,6 @@ class UnexpectedStepHandlingError(BaseGenericSchedulerError): ) -class OperationContextValueIsNoneError(BaseGenericSchedulerError): - msg_template: str = "Values of context cannot be None: {operation_context}" - - class ProvidedOperationContextKeysAreMissingError(BaseGenericSchedulerError): msg_template: str = ( "Provided context {provided_context} is missing keys {missing_keys}, was expecting {expected_keys}" diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event.py index 5476ff6bdf08..9f3397a8fcd1 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event.py @@ -3,7 +3,7 @@ from ._dependencies import get_event_scheduler from ._event_base_queue import OperationToStartEvent from ._event_queues import ExecuteCompletedQueue, RevertCompletedQueue, ScheduleQueue -from ._models import OperationContext, OperationName, ScheduleId +from ._models import OperationToStart, ScheduleId async def enqueue_schedule_event(app: FastAPI, schedule_id: ScheduleId) -> None: @@ -13,15 +13,18 @@ async def enqueue_schedule_event(app: FastAPI, schedule_id: ScheduleId) -> None: async def enqueue_execute_completed_event( app: FastAPI, schedule_id: ScheduleId, - operation_name: OperationName, - initial_context: OperationContext, + to_start: OperationToStart, + *, + on_execute_completed: OperationToStart | None = None, + on_revert_completed: OperationToStart | None = None, ) -> None: await get_event_scheduler(app).enqueue_message_for( ExecuteCompletedQueue, OperationToStartEvent( schedule_id=schedule_id, - operation_name=operation_name, - initial_context=initial_context, + to_start=to_start, + on_execute_completed=on_execute_completed, + on_revert_completed=on_revert_completed, ), ) @@ -29,14 +32,17 @@ async def enqueue_execute_completed_event( async def enqueue_revert_completed_event( app: FastAPI, schedule_id: ScheduleId, - operation_name: OperationName, - initial_context: OperationContext, + to_start: OperationToStart, + *, + on_execute_completed: OperationToStart | None = None, + on_revert_completed: OperationToStart | None = None, ) -> None: await get_event_scheduler(app).enqueue_message_for( RevertCompletedQueue, OperationToStartEvent( schedule_id=schedule_id, - operation_name=operation_name, - initial_context=initial_context, + to_start=to_start, + on_execute_completed=on_execute_completed, + on_revert_completed=on_revert_completed, ), ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after.py index 9194c11e1cc0..9f068679dc87 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after.py @@ -6,13 +6,7 @@ from ._core import start_operation from ._errors import OperationInitialContextKeyNotFoundError -from ._models import ( - EventType, - OperationContext, - OperationName, - OperationToStart, - ScheduleId, -) +from ._models import EventType, OperationToStart, ScheduleId from ._operation import OperationRegistry from ._store import OperationEventsProxy, Store @@ -37,6 +31,8 @@ async def register_to_start_after( event_type: EventType, *, to_start: OperationToStart | None, + on_execute_completed: OperationToStart | None = None, + on_revert_completed: OperationToStart | None = None, ) -> None: events_proxy = OperationEventsProxy(self._store, schedule_id, event_type) @@ -60,40 +56,47 @@ async def register_to_start_after( await events_proxy.create_or_update_multiple( { - "initial_context": to_start.initial_context, - "operation_name": to_start.operation_name, + "to_start": to_start, + "on_execute_completed": on_execute_completed, + "on_revert_completed": on_revert_completed, } ) _logger.debug( - "Registered event_type='%s' to_start='%s' for schedule_id='%s'", + "Registered event_type='%s' for schedule_id='%s' to_start='%s'", event_type, - to_start, schedule_id, + to_start, ) async def safe_on_event_type( self, event_type: EventType, schedule_id: ScheduleId, - operation_name: OperationName, - initial_context: OperationContext, + to_start: OperationToStart, + *, + on_execute_completed: OperationToStart | None = None, + on_revert_completed: OperationToStart | None = None, ) -> None: with log_context( _logger, logging.DEBUG, - f"processing {event_type=} for {schedule_id=} {operation_name=} {initial_context=}", + f"processing {event_type=} for {schedule_id=} {to_start=} {on_execute_completed=} {on_revert_completed=}", log_duration=True, ): new_schedule_id = await start_operation( - self.app, operation_name, initial_context + self.app, + to_start, + on_execute_completed=on_execute_completed, + on_revert_completed=on_revert_completed, ) _logger.debug( "Finished execution of event_type='%s' for schedule_id='%s'. " - "Started new_schedule_id='%s' from operation_name='%s' with initial_context='%s'", + "Started new_schedule_id='%s' from to_start='%s' on_execute_completed='%s' on_revert_completed='%s'", event_type, schedule_id, new_schedule_id, - operation_name, - initial_context, + to_start, + on_execute_completed, + on_revert_completed, ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after_registration.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after_registration.py index bec166efabb5..7f61d78563e9 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after_registration.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_after_registration.py @@ -16,16 +16,36 @@ def _get_after_event_manager(app: FastAPI) -> "AfterEventManager": async def register_to_start_after_on_executed_completed( - app: FastAPI, schedule_id: ScheduleId, *, to_start: OperationToStart | None + app: FastAPI, + schedule_id: ScheduleId, + *, + to_start: OperationToStart | None, + on_execute_completed: OperationToStart | None = None, + on_revert_completed: OperationToStart | None = None, ) -> None: + """raises raises NoDataFoundError""" await _get_after_event_manager(app).register_to_start_after( - schedule_id, EventType.ON_EXECUTEDD_COMPLETED, to_start=to_start + schedule_id, + EventType.ON_EXECUTED_COMPLETED, + to_start=to_start, + on_execute_completed=on_execute_completed, + on_revert_completed=on_revert_completed, ) async def register_to_start_after_on_reverted_completed( - app: FastAPI, schedule_id: ScheduleId, *, to_start: OperationToStart | None + app: FastAPI, + schedule_id: ScheduleId, + *, + to_start: OperationToStart | None, + on_execute_completed: OperationToStart | None = None, + on_revert_completed: OperationToStart | None = None, ) -> None: + """raises raises NoDataFoundError""" await _get_after_event_manager(app).register_to_start_after( - schedule_id, EventType.ON_REVERT_COMPLETED, to_start=to_start + schedule_id, + EventType.ON_REVERT_COMPLETED, + to_start=to_start, + on_execute_completed=on_execute_completed, + on_revert_completed=on_revert_completed, ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_base_queue.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_base_queue.py index a6eb2924fea4..896e8751dd79 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_base_queue.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_base_queue.py @@ -9,7 +9,7 @@ from faststream.rabbit import RabbitExchange, RabbitQueue, RabbitRouter from faststream.rabbit.schemas.queue import QueueType, QuorumQueueArgs -from ._models import OperationContext, OperationName, ScheduleId +from ._models import OperationToStart, ScheduleId _logger = logging.getLogger(__name__) @@ -61,8 +61,9 @@ async def wrapper(*args, **kwargs): @dataclass class OperationToStartEvent: schedule_id: ScheduleId - operation_name: OperationName - initial_context: OperationContext + to_start: OperationToStart + on_execute_completed: OperationToStart | None = None + on_revert_completed: OperationToStart | None = None @dataclass diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_queues.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_queues.py index 4fbd43ab7f9b..43074ad63e65 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_queues.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_event_queues.py @@ -15,10 +15,11 @@ async def handler( # type:ignore[override] # pylint:disable=arguments-differ self, event: OperationToStartEvent ) -> None: await get_after_event_manager(self.app).safe_on_event_type( - EventType.ON_EXECUTEDD_COMPLETED, + EventType.ON_EXECUTED_COMPLETED, event.schedule_id, - event.operation_name, - event.initial_context, + event.to_start, + on_execute_completed=event.on_execute_completed, + on_revert_completed=event.on_revert_completed, ) @@ -29,6 +30,7 @@ async def handler( # type:ignore[override] # pylint:disable=arguments-differ await get_after_event_manager(self.app).safe_on_event_type( EventType.ON_REVERT_COMPLETED, event.schedule_id, - event.operation_name, - event.initial_context, + event.to_start, + on_execute_completed=event.on_execute_completed, + on_revert_completed=event.on_revert_completed, ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_models.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_models.py index 5810ed7b125d..39b72e34d96d 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_models.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_models.py @@ -43,7 +43,7 @@ class OperationErrorType(StrAutoEnum): class EventType(StrAutoEnum): - ON_EXECUTEDD_COMPLETED = auto() + ON_EXECUTED_COMPLETED = auto() ON_REVERT_COMPLETED = auto() diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py index f9c5165ad234..513cfb4f9021 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_operation.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from datetime import timedelta -from typing import Annotated, ClassVar, Final, TypeAlias, TypedDict +from typing import Annotated, ClassVar, Final, TypedDict from fastapi import FastAPI from pydantic import Field, NonNegativeInt, TypeAdapter, validate_call @@ -23,6 +23,7 @@ _DEFAULT_STEP_RETRIES: Final[NonNegativeInt] = 0 _DEFAULT_STEP_TIMEOUT: Final[timedelta] = timedelta(seconds=5) _DEFAULT_WAIT_FOR_MANUAL_INTERVENTION: Final[bool] = False +_DEFAULT_SLEEP_EXECUTE_REVERST: Final[timedelta] = timedelta(seconds=0) class BaseStep(ABC): @@ -32,6 +33,13 @@ def get_step_name(cls) -> StepName: ### EXECUTE + @classmethod + def get_sleep_before_execute(cls) -> timedelta: + """ + [optional] wait time before executing the step + """ + return _DEFAULT_SLEEP_EXECUTE_REVERST + @classmethod @abstractmethod async def execute( @@ -43,6 +51,13 @@ async def execute( - `execute` is called multiple times and does not cause duplicate resources """ + @classmethod + def get_sleep_after_execute(cls) -> timedelta: + """ + [optional] wait time after executing the step + """ + return _DEFAULT_SLEEP_EXECUTE_REVERST + @classmethod def get_execute_requires_context_keys(cls) -> set[str]: """ @@ -88,6 +103,13 @@ def wait_for_manual_intervention(cls) -> bool: ### REVERT + @classmethod + def get_sleep_before_revert(cls) -> timedelta: + """ + [optional] wait time before reverting the step + """ + return _DEFAULT_SLEEP_EXECUTE_REVERST + @classmethod async def revert( cls, app: FastAPI, required_context: RequiredOperationContext @@ -103,6 +125,13 @@ async def revert( _ = app return {} + @classmethod + def get_sleep_after_revert(cls) -> timedelta: + """ + [optional] wait time after reverting the step + """ + return _DEFAULT_SLEEP_EXECUTE_REVERST + @classmethod def get_revert_requires_context_keys(cls) -> set[str]: """ @@ -140,16 +169,15 @@ async def get_revert_wait_between_attempts( return _DEFAULT_STEP_TIMEOUT -StepsSubGroup: TypeAlias = Annotated[tuple[type[BaseStep], ...], Field(min_length=1)] +type StepsSubGroup = Annotated[tuple[type[BaseStep], ...], Field(min_length=1)] class BaseStepGroup(ABC): - def __init__(self, *, repeat_steps: bool, wait_before_repeat: timedelta) -> None: + def __init__(self, *, repeat_steps: bool) -> None: """ if repeat_steps is True, the steps in this group will be repeated forever """ self.repeat_steps = repeat_steps - self.wait_before_repeat = wait_before_repeat @abstractmethod def __len__(self) -> int: @@ -169,21 +197,14 @@ def get_step_subgroup_to_run(self) -> StepsSubGroup: _DEFAULT_REPEAT_STEPS: Final[bool] = False -_DEFAULT_WAIT_BEFORE_REPEAT: Final[timedelta] = timedelta(seconds=5) class SingleStepGroup(BaseStepGroup): def __init__( - self, - step: type[BaseStep], - *, - repeat_steps: bool = _DEFAULT_REPEAT_STEPS, - wait_before_repeat: timedelta = _DEFAULT_WAIT_BEFORE_REPEAT, + self, step: type[BaseStep], *, repeat_steps: bool = _DEFAULT_REPEAT_STEPS ) -> None: self._step: type[BaseStep] = step - super().__init__( - repeat_steps=repeat_steps, wait_before_repeat=wait_before_repeat - ) + super().__init__(repeat_steps=repeat_steps) def __len__(self) -> int: return 1 @@ -203,15 +224,10 @@ def get_step_subgroup_to_run(self) -> StepsSubGroup: class ParallelStepGroup(BaseStepGroup): def __init__( - self, - *steps: type[BaseStep], - repeat_steps: bool = _DEFAULT_REPEAT_STEPS, - wait_before_repeat: timedelta = _DEFAULT_WAIT_BEFORE_REPEAT, + self, *steps: type[BaseStep], repeat_steps: bool = _DEFAULT_REPEAT_STEPS ) -> None: self._steps: list[type[BaseStep]] = list(steps) - super().__init__( - repeat_steps=repeat_steps, wait_before_repeat=wait_before_repeat - ) + super().__init__(repeat_steps=repeat_steps) def __len__(self) -> int: return len(self._steps) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_store.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_store.py index 87c7f9517f29..6e99c9644c33 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_store.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/generic_scheduler/_store.py @@ -13,9 +13,9 @@ from ._lifecycle_protocol import SupportsLifecycle from ._models import ( EventType, - OperationContext, OperationErrorType, OperationName, + OperationToStart, ProvidedOperationContext, RequiredOperationContext, ScheduleId, @@ -442,9 +442,10 @@ async def delete(self) -> None: await self._store.delete(self._get_hash_key()) -class _EventDict(TypedDict): - operation_name: NotRequired[OperationName] - initial_context: NotRequired[OperationContext] +class _OperationEventsDict(TypedDict): + to_start: NotRequired[OperationToStart] + on_execute_completed: NotRequired[OperationToStart | None] + on_revert_completed: NotRequired[OperationToStart | None] class OperationEventsProxy: @@ -462,29 +463,42 @@ def _get_hash_key(self) -> str: @overload async def create_or_update( - self, key: Literal["initial_context"], value: OperationContext + self, key: Literal["to_start"], value: OperationToStart ) -> None: ... @overload async def create_or_update( - self, key: Literal["operation_name"], value: OperationName + self, key: Literal["on_execute_completed"], value: OperationToStart | None + ) -> None: ... + @overload + async def create_or_update( + self, key: Literal["on_revert_completed"], value: OperationToStart | None ) -> None: ... async def create_or_update(self, key: str, value: Any) -> None: await self._store.set_key_in_hash(self._get_hash_key(), key, value) - async def create_or_update_multiple(self, updates: _EventDict) -> None: + async def create_or_update_multiple(self, updates: _OperationEventsDict) -> None: await self._store.set_keys_in_hash(self._get_hash_key(), updates=updates) # type: ignore[arg-type] @overload - async def read(self, key: Literal["operation_name"]) -> OperationName: ... + async def read(self, key: Literal["to_start"]) -> OperationToStart: ... + @overload + async def read( + self, key: Literal["on_execute_completed"] + ) -> OperationToStart | None: ... @overload - async def read(self, key: Literal["initial_context"]) -> OperationContext: ... + async def read( + self, key: Literal["on_revert_completed"] + ) -> OperationToStart | None: ... async def read(self, key: str) -> Any: """raises NoDataFoundError if the key is not present in the hash""" hash_key = self._get_hash_key() (result,) = await self._store.get_keys_from_hash(hash_key, key) - if result is None: - raise NoDataFoundError(key=key, hash_key=hash_key) - return result + + match key: + case "to_start" | "on_execute_completed" | "on_revert_completed": + return OperationToStart(**result) if result else None + + raise NoDataFoundError(key=key, hash_key=hash_key) async def delete(self) -> None: await self._store.delete(self._get_hash_key()) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/__init__.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/__init__.py new file mode 100644 index 000000000000..7f8c1e60d72c --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/__init__.py @@ -0,0 +1,8 @@ +from ._lifespan import scheduler_lifespan +from ._manager import start_service, stop_service + +__all__: tuple[str, ...] = ( + "scheduler_lifespan", + "start_service", + "stop_service", +) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_errors.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_errors.py new file mode 100644 index 000000000000..c3bba02f277a --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_errors.py @@ -0,0 +1,19 @@ +from common_library.errors_classes import OsparcErrorMixin + + +class BaseSchedulerError(OsparcErrorMixin, Exception): + """base exception for this module""" + + +class UnexpectedCouldNotFindCurrentScheduledIdError(BaseSchedulerError): + msg_template: str = "Could not find current_schedule_id, this is unexpected" + + +class UnexpectedCouldNotFindOperationNameError(BaseSchedulerError): + msg_template: str = "Could not find operation name for schedule_id '{schedule_id}'" + + +class UnexpectedCouldNotDetermineOperationTypeError(BaseSchedulerError): + msg_template: str = ( + "Could not determine operation type from '{operation_name}'. Supported types are {supported_types}" + ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_lifespan.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_lifespan.py new file mode 100644 index 000000000000..d9e9b4750602 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_lifespan.py @@ -0,0 +1,26 @@ +from collections.abc import AsyncIterator + +from fastapi import FastAPI +from fastapi_lifespan_manager import State + +from ...core.settings import ApplicationSettings +from ._operations import profiles, registry +from ._redis import RedisStore + + +async def scheduler_lifespan(app: FastAPI) -> AsyncIterator[State]: + settings: ApplicationSettings = app.state.settings + + store = RedisStore(settings.DYNAMIC_SCHEDULER_REDIS) + store.set_to_app_state(app) + + profiles.register_profiles() + registry.register_operataions() + + await store.setup() + + yield {} + + await store.shutdown() + registry.unregister_operations() + profiles.unregister_profiles() diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_manager.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_manager.py new file mode 100644 index 000000000000..737d1b695b57 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_manager.py @@ -0,0 +1,207 @@ +import logging +from datetime import timedelta +from typing import Final + +from fastapi import FastAPI +from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( + DynamicServiceStart, + DynamicServiceStop, +) +from models_library.projects_nodes_io import NodeID +from pydantic import NonNegativeFloat, TypeAdapter +from servicelib.logging_utils import log_context +from tenacity import ( + AsyncRetrying, + retry_if_exception_type, + stop_after_delay, + wait_fixed, +) + +from ..generic_scheduler import ( + NoDataFoundError, + OperationName, + OperationToStart, + ScheduleId, + cancel_operation, + get_operation_name_or_none, + register_to_start_after_on_executed_completed, + register_to_start_after_on_reverted_completed, + start_operation, +) +from . import _opration_names +from ._errors import ( + UnexpectedCouldNotFindCurrentScheduledIdError, + UnexpectedCouldNotFindOperationNameError, +) +from ._models import OperationType, UserRequestedState +from ._redis import RedisServiceStateManager +from ._utils import get_scheduler_operation_type_or_raise + +_logger = logging.getLogger(__name__) + +_WAIT_BETWEEN_RETRIES: Final[NonNegativeFloat] = 0.1 +_MAX_WAIT_TIME_FOR_SCHEDULE_ID: Final[NonNegativeFloat] = timedelta( + seconds=5 +).total_seconds() + + +async def _get_schedule_id_and_opration_type( + app: FastAPI, service_state_manager: RedisServiceStateManager +) -> tuple[ScheduleId, OperationType]: + + # NOTE: current_schedule_id is expected to be invalid, + # while oprations are switching. + # Waiting a very short time should usually fixes the issue. + async for attempt in AsyncRetrying( + wait=wait_fixed(_WAIT_BETWEEN_RETRIES), + stop=stop_after_delay(_MAX_WAIT_TIME_FOR_SCHEDULE_ID), + reraise=True, + retry=retry_if_exception_type(UnexpectedCouldNotFindOperationNameError), + ): + with attempt: + current_schedule_id = await service_state_manager.read( + "current_schedule_id" + ) + if current_schedule_id is None: + raise UnexpectedCouldNotFindCurrentScheduledIdError + + opration_name = await get_operation_name_or_none(app, current_schedule_id) + + if opration_name is None: + raise UnexpectedCouldNotFindOperationNameError( + schedule_id=current_schedule_id + ) + + assert TypeAdapter(ScheduleId).validate_python(current_schedule_id) # nosec + assert TypeAdapter(OperationName).validate_python(opration_name) # nosec + + operation_type = get_scheduler_operation_type_or_raise(name=opration_name) + + return current_schedule_id, operation_type + + +async def _switch_to_enforce( + app: FastAPI, schedule_id: ScheduleId, node_id: NodeID +) -> None: + try: + enforce_operation = OperationToStart( + _opration_names.ENFORCE, {"node_id": node_id} + ) + await register_to_start_after_on_executed_completed( + app, schedule_id, to_start=enforce_operation + ) + await register_to_start_after_on_reverted_completed( + app, schedule_id, to_start=enforce_operation + ) + await cancel_operation(app, schedule_id) + _logger.debug("Switched schedule_id='%s' to ENFORCE.", schedule_id) + except NoDataFoundError: + _logger.debug("Could not switch schedule_id='%s' to ENFORCE.", schedule_id) + + +async def _set_desired_started( + service_state_manager: RedisServiceStateManager, start_data: DynamicServiceStart +) -> None: + await service_state_manager.create_or_update_multiple( + { + "desired_state": UserRequestedState.RUNNING, + "desired_start_data": start_data, + } + ) + + +async def start_service(app: FastAPI, start_data: DynamicServiceStart) -> None: + node_id = start_data.node_uuid + service_state_manager = RedisServiceStateManager(app=app, node_id=node_id) + + if not await service_state_manager.exists(): + # service is not tracked + with log_context( + _logger, logging.DEBUG, f"startup of untracked service for {node_id=}" + ): + await _set_desired_started(service_state_manager, start_data) + enforce_operation = OperationToStart( + _opration_names.ENFORCE, {"node_id": node_id} + ) + await start_operation( + app, + OperationToStart(_opration_names.ENFORCE, {"node_id": node_id}), + on_execute_completed=enforce_operation, + on_revert_completed=enforce_operation, + ) + _logger.debug("node_di='%s' added to tracking", node_id) + return + + current_schedule_id, operation_type = await _get_schedule_id_and_opration_type( + app, service_state_manager + ) + + _logger.debug( + "Starting node_id='%s' with current operation '%s'", node_id, operation_type + ) + + match operation_type: + # NOTE: STOP opreration cannot be cancelled + case OperationType.ENFORCE | OperationType.START: + if await service_state_manager.read("current_start_data") != start_data: + await _set_desired_started(service_state_manager, start_data) + await _switch_to_enforce(app, current_schedule_id, node_id) + case OperationType.MONITOR: + await _set_desired_started(service_state_manager, start_data) + await _switch_to_enforce(app, current_schedule_id, node_id) + case OperationType.STOP: + _logger.info("Cannot start while stopping node_id='%s'", node_id) + + +async def _set_desired_stopped( + service_state_manager: RedisServiceStateManager, stop_data: DynamicServiceStop +) -> None: + await service_state_manager.create_or_update_multiple( + { + "desired_state": UserRequestedState.STOPPED, + "desired_stop_data": stop_data, + } + ) + + +async def stop_service(app: FastAPI, stop_data: DynamicServiceStop) -> None: + node_id = stop_data.node_id + service_state_manager = RedisServiceStateManager(app=app, node_id=node_id) + + if not await service_state_manager.exists(): + # it is always possible to schedule the service for a stop, + # primary use case is platform cleanup + with log_context( + _logger, logging.DEBUG, f"shutdown of untracked service for {node_id=}" + ): + await _set_desired_stopped(service_state_manager, stop_data) + enforce_operation = OperationToStart( + _opration_names.ENFORCE, {"node_id": node_id} + ) + await start_operation( + app, + OperationToStart(_opration_names.ENFORCE, {"node_id": node_id}), + on_execute_completed=enforce_operation, + on_revert_completed=enforce_operation, + ) + return + + current_schedule_id, operation_type = await _get_schedule_id_and_opration_type( + app, service_state_manager + ) + + _logger.debug( + "Stopping node_id='%s' with current operation '%s'", node_id, operation_type + ) + + match operation_type: + # NOTE: STOP opreration cannot be cancelled + case OperationType.ENFORCE: + if await service_state_manager.read("current_stop_data") != stop_data: + await _set_desired_stopped(service_state_manager, stop_data) + await _switch_to_enforce(app, current_schedule_id, node_id) + case OperationType.START | OperationType.MONITOR: + await _set_desired_stopped(service_state_manager, stop_data) + await _switch_to_enforce(app, current_schedule_id, node_id) + case OperationType.STOP: + _logger.info("Already stopping node_id='%s'", node_id) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_models.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_models.py new file mode 100644 index 000000000000..00dd796c2a05 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_models.py @@ -0,0 +1,22 @@ +from enum import auto + +from models_library.utils.enums import StrAutoEnum + +type SchedulerOperationName = str + + +class UserRequestedState(StrAutoEnum): + RUNNING = auto() + STOPPED = auto() + + +class OperationType(StrAutoEnum): + ENFORCE = auto() + START = auto() + MONITOR = auto() + STOP = auto() + + +class SchedulingProfileType(StrAutoEnum): + LEGACY = auto() + NEW_STYLE = auto() diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/__init__.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/_common_steps.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/_common_steps.py new file mode 100644 index 000000000000..614e5facb69c --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/_common_steps.py @@ -0,0 +1,83 @@ +import logging + +from fastapi import FastAPI +from models_library.projects_nodes_io import NodeID + +from ...generic_scheduler import ( + BaseStep, + ProvidedOperationContext, + RequiredOperationContext, + ReservedContextKeys, + ScheduleId, +) +from .._models import UserRequestedState +from .._redis import RedisServiceStateManager + +_logger = logging.getLogger(__name__) + + +class SetCurrentScheduleId(BaseStep): + @classmethod + def get_execute_requires_context_keys(cls) -> set[str]: + return {ReservedContextKeys.SCHEDULE_ID, "node_id"} + + @classmethod + async def execute( + cls, app: FastAPI, required_context: RequiredOperationContext + ) -> ProvidedOperationContext | None: + schedule_id: ScheduleId = required_context[ReservedContextKeys.SCHEDULE_ID] + node_id: NodeID = required_context["node_id"] + + service_state_manager = RedisServiceStateManager(app=app, node_id=node_id) + await service_state_manager.create_or_update("current_schedule_id", schedule_id) + + return None + + +class SetCurrentStateRunning(BaseStep): + @classmethod + def get_execute_requires_context_keys(cls) -> set[str]: + return {"node_id"} + + @classmethod + async def execute( + cls, app: FastAPI, required_context: RequiredOperationContext + ) -> ProvidedOperationContext | None: + node_id: NodeID = required_context["node_id"] + + service_state_manager = RedisServiceStateManager(app=app, node_id=node_id) + await service_state_manager.create_or_update( + "current_state", UserRequestedState.RUNNING + ) + return None + + +class SetCurrentStateStopped(BaseStep): + @classmethod + def get_execute_requires_context_keys(cls) -> set[str]: + return {"node_id"} + + @classmethod + async def execute( + cls, app: FastAPI, required_context: RequiredOperationContext + ) -> ProvidedOperationContext | None: + node_id: NodeID = required_context["node_id"] + + service_state_manager = RedisServiceStateManager(app=app, node_id=node_id) + await service_state_manager.create_or_update( + "current_state", UserRequestedState.STOPPED + ) + return None + + +class DoNothing(BaseStep): + @classmethod + async def execute( + cls, app: FastAPI, required_context: RequiredOperationContext + ) -> ProvidedOperationContext | None: + _ = app + _ = required_context + + _logger.debug("doing nothing, just a placeholder") + + return None diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/enforce.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/enforce.py new file mode 100644 index 000000000000..ee7787a73062 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/enforce.py @@ -0,0 +1,144 @@ +import logging + +from fastapi import FastAPI +from models_library.projects_nodes_io import NodeID + +from ...generic_scheduler import ( + BaseStep, + Operation, + OperationContext, + OperationToStart, + ProvidedOperationContext, + RequiredOperationContext, + SingleStepGroup, + register_to_start_after_on_executed_completed, +) +from .. import _opration_names +from .._models import SchedulingProfileType, UserRequestedState +from .._redis import RedisServiceStateManager +from ._common_steps import SetCurrentScheduleId +from .profiles import RegsteredSchedulingProfiles + +_logger = logging.getLogger(__name__) + + +class _CacheSchedulingProfileType(BaseStep): + """ + Computes and stores the scheduling profile to be used when enforcing + """ + + @classmethod + def get_execute_requires_context_keys(cls) -> set[str]: + return {"node_id", "scheduling_profile_type"} + + @classmethod + def get_execute_provides_context_keys(cls) -> set[str]: + return {"scheduling_profile_type"} + + @classmethod + async def execute( + cls, app: FastAPI, required_context: RequiredOperationContext + ) -> ProvidedOperationContext | None: + node_id: NodeID = required_context["node_id"] + scheduling_profile_type: SchedulingProfileType | None = required_context[ + "scheduling_profile_type" + ] + + # allows to skip lengthy check + if scheduling_profile_type is not None: + return {"scheduling_profile_type": scheduling_profile_type} + + # TODO: this will be done in a future PR, for now it stays mocked + _ = app + _ = node_id + scheduling_profile_type = SchedulingProfileType.LEGACY + + return {"scheduling_profile_type": scheduling_profile_type} + + +def _get_start_monitor_stop_initial_context(node_id: NodeID) -> OperationContext: + return {"node_id": node_id} + + +class _Enforce(BaseStep): + @classmethod + def get_execute_requires_context_keys(cls) -> set[str]: + return {"node_id", "scheduling_profile_type"} + + @classmethod + async def execute( + cls, app: FastAPI, required_context: RequiredOperationContext + ) -> ProvidedOperationContext | None: + node_id: NodeID = required_context["node_id"] + scheduling_profile_type: SchedulingProfileType = required_context[ + "scheduling_profile_type" + ] + + service_state_manager = RedisServiceStateManager(app=app, node_id=node_id) + + desired_state = await service_state_manager.read("desired_state") + assert desired_state is not None # nosec + current_state = await service_state_manager.read("current_state") + current_schedule_id = await service_state_manager.read("current_schedule_id") + assert current_schedule_id is not None # nosec + + profile = RegsteredSchedulingProfiles.get_profile(scheduling_profile_type) + + initial_context = _get_start_monitor_stop_initial_context(node_id) + enforce_operation = OperationToStart( + _opration_names.ENFORCE, initial_context=initial_context + ) + + _logger.debug( + "Deciding for current_schedule_id='%s' based on current='%s' and desired='%s', with profile='%s'", + current_schedule_id, + current_state, + desired_state, + profile, + ) + if current_state == desired_state == UserRequestedState.RUNNING: + await register_to_start_after_on_executed_completed( + app, + current_schedule_id, + to_start=OperationToStart(profile.monitor_name, initial_context), + on_execute_completed=enforce_operation, + on_revert_completed=enforce_operation, + ) + _logger.debug("selected operation: monitor") + return None + + if current_state == desired_state == UserRequestedState.STOPPED: + # do nothing reached the end of everything just remove + await service_state_manager.delete() + _logger.debug("node_di='%s' removed from tracking", node_id) + return None + + match desired_state: + case UserRequestedState.RUNNING: + await register_to_start_after_on_executed_completed( + app, + current_schedule_id, + to_start=OperationToStart(profile.start_name, initial_context), + on_execute_completed=enforce_operation, + on_revert_completed=enforce_operation, + ) + _logger.debug("selected operation: start") + case UserRequestedState.STOPPED: + await register_to_start_after_on_executed_completed( + app, + current_schedule_id, + to_start=OperationToStart(profile.stop_name, initial_context), + on_execute_completed=enforce_operation, + on_revert_completed=enforce_operation, + ) + _logger.debug("selected operation: stop") + + return None + + +def get_operation() -> Operation: + return Operation( + SingleStepGroup(SetCurrentScheduleId), + SingleStepGroup(_CacheSchedulingProfileType), + SingleStepGroup(_Enforce), + ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/legacy/__init__.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/legacy/__init__.py new file mode 100644 index 000000000000..9945d10ce65d --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/legacy/__init__.py @@ -0,0 +1,7 @@ +from . import monitor, start, stop + +__all__: tuple[str, ...] = ( + "monitor", + "start", + "stop", +) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/legacy/_steps/__init__.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/legacy/_steps/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/legacy/monitor.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/legacy/monitor.py new file mode 100644 index 000000000000..77d880aa1cf6 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/legacy/monitor.py @@ -0,0 +1,13 @@ +from ....generic_scheduler import BaseStepGroup, Operation, SingleStepGroup +from .._common_steps import DoNothing, SetCurrentScheduleId + +_steps: list[BaseStepGroup] = [ + SingleStepGroup(DoNothing, repeat_steps=True), +] + + +def get_operation() -> Operation: + return Operation( + SingleStepGroup(SetCurrentScheduleId), + *_steps, + ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/legacy/start.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/legacy/start.py new file mode 100644 index 000000000000..25afe506e633 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/legacy/start.py @@ -0,0 +1,12 @@ +from ....generic_scheduler import BaseStepGroup, Operation, SingleStepGroup +from .._common_steps import SetCurrentScheduleId, SetCurrentStateRunning + +_steps: list[BaseStepGroup] = [] + + +def get_operation() -> Operation: + return Operation( + SingleStepGroup(SetCurrentScheduleId), + *_steps, + SingleStepGroup(SetCurrentStateRunning), + ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/legacy/stop.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/legacy/stop.py new file mode 100644 index 000000000000..dcd941e5b264 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/legacy/stop.py @@ -0,0 +1,13 @@ +from ....generic_scheduler import BaseStepGroup, Operation, SingleStepGroup +from .._common_steps import SetCurrentScheduleId, SetCurrentStateStopped + +_steps: list[BaseStepGroup] = [] + + +def get_operation() -> Operation: + return Operation( + SingleStepGroup(SetCurrentScheduleId), + *_steps, + SingleStepGroup(SetCurrentStateStopped), + is_cancellable=False, + ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/new_style/__init__.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/new_style/__init__.py new file mode 100644 index 000000000000..9945d10ce65d --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/new_style/__init__.py @@ -0,0 +1,7 @@ +from . import monitor, start, stop + +__all__: tuple[str, ...] = ( + "monitor", + "start", + "stop", +) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/new_style/_steps/__init__.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/new_style/_steps/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/new_style/monitor.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/new_style/monitor.py new file mode 100644 index 000000000000..77d880aa1cf6 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/new_style/monitor.py @@ -0,0 +1,13 @@ +from ....generic_scheduler import BaseStepGroup, Operation, SingleStepGroup +from .._common_steps import DoNothing, SetCurrentScheduleId + +_steps: list[BaseStepGroup] = [ + SingleStepGroup(DoNothing, repeat_steps=True), +] + + +def get_operation() -> Operation: + return Operation( + SingleStepGroup(SetCurrentScheduleId), + *_steps, + ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/new_style/start.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/new_style/start.py new file mode 100644 index 000000000000..25afe506e633 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/new_style/start.py @@ -0,0 +1,12 @@ +from ....generic_scheduler import BaseStepGroup, Operation, SingleStepGroup +from .._common_steps import SetCurrentScheduleId, SetCurrentStateRunning + +_steps: list[BaseStepGroup] = [] + + +def get_operation() -> Operation: + return Operation( + SingleStepGroup(SetCurrentScheduleId), + *_steps, + SingleStepGroup(SetCurrentStateRunning), + ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/new_style/stop.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/new_style/stop.py new file mode 100644 index 000000000000..dcd941e5b264 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/new_style/stop.py @@ -0,0 +1,13 @@ +from ....generic_scheduler import BaseStepGroup, Operation, SingleStepGroup +from .._common_steps import SetCurrentScheduleId, SetCurrentStateStopped + +_steps: list[BaseStepGroup] = [] + + +def get_operation() -> Operation: + return Operation( + SingleStepGroup(SetCurrentScheduleId), + *_steps, + SingleStepGroup(SetCurrentStateStopped), + is_cancellable=False, + ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/profiles.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/profiles.py new file mode 100644 index 000000000000..8d49b0862380 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/profiles.py @@ -0,0 +1,85 @@ +from collections.abc import Iterable +from dataclasses import dataclass +from typing import ClassVar + +from ...generic_scheduler import Operation +from .._models import OperationType, SchedulerOperationName, SchedulingProfileType +from .._utils import get_scheduler_oepration_name +from . import legacy, new_style + + +@dataclass +class SchedulingProfile: + start_name: SchedulerOperationName + start_operation: Operation + + monitor_name: SchedulerOperationName + monitor_operation: Operation + + stop_name: SchedulerOperationName + stop_operation: Operation + + +class RegsteredSchedulingProfiles: + _REGISTEERED_MODES: ClassVar[dict[SchedulingProfileType, SchedulingProfile]] = {} + + @classmethod + def register( + cls, + scheduling_profile_type: SchedulingProfileType, + *, + start_operation: Operation, + monitor_operation: Operation, + stop_operation: Operation, + ) -> None: + cls._REGISTEERED_MODES[scheduling_profile_type] = SchedulingProfile( + start_name=get_scheduler_oepration_name( + OperationType.START, + scheduling_profile_type, + ), + start_operation=start_operation, + monitor_name=get_scheduler_oepration_name( + OperationType.MONITOR, + scheduling_profile_type, + ), + monitor_operation=monitor_operation, + stop_name=get_scheduler_oepration_name( + OperationType.STOP, + scheduling_profile_type, + ), + stop_operation=stop_operation, + ) + + @classmethod + def unregister(cls, scheduling_profile_type: SchedulingProfileType) -> None: + cls._REGISTEERED_MODES.pop(scheduling_profile_type, None) + + @classmethod + def get_profile( + cls, scheduling_profile_type: SchedulingProfileType + ) -> SchedulingProfile: + return cls._REGISTEERED_MODES[scheduling_profile_type] + + @classmethod + def iter_profiles(cls) -> Iterable[SchedulingProfile]: + yield from cls._REGISTEERED_MODES.values() + + +def register_profiles() -> None: + RegsteredSchedulingProfiles.register( + SchedulingProfileType.LEGACY, + start_operation=legacy.start.get_operation(), + monitor_operation=legacy.monitor.get_operation(), + stop_operation=legacy.stop.get_operation(), + ) + RegsteredSchedulingProfiles.register( + SchedulingProfileType.NEW_STYLE, + start_operation=new_style.start.get_operation(), + monitor_operation=new_style.monitor.get_operation(), + stop_operation=new_style.stop.get_operation(), + ) + + +def unregister_profiles() -> None: + RegsteredSchedulingProfiles.unregister(SchedulingProfileType.LEGACY) + RegsteredSchedulingProfiles.unregister(SchedulingProfileType.NEW_STYLE) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/registry.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/registry.py new file mode 100644 index 000000000000..dfdbcae638f1 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_operations/registry.py @@ -0,0 +1,65 @@ +from typing import Final + +from pydantic import NonNegativeInt + +from ...generic_scheduler import Operation, OperationRegistry, SingleStepGroup +from .. import _opration_names +from . import enforce +from ._common_steps import SetCurrentScheduleId +from .profiles import RegsteredSchedulingProfiles + +_MIN_STEPS_IN_OPERATION: Final[NonNegativeInt] = 2 + + +def _validate_operation(operation: Operation) -> None: + if len(operation.step_groups) < _MIN_STEPS_IN_OPERATION: + msg = ( + f"Operation must have at least {_MIN_STEPS_IN_OPERATION} " + f"startign with {SetCurrentScheduleId.__name__} " + f"got: {operation.step_groups}" + ) + raise ValueError(msg) + first_step_group = operation.step_groups[0] + + if ( + isinstance(first_step_group, SingleStepGroup) + and first_step_group.get_step_subgroup_to_run()[0] is not SetCurrentScheduleId + ): + msg = ( + f"First step group must be {SetCurrentScheduleId.__name__}, " + f"got: {first_step_group}" + ) + raise ValueError(msg) + + +def register_operataions() -> None: + # register utility operations + for opration_name, operation in ( + (_opration_names.ENFORCE, enforce.get_operation()), + ): + _validate_operation(operation) + OperationRegistry.register(opration_name, operation) + + # register scheduling profiles operations + for profile in RegsteredSchedulingProfiles.iter_profiles(): + for opration_name, operation in ( + (profile.start_name, profile.start_operation), + (profile.monitor_name, profile.monitor_operation), + (profile.stop_name, profile.stop_operation), + ): + _validate_operation(operation) + OperationRegistry.register(opration_name, operation) + + +def unregister_operations() -> None: + # unregister utility operations + OperationRegistry.unregister(_opration_names.ENFORCE) + + # unregister scheduling profiles operations + for profile in RegsteredSchedulingProfiles.iter_profiles(): + for opration_name in ( + profile.start_name, + profile.monitor_name, + profile.stop_name, + ): + OperationRegistry.unregister(opration_name) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_opration_names.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_opration_names.py new file mode 100644 index 000000000000..da806aa5b92b --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_opration_names.py @@ -0,0 +1,6 @@ +from ._models import OperationType +from ._utils import get_scheduler_oepration_name + +# SHARED + +ENFORCE = get_scheduler_oepration_name(OperationType.ENFORCE, "shared") diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_redis.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_redis.py new file mode 100644 index 000000000000..130345bc2090 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_redis.py @@ -0,0 +1,178 @@ +from functools import cached_property +from typing import Any, Final, Literal, NotRequired, TypedDict, overload + +import redis.asyncio as aioredis +from common_library.json_serialization import json_dumps, json_loads +from fastapi import FastAPI +from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( + DynamicServiceStart, + DynamicServiceStop, +) +from models_library.projects_nodes_io import NodeID +from servicelib.fastapi.app_state import SingletonInAppStateMixin +from servicelib.redis._client import RedisClientSDK +from servicelib.redis._utils import handle_redis_returns_union_types +from settings_library.redis import RedisDatabase, RedisSettings + +from ..generic_scheduler import ScheduleId +from ._models import UserRequestedState + +_SERVICE_STATE_NAMESPACE: Final[str] = "SS" + + +def _get_service_state_key(*, node_id: NodeID) -> str: + # SERVICE_STATE_NAMESPACE:NODE_ID + # - SERVICE_STATE_NAMESPACE: namespace prefix + # - NODE_ID: the unique node_id of the service + # Example: + # - SCH:00000000-0000-0000-0000-000000000000 + return f"{_SERVICE_STATE_NAMESPACE}:{node_id}" + + +class RedisStore(SingletonInAppStateMixin): + app_state_name: str = "scheduler_redis_store" + + def __init__(self, redis_settings: RedisSettings) -> None: + self.redis_settings = redis_settings + + self._client: RedisClientSDK | None = None + + async def setup(self) -> None: + self._client = RedisClientSDK( + self.redis_settings.build_redis_dsn(RedisDatabase.DYNAMIC_SERVICES), + client_name=__name__, + ) + await self._client.setup() + + async def shutdown(self) -> None: + if self._client: + await self._client.shutdown() + + @property + def redis(self) -> aioredis.Redis: + assert self._client # nosec + return self._client.redis + + +class _UpdateServiceStateDict(TypedDict): + desired_state: NotRequired[UserRequestedState] + desired_start_data: NotRequired[DynamicServiceStart] + desired_stop_data: NotRequired[DynamicServiceStop] + + current_state: NotRequired[UserRequestedState] + current_start_data: NotRequired[DynamicServiceStart] + current_stop_data: NotRequired[DynamicServiceStop] + + current_schedule_id: NotRequired[ScheduleId] + + +_AllowedDeleteKeys = Literal["current_schedule_id"] + + +class RedisServiceStateManager: + def __init__(self, *, app: FastAPI, node_id: NodeID) -> None: + self.resis_store = RedisStore.get_from_app_state(app) + self.node_id = node_id + + @cached_property + def redis(self) -> aioredis.Redis: + return self.resis_store.redis + + @cached_property + def redis_key(self) -> str: + return _get_service_state_key(node_id=self.node_id) + + @overload + async def create_or_update( + self, key: Literal["desired_state"], value: UserRequestedState + ) -> None: ... + @overload + async def create_or_update( + self, key: Literal["desired_start_data"], value: DynamicServiceStart + ) -> None: ... + @overload + async def create_or_update( + self, key: Literal["desired_stop_data"], value: DynamicServiceStop + ) -> None: ... + @overload + async def create_or_update( + self, key: Literal["current_state"], value: UserRequestedState + ) -> None: ... + @overload + async def create_or_update( + self, key: Literal["current_start_data"], value: DynamicServiceStart + ) -> None: ... + @overload + async def create_or_update( + self, key: Literal["current_stop_data"], value: DynamicServiceStop + ) -> None: ... + @overload + async def create_or_update( + self, key: Literal["current_schedule_id"], value: ScheduleId + ) -> None: ... + async def create_or_update(self, key: str, value: Any) -> None: + await handle_redis_returns_union_types( + self.redis.hset(self.redis_key, mapping={key: json_dumps(value)}) + ) + + async def create_or_update_multiple(self, updates: _UpdateServiceStateDict) -> None: + await handle_redis_returns_union_types( + self.redis.hset( + self.redis_key, mapping={k: json_dumps(v) for k, v in updates.items()} + ) + ) + + @overload + async def read( + self, key: Literal["desired_state"] + ) -> UserRequestedState | None: ... + @overload + async def read( + self, key: Literal["desired_start_data"] + ) -> DynamicServiceStart | None: ... + @overload + async def read( + self, key: Literal["desired_stop_data"] + ) -> DynamicServiceStop | None: ... + @overload + async def read( + self, key: Literal["current_state"] + ) -> UserRequestedState | None: ... + @overload + async def read( + self, key: Literal["current_start_data"] + ) -> DynamicServiceStart | None: ... + @overload + async def read( + self, key: Literal["current_stop_data"] + ) -> DynamicServiceStop | None: ... + @overload + async def read(self, key: Literal["current_schedule_id"]) -> ScheduleId | None: ... + async def read(self, key: str) -> Any: + list_result: list[str | None] = await handle_redis_returns_union_types( + self.redis.hmget(self.redis_key, [key]) + ) + serialised_result = list_result[0] + if serialised_result is None: + return None + result = json_loads(serialised_result) + + match key: + case "current_start_data" | "desired_start_data": + return DynamicServiceStart.model_validate(result) + case "current_stop_data" | "desired_stop_data": + return DynamicServiceStop.model_validate(result) + case _: + return result + + async def delete_key(self, key: _AllowedDeleteKeys) -> None: + await handle_redis_returns_union_types(self.redis.hdel(self.redis_key, key)) + + async def exists(self) -> bool: + result: int = await handle_redis_returns_union_types( + self.redis.exists(self.redis_key) + ) + return result == 1 + + async def delete(self) -> None: + await handle_redis_returns_union_types(self.redis.delete(self.redis_key)) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_utils.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_utils.py new file mode 100644 index 000000000000..6eb10e521294 --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler/_utils.py @@ -0,0 +1,28 @@ +from typing import Final + +from pydantic import NonNegativeInt + +from ._errors import UnexpectedCouldNotDetermineOperationTypeError +from ._models import OperationType, SchedulerOperationName + +_MIN_STEPS_IN_OPERATION: Final[NonNegativeInt] = 3 + + +def get_scheduler_oepration_name( + operation_type: OperationType, suffix: str +) -> SchedulerOperationName: + return f"{operation_type.value}_{suffix}" + + +def get_scheduler_operation_type_or_raise( + *, + name: SchedulerOperationName, +) -> OperationType: + operation_type = name.split("_") + try: + return OperationType(operation_type[:1][0]) + except ValueError as exc: + # NOTE: if this is raised there is an actual issue with the operation name + raise UnexpectedCouldNotDetermineOperationTypeError( + operation_name=name, supported_types={x.value for x in OperationType} + ) from exc diff --git a/services/dynamic-scheduler/tests/conftest.py b/services/dynamic-scheduler/tests/conftest.py index 6a313ec2b4d5..667ace33526e 100644 --- a/services/dynamic-scheduler/tests/conftest.py +++ b/services/dynamic-scheduler/tests/conftest.py @@ -126,6 +126,11 @@ def disable_generic_scheduler_lifespan(mocker: MockerFixture) -> None: mocker.patch(f"{_EVENTS_MODULE}.generic_scheduler_lifespan") +@pytest.fixture +def disable_scheduler_lifespan(mocker: MockerFixture) -> None: + mocker.patch(f"{_EVENTS_MODULE}.scheduler_lifespan") + + @pytest.fixture def disable_postgres_lifespan( mocker: MockerFixture, monkeypatch: pytest.MonkeyPatch diff --git a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__core.py b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__core.py index ce2836cb9ccc..1a31cd712dce 100644 --- a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__core.py +++ b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__core.py @@ -16,6 +16,17 @@ from asyncpg import NoDataFoundError from fastapi import FastAPI from pydantic import NonNegativeFloat, NonNegativeInt, TypeAdapter +from pytest_simcore.helpers.dynamic_scheduler import ( + EXECUTED, + REVERTED, + BaseExpectedStepOrder, + ExecuteRandom, + ExecuteSequence, + RevertRandom, + RevertSequence, + ensure_expected_order, + ensure_keys_in_store, +) from pytest_simcore.helpers.typing_env import EnvVarsDict from servicelib.utils import limited_gather from settings_library.rabbit import RabbitSettings @@ -25,6 +36,7 @@ BaseStep, Operation, OperationName, + OperationToStart, ParallelStepGroup, ProvidedOperationContext, RequiredOperationContext, @@ -47,7 +59,6 @@ from simcore_service_dynamic_scheduler.services.generic_scheduler._errors import ( CannotCancelWhileWaitingForManualInterventionError, InitialOperationContextKeyNotAllowedError, - OperationContextValueIsNoneError, OperationNotCancellableError, ProvidedOperationContextKeysAreMissingError, StepNameNotInCurrentGroupError, @@ -63,17 +74,6 @@ stop_after_delay, wait_fixed, ) -from utils import ( - EXECUTED, - REVERTED, - BaseExpectedStepOrder, - ExecuteRandom, - ExecuteSequence, - RevertRandom, - RevertSequence, - ensure_expected_order, - ensure_keys_in_store, -) pytest_simcore_core_services_selection = [ "rabbit", @@ -98,6 +98,7 @@ @pytest.fixture def app_environment( + disable_scheduler_lifespan: None, disable_postgres_lifespan: None, disable_service_tracker_lifespan: None, disable_notifier_lifespan: None, @@ -739,7 +740,9 @@ async def test_execute_revert_order( ): register_operation(operation_name, operation) - schedule_id = await start_operation(selected_app, operation_name, {}) + schedule_id = await start_operation( + selected_app, OperationToStart(operation_name, {}) + ) assert TypeAdapter(ScheduleId).validate_python(schedule_id) await ensure_expected_order(steps_call_order, expected_order) @@ -858,7 +861,9 @@ async def test_fails_during_revert_is_in_error_state( ): register_operation(operation_name, operation) - schedule_id = await start_operation(selected_app, operation_name, {}) + schedule_id = await start_operation( + selected_app, OperationToStart(operation_name, {}) + ) assert TypeAdapter(ScheduleId).validate_python(schedule_id) await ensure_expected_order(steps_call_order, expected_order) @@ -925,7 +930,9 @@ async def test_cancelled_finishes_nicely( ): register_operation(operation_name, operation) - schedule_id = await start_operation(selected_app, operation_name, {}) + schedule_id = await start_operation( + selected_app, OperationToStart(operation_name, {}) + ) assert TypeAdapter(ScheduleId).validate_python(schedule_id) await ensure_expected_order(steps_call_order, expected_before_cancel_order) @@ -944,65 +951,83 @@ async def test_cancelled_finishes_nicely( _REPAT_COUNT: Final[NonNegativeInt] = 10 +class _S1R(_S1): + @classmethod + def get_sleep_after_execute(cls) -> timedelta: + return _FAST_REPEAT_INTERVAL + + +class _S2R(_S2): + @classmethod + def get_sleep_after_execute(cls) -> timedelta: + return _FAST_REPEAT_INTERVAL + + +class _RS1R(_RS1): + @classmethod + def get_sleep_after_execute(cls) -> timedelta: + return _FAST_REPEAT_INTERVAL + + @classmethod + def get_sleep_after_revert(cls) -> timedelta: + return _FAST_REPEAT_INTERVAL + + +class _RS2R(_RS2): + @classmethod + def get_sleep_after_execute(cls) -> timedelta: + return _FAST_REPEAT_INTERVAL + + @classmethod + def get_sleep_after_revert(cls) -> timedelta: + return _FAST_REPEAT_INTERVAL + + @pytest.mark.parametrize("app_count", [10]) @pytest.mark.parametrize( "operation, expected_before_cancel_order, expected_order", [ pytest.param( Operation( - SingleStepGroup( - _S1, repeat_steps=True, wait_before_repeat=_FAST_REPEAT_INTERVAL - ), + SingleStepGroup(_S1R, repeat_steps=True), ), - [ExecuteSequence(_S1) for _ in range(_REPAT_COUNT)], + [ExecuteSequence(_S1R) for _ in range(_REPAT_COUNT)], [ - *[ExecuteSequence(_S1) for _ in range(_REPAT_COUNT)], - RevertSequence(_S1), + *[ExecuteSequence(_S1R) for _ in range(_REPAT_COUNT)], + RevertSequence(_S1R), ], id="s1(r)", ), pytest.param( Operation( - ParallelStepGroup( - _S1, - _S2, - repeat_steps=True, - wait_before_repeat=_FAST_REPEAT_INTERVAL, - ), + ParallelStepGroup(_S1R, _S2R, repeat_steps=True), ), - [ExecuteRandom(_S1, _S2) for _ in range(_REPAT_COUNT)], + [ExecuteRandom(_S1R, _S2R) for _ in range(_REPAT_COUNT)], [ - *[ExecuteRandom(_S1, _S2) for _ in range(_REPAT_COUNT)], - RevertRandom(_S1, _S2), + *[ExecuteRandom(_S1R, _S2R) for _ in range(_REPAT_COUNT)], + RevertRandom(_S1R, _S2R), ], id="p2(r)", ), pytest.param( Operation( - SingleStepGroup( - _RS1, repeat_steps=True, wait_before_repeat=_FAST_REPEAT_INTERVAL - ), + SingleStepGroup(_RS1R, repeat_steps=True), ), - [ExecuteSequence(_RS1) for _ in range(_REPAT_COUNT)], + [ExecuteSequence(_RS1R) for _ in range(_REPAT_COUNT)], [ - *[ExecuteSequence(_RS1) for _ in range(_REPAT_COUNT)], - RevertSequence(_RS1), + *[ExecuteSequence(_RS1R) for _ in range(_REPAT_COUNT)], + RevertSequence(_RS1R), ], id="s1(rf)", ), pytest.param( Operation( - ParallelStepGroup( - _RS1, - _RS2, - repeat_steps=True, - wait_before_repeat=_FAST_REPEAT_INTERVAL, - ), + ParallelStepGroup(_RS1R, _RS2R, repeat_steps=True), ), - [ExecuteRandom(_RS1, _RS2) for _ in range(_REPAT_COUNT)], + [ExecuteRandom(_RS1R, _RS2R) for _ in range(_REPAT_COUNT)], [ - *[ExecuteRandom(_RS1, _RS2) for _ in range(_REPAT_COUNT)], - RevertRandom(_RS1, _RS2), + *[ExecuteRandom(_RS1R, _RS2R) for _ in range(_REPAT_COUNT)], + RevertRandom(_RS1R, _RS2R), ], id="p2(rf)", ), @@ -1020,7 +1045,9 @@ async def test_repeating_step( ): register_operation(operation_name, operation) - schedule_id = await start_operation(selected_app, operation_name, {}) + schedule_id = await start_operation( + selected_app, OperationToStart(operation_name, {}) + ) assert TypeAdapter(ScheduleId).validate_python(schedule_id) await ensure_expected_order( @@ -1139,7 +1166,9 @@ async def test_wait_for_manual_intervention( ): register_operation(operation_name, operation) - schedule_id = await start_operation(selected_app, operation_name, {}) + schedule_id = await start_operation( + selected_app, OperationToStart(operation_name, {}) + ) assert TypeAdapter(ScheduleId).validate_python(schedule_id) formatted_expected_keys = {k.format(schedule_id=schedule_id) for k in expected_keys} @@ -1204,7 +1233,9 @@ async def test_operation_is_not_cancellable( operation = Operation(SingleStepGroup(_S1), is_cancellable=False) register_operation(operation_name, operation) - schedule_id = await start_operation(selected_app, operation_name, {}) + schedule_id = await start_operation( + selected_app, OperationToStart(operation_name, {}) + ) # even if cancelled, state of waiting for manual intervention remains the same with pytest.raises(OperationNotCancellableError): @@ -1321,7 +1352,9 @@ async def test_restart_revert_operation_step_in_error( ): register_operation(operation_name, operation) - schedule_id = await start_operation(selected_app, operation_name, {}) + schedule_id = await start_operation( + selected_app, OperationToStart(operation_name, {}) + ) assert TypeAdapter(ScheduleId).validate_python(schedule_id) formatted_expected_keys = {k.format(schedule_id=schedule_id) for k in expected_keys} @@ -1377,7 +1410,9 @@ async def test_errors_with_restart_operation_step_in_error( ) register_operation(operation_name, operation) - schedule_id = await start_operation(selected_app, operation_name, {}) + schedule_id = await start_operation( + selected_app, OperationToStart(operation_name, {}) + ) assert TypeAdapter(ScheduleId).validate_python(schedule_id) await ensure_expected_order( @@ -1389,6 +1424,7 @@ async def test_errors_with_restart_operation_step_in_error( ], ) + assert isinstance(operation.step_groups[-1], ParallelStepGroup) await _esnure_steps_have_status( selected_app, schedule_id, @@ -1506,7 +1542,9 @@ async def test_operation_context_usage( register_operation(operation_name, operation) - schedule_id = await start_operation(selected_app, operation_name, initial_context) + schedule_id = await start_operation( + selected_app, OperationToStart(operation_name, initial_context) + ) assert TypeAdapter(ScheduleId).validate_python(schedule_id) # NOTE: might fail because it raised ProvidedOperationContextKeysAreMissingError check logs @@ -1514,7 +1552,6 @@ async def test_operation_context_usage( await ensure_keys_in_store(selected_app, expected_keys=set()) - assert f"{OperationContextValueIsNoneError.__name__}" not in caplog.text assert f"{ProvidedOperationContextKeysAreMissingError.__name__}" not in caplog.text @@ -1562,63 +1599,9 @@ async def test_operation_initial_context_using_key_provided_by_step( register_operation(operation_name, operation) with pytest.raises(InitialOperationContextKeyNotAllowedError): - await start_operation(selected_app, operation_name, initial_context) - - await ensure_keys_in_store(selected_app, expected_keys=set()) - - -@pytest.mark.parametrize("app_count", [10]) -@pytest.mark.parametrize( - "operation, initial_context, expected_order", - [ - pytest.param( - Operation( - SingleStepGroup(RPCtxS1), - ), - { - # `bs__c_req_1` is missing - }, - [ - RevertSequence(RPCtxS1), - ], - id="missing_context_key", - ), - pytest.param( - Operation( - SingleStepGroup(RPCtxS1), - ), - { - "bs__e_req_1": None, - }, - [ - RevertSequence(RPCtxS1), - ], - id="context_key_is_none", - ), - ], -) -async def test_step_does_not_receive_context_key_or_is_none( - preserve_caplog_for_async_logging: None, - caplog: pytest.LogCaptureFixture, - steps_call_order: list[tuple[str, str]], - selected_app: FastAPI, - register_operation: Callable[[OperationName, Operation], None], - operation: Operation, - operation_name: OperationName, - initial_context: OperationContext, - expected_order: list[BaseExpectedStepOrder], -): - caplog.at_level(logging.DEBUG) - caplog.clear() - - register_operation(operation_name, operation) - - schedule_id = await start_operation(selected_app, operation_name, initial_context) - assert TypeAdapter(ScheduleId).validate_python(schedule_id) - - await _ensure_log_mesage(caplog, message=OperationContextValueIsNoneError.__name__) - - await ensure_expected_order(steps_call_order, expected_order) + await start_operation( + selected_app, OperationToStart(operation_name, initial_context) + ) await ensure_keys_in_store(selected_app, expected_keys=set()) @@ -1685,32 +1668,6 @@ async def revert( @pytest.mark.parametrize( "operation, initial_context, expected_error_str, expected_order, expected_keys", [ - pytest.param( - Operation( - SingleStepGroup(_BadImplementedStep), - ), - { - "trigger_revert": False, - "to_return": { - "add_to_return": True, - "keys": {"a_key": None}, - }, - }, - f"{OperationContextValueIsNoneError.__name__}: Values of context cannot be None: {{'a_key'", - [ - ExecuteSequence(_BadImplementedStep), - RevertSequence(_BadImplementedStep), - ], - { - "SCH:{schedule_id}", - "SCH:{schedule_id}:GROUPS:test_op:0S:E", - "SCH:{schedule_id}:GROUPS:test_op:0S:R", - "SCH:{schedule_id}:OP_CTX:test_op", - "SCH:{schedule_id}:STEPS:test_op:0S:E:_BadImplementedStep", - "SCH:{schedule_id}:STEPS:test_op:0S:R:_BadImplementedStep", - }, - id="execute-returns-key-set-to-None", - ), pytest.param( Operation( SingleStepGroup(_BadImplementedStep), @@ -1736,32 +1693,6 @@ async def revert( }, id="execute-does-not-set-the-key-to-return", ), - pytest.param( - Operation( - SingleStepGroup(_BadImplementedStep), - ), - { - "trigger_revert": True, - "to_return": { - "add_to_return": True, - "keys": {"a_key": None}, - }, - }, - f"{OperationContextValueIsNoneError.__name__}: Values of context cannot be None: {{'a_key'", - [ - ExecuteSequence(_BadImplementedStep), - RevertSequence(_BadImplementedStep), - ], - { - "SCH:{schedule_id}", - "SCH:{schedule_id}:GROUPS:test_op:0S:E", - "SCH:{schedule_id}:GROUPS:test_op:0S:R", - "SCH:{schedule_id}:OP_CTX:test_op", - "SCH:{schedule_id}:STEPS:test_op:0S:E:_BadImplementedStep", - "SCH:{schedule_id}:STEPS:test_op:0S:R:_BadImplementedStep", - }, - id="revert-returns-key-set-to-None", - ), pytest.param( Operation( SingleStepGroup(_BadImplementedStep), @@ -1807,7 +1738,9 @@ async def test_step_does_not_provide_declared_key_or_is_none( register_operation(operation_name, operation) - schedule_id = await start_operation(selected_app, operation_name, initial_context) + schedule_id = await start_operation( + selected_app, OperationToStart(operation_name, initial_context) + ) assert TypeAdapter(ScheduleId).validate_python(schedule_id) await _ensure_log_mesage(caplog, message=expected_error_str) @@ -1833,6 +1766,8 @@ async def test_get_operation_name_or_none( operation = Operation(SingleStepGroup(_S1)) register_operation(operation_name, operation) - schedule_id = await start_operation(selected_app, operation_name, {}) + schedule_id = await start_operation( + selected_app, OperationToStart(operation_name, {}) + ) assert await get_operation_name_or_none(selected_app, schedule_id) == operation_name diff --git a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__deferred_runner.py b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__deferred_runner.py index d0c9f8853b08..2bd5bc59a00d 100644 --- a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__deferred_runner.py +++ b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__deferred_runner.py @@ -59,6 +59,7 @@ @pytest.fixture def app_environment( + disable_scheduler_lifespan: None, disable_postgres_lifespan: None, disable_service_tracker_lifespan: None, disable_notifier_lifespan: None, diff --git a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__event_after.py b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__event_after.py index 33fdc7ddf373..a9476bc5be75 100644 --- a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__event_after.py +++ b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__event_after.py @@ -11,6 +11,7 @@ from fastapi import FastAPI from pydantic import TypeAdapter from pytest_mock import MockerFixture +from pytest_simcore.helpers.dynamic_scheduler import ensure_keys_in_store from pytest_simcore.helpers.typing_env import EnvVarsDict from settings_library.rabbit import RabbitSettings from settings_library.redis import RedisSettings @@ -39,7 +40,6 @@ OperationEventsProxy, Store, ) -from utils import ensure_keys_in_store pytest_simcore_core_services_selection = [ "rabbit", @@ -52,6 +52,7 @@ @pytest.fixture def app_environment( + disable_scheduler_lifespan: None, disable_postgres_lifespan: None, disable_service_tracker_lifespan: None, disable_notifier_lifespan: None, @@ -171,9 +172,14 @@ async def test_workflow( await ensure_keys_in_store(after_event_manager.app, expected_keys=set()) await after_event_manager.safe_on_event_type( - event_type, schedule_id, operation_name, initial_context + event_type, schedule_id, OperationToStart(operation_name, initial_context) ) assert mock_start_operation.call_args_list == [ - call(after_event_manager.app, operation_name, initial_context) + call( + after_event_manager.app, + OperationToStart(operation_name, initial_context), + on_execute_completed=None, + on_revert_completed=None, + ) ] diff --git a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__event_scheduler.py b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__event_scheduler.py index 3851e1877f05..5a75819f5a1b 100644 --- a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__event_scheduler.py +++ b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__event_scheduler.py @@ -13,6 +13,9 @@ from pytest_mock import MockerFixture from pytest_simcore.helpers.typing_env import EnvVarsDict from settings_library.rabbit import RabbitSettings +from simcore_service_dynamic_scheduler.services.generic_scheduler import ( + OperationToStart, +) from simcore_service_dynamic_scheduler.services.generic_scheduler._event import ( enqueue_execute_completed_event, enqueue_revert_completed_event, @@ -20,8 +23,6 @@ ) from simcore_service_dynamic_scheduler.services.generic_scheduler._models import ( EventType, - OperationContext, - OperationName, ScheduleId, ) from tenacity import ( @@ -58,6 +59,7 @@ def disable_other_generic_scheduler_modules(mocker: MockerFixture) -> None: @pytest.fixture def app_environment( + disable_scheduler_lifespan: None, disable_other_generic_scheduler_modules: None, disable_redis_lifespan: None, disable_postgres_lifespan: None, @@ -151,17 +153,12 @@ async def _side_effect_raise_error(schedule_id: ScheduleId) -> None: def get_mock_safe_on_event_type( mocker: MockerFixture, ) -> Callable[ - [ - Callable[ - [EventType, ScheduleId, OperationName, OperationContext], Awaitable[None] - ] - ], - Mock, + [Callable[[EventType, ScheduleId, OperationToStart], Awaitable[None]]], Mock ]: def _( side_effect: Callable[ - [EventType, ScheduleId, OperationName, OperationContext], Awaitable[None] + [EventType, ScheduleId, OperationToStart], Awaitable[None] ], ) -> Mock: another_mock = Mock() @@ -169,11 +166,13 @@ def _( async def _mock( event_type: EventType, schedule_id: ScheduleId, - operation_name: OperationName, - initial_context: OperationContext, + to_start: OperationToStart, + *, + on_execute_completed: OperationToStart | None = None, + on_revert_completed: OperationToStart | None = None, ) -> None: - await side_effect(event_type, schedule_id, operation_name, initial_context) - another_mock(event_type, schedule_id, operation_name, initial_context) + await side_effect(event_type, schedule_id, to_start) + another_mock(event_type, schedule_id, to_start) core_mock = Mock() core_mock.safe_on_event_type = _mock @@ -189,13 +188,7 @@ async def _mock( @pytest.mark.parametrize("expected_event_type", EventType) async def test_enqueue_event_type( get_mock_safe_on_event_type: Callable[ - [ - Callable[ - [EventType, ScheduleId, OperationName, OperationContext], - Awaitable[None], - ] - ], - Mock, + [Callable[[EventType, ScheduleId, OperationToStart], Awaitable[None]]], Mock ], app: FastAPI, expected_event_type: EventType, @@ -204,8 +197,10 @@ async def test_enqueue_event_type( async def _side_effect_nothing( event_type: EventType, schedule_id: ScheduleId, - operation_name: OperationName, - initial_context: OperationContext, + to_start: OperationToStart, + *, + on_execute_completed: OperationToStart | None = None, + on_revert_completed: OperationToStart | None = None, ) -> None: pass @@ -213,10 +208,14 @@ async def _side_effect_nothing( schedule_id = TypeAdapter(ScheduleId).validate_python(f"{uuid4()}") match expected_event_type: - case EventType.ON_EXECUTEDD_COMPLETED: - await enqueue_execute_completed_event(app, schedule_id, "op1", {}) + case EventType.ON_EXECUTED_COMPLETED: + await enqueue_execute_completed_event( + app, schedule_id, OperationToStart("op1", {}) + ) case EventType.ON_REVERT_COMPLETED: - await enqueue_revert_completed_event(app, schedule_id, "op1", {}) + await enqueue_revert_completed_event( + app, schedule_id, OperationToStart("op1", {}) + ) case _: pytest.fail("unsupported case") @@ -224,20 +223,14 @@ async def _side_effect_nothing( with attempt: await asyncio.sleep(0) # wait for event to trigger assert mock.call_args_list == [ - call(expected_event_type, schedule_id, "op1", {}) + call(expected_event_type, schedule_id, OperationToStart("op1", {})) ] @pytest.mark.parametrize("expected_event_type", EventType) async def test_enqueue_event_type_raises_error( get_mock_safe_on_event_type: Callable[ - [ - Callable[ - [EventType, ScheduleId, OperationName, OperationContext], - Awaitable[None], - ] - ], - Mock, + [Callable[[EventType, ScheduleId, OperationToStart], Awaitable[None]]], Mock ], app: FastAPI, caplog: pytest.LogCaptureFixture, @@ -248,8 +241,7 @@ async def test_enqueue_event_type_raises_error( async def _side_effect_raise_error( event_type: EventType, schedule_id: ScheduleId, - operation_name: OperationName, - initial_context: OperationContext, + to_start: OperationToStart, ) -> None: msg = "always failing here as requested" raise RuntimeError(msg) @@ -259,10 +251,14 @@ async def _side_effect_raise_error( schedule_id = TypeAdapter(ScheduleId).validate_python(f"{uuid4()}") match expected_event_type: - case EventType.ON_EXECUTEDD_COMPLETED: - await enqueue_execute_completed_event(app, schedule_id, "op1", {}) + case EventType.ON_EXECUTED_COMPLETED: + await enqueue_execute_completed_event( + app, schedule_id, OperationToStart("op1", {}) + ) case EventType.ON_REVERT_COMPLETED: - await enqueue_revert_completed_event(app, schedule_id, "op1", {}) + await enqueue_revert_completed_event( + app, schedule_id, OperationToStart("op1", {}) + ) case _: pytest.fail("unsupported case") diff --git a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__store.py b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__store.py index c22ced8f5f5d..15beb75a4471 100644 --- a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__store.py +++ b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test__store.py @@ -18,10 +18,10 @@ ) from simcore_service_dynamic_scheduler.services.generic_scheduler._store import ( EventType, - OperationContext, OperationContextProxy, OperationEventsProxy, OperationRemovalProxy, + OperationToStart, ScheduleDataStoreProxy, StepGroupProxy, StepStoreProxy, @@ -355,6 +355,9 @@ async def test_operation_context_proxy( assert await proxy.read(*provided_context.keys()) == provided_context + # if a keys is missing the value is always None + assert await proxy.read("missing-key") == {"missing-key": None} + async def test_operation_removal_proxy(store: Store, schedule_id: ScheduleId): await _assert_keys(store, set()) @@ -420,11 +423,34 @@ async def test_operation_removal_proxy(store: Store, schedule_id: ScheduleId): await proxy.delete() -async def test_operation_events_proxy(store: Store, schedule_id: ScheduleId): - operation_name = "op1" - initial_context: OperationContext = {"k1": "v1", "k2": 2} - - event_type = EventType.ON_EXECUTEDD_COMPLETED +@pytest.mark.parametrize( + "to_start, on_execute_completed, on_revert_completed", + [ + ( + OperationToStart("op1", {"k1": "v1"}), + None, + None, + ), + ( + OperationToStart("op2", {"k2": 2}), + OperationToStart("op3", {"k3": True}), + None, + ), + ( + OperationToStart("op4", {"k4": 3.14}), + OperationToStart("op5", {"k5": [1, 2, 3]}), + OperationToStart("op6", {"k6": {"a": "b"}}), + ), + ], +) +async def test_operation_events_proxy( + store: Store, + schedule_id: ScheduleId, + to_start: OperationToStart, + on_execute_completed: OperationToStart | None, + on_revert_completed: OperationToStart | None, +): + event_type = EventType.ON_EXECUTED_COMPLETED proxy = OperationEventsProxy(store, schedule_id, event_type) hash_key = f"SCH:{schedule_id}:EVENTS:{event_type}" @@ -433,15 +459,22 @@ async def test_operation_events_proxy(store: Store, schedule_id: ScheduleId): await _assert_keys_in_hash(store, hash_key, set()) await proxy.create_or_update_multiple( - {"operation_name": operation_name, "initial_context": initial_context} + { + "to_start": to_start, + "on_execute_completed": on_execute_completed, + "on_revert_completed": on_revert_completed, + } ) assert await proxy.exists() is True await _assert_keys(store, {hash_key}) - await _assert_keys_in_hash(store, hash_key, {"operation_name", "initial_context"}) + await _assert_keys_in_hash( + store, hash_key, {"to_start", "on_execute_completed", "on_revert_completed"} + ) - assert await proxy.read("operation_name") == operation_name - assert await proxy.read("initial_context") == initial_context + assert await proxy.read("to_start") == to_start + assert await proxy.read("on_execute_completed") == on_execute_completed + assert await proxy.read("on_revert_completed") == on_revert_completed await proxy.delete() assert await proxy.exists() is False diff --git a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py index f22dee36bbfb..9dd00014fa0d 100644 --- a/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py +++ b/services/dynamic-scheduler/tests/unit/services/generic_scheduler/test_generic_scheduler.py @@ -19,6 +19,14 @@ from common_library.async_tools import cancel_wait_task from fastapi import FastAPI from pydantic import NonNegativeFloat, NonNegativeInt +from pytest_simcore.helpers.dynamic_scheduler import ( + BaseExpectedStepOrder, + ExecuteRandom, + ExecuteSequence, + RevertSequence, + ensure_expected_order, + ensure_keys_in_store, +) from pytest_simcore.helpers.paused_container import pause_rabbit, pause_redis from pytest_simcore.helpers.typing_env import EnvVarsDict from servicelib.deferred_tasks import DeferredContext @@ -46,14 +54,6 @@ from simcore_service_dynamic_scheduler.services.generic_scheduler._errors import ( OperationInitialContextKeyNotFoundError, ) -from utils import ( - BaseExpectedStepOrder, - ExecuteRandom, - ExecuteSequence, - RevertSequence, - ensure_expected_order, - ensure_keys_in_store, -) pytest_simcore_core_services_selection = [ "rabbit", @@ -80,6 +80,7 @@ def _get_random_interruption_duration() -> NonNegativeFloat: @pytest.fixture def app_environment( + disable_scheduler_lifespan: None, disable_postgres_lifespan: None, disable_service_tracker_lifespan: None, disable_notifier_lifespan: None, @@ -177,7 +178,7 @@ def __init__(self, multiprocessing_queue: _AsyncMultiprocessingQueue) -> None: async def _async_worker(self, operation_name: OperationName) -> None: async with _get_app(self.multiprocessing_queue) as app: - await start_operation(app, operation_name, {}) + await start_operation(app, OperationToStart(operation_name, {})) while True: # noqa: ASYNC110 await asyncio.sleep(1) @@ -469,8 +470,7 @@ async def test_run_operation_after( schedule_id = await start_operation( app, - _INITIAL_OP_NAME, - {}, + OperationToStart(_INITIAL_OP_NAME, {}), on_execute_completed=on_execute_completed, on_revert_completed=on_revert_completed, ) @@ -515,17 +515,20 @@ async def test_missing_initial_context_key_from_operation( ) # 1. check it works - await start_operation(app, bad_operation_name, good_initial_context) + await start_operation( + app, OperationToStart(bad_operation_name, good_initial_context) + ) # 2. check it raises with a bad context with pytest.raises(OperationInitialContextKeyNotFoundError): - await start_operation(app, bad_operation_name, bad_initial_context) + await start_operation( + app, OperationToStart(bad_operation_name, bad_initial_context) + ) with pytest.raises(OperationInitialContextKeyNotFoundError): await start_operation( app, - good_operation_name, - good_initial_context, + OperationToStart(good_operation_name, good_initial_context), on_execute_completed=bad_operation_to_start, on_revert_completed=None, ) @@ -533,14 +536,15 @@ async def test_missing_initial_context_key_from_operation( with pytest.raises(OperationInitialContextKeyNotFoundError): await start_operation( app, - good_operation_name, - good_initial_context, + OperationToStart(good_operation_name, good_initial_context), on_execute_completed=None, on_revert_completed=bad_operation_to_start, ) # 3. register_to_start_after... raises with a bad context - schedule_id = await start_operation(app, bad_operation_name, good_initial_context) + schedule_id = await start_operation( + app, OperationToStart(bad_operation_name, good_initial_context) + ) with pytest.raises(OperationInitialContextKeyNotFoundError): await register_to_start_after_on_executed_completed( diff --git a/services/dynamic-scheduler/tests/unit/services/scheduler/_oprations/test_registry.py b/services/dynamic-scheduler/tests/unit/services/scheduler/_oprations/test_registry.py new file mode 100644 index 000000000000..cb51f89bea48 --- /dev/null +++ b/services/dynamic-scheduler/tests/unit/services/scheduler/_oprations/test_registry.py @@ -0,0 +1,22 @@ +# pylint:disable=protected-access + +from pydantic import NonNegativeInt +from simcore_service_dynamic_scheduler.services.generic_scheduler import ( + OperationRegistry, +) +from simcore_service_dynamic_scheduler.services.scheduler._operations.registry import ( + register_operataions, + unregister_operations, +) + + +def _ensure_registered_operations(*, count: NonNegativeInt) -> None: + assert len(OperationRegistry._OPERATIONS) == count # noqa: SLF001 + + +def test_register_unregister_operations() -> None: + _ensure_registered_operations(count=0) + register_operataions() + _ensure_registered_operations(count=7) + unregister_operations() + _ensure_registered_operations(count=0) diff --git a/services/dynamic-scheduler/tests/unit/services/scheduler/test__redis.py b/services/dynamic-scheduler/tests/unit/services/scheduler/test__redis.py new file mode 100644 index 000000000000..be304f924415 --- /dev/null +++ b/services/dynamic-scheduler/tests/unit/services/scheduler/test__redis.py @@ -0,0 +1,152 @@ +# pylint:disable=redefined-outer-name +# pylint:disable=unused-argument + + +import pytest +from faker import Faker +from fastapi import FastAPI +from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( + DynamicServiceStart, + DynamicServiceStop, +) +from models_library.projects_nodes_io import NodeID +from pydantic import TypeAdapter +from pytest_simcore.helpers.typing_env import EnvVarsDict +from settings_library.redis import RedisSettings +from simcore_service_dynamic_scheduler.services.generic_scheduler import ScheduleId +from simcore_service_dynamic_scheduler.services.scheduler._models import ( + UserRequestedState, +) +from simcore_service_dynamic_scheduler.services.scheduler._redis import ( + RedisServiceStateManager, +) + +pytest_simcore_core_services_selection = [ + "redis", +] +pytest_simcore_ops_services_selection = [ + "redis-commander", +] + + +@pytest.fixture +def app_environment( + disable_deferred_manager_lifespan: None, + disable_rabbitmq_lifespan: None, + disable_generic_scheduler_lifespan: None, + disable_postgres_lifespan: None, + disable_service_tracker_lifespan: None, + disable_notifier_lifespan: None, + disable_status_monitor_lifespan: None, + app_environment: EnvVarsDict, + redis_service: RedisSettings, + remove_redis_data: None, +) -> EnvVarsDict: + return app_environment + + +@pytest.fixture +def node_id(faker: Faker) -> NodeID: + return faker.uuid4(cast_to=None) + + +@pytest.fixture +def schedule_id(faker: Faker) -> ScheduleId: + return faker.uuid4() + + +@pytest.fixture +def dynamic_service_start() -> DynamicServiceStart: + return TypeAdapter(DynamicServiceStart).validate_python( + DynamicServiceStart.model_json_schema()["example"] + ) + + +@pytest.fixture +def dynamic_service_stop() -> DynamicServiceStop: + return TypeAdapter(DynamicServiceStop).validate_python( + DynamicServiceStop.model_json_schema()["example"] + ) + + +async def test_redis_service_state( + app: FastAPI, + node_id: NodeID, + schedule_id: ScheduleId, + dynamic_service_start: DynamicServiceStart, + dynamic_service_stop: DynamicServiceStop, +): + state_manager = RedisServiceStateManager(app=app, node_id=node_id) + + # 1. check nothing present + assert await state_manager.exists() is False + assert await state_manager.read("desired_state") is None + assert await state_manager.read("desired_start_data") is None + assert await state_manager.read("desired_stop_data") is None + assert await state_manager.read("current_start_data") is None + assert await state_manager.read("current_stop_data") is None + assert await state_manager.read("current_schedule_id") is None + # reading does not create items + assert await state_manager.exists() is False + + # 2. create some entries + await state_manager.create_or_update("desired_state", UserRequestedState.RUNNING) + # already works with one entry regarless of which one is + assert await state_manager.exists() is True + assert await state_manager.read("desired_state") == UserRequestedState.RUNNING + + await state_manager.create_or_update("current_state", UserRequestedState.STOPPED) + assert await state_manager.read("current_state") == UserRequestedState.STOPPED + + await state_manager.create_or_update("current_schedule_id", schedule_id) + assert await state_manager.read("current_schedule_id") == schedule_id + + await state_manager.create_or_update("current_start_data", dynamic_service_start) + assert await state_manager.read("current_start_data") == dynamic_service_start + await state_manager.create_or_update("desired_start_data", dynamic_service_start) + assert await state_manager.read("desired_start_data") == dynamic_service_start + + await state_manager.create_or_update("current_stop_data", dynamic_service_stop) + assert await state_manager.read("current_stop_data") == dynamic_service_stop + await state_manager.create_or_update("desired_stop_data", dynamic_service_stop) + assert await state_manager.read("desired_stop_data") == dynamic_service_stop + # still true regardless of how many entries + assert await state_manager.exists() is True + + # 3. remove nothig is presnet any longer + await state_manager.delete() + assert await state_manager.exists() is False + + # 4 setting multiple is the same + + await state_manager.create_or_update_multiple( + { + "desired_state": UserRequestedState.STOPPED, + "desired_stop_data": dynamic_service_stop, + "desired_start_data": dynamic_service_start, + "current_schedule_id": schedule_id, + "current_state": UserRequestedState.STOPPED, + "current_start_data": dynamic_service_start, + "current_stop_data": dynamic_service_stop, + } + ) + assert await state_manager.exists() is True + assert await state_manager.read("desired_state") == UserRequestedState.STOPPED + assert await state_manager.read("desired_start_data") == dynamic_service_start + assert await state_manager.read("desired_stop_data") == dynamic_service_stop + assert await state_manager.read("current_schedule_id") == schedule_id + assert await state_manager.read("current_state") == UserRequestedState.STOPPED + assert await state_manager.read("current_start_data") == dynamic_service_start + assert await state_manager.read("current_stop_data") == dynamic_service_stop + + # 5. remove nothig is presnet any longer + await state_manager.delete() + assert await state_manager.exists() is False + + # 6. deleting a key + await state_manager.create_or_update("current_schedule_id", schedule_id) + assert await state_manager.read("current_schedule_id") == schedule_id + await state_manager.delete_key("current_schedule_id") + assert await state_manager.read("current_schedule_id") is None + # can also delete unexising key withtout errors + await state_manager.delete_key("current_schedule_id") diff --git a/services/dynamic-scheduler/tests/unit/services/scheduler/test__utils.py b/services/dynamic-scheduler/tests/unit/services/scheduler/test__utils.py new file mode 100644 index 000000000000..a9b478fb61aa --- /dev/null +++ b/services/dynamic-scheduler/tests/unit/services/scheduler/test__utils.py @@ -0,0 +1,37 @@ +# pylint:disable=redefined-outer-name +# pylint:disable=unused-argument + + +import pytest +from simcore_service_dynamic_scheduler.services.scheduler._errors import ( + UnexpectedCouldNotDetermineOperationTypeError, +) +from simcore_service_dynamic_scheduler.services.scheduler._models import OperationType +from simcore_service_dynamic_scheduler.services.scheduler._utils import ( + get_scheduler_oepration_name, + get_scheduler_operation_type_or_raise, +) + + +@pytest.mark.parametrize( + "suffix", + [ + "1234", + "abcd", + "some_suffix", + "_", + "____", + "___asd___Asd", + "", + ], +) +@pytest.mark.parametrize("operation_type", OperationType) +def test_operation_names(operation_type: OperationType, suffix: str) -> None: + operation_name = get_scheduler_oepration_name(operation_type, suffix) + assert operation_name == f"{operation_type}_{suffix}" + assert get_scheduler_operation_type_or_raise(name=operation_name) == operation_type + + +def test_raise_on_invalid_operation_name() -> None: + with pytest.raises(UnexpectedCouldNotDetermineOperationTypeError): + get_scheduler_operation_type_or_raise(name="invalid_operation_name") diff --git a/services/dynamic-scheduler/tests/unit/services/scheduler/test_scheduler.py b/services/dynamic-scheduler/tests/unit/services/scheduler/test_scheduler.py new file mode 100644 index 000000000000..a8d57f754e17 --- /dev/null +++ b/services/dynamic-scheduler/tests/unit/services/scheduler/test_scheduler.py @@ -0,0 +1,455 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=too-many-instance-attributes +# pylint: disable=unused-argument +import asyncio +import logging +from asyncio import Event +from collections.abc import Iterable +from copy import deepcopy +from dataclasses import dataclass +from datetime import timedelta +from types import ModuleType +from typing import ClassVar, Final +from uuid import uuid4 + +import pytest +from faker import Faker +from fastapi import FastAPI +from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( + DynamicServiceStart, + DynamicServiceStop, +) +from models_library.projects_nodes_io import NodeID +from pydantic import NonNegativeFloat, TypeAdapter +from pytest_mock import MockerFixture +from pytest_simcore.helpers.dynamic_scheduler import ( + EXECUTED, + REVERTED, + BaseExpectedStepOrder, + ExecuteSequence, + RevertSequence, + ensure_expected_order, +) +from pytest_simcore.helpers.typing_env import EnvVarsDict +from servicelib.deferred_tasks import DeferredContext +from servicelib.logging_utils import log_context +from settings_library.rabbit import RabbitSettings +from settings_library.redis import RedisSettings +from simcore_service_dynamic_scheduler.services.generic_scheduler import ( + BaseStep, + ProvidedOperationContext, + RequiredOperationContext, + SingleStepGroup, +) +from simcore_service_dynamic_scheduler.services.generic_scheduler._models import ( + OperationContext, +) +from simcore_service_dynamic_scheduler.services.scheduler import ( + start_service, + stop_service, +) +from simcore_service_dynamic_scheduler.services.scheduler._operations import ( + legacy, + new_style, +) + +_logger = logging.getLogger(__name__) + +pytest_simcore_core_services_selection = [ + "rabbit", + "redis", +] +pytest_simcore_ops_services_selection = [ + "redis-commander", +] + + +# UTILS + +type _EventID = str + + +class _HaltEventTracker: + """cannot store Event instances in Redis, keeping track in meory""" + + _EVENTS: ClassVar[dict[_EventID, Event]] = {} + + @classmethod + def create_event(cls) -> _EventID: + event_id = f"{uuid4()}" + cls._EVENTS[event_id] = Event() + return event_id + + @classmethod + def get_event(cls, event_id: _EventID) -> Event: + return cls._EVENTS[event_id] + + +def _get_key_execute(cls: type[BaseStep]) -> str: + return f"{cls.__name__}_execute" + + +def _get_key_revert(cls: type[BaseStep]) -> str: + return f"{cls.__name__}_revert" + + +@dataclass +class _Action: + halt_event_id: _EventID | None = None + fail: bool = False + + +_SLEEP_FOREVER: Final[NonNegativeFloat] = 1e6 +_STEPS_CALL_ORDER: list[tuple[str, str]] = [] + + +@pytest.fixture +def steps_call_order() -> Iterable[list[tuple[str, str]]]: + _STEPS_CALL_ORDER.clear() + yield _STEPS_CALL_ORDER + _STEPS_CALL_ORDER.clear() + + +class _CoreBaseStep(BaseStep): + @classmethod + async def get_execute_wait_between_attempts( + cls, context: DeferredContext + ) -> timedelta: + _ = context + return timedelta(seconds=_SLEEP_FOREVER) + + @classmethod + async def get_revert_wait_between_attempts( + cls, context: DeferredContext + ) -> timedelta: + _ = context + return timedelta(seconds=_SLEEP_FOREVER) + + @classmethod + async def _execute_action(cls, app: FastAPI, action: _Action) -> None: + _ = app + with log_context( + _logger, logging.DEBUG, f"step {cls.__name__} executing action {action=}" + ): + if action.fail: + msg = f"Step {cls.__name__} failed as requested" + raise RuntimeError(msg) + + if action.halt_event_id is not None: + # allows to halt execution here and intercept when it ararives + # at this point to trigger further actions + event = _HaltEventTracker.get_event(action.halt_event_id) + + event.set() + await asyncio.sleep(_SLEEP_FOREVER) + + @classmethod + def get_execute_requires_context_keys(cls) -> set[str]: + return {_get_key_execute(cls)} + + @classmethod + async def execute( + cls, app: FastAPI, required_context: RequiredOperationContext + ) -> ProvidedOperationContext | None: + _STEPS_CALL_ORDER.append((cls.__name__, EXECUTED)) + + action: dict = required_context[_get_key_execute(cls)] + await cls._execute_action(app, _Action(**action)) + return None + + @classmethod + def get_revert_requires_context_keys(cls) -> set[str]: + return {_get_key_revert(cls)} + + @classmethod + async def revert( + cls, app: FastAPI, required_context: RequiredOperationContext + ) -> ProvidedOperationContext | None: + _STEPS_CALL_ORDER.append((cls.__name__, REVERTED)) + + action: dict = required_context[_get_key_revert(cls)] + await cls._execute_action(app, _Action(**action)) + return None + + +_DEFAULT_SLEEP_BEFORE_REPEAT: Final[timedelta] = timedelta(seconds=0.1) + + +class _LegacyStart(_CoreBaseStep): ... + + +class _LegacyMonitor(_CoreBaseStep): + @classmethod + def get_sleep_before_execute(cls) -> timedelta: + """ + [optional] wait time before executing the step + """ + return _DEFAULT_SLEEP_BEFORE_REPEAT + + +class _NewStyleStop(_CoreBaseStep): ... + + +class _NewStyleStart(_CoreBaseStep): ... + + +class _NewStyleMonitor(_CoreBaseStep): + @classmethod + def get_sleep_before_execute(cls) -> timedelta: + """ + [optional] wait time before executing the step + """ + return _DEFAULT_SLEEP_BEFORE_REPEAT + + +class _LegacyStop(_CoreBaseStep): ... + + +@dataclass +class _ProfileConfig: + is_legacy: bool = False + + # START + start_execute_halt: bool = False + start_execute_fails: bool = False + start_revert_halt: bool = False + start_revert_fails: bool = False + + # MONITOR + monitor_execute_halt: bool = False + monitor_execute_fails: bool = False + monitor_revert_halt: bool = False + monitor_revert_fails: bool = False + + # STOP + stop_execute_halt: bool = False + stop_execute_fails: bool = False + stop_revert_halt: bool = False + stop_revert_fails: bool = False + + +@dataclass +class _HaltEvents: + legacy_start_execute: Event + legacy_start_revert: Event + + legacy_monitor_execute: Event + legacy_monitor_revert: Event + + legacy_stop_execute: Event + legacy_stop_revert: Event + + new_style_start_execute: Event + new_style_start_revert: Event + + new_style_monitor_execute: Event + new_style_monitor_revert: Event + + new_style_stop_execute: Event + new_style_stop_revert: Event + + +def _create_action_data(*, register_event: bool, fail: bool) -> tuple[_Action, Event]: + halt_event_id = _HaltEventTracker.create_event() + action = _Action(halt_event_id=halt_event_id if register_event else None, fail=fail) + + halt_event = _HaltEventTracker.get_event(halt_event_id) + return action, halt_event + + +def _get_context_and_events( + config: _ProfileConfig, +) -> tuple[OperationContext, _HaltEvents]: + initial_context: OperationContext = {} + + start = _LegacyStart if config.is_legacy else _NewStyleStart + monitor = _LegacyMonitor if config.is_legacy else _NewStyleMonitor + stop = _LegacyStop if config.is_legacy else _NewStyleStop + + # START + action_start_execute, event_start_execute = _create_action_data( + register_event=config.start_execute_halt, fail=config.start_execute_fails + ) + initial_context[_get_key_execute(start)] = action_start_execute + + action_start_revert, event_start_revert = _create_action_data( + register_event=config.start_revert_halt, fail=config.start_revert_fails + ) + initial_context[_get_key_revert(start)] = action_start_revert + + # MONITOR + action_monitor_execute, event_monitor_execute = _create_action_data( + register_event=config.monitor_execute_halt, + fail=config.monitor_execute_fails, + ) + initial_context[_get_key_execute(monitor)] = action_monitor_execute + action_monitor_revert, event_monitor_revert = _create_action_data( + register_event=config.monitor_revert_halt, + fail=config.monitor_revert_fails, + ) + initial_context[_get_key_revert(monitor)] = action_monitor_revert + + # STOP + action_stop_execute, event_stop_execute = _create_action_data( + register_event=config.stop_execute_halt, fail=config.stop_execute_fails + ) + initial_context[_get_key_execute(stop)] = action_stop_execute + action_stop_revert, event_stop_revert = _create_action_data( + register_event=config.stop_revert_halt, fail=config.stop_revert_fails + ) + initial_context[_get_key_revert(stop)] = action_stop_revert + + halt_events = _HaltEvents( + legacy_start_execute=event_start_execute, + legacy_start_revert=event_start_revert, + legacy_monitor_execute=event_monitor_execute, + legacy_monitor_revert=event_monitor_revert, + legacy_stop_execute=event_stop_execute, + legacy_stop_revert=event_stop_revert, + new_style_start_execute=event_start_execute, + new_style_start_revert=event_start_revert, + new_style_monitor_execute=event_monitor_execute, + new_style_monitor_revert=event_monitor_revert, + new_style_stop_execute=event_stop_execute, + new_style_stop_revert=event_stop_revert, + ) + + return initial_context, halt_events + + +# MOCKS + + +@pytest.fixture +def mock_operations(mocker: MockerFixture) -> None: + # replace all steps that do somthing in the operations with mocked ones + # allows to simulate various testing scenarios while not impacting functionality + + def _replace_steps_in_operation( + module: ModuleType, step_cls: type[BaseStep], *, is_monitor: bool = False + ) -> None: + mocker.patch.object( + module, "_steps", new=[SingleStepGroup(step_cls, repeat_steps=is_monitor)] + ) + + _replace_steps_in_operation(legacy.start, _LegacyStart) + _replace_steps_in_operation(legacy.monitor, _LegacyMonitor, is_monitor=True) + _replace_steps_in_operation(legacy.stop, _LegacyStop) + + _replace_steps_in_operation(new_style.start, _NewStyleStart) + _replace_steps_in_operation(new_style.monitor, _NewStyleMonitor, is_monitor=True) + _replace_steps_in_operation(new_style.stop, _NewStyleStop) + + +@pytest.fixture +def inject_context_and_events( + mocker: MockerFixture, profile_config: _ProfileConfig +) -> _HaltEvents: + initial_context, halt_events = _get_context_and_events(profile_config) + + def _get_initial_context(node_id: NodeID) -> OperationContext: + context = deepcopy(initial_context) + context["node_id"] = node_id + return context + + mocker.patch( + "simcore_service_dynamic_scheduler.services.scheduler._operations.enforce._get_start_monitor_stop_initial_context", + _get_initial_context, + ) + + return halt_events + + +@pytest.fixture +def halt_events(inject_context_and_events: _HaltEvents) -> _HaltEvents: + return inject_context_and_events + + +@pytest.fixture +def app_environment( + disable_postgres_lifespan: None, + disable_service_tracker_lifespan: None, + disable_notifier_lifespan: None, + disable_status_monitor_lifespan: None, + app_environment: EnvVarsDict, + rabbit_service: RabbitSettings, + redis_service: RedisSettings, + remove_redis_data: None, +) -> EnvVarsDict: + return app_environment + + +@pytest.fixture +def node_id(faker: Faker) -> NodeID: + return faker.uuid4(cast_to=None) + + +@pytest.fixture +def start_data(node_id: NodeID) -> DynamicServiceStart: + data = deepcopy(DynamicServiceStart.model_json_schema()["example"]) + data["service_uuid"] = node_id + return TypeAdapter(DynamicServiceStart).validate_python(data) + + +@pytest.fixture +def stop_data(node_id: NodeID) -> DynamicServiceStop: + data = deepcopy(DynamicServiceStop.model_json_schema()["example"]) + data["node_id"] = node_id + return TypeAdapter(DynamicServiceStop).validate_python(data) + + +# TESTS + + +@pytest.mark.parametrize( + "profile_config, expected_order_after_start, expected_order_after_stop", + [ + pytest.param( + _ProfileConfig(is_legacy=True), + [ + ExecuteSequence( + _LegacyStart, + _LegacyMonitor, + ) + ], + [ + ExecuteSequence(_LegacyMonitor), + RevertSequence(_LegacyMonitor), + ExecuteSequence(_LegacyStop), + ], + ), + ], +) +async def test_something( + preserve_caplog_for_async_logging: None, + mock_operations: None, + halt_events: _HaltEvents, + steps_call_order: list[tuple[str, str]], + app: FastAPI, + start_data: DynamicServiceStart, + stop_data: DynamicServiceStop, + expected_order_after_start: list[BaseExpectedStepOrder], + expected_order_after_stop: list[BaseExpectedStepOrder], +) -> None: + await start_service(app, start_data) + await ensure_expected_order( + steps_call_order, expected_order_after_start, use_only_first_entries=True + ) + + await stop_service(app, stop_data) + await ensure_expected_order( + steps_call_order, expected_order_after_stop, use_only_last_entries=True + ) + + +# - mock with something that can cause errors and problems +# - we want very faulty steps and we need to generate different configuration of these +# - we simulate it being stuck in MONITOR, STOP, START +# - try to emulate all possible combinations so that we can see if there are any errors + + +# TODO: try to recover form error states as well +# mock with event wait and set & wait forever if enabled + + +# TODO: cancelling this is hard, should work faster, why not have a step that waits instead of the _core?