Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
1bf89c0
rename
Oct 8, 2025
63f092e
added redis
Oct 8, 2025
17fbd62
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Oct 9, 2025
47c138e
added lifespan
Oct 10, 2025
ca0de8a
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Oct 10, 2025
e6c4eb4
refactored redis and utils
Oct 10, 2025
5a5f03b
added empty template
Oct 10, 2025
1b9c29c
docstring and exposing exposing error
Oct 10, 2025
7daf536
fixed redis tests
Oct 13, 2025
10f8c85
fixed tests
Oct 13, 2025
773c8db
exposed
Oct 13, 2025
1c14af2
extended
Oct 13, 2025
80e7f5c
added delte
Oct 13, 2025
6401948
expand tests
Oct 13, 2025
ed27670
added common stesps
Oct 13, 2025
f3d2050
rename to current start and stop data
Oct 13, 2025
bfa0d25
defined neforce
Oct 13, 2025
e494751
defined registry
Oct 13, 2025
eb55012
added manager and opration names
Oct 13, 2025
b243887
added current state
Oct 13, 2025
99459f4
added base oprations
Oct 13, 2025
e05a8ce
added baseic opration registration
Oct 13, 2025
e0e6f7a
pylint
Oct 13, 2025
0c91fa2
unused
Oct 13, 2025
8830434
ensure switching does not raise
Oct 13, 2025
bd374f2
refactor
Oct 13, 2025
2ab04b1
refactor
Oct 14, 2025
abd23f4
added constants
Oct 14, 2025
49e32cf
reduced repeat wait interval
Oct 14, 2025
9e75f2f
operations refactor
Oct 14, 2025
cb24ab3
moved test utils
Oct 14, 2025
480deea
allowed to return None on some keys
Oct 14, 2025
cb8280f
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Oct 14, 2025
d9aeeec
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Oct 15, 2025
77ebace
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Nov 27, 2025
6eba20d
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate…
Nov 27, 2025
3225c59
refactor
Nov 28, 2025
51cec9d
refactor
Nov 28, 2025
d46e947
redefined operations bases
Nov 28, 2025
723c130
changed repeating logic
Nov 28, 2025
84a309b
refactored
Nov 28, 2025
d521e4c
alligning names
Nov 28, 2025
cad1b8f
refactor
Nov 28, 2025
5685eea
added required
Nov 28, 2025
0c28675
updated generic_scheduler's interface
Nov 28, 2025
e4f1cf2
exposed proper interface
Nov 28, 2025
fbaa339
added base scheduler test that works
Nov 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ..services.notifier import get_notifier_lifespans
from ..services.rabbitmq import rabbitmq_lifespan
from ..services.redis import redis_lifespan
from ..services.scheduler import scheduler_lifespan
from ..services.service_tracker import service_tracker_lifespan
from ..services.status_monitor import status_monitor_lifespan
from .settings import ApplicationSettings
Expand Down Expand Up @@ -84,6 +85,7 @@ def create_app_lifespan(
app_lifespan.add(lifespan)

app_lifespan.add(generic_scheduler_lifespan)
app_lifespan.add(scheduler_lifespan)

app_lifespan.add(service_tracker_lifespan)
app_lifespan.add(deferred_manager_lifespan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
)
from ._lifespan import generic_scheduler_lifespan
from ._models import (
OperationContext,
OperationName,
OperationToStart,
ProvidedOperationContext,
RequiredOperationContext,
ReservedContextKeys,
ScheduleId,
)
from ._operation import (
Expand Down Expand Up @@ -48,6 +50,7 @@
"get_step_store_proxy",
"NoDataFoundError",
"Operation",
"OperationContext",
"OperationContextProxy",
"OperationName",
"OperationRegistry",
Expand All @@ -57,6 +60,7 @@
"register_to_start_after_on_executed_completed",
"register_to_start_after_on_reverted_completed",
"RequiredOperationContext",
"ReservedContextKeys",
"restart_operation_step_stuck_during_revert",
"restart_operation_step_stuck_in_manual_intervention_during_execute",
"ScheduleId",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import logging
from collections.abc import Iterable
from contextlib import suppress
Expand Down Expand Up @@ -48,7 +47,6 @@
)
from ._models import (
EventType,
OperationContext,
OperationErrorType,
OperationName,
OperationToStart,
Expand Down Expand Up @@ -94,38 +92,40 @@ def __init__(

async def start_operation(
self,
operation_name: OperationName,
initial_operation_context: OperationContext,
operation_to_start: OperationToStart,
on_execute_completed: OperationToStart | None,
on_revert_completed: OperationToStart | None,
) -> ScheduleId:
"""start an operation by it's given name and providing an initial context"""
schedule_id: ScheduleId = f"{uuid4()}"

initial_operation_context[ReservedContextKeys.SCHEDULE_ID] = schedule_id
operation_to_start.initial_context[ReservedContextKeys.SCHEDULE_ID] = (
schedule_id
)

# check if operation is registered
operation = OperationRegistry.get_operation(operation_name)
operation = OperationRegistry.get_operation(operation_to_start.operation_name)

for required_key in operation.initial_context_required_keys:
if required_key not in initial_operation_context:
if required_key not in operation_to_start.initial_context:
raise OperationInitialContextKeyNotFoundError(
operation_name=operation_name, required_key=required_key
operation_name=operation_to_start.operation_name,
required_key=required_key,
)

# NOTE: to ensure reproducibility of operations, the
# operation steps cannot overwrite keys in the
# initial context with their results
raise_if_overwrites_any_operation_provided_key(
operation, initial_operation_context
operation, operation_to_start.initial_context
)

schedule_data_proxy = ScheduleDataStoreProxy(
store=self._store, schedule_id=schedule_id
)
await schedule_data_proxy.create_or_update_multiple(
{
"operation_name": operation_name,
"operation_name": operation_to_start.operation_name,
"group_index": 0,
"is_executing": True,
}
Expand All @@ -134,9 +134,11 @@ async def start_operation(
operation_content_proxy = OperationContextProxy(
store=self._store,
schedule_id=schedule_id,
operation_name=operation_name,
operation_name=operation_to_start.operation_name,
)
await operation_content_proxy.create_or_update(
operation_to_start.initial_context
)
await operation_content_proxy.create_or_update(initial_operation_context)

if on_execute_completed:
await register_to_start_after_on_executed_completed(
Expand Down Expand Up @@ -188,7 +190,7 @@ async def cancel_operation(self, schedule_id: ScheduleId) -> None:
operation_name=operation_name,
group_index=group_index,
step_group=step_group,
is_executing=is_executing,
is_executing=True,
)

# not allowed to cancel while waiting for manual intervention
Expand All @@ -210,13 +212,16 @@ async def _cancel_step(step_name: StepName, step_proxy: StepStoreProxy) -> None:
with log_context(
_logger,
logging.DEBUG,
f"Cancelling step {step_name=} of {operation_name=} for {schedule_id=}",
f"cancelling step {step_name=} of {operation_name=} for {schedule_id=}",
):
with suppress(NoDataFoundError):
deferred_task_uid = await step_proxy.read("deferred_task_uid")
# the deferred task might not be running when this is called
# e.g. cancelling a repeating operation
await DeferredRunner.cancel(deferred_task_uid)
with log_context(
_logger, logging.DEBUG, f"cancelling {deferred_task_uid=}"
):
await DeferredRunner.cancel(deferred_task_uid)

await step_proxy.create_or_update("status", StepStatus.CANCELLED)

Expand All @@ -226,12 +231,16 @@ async def _cancel_step(step_name: StepName, step_proxy: StepStoreProxy) -> None:
schedule_id=schedule_id,
operation_name=operation_name,
step_group_name=step_group_name,
is_executing=is_executing,
is_executing=True,
)
if (
await group_proxy.increment_and_get_done_steps_count()
== expected_steps_count
):
# all steps cancelled, move to revert
await schedule_data_proxy.create_or_update(
"is_executing", value=False
)
await enqueue_schedule_event(self.app, schedule_id)

await limited_gather(
Expand Down Expand Up @@ -443,29 +452,29 @@ async def _on_schedule_event(self, schedule_id: ScheduleId) -> None:
step_group_name = step_group.get_step_group_name(index=group_index)
base_message = f"{step_group_name=} in {operation_name=} for {schedule_id=}"

if step_group.repeat_steps is True and is_executing:
with log_context(_logger, logging.DEBUG, f"REPEATING {base_message}"):
await self._advance_as_repeating(
schedule_data_proxy,
schedule_id,
operation_name,
group_index,
step_group,
group_step_proxies,
)

elif is_executing:
with log_context(_logger, logging.DEBUG, f"CREATING {base_message}"):
await self._advance_as_creating(
steps_statuses,
schedule_data_proxy,
schedule_id,
operation_name,
group_index,
step_group,
operation,
)

if is_executing:
if step_group.repeat_steps:
with log_context(_logger, logging.DEBUG, f"REPEATING {base_message}"):
await self._advance_as_repeating(
steps_statuses,
schedule_data_proxy,
schedule_id,
operation_name,
group_index,
step_group,
group_step_proxies,
)
else:
with log_context(_logger, logging.DEBUG, f"CREATING {base_message}"):
await self._advance_as_creating(
steps_statuses,
schedule_data_proxy,
schedule_id,
operation_name,
group_index,
step_group,
operation,
)
else:
with log_context(_logger, logging.DEBUG, f"REVERTING {base_message}"):
await self._advance_as_reverting(
Expand All @@ -479,6 +488,7 @@ async def _on_schedule_event(self, schedule_id: ScheduleId) -> None:

async def _advance_as_repeating(
self,
steps_statuses: dict[StepName, StepStatus],
schedule_data_proxy: ScheduleDataStoreProxy,
schedule_id: ScheduleId,
operation_name: OperationName,
Expand All @@ -487,27 +497,19 @@ async def _advance_as_repeating(
group_step_proxies: dict[StepName, StepStoreProxy],
) -> None:
# REPEATING logic:
# 1) sleep before repeating
# 2) if any of the repeating steps was cancelled -> move to revert
# 3) -> restart all steps in the group
# 1) if any of the repeating steps was cancelled -> move to revert
# 2) -> restart all steps in the group

step_proxies: Iterable[StepStoreProxy] = group_step_proxies.values()

# 1) sleep before repeating
await asyncio.sleep(current_step_group.wait_before_repeat.total_seconds())

# 2) if any of the repeating steps was cancelled -> move to revert

# since some time passed, query all steps statuses again,
# a cancellation request might have been requested
steps_stauses = await get_steps_statuses(step_proxies)
if any(status == StepStatus.CANCELLED for status in steps_stauses.values()):
# 1) if any of the repeating steps was cancelled -> move to revert
if any(status == StepStatus.CANCELLED for status in steps_statuses.values()):
# NOTE:
await schedule_data_proxy.create_or_update("is_executing", value=False)
await enqueue_schedule_event(self.app, schedule_id)
return

# 3) -> restart all steps in the group
# 2) -> restart all steps in the group
await limited_gather(
*(x.delete() for x in step_proxies), limit=PARALLEL_REQUESTS
)
Expand Down Expand Up @@ -554,30 +556,30 @@ async def _advance_as_creating(

# 1b) if reached the end of the EXECUTE operation -> remove all created data [EMIT execute complete event]
on_executed_proxy = OperationEventsProxy(
self._store, schedule_id, EventType.ON_EXECUTEDD_COMPLETED
self._store, schedule_id, EventType.ON_EXECUTED_COMPLETED
)
on_executed_operation_name: OperationName | None = None
on_executed_initial_context: OperationContext | None = None
on_executed_to_start: OperationToStart | None = None
on_executed_on_execute_completed: OperationToStart | None = None
on_executed_on_revert_completed: OperationToStart | None = None
if await on_executed_proxy.exists():
on_executed_operation_name = await on_executed_proxy.read(
"operation_name"
on_executed_to_start = await on_executed_proxy.read("to_start")
on_executed_on_execute_completed = await on_executed_proxy.read(
"on_execute_completed"
)
on_executed_initial_context = await on_executed_proxy.read(
"initial_context"
on_executed_on_revert_completed = await on_executed_proxy.read(
"on_revert_completed"
)

await cleanup_after_finishing(
self._store, schedule_id=schedule_id, is_executing=True
)
if (
on_executed_operation_name is not None
and on_executed_initial_context is not None
):
if on_executed_to_start is not None:
await enqueue_execute_completed_event(
self.app,
schedule_id,
on_executed_operation_name,
on_executed_initial_context,
on_executed_to_start,
on_execute_completed=on_executed_on_execute_completed,
on_revert_completed=on_executed_on_revert_completed,
)

return
Expand Down Expand Up @@ -657,28 +659,28 @@ async def _advance_as_reverting(
on_reverted_proxy = OperationEventsProxy(
self._store, schedule_id, EventType.ON_REVERT_COMPLETED
)
on_reverted_operation_name: OperationName | None = None
on_reverted_initial_context: OperationContext | None = None
on_reverted_to_start: OperationToStart | None = None
on_reverted_on_execute_completed: OperationToStart | None = None
on_reverted_on_revert_completed: OperationToStart | None = None
if await on_reverted_proxy.exists():
on_reverted_operation_name = await on_reverted_proxy.read(
"operation_name"
on_reverted_to_start = await on_reverted_proxy.read("to_start")
on_reverted_on_execute_completed = await on_reverted_proxy.read(
"on_execute_completed"
)
on_reverted_initial_context = await on_reverted_proxy.read(
"initial_context"
on_reverted_on_revert_completed = await on_reverted_proxy.read(
"on_revert_completed"
)

await cleanup_after_finishing(
self._store, schedule_id=schedule_id, is_executing=False
)
if (
on_reverted_operation_name is not None
and on_reverted_initial_context is not None
):
if on_reverted_to_start is not None:
await enqueue_revert_completed_event(
self.app,
schedule_id,
on_reverted_operation_name,
on_reverted_initial_context,
on_reverted_to_start,
on_execute_completed=on_reverted_on_execute_completed,
on_revert_completed=on_reverted_on_revert_completed,
)
return

Expand Down Expand Up @@ -747,23 +749,21 @@ async def _advance_as_reverting(

async def start_operation(
app: FastAPI,
operation_name: OperationName,
initial_operation_context: OperationContext,
operation_to_start: OperationToStart,
*,
on_execute_completed: OperationToStart | None = None,
on_revert_completed: OperationToStart | None = None,
) -> ScheduleId:
return await Core.get_from_app_state(app).start_operation(
operation_name,
initial_operation_context,
operation_to_start,
on_execute_completed,
on_revert_completed,
)


async def cancel_operation(app: FastAPI, schedule_id: ScheduleId) -> None:
"""
Unstruct scheduler to revert all steps completed until
Instruct scheduler to revert all steps completed until
now for the running operation.

`reverting` refers to the act of reverting the effects of a step
Expand Down
Loading
Loading