Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/source/public_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,17 @@ This page summarises the parts of the LabThings API that should be most frequent
.. automethod:: labthings_fastapi.thing_server_interface.ThingServerInterface.get_thing_states
:no-index:

.. py:property:: global_lock
:type GlobalLock | None:

A global lock object that is used to restrict concurrent execution of actions and setting of properties.

.. py:method:: hold_global_lock(enabled: bool | None = True)

A context manager that holds the global lock. The `enabled` parameter sets
whether the lock is held. `False` ignores the lock, `None` uses the lock if
available, and `True` uses the lock or raises an error if it is missing.


.. py:class:: ThingConfig(/, **data: Any)

Expand Down
50 changes: 47 additions & 3 deletions src/labthings_fastapi/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import datetime
import logging
from collections import deque
from functools import partial
from functools import partial, wraps
import inspect
from threading import Thread, Lock
import uuid
Expand All @@ -29,6 +29,7 @@
Callable,
Concatenate,
Generic,
Literal,
Optional,
ParamSpec,
TypeVar,
Expand All @@ -39,6 +40,7 @@
from fastapi import APIRouter, FastAPI, HTTPException, Request, Body, BackgroundTasks
from pydantic import BaseModel, create_model


from .middleware.url_for import URLFor
from .base_descriptor import (
BaseDescriptor,
Expand Down Expand Up @@ -505,8 +507,8 @@
with self._invocations_lock:
try:
invocation: Any = self._invocations[id]
except KeyError as e:
raise HTTPException(

Check warning on line 511 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

510-511 lines are not covered with tests
status_code=404,
detail="No action invocation found with ID {id}",
) from e
Expand All @@ -519,7 +521,7 @@
invocation.output.response
):
# TODO: honour "accept" header
return invocation.output.response()

Check warning on line 524 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

524 line is not covered with tests
return invocation.output

@router.delete(
Expand All @@ -544,8 +546,8 @@
with self._invocations_lock:
try:
invocation: Any = self._invocations[id]
except KeyError as e:
raise HTTPException(

Check warning on line 550 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

549-550 lines are not covered with tests
status_code=404,
detail="No action invocation found with ID {id}",
) from e
Expand Down Expand Up @@ -665,6 +667,7 @@
func: Callable[Concatenate[OwnerT, ActionParams], ActionReturn],
response_timeout: float = 1,
retention_time: float = 300,
use_global_lock: Literal[False] | None = None,
) -> None:
"""Create a new action descriptor.

Expand All @@ -683,6 +686,17 @@
of the action.
:param retention_time: how long, in seconds, the action should be kept
for after it has completed.
:param use_global_lock: If the global lock is enabled in `lt.FEATURE_FLAGS`
this parameter may be used to opt out. When the global lock is enabled,
by default all actions acquire the global lock before starting, and
release it after they finish. That means only one action thread may
run at a time. The same lock is used to set properties.

If this parameter is `False` then the lock will not be acquired, even
if global locking is enabled. That is appropriate if the action does
not have side effects that would cause problems for other actions, or
if more nuanced locking behaviour is required meaning the lock is
acquired directly in the action code.
"""
super().__init__()
self.func = func
Expand All @@ -692,6 +706,7 @@
name = func.__name__ # this is checked in __set_name__
self.response_timeout = response_timeout
self.retention_time = retention_time
self.use_global_lock = use_global_lock
self.dependency_params = fastapi_dependency_params(func)
self.input_model = input_model_from_signature(
func,
Expand Down Expand Up @@ -720,24 +735,53 @@
"""
super().__set_name__(owner, name)
if self.name != self.func.__name__:
raise ValueError(

Check warning on line 738 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

738 line is not covered with tests
f"Action name '{self.name}' does not match function name "
f"'{self.func.__name__}'",
)

def wrapped_func(
self, obj: OwnerT
) -> Callable[Concatenate[OwnerT, ActionParams], ActionReturn]:
"""Wrap the action function if necessary, so that it holds the global lock.

If global locking is enabled and this action hasn't opted out, this function
will wrap `func` such that it holds the global lock while it is running.

:param obj: The object on which the method is being called.
:return: the function, wrapped if necessary.
"""
# hold_global_lock returns a context manager. It won't hold the lock
# until we enter the context in `wrapped` (defined below).
lock_context_manager = obj._thing_server_interface.hold_global_lock(
self.use_global_lock
)
func = self.func

@wraps(func)
def wrapped(*args: Any, **kwargs: Any) -> Any: # noqa: DOC
"""Acquire the lock then run `func` with supplied arguments."""
with lock_context_manager:
return func(*args, **kwargs)

return wrapped

def instance_get(self, obj: OwnerT) -> Callable[ActionParams, ActionReturn]:
"""Return the function, bound to an object as for a normal method.

This currently doesn't validate the arguments, though it may do so
in future. In its present form, this is equivalent to a regular
in future. If locking is disabled this is equivalent to a regular
Python method, i.e. all we do is supply the first argument, `self`.

If locking is enabled, we return a wrapped function that holds the
global lock while the action runs.

:param obj: the `~lt.Thing` to which we are attached. This will be
the first argument supplied to the function wrapped by this
descriptor.
:return: the action function, bound to ``obj``.
"""
return partial(self.func, obj)
return partial(self.wrapped_func(obj), obj)

def _observers_set(self, obj: Thing) -> WeakSet:
"""Return a set used to notify changes.
Expand Down Expand Up @@ -862,14 +906,14 @@
try:
responses[200]["model"] = self.output_model
pass
except AttributeError:
print(f"Failed to generate response model for action {self.name}")

Check warning on line 910 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

909-910 lines are not covered with tests
# Add an additional media type if we may return a file
if hasattr(self.output_model, "media_type"):
responses[200]["content"][self.output_model.media_type] = {}

Check warning on line 913 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

913 line is not covered with tests
# Now we can add the endpoint to the app.
if thing.path is None:
raise NotConnectedToServerError(

Check warning on line 916 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

916 line is not covered with tests
"Can't add the endpoint without thing.path!"
)
app.post(
Expand Down Expand Up @@ -917,7 +961,7 @@
"""
path = path or thing.path
if path is None:
raise NotConnectedToServerError("Can't generate forms without a path!")

Check warning on line 964 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

964 line is not covered with tests
forms = [
Form[ActionOp](href=path + self.name, op=[ActionOp.invokeaction]),
]
Expand Down
18 changes: 18 additions & 0 deletions src/labthings_fastapi/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,28 @@ class FeatureNotAvailableError(NotImplementedError):
"""


class FeatureNotEnabledError(RuntimeError):
"""A feature is being used that is currently disabled.

Some new or optional features are only available if the relevant feature flag
is set. See `lt.FEATURE_FLAGS` for a list of features that may be enabled.
This error is raised if a feature is used when it is not enabled.
"""


class PropertyRedefinitionError(AttributeError):
"""A property is being incorrectly redefined.

This method is raised if a property is at risk of being redefined. This usually
happens when a decorator is applied to a function with the same name as the
property. The solution is usually to rename the function.
"""


class GlobalLockBusyError(TimeoutError):
"""The global lock is already in use.

This exception is raised when code needs the global lock but cannot acquire
it. It indicates that the LabThings server is busy running another action or
property setter.
"""
79 changes: 79 additions & 0 deletions src/labthings_fastapi/global_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Global locking.

If the feature is enabled, a global lock is used to restrict running actions
and setting properties. This module defines a wrapper for `threading.RLock`
with a context manager that acquires the lock using a short timeout.
"""

from threading import RLock
from types import EllipsisType, TracebackType

from .exceptions import GlobalLockBusyError


class GlobalLock:
"""An RLock wrapper and work-a-like with a default timeout."""

def __init__(self) -> None:
"""Initialise the global lock."""
self._lock = RLock()

default_timeout: float = 0.05

def acquire(
self, blocking: bool = True, timeout: float | EllipsisType = ...
) -> bool:
"""Acquire the lock.

This wraps the underlying `threading.RLock.acquire` but will by default
block with a short timeout.

:param blocking: whether to wait for the lock to become free. `True` (the
default) will block until the lock is available or we time out. `False`
will always return immediately.
:param timeout: the length of time to wait for the lock, if ``blocking`` is
`True` - or `-1` to specify waiting forever.

:return: whether the lock was successfully acquired.
"""
if blocking is False:
return self._lock.acquire(blocking=False)
if timeout is ...:
timeout = self.default_timeout
return self._lock.acquire(blocking=blocking, timeout=timeout)

def release(self) -> None:
"""Release the lock.

This wraps `threading.RLock.release` without modification.
"""
self._lock.release()

def __enter__(self) -> None:
"""Allow the lock to be used as a context manager.

The behaviour when used as a context manager is different from a regular
`threading.RLock` because it will use the default timeout rather than
blocking forever.

:raises GlobalLockBusyError: if the lock is in use by another thread.
"""
result = self.acquire(blocking=True, timeout=self.default_timeout)
if not result:
raise GlobalLockBusyError("The global lock could not be acquired.")

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
"""Allow the lock to be used as a context manager.

The lock is released when the context ends. No error handling is done.

:param exc_type: the exception type, if one was raised (ignored).
:param exc_value: the exception, if one was raised (ignored).
:param traceback: the traceback, if an error was raised (ignored).
"""
self.release()
Loading
Loading