Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion docs/reference/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,26 @@ components:
- name
title: Task
type: object
TaskError:
additionalProperties: false
description: Wrapper around an exception raised by a plan
properties:
message:
title: Message
type: string
outcome:
const: error
default: error
title: Outcome
type: string
type:
title: Type
type: string
required:
- type
- message
title: TaskError
type: object
TaskRequest:
additionalProperties: false
description: Request to run a task with related info
Expand Down Expand Up @@ -280,6 +300,31 @@ components:
- task_id
title: TaskResponse
type: object
TaskResult:
additionalProperties: false
description: 'Serializable wrapper around the result of a plan


If the result is not serializable, the result will be None but the type

will be the name of the type. If the result is actually None, the type will

be ''NoneType''.'
properties:
outcome:
const: success
default: success
title: Outcome
type: string
result:
title: Result
type:
title: Type
type: string
required:
- type
title: TaskResult
type: object
TasksListResponse:
additionalProperties: false
description: Diagnostic information on the tasks
Expand Down Expand Up @@ -311,6 +356,12 @@ components:
default: true
title: Is Pending
type: boolean
outcome:
anyOf:
- $ref: '#/components/schemas/TaskResult'
- $ref: '#/components/schemas/TaskError'
- type: 'null'
title: Outcome
request_id:
title: Request Id
type: string
Expand Down Expand Up @@ -377,7 +428,7 @@ info:
name: Apache 2.0
url: https://www.apache.org/licenses/LICENSE-2.0.html
title: BlueAPI Control
version: 1.1.2
version: 1.2.0
openapi: 3.1.0
paths:
/config/oidc:
Expand Down
17 changes: 12 additions & 5 deletions src/blueapi/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from functools import wraps
from pathlib import Path
from pprint import pprint
from typing import ParamSpec, TypeVar
from typing import ParamSpec, TypeVar, cast

import click
from bluesky.callbacks.best_effort import BestEffortCallback
Expand Down Expand Up @@ -38,6 +38,7 @@
from blueapi.service.authentication import SessionCacheManager, SessionManager
from blueapi.service.model import SourceInfo, TaskRequest
from blueapi.worker import ProgressEvent, WorkerEvent
from blueapi.worker.event import TaskError, TaskResult

from .scratch import setup_scratch
from .updates import CliEventRenderer
Expand Down Expand Up @@ -290,7 +291,7 @@ def run_plan(
instrument_session: str,
) -> None:
"""Run a plan with parameters"""
client: BlueapiClient = obj["client"]
client: BlueapiClient = cast(BlueapiClient, obj["client"])

parameters = parameters or "{}"
try:
Expand Down Expand Up @@ -320,9 +321,15 @@ def on_event(event: AnyEvent) -> None:
callback(event.name, event.doc)

resp = client.run_task(task, on_event=on_event)

if resp.task_status is not None and not resp.task_status.task_failed:
print("Plan Succeeded")
match resp.result:
case TaskResult(result=None, type="NoneType"):
print("Plan succeeded")
case TaskResult(result=None, type=t):
print(f"Plan returned unserializable result of type '{t}'")
case TaskResult(result=r):
print(f"Plan succeeded: {r}")
case TaskError(type=exc, message=m):
print(f"Plan failed: {exc}: {m}")
else:
server_task = client.create_and_start_task(task)
click.echo(server_task.task_id)
Expand Down
22 changes: 11 additions & 11 deletions src/blueapi/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from blueapi.worker import TrackableTask, WorkerEvent, WorkerState
from blueapi.worker.event import ProgressEvent, TaskStatus

from .event_bus import AnyEvent, BlueskyStreamingError, EventBusClient, OnAnyEvent
from .event_bus import AnyEvent, EventBusClient, OnAnyEvent
from .rest import BlueapiRestClient, BlueskyRemoteControlError

TRACER = get_tracer("client")
Expand Down Expand Up @@ -201,7 +201,7 @@ def run_task(
task: TaskRequest,
on_event: OnAnyEvent | None = None,
timeout: float | None = None,
) -> WorkerEvent:
) -> TaskStatus:
"""
Synchronously run a task, requires a message bus connection

Expand All @@ -224,7 +224,7 @@ def run_task(
task_response = self.create_task(task)
task_id = task_response.task_id

complete: Future[WorkerEvent] = Future()
complete: Future[TaskStatus] = Future()

def inner_on_event(event: AnyEvent, ctx: MessageContext) -> None:
match event:
Expand All @@ -239,19 +239,19 @@ def inner_on_event(event: AnyEvent, ctx: MessageContext) -> None:
if relates_to_task:
if on_event is not None:
on_event(event)
if isinstance(event, WorkerEvent) and (
(event.is_complete()) and (ctx.correlation_id == task_id)
if (
isinstance(event, WorkerEvent)
and (event.is_complete())
and (ctx.correlation_id == task_id)
):
if event.task_status is not None and event.task_status.task_failed:
if event.task_status is None:
complete.set_exception(
BlueskyStreamingError(
"\n".join(event.errors)
if len(event.errors) > 0
else "Unknown error"
BlueskyRemoteControlError(
"Server completed without task status"
)
)
else:
complete.set_result(event)
complete.set_result(event.task_status)

with self._events:
self._events.subscribe_to_all_events(inner_on_event)
Expand Down
2 changes: 1 addition & 1 deletion src/blueapi/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class BlueskyContext:
configuration: InitVar[ApplicationConfig | None] = None

run_engine: RunEngine = field(
default_factory=lambda: RunEngine(context_managers=[])
default_factory=lambda: RunEngine(context_managers=[], call_returns_result=True)
)
tiled_conf: TiledConfig | None = field(default=None, init=False, repr=False)
numtracker: NumtrackerClient | None = field(default=None, init=False, repr=False)
Expand Down
2 changes: 1 addition & 1 deletion src/blueapi/service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
from .runner import WorkerDispatcher

#: API version to publish in OpenAPI schema
REST_API_VERSION = "1.1.2"
REST_API_VERSION = "1.2.0"

LICENSE_INFO: dict[str, str] = {
"name": "Apache 2.0",
Expand Down
8 changes: 7 additions & 1 deletion src/blueapi/utils/base_model.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
from pydantic import BaseModel, ConfigDict
import logging

from pydantic import (
BaseModel,
ConfigDict,
)

logger = logging.getLogger(__name__)
# Pydantic config for blueapi API models with common config.
BlueapiModelConfig = ConfigDict(
extra="forbid",
Expand Down
48 changes: 47 additions & 1 deletion src/blueapi/worker/event.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
from collections.abc import Mapping
from enum import StrEnum
from typing import Any, Literal, Self

from bluesky.run_engine import RunEngineStateMachine
from pydantic import Field
from pydantic import Field, PydanticSchemaGenerationError, TypeAdapter
from super_state_machine.extras import PropertyMachine, ProxyString

from blueapi.utils import BlueapiBaseModel
Expand All @@ -11,6 +13,8 @@
# RawRunEngineState = type[PropertyMachine | ProxyString | str]
RawRunEngineState = PropertyMachine | ProxyString | str

log = logging.getLogger(__name__)


# NOTE this is interim until refactor
class TaskStatusEnum(StrEnum):
Expand Down Expand Up @@ -52,6 +56,47 @@ def from_bluesky_state(cls, bluesky_state: RawRunEngineState) -> "WorkerState":
return WorkerState(str(bluesky_state).upper())


class TaskResult(BlueapiBaseModel):
"""
Serializable wrapper around the result of a plan

If the result is not serializable, the result will be None but the type
will be the name of the type. If the result is actually None, the type will
be 'NoneType'.
"""

outcome: Literal["success"] = "success"
"""Discriminant for serialization"""
result: Any = Field(None)
"""The serialized result (or None if it is not serializable)"""
type: str
"""The type of the result"""

@classmethod
def from_result(cls, result: Any) -> Self:
type_str = type(result).__name__
try:
value = TypeAdapter(type(result)).dump_python(result)
except PydanticSchemaGenerationError:
value = None
return cls(result=value, type=type_str)


class TaskError(BlueapiBaseModel):
"""Wrapper around an exception raised by a plan"""

outcome: Literal["error"] = "error"
"""Discriminant for serialization"""
type: str
"""The class of exception"""
message: str
"""The message of the raised exception"""

@classmethod
def from_exception(cls, err: Exception) -> Self:
return cls(type=type(err).__name__, message=str(err))


class StatusView(BlueapiBaseModel):
"""
A snapshot of a Status of an operation, optionally representing progress
Expand Down Expand Up @@ -107,6 +152,7 @@ class TaskStatus(BlueapiBaseModel):
"""

task_id: str
result: TaskResult | TaskError | None = Field(None, discriminator="outcome")
task_complete: bool
task_failed: bool

Expand Down
6 changes: 5 additions & 1 deletion src/blueapi/worker/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ def do_task(self, ctx: BlueskyContext) -> None:
func = ctx.plan_functions[self.name]
prepared_params = self.prepare_params(ctx)
ctx.run_engine.md.update(self.metadata)
ctx.run_engine(func(**prepared_params))
result = ctx.run_engine(func(**prepared_params))
if isinstance(result, tuple): # pragma: no cover
# this is never true if the run_engine is configured correctly
return None
return result.plan_result


def _lookup_params(ctx: BlueskyContext, task: Task) -> BaseModel:
Expand Down
17 changes: 16 additions & 1 deletion src/blueapi/worker/task_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
ProgressEvent,
RawRunEngineState,
StatusView,
TaskError,
TaskResult,
TaskStatus,
TaskStatusEnum,
WorkerEvent,
Expand Down Expand Up @@ -69,6 +71,13 @@ class TrackableTask(BlueapiBaseModel):
is_complete: bool = False
is_pending: bool = True
errors: list[str] = Field(default_factory=list)
outcome: TaskResult | TaskError | None = None

def set_result(self, result: Any):
self.outcome = TaskResult.from_result(result)

def set_exception(self, err: Exception):
self.outcome = TaskError.from_exception(err)


class TaskWorker:
Expand Down Expand Up @@ -427,7 +436,12 @@ def process_task():
LOGGER.info(f"Got new task: {next_task}")
self._current = next_task
self._current.is_pending = False
self._current.task.do_task(self._ctx)
try:
result = self._current.task.do_task(self._ctx)
self._current.set_result(result)
except Exception as e:
self._current.set_exception(e)
self._report_error(e)

with plan_tag_filter_context(next_task.task.name, LOGGER):
if self._current_task_otel_context is not None:
Expand Down Expand Up @@ -528,6 +542,7 @@ def _report_status(
task_id=self._current.task_id,
task_complete=self._current.is_complete,
task_failed=bool(self._current.errors),
result=self._current.outcome,
)
correlation_id = self._current.task_id
add_span_attributes(
Expand Down
Loading