diff --git a/README.md b/README.md index 30a668c..c5dac6c 100644 --- a/README.md +++ b/README.md @@ -60,9 +60,17 @@ Flagsmith uses Prometheus to track performance metrics. The following default metrics are exposed: +##### Common metrics + - `flagsmith_build_info`: Has the labels `version` and `ci_commit_sha`. - `http_server_request_duration_seconds`: Histogram labeled with `method`, `route`, and `response_status`. - `http_server_requests_total`: Counter labeled with `method`, `route`, and `response_status`. +- `task_processor_enqueued_tasks_total`: Counter labeled with `task_identifier`. + +##### Task Processor metrics + +- `task_processor_finished_tasks_total`: Counter labeled with `task_identifier` and `result` (`"success"`, `"failure"`). +- `task_processor_task_duration_seconds`: Histogram labeled with `task_identifier` and `result` (`"success"`, `"failure"`). ##### Guidelines diff --git a/poetry.lock b/poetry.lock index 9bcaa48..4dae194 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1571,4 +1571,4 @@ test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess [metadata] lock-version = "2.1" python-versions = ">=3.11,<4.0" -content-hash = "cbbf795f761c8166b098abbd6d3ef7ace6c1315fac0598a60253e9358b49332a" +content-hash = "947b4e34fce30980f39cfc2ca1090bcbb4a89a5725647c9ce83df15719d2119d" diff --git a/pyproject.toml b/pyproject.toml index 55c5512..88e6f8e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,8 +15,8 @@ dependencies = [ "flagsmith-flag-engine", "gunicorn (>=19.1)", "prometheus-client (>=0.0.16)", - "simplejson (>=3,<4)", "psycopg2-binary (>=2.9,<3)", + "simplejson (>=3,<4)", ] authors = [ { name = "Matthew Elwell" }, diff --git a/settings/dev.py b/settings/dev.py index cdd1655..efde469 100644 --- a/settings/dev.py +++ b/settings/dev.py @@ -46,6 +46,7 @@ TASK_DELETE_RETENTION_DAYS = 15 TASK_DELETE_RUN_EVERY = timedelta(days=1) TASK_DELETE_RUN_TIME = time(5, 0, 0) +TASK_PROCESSOR_MODE = False TASK_RUN_METHOD = TaskRunMethod.TASK_PROCESSOR # Avoid models.W042 warnings diff --git a/src/common/prometheus/utils.py b/src/common/prometheus/utils.py index 2258561..c1793c5 100644 --- a/src/common/prometheus/utils.py +++ b/src/common/prometheus/utils.py @@ -1,12 +1,10 @@ -import typing +import importlib import prometheus_client from django.conf import settings from prometheus_client.metrics import MetricWrapperBase from prometheus_client.multiprocess import MultiProcessCollector -T = typing.TypeVar("T", bound=MetricWrapperBase) - class Histogram(prometheus_client.Histogram): DEFAULT_BUCKETS = settings.PROMETHEUS_HISTOGRAM_BUCKETS @@ -16,3 +14,25 @@ def get_registry() -> prometheus_client.CollectorRegistry: registry = prometheus_client.CollectorRegistry() MultiProcessCollector(registry) # type: ignore[no-untyped-call] return registry + + +def reload_metrics(*metric_module_names: str) -> None: + """ + Clear the registry of all collectors from the given modules + and reload the modules to register the collectors again. + + Used in tests to reset the state of the metrics module + when needed. + """ + + registry = prometheus_client.REGISTRY + + for module_name in metric_module_names: + metrics_module = importlib.import_module(module_name) + + for module_attr in vars(metrics_module).values(): + if isinstance(module_attr, MetricWrapperBase): + # Unregister the collector from the registry + registry.unregister(module_attr) + + importlib.reload(metrics_module) diff --git a/src/task_processor/decorators.py b/src/task_processor/decorators.py index 8b74078..40c2b5f 100644 --- a/src/task_processor/decorators.py +++ b/src/task_processor/decorators.py @@ -1,5 +1,4 @@ import logging -import os import typing from datetime import datetime, time, timedelta from threading import Thread @@ -8,7 +7,7 @@ from django.db.transaction import on_commit from django.utils import timezone -from task_processor import task_registry +from task_processor import metrics, task_registry from task_processor.exceptions import InvalidArgumentsError, TaskQueueFullError from task_processor.models import RecurringTask, Task, TaskPriority from task_processor.task_run_method import TaskRunMethod @@ -69,7 +68,8 @@ def delay( args: tuple[typing.Any, ...] = (), kwargs: dict[str, typing.Any] | None = None, ) -> Task | None: - logger.debug("Request to run task '%s' asynchronously.", self.task_identifier) + task_identifier = self.task_identifier + logger.debug("Request to run task '%s' asynchronously.", task_identifier) kwargs = kwargs or {} @@ -84,13 +84,16 @@ def delay( _validate_inputs(*args, **kwargs) self.unwrapped(*args, **kwargs) elif settings.TASK_RUN_METHOD == TaskRunMethod.SEPARATE_THREAD: - logger.debug("Running task '%s' in separate thread", self.task_identifier) + logger.debug("Running task '%s' in separate thread", task_identifier) self.run_in_thread(args=args, kwargs=kwargs) else: - logger.debug("Creating task for function '%s'...", self.task_identifier) + logger.debug("Creating task for function '%s'...", task_identifier) + metrics.task_processor_enqueued_tasks_total.labels( + task_identifier=task_identifier + ).inc() try: task = Task.create( - task_identifier=self.task_identifier, + task_identifier=task_identifier, scheduled_for=delay_until or timezone.now(), priority=self.priority, queue_size=self.queue_size, @@ -174,7 +177,7 @@ def register_recurring_task( first_run_time: time | None = None, timeout: timedelta | None = timedelta(minutes=30), ) -> typing.Callable[[TaskCallable[TaskParameters]], TaskCallable[TaskParameters]]: - if not os.environ.get("RUN_BY_PROCESSOR"): + if not settings.TASK_PROCESSOR_MODE: # Do not register recurring tasks if not invoked by task processor return lambda f: f diff --git a/src/task_processor/metrics.py b/src/task_processor/metrics.py new file mode 100644 index 0000000..35e7662 --- /dev/null +++ b/src/task_processor/metrics.py @@ -0,0 +1,22 @@ +import prometheus_client +from django.conf import settings + +from common.prometheus import Histogram + +task_processor_enqueued_tasks_total = prometheus_client.Counter( + "task_processor_enqueued_tasks_total", + "Total number of enqueued tasks", + ["task_identifier"], +) + +if settings.TASK_PROCESSOR_MODE: + task_processor_finished_tasks_total = prometheus_client.Counter( + "task_processor_finished_tasks_total", + "Total number of finished tasks", + ["task_identifier", "result"], + ) + task_processor_task_duration_seconds = Histogram( + "task_processor_task_duration_seconds", + "Task processor task duration in seconds", + ["task_identifier", "result"], + ) diff --git a/src/task_processor/processor.py b/src/task_processor/processor.py index 78aa820..9e9545d 100644 --- a/src/task_processor/processor.py +++ b/src/task_processor/processor.py @@ -2,10 +2,13 @@ import traceback import typing from concurrent.futures import ThreadPoolExecutor +from contextlib import ExitStack from datetime import timedelta +from django.conf import settings from django.utils import timezone +from task_processor import metrics from task_processor.models import ( AbstractBaseTask, RecurringTask, @@ -101,10 +104,21 @@ def run_recurring_tasks() -> list[RecurringTaskRun]: def _run_task( task: T, ) -> typing.Tuple[T, AnyTaskRun]: + assert settings.TASK_PROCESSOR_MODE, ( + "Attempt to run tasks in a non-task-processor environment" + ) + + ctx = ExitStack() + timer = metrics.task_processor_task_duration_seconds.time() + ctx.enter_context(timer) + + task_identifier = task.task_identifier + logger.debug( - f"Running task {task.task_identifier} id={task.pk} args={task.args} kwargs={task.kwargs}" + f"Running task {task_identifier} id={task.pk} args={task.args} kwargs={task.kwargs}" ) task_run: AnyTaskRun = task.task_runs.model(started_at=timezone.now(), task=task) # type: ignore[attr-defined] + result: str try: with ThreadPoolExecutor(max_workers=1) as executor: @@ -112,27 +126,41 @@ def _run_task( timeout = task.timeout.total_seconds() if task.timeout else None future.result(timeout=timeout) # Wait for completion or timeout - task_run.result = TaskResult.SUCCESS.value + task_run.result = result = TaskResult.SUCCESS.value task_run.finished_at = timezone.now() task.mark_success() - logger.debug(f"Task {task.task_identifier} id={task.pk} completed") + + logger.debug(f"Task {task_identifier} id={task.pk} completed") except Exception as e: # For errors that don't include a default message (e.g., TimeoutError), # fall back to using repr. err_msg = str(e) or repr(e) + task.mark_failure() + + task_run.result = result = TaskResult.FAILURE.value + task_run.error_details = str(traceback.format_exc()) + logger.error( "Failed to execute task '%s', with id %d. Exception: %s", - task.task_identifier, + task_identifier, task.pk, err_msg, exc_info=True, ) - task.mark_failure() + result_label_value = result.lower() - task_run.result = TaskResult.FAILURE.value - task_run.error_details = str(traceback.format_exc()) + timer.labels( + task_identifier=task_identifier, + result=result_label_value, + ) # type: ignore[no-untyped-call] + ctx.close() + + metrics.task_processor_finished_tasks_total.labels( + task_identifier=task_identifier, + result=result_label_value, + ).inc() return task, task_run diff --git a/tests/unit/task_processor/conftest.py b/tests/unit/task_processor/conftest.py index 23769b2..d601660 100644 --- a/tests/unit/task_processor/conftest.py +++ b/tests/unit/task_processor/conftest.py @@ -1,40 +1,25 @@ -import logging import typing import pytest +from pytest_django.fixtures import SettingsWrapper +from common.prometheus.utils import reload_metrics from task_processor.task_registry import RegisteredTask -@pytest.fixture -def run_by_processor(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setenv("RUN_BY_PROCESSOR", "True") +@pytest.fixture() +def task_processor_mode(settings: SettingsWrapper) -> None: + settings.TASK_PROCESSOR_MODE = True + # The setting is supposed to be set before the metrics module is imported, + # so reload it + reload_metrics("task_processor.metrics") -class GetTaskProcessorCaplog(typing.Protocol): - def __call__( - self, log_level: str | int = logging.INFO - ) -> pytest.LogCaptureFixture: ... - - -@pytest.fixture -def get_task_processor_caplog( - caplog: pytest.LogCaptureFixture, -) -> GetTaskProcessorCaplog: - # caplog doesn't allow you to capture logging outputs from loggers that don't - # propagate to root. Quick hack here to get the task_processor logger to - # propagate. - # TODO: look into using loguru. - - def _inner(log_level: str | int = logging.INFO) -> pytest.LogCaptureFixture: - task_processor_logger = logging.getLogger("task_processor") - task_processor_logger.propagate = True - # Assume required level for the logger. - task_processor_logger.setLevel(log_level) - caplog.set_level(log_level) - return caplog - - return _inner +@pytest.fixture(autouse=True) +def task_processor_mode_marked(request: pytest.FixtureRequest) -> None: + for marker in request.node.iter_markers(): + if marker.name == "task_processor_mode": + request.getfixturevalue("task_processor_mode") @pytest.fixture(autouse=True) diff --git a/tests/unit/task_processor/test_unit_task_processor_decorators.py b/tests/unit/task_processor/test_unit_task_processor_decorators.py index 3609515..4f0c443 100644 --- a/tests/unit/task_processor/test_unit_task_processor_decorators.py +++ b/tests/unit/task_processor/test_unit_task_processor_decorators.py @@ -1,4 +1,5 @@ import json +import logging import typing from datetime import timedelta from unittest.mock import MagicMock @@ -8,6 +9,7 @@ from pytest_django.fixtures import SettingsWrapper from pytest_mock import MockerFixture +from common.test_tools import AssertMetricFixture from task_processor.decorators import ( register_recurring_task, register_task_handler, @@ -17,10 +19,6 @@ from task_processor.task_registry import get_task, initialise from task_processor.task_run_method import TaskRunMethod -if typing.TYPE_CHECKING: - # This import breaks private-package-test workflow in core - from tests.unit.task_processor.conftest import GetTaskProcessorCaplog - @pytest.fixture def mock_thread_class( @@ -35,12 +33,12 @@ def mock_thread_class( @pytest.mark.django_db def test_register_task_handler_run_in_thread__transaction_commit__true__default( - get_task_processor_caplog: "GetTaskProcessorCaplog", + caplog: pytest.LogCaptureFixture, mock_thread_class: MagicMock, django_capture_on_commit_callbacks: DjangoCaptureOnCommitCallbacks, ) -> None: # Given - caplog = get_task_processor_caplog() + caplog.set_level(logging.DEBUG) @register_task_handler() def my_function(*args: str, **kwargs: str) -> None: @@ -68,11 +66,11 @@ def my_function(*args: str, **kwargs: str) -> None: def test_register_task_handler_run_in_thread__transaction_commit__false( - get_task_processor_caplog: "GetTaskProcessorCaplog", + caplog: pytest.LogCaptureFixture, mock_thread_class: MagicMock, ) -> None: # Given - caplog = get_task_processor_caplog() + caplog.set_level(logging.DEBUG) @register_task_handler(transaction_on_commit=False) def my_function(*args: typing.Any, **kwargs: typing.Any) -> None: @@ -98,10 +96,10 @@ def my_function(*args: typing.Any, **kwargs: typing.Any) -> None: ) +@pytest.mark.django_db +@pytest.mark.task_processor_mode def test_register_recurring_task( mocker: MockerFixture, - db: None, - run_by_processor: None, ) -> None: # Given mock = mocker.Mock() @@ -127,10 +125,8 @@ def test_register_recurring_task( assert task.callable is mock -def test_register_recurring_task_does_nothing_if_not_run_by_processor( - mocker: MockerFixture, - db: None, -) -> None: +@pytest.mark.django_db +def test_register_recurring_task_does_nothing_if_not_run_by_processor() -> None: # Given task_kwargs = {"first_arg": "foo", "second_arg": "bar"} @@ -186,9 +182,8 @@ class NonSerializableObj: my_function.delay(args=(NonSerializableObj(),)) -def test_delay_returns_none_if_task_queue_is_full( - settings: SettingsWrapper, db: None -) -> None: +@pytest.mark.django_db +def test_delay_returns_none_if_task_queue_is_full(settings: SettingsWrapper) -> None: # Given settings.TASK_RUN_METHOD = TaskRunMethod.TASK_PROCESSOR @@ -208,7 +203,31 @@ def my_function(*args: typing.Any, **kwargs: typing.Any) -> None: assert task is None -def test_can_create_task_with_priority(settings: SettingsWrapper, db: None) -> None: +@pytest.mark.django_db +def test_delay__expected_metrics( + settings: SettingsWrapper, + assert_metric: AssertMetricFixture, +) -> None: + # Given + settings.TASK_RUN_METHOD = TaskRunMethod.TASK_PROCESSOR + + @register_task_handler(queue_size=1) + def my_function(*args: typing.Any, **kwargs: typing.Any) -> None: + pass + + # When + my_function.delay() + + # Then + assert_metric( + name="task_processor_enqueued_tasks_total", + value=1.0, + labels={"task_identifier": "test_unit_task_processor_decorators.my_function"}, + ) + + +@pytest.mark.django_db +def test_can_create_task_with_priority(settings: SettingsWrapper) -> None: # Given settings.TASK_RUN_METHOD = TaskRunMethod.TASK_PROCESSOR diff --git a/tests/unit/task_processor/test_unit_task_processor_health.py b/tests/unit/task_processor/test_unit_task_processor_health.py index 21a76bf..86de292 100644 --- a/tests/unit/task_processor/test_unit_task_processor_health.py +++ b/tests/unit/task_processor/test_unit_task_processor_health.py @@ -1,3 +1,4 @@ +import pytest from pytest_django.fixtures import SettingsWrapper from pytest_mock import MockerFixture @@ -25,8 +26,8 @@ def test_is_processor_healthy_returns_false_if_task_not_processed( assert result is False +@pytest.mark.django_db def test_is_processor_healthy_returns_true_if_task_processed( - db: None, settings: SettingsWrapper, ) -> None: # Given diff --git a/tests/unit/task_processor/test_unit_task_processor_monitoring.py b/tests/unit/task_processor/test_unit_task_processor_monitoring.py index ffe27f7..eae67a2 100644 --- a/tests/unit/task_processor/test_unit_task_processor_monitoring.py +++ b/tests/unit/task_processor/test_unit_task_processor_monitoring.py @@ -1,12 +1,14 @@ from datetime import timedelta +import pytest from django.utils import timezone from task_processor.models import Task from task_processor.monitoring import get_num_waiting_tasks -def test_get_num_waiting_tasks(db: None) -> None: +@pytest.mark.django_db +def test_get_num_waiting_tasks() -> None: # Given now = timezone.now() diff --git a/tests/unit/task_processor/test_unit_task_processor_processor.py b/tests/unit/task_processor/test_unit_task_processor_processor.py index 4edbce5..492f782 100644 --- a/tests/unit/task_processor/test_unit_task_processor_processor.py +++ b/tests/unit/task_processor/test_unit_task_processor_processor.py @@ -9,9 +9,9 @@ from django.core.cache import cache from django.utils import timezone from freezegun import freeze_time -from pytest import MonkeyPatch from pytest_mock import MockerFixture +from common.test_tools import AssertMetricFixture from task_processor.decorators import ( TaskHandler, register_recurring_task, @@ -32,11 +32,6 @@ ) from task_processor.task_registry import initialise, registered_tasks -if typing.TYPE_CHECKING: - # This import breaks private-package-test workflow in core - from tests.unit.task_processor.conftest import GetTaskProcessorCaplog - - DEFAULT_CACHE_KEY = "foo" DEFAULT_CACHE_VALUE = "bar" @@ -78,6 +73,7 @@ def _sleep_task(seconds: int) -> None: return _sleep_task +@pytest.mark.task_processor_mode def test_run_task_runs_task_and_creates_task_run_object_when_success( dummy_task: TaskHandler[[str, str]], ) -> None: @@ -105,12 +101,12 @@ def test_run_task_runs_task_and_creates_task_run_object_when_success( assert task.completed +@pytest.mark.task_processor_mode def test_run_task_kills_task_after_timeout( sleep_task: TaskHandler[[int]], - get_task_processor_caplog: "GetTaskProcessorCaplog", + caplog: pytest.LogCaptureFixture, ) -> None: # Given - caplog = get_task_processor_caplog(logging.ERROR) task = Task.create( sleep_task.task_identifier, scheduled_for=timezone.now(), @@ -143,15 +139,12 @@ def test_run_task_kills_task_after_timeout( ) +@pytest.mark.django_db +@pytest.mark.task_processor_mode def test_run_recurring_task_kills_task_after_timeout( - db: None, - monkeypatch: MonkeyPatch, - get_task_processor_caplog: "GetTaskProcessorCaplog", + caplog: pytest.LogCaptureFixture, ) -> None: # Given - caplog = get_task_processor_caplog(logging.ERROR) - monkeypatch.setenv("RUN_BY_PROCESSOR", "True") - @register_recurring_task( run_every=timedelta(seconds=1), timeout=timedelta(microseconds=1) ) @@ -186,13 +179,12 @@ def _dummy_recurring_task() -> None: ) -def test_run_recurring_tasks_runs_task_and_creates_recurring_task_run_object_when_success( - db: None, - monkeypatch: MonkeyPatch, -) -> None: +@pytest.mark.django_db +@pytest.mark.task_processor_mode +def test_run_recurring_tasks_runs_task_and_creates_recurring_task_run_object_when_success() -> ( + None +): # Given - monkeypatch.setenv("RUN_BY_PROCESSOR", "True") - @register_recurring_task(run_every=timedelta(seconds=1)) def _dummy_recurring_task() -> None: cache.set(DEFAULT_CACHE_KEY, DEFAULT_CACHE_VALUE) @@ -216,13 +208,10 @@ def _dummy_recurring_task() -> None: assert task_run.error_details is None -def test_run_recurring_tasks_runs_locked_task_after_tiemout( - db: None, - monkeypatch: MonkeyPatch, -) -> None: +@pytest.mark.django_db +@pytest.mark.task_processor_mode +def test_run_recurring_tasks_runs_locked_task_after_tiemout() -> None: # Given - monkeypatch.setenv("RUN_BY_PROCESSOR", "True") - @register_recurring_task(run_every=timedelta(hours=1)) def _dummy_recurring_task() -> None: cache.set(DEFAULT_CACHE_KEY, DEFAULT_CACHE_VALUE) @@ -257,10 +246,8 @@ def _dummy_recurring_task() -> None: @pytest.mark.django_db(transaction=True) -def test_run_recurring_tasks_multiple_runs( - db: None, - run_by_processor: None, -) -> None: +@pytest.mark.task_processor_mode +def test_run_recurring_tasks_multiple_runs() -> None: # Given @register_recurring_task(run_every=timedelta(milliseconds=200)) def _dummy_recurring_task() -> None: @@ -304,10 +291,8 @@ def _dummy_recurring_task() -> None: @pytest.mark.django_db(transaction=True) -def test_run_recurring_tasks_loops_over_all_tasks( - db: None, - run_by_processor: None, -) -> None: +@pytest.mark.task_processor_mode +def test_run_recurring_tasks_loops_over_all_tasks() -> None: # Given, Three recurring tasks @register_recurring_task(run_every=timedelta(milliseconds=200)) def _dummy_recurring_task_1() -> None: @@ -336,10 +321,11 @@ def _dummy_recurring_task_3() -> None: assert RecurringTaskRun.objects.filter(task=task).count() == 1 -def test_run_recurring_tasks_only_executes_tasks_after_interval_set_by_run_every( - db: None, - run_by_processor: None, -) -> None: +@pytest.mark.django_db +@pytest.mark.task_processor_mode +def test_run_recurring_tasks_only_executes_tasks_after_interval_set_by_run_every() -> ( + None +): # Given @register_recurring_task(run_every=timedelta(milliseconds=200)) def _dummy_recurring_task() -> None: @@ -363,15 +349,10 @@ def _dummy_recurring_task() -> None: assert RecurringTaskRun.objects.filter(task=task).count() == 1 -def test_run_recurring_tasks_does_nothing_if_unregistered_task_is_new( - db: None, - run_by_processor: None, - caplog: pytest.LogCaptureFixture, -) -> None: +@pytest.mark.django_db +@pytest.mark.task_processor_mode +def test_run_recurring_tasks_does_nothing_if_unregistered_task_is_new() -> None: # Given - task_processor_logger = logging.getLogger("task_processor") - task_processor_logger.propagate = True - task_identifier = "test_unit_task_processor_processor._a_task" @register_recurring_task(run_every=timedelta(milliseconds=100)) @@ -393,11 +374,9 @@ def _a_task() -> None: assert RecurringTask.objects.filter(task_identifier=task_identifier).exists() -def test_run_recurring_tasks_deletes_the_task_if_unregistered_task_is_old( - db: None, - run_by_processor: None, - mocker: MockerFixture, -) -> None: +@pytest.mark.django_db +@pytest.mark.task_processor_mode +def test_run_recurring_tasks_deletes_the_task_if_unregistered_task_is_old() -> None: # Given task_processor_logger = logging.getLogger("task_processor") task_processor_logger.propagate = True @@ -425,13 +404,13 @@ def _a_task() -> None: ) +@pytest.mark.task_processor_mode def test_run_task_runs_task_and_creates_task_run_object_when_failure( raise_exception_task: TaskHandler[[str]], - get_task_processor_caplog: "GetTaskProcessorCaplog", + caplog: pytest.LogCaptureFixture, ) -> None: # Given - caplog = get_task_processor_caplog(logging.DEBUG) - + caplog.set_level(logging.DEBUG) msg = "Error!" task = Task.create( raise_exception_task.task_identifier, args=(msg,), scheduled_for=timezone.now() @@ -470,6 +449,7 @@ def test_run_task_runs_task_and_creates_task_run_object_when_failure( ] +@pytest.mark.task_processor_mode def test_run_task_runs_failed_task_again( raise_exception_task: TaskHandler[[str]], ) -> None: @@ -501,10 +481,11 @@ def test_run_task_runs_failed_task_again( assert task.is_locked is False -def test_run_recurring_task_runs_task_and_creates_recurring_task_run_object_when_failure( - db: None, - run_by_processor: None, -) -> None: +@pytest.mark.django_db +@pytest.mark.task_processor_mode +def test_run_recurring_task_runs_task_and_creates_recurring_task_run_object_when_failure() -> ( + None +): # Given task_identifier = "test_unit_task_processor_processor._raise_exception" @@ -528,7 +509,8 @@ def _raise_exception(organisation_name: str) -> None: assert task_run.error_details is not None -def test_run_task_does_nothing_if_no_tasks(db: None) -> None: +@pytest.mark.django_db +def test_run_task_does_nothing_if_no_tasks() -> None: # Given - no tasks # When result = run_tasks() @@ -538,6 +520,7 @@ def test_run_task_does_nothing_if_no_tasks(db: None) -> None: @pytest.mark.django_db(transaction=True) +@pytest.mark.task_processor_mode def test_run_task_runs_tasks_in_correct_priority( dummy_task: TaskHandler[[str, str]], ) -> None: @@ -578,7 +561,83 @@ def test_run_task_runs_tasks_in_correct_priority( assert task_runs_3[0].task == task_2 +def test_run_tasks__fails_if_not_in_task_processor_mode( + dummy_task: TaskHandler[[str, str]], +) -> None: + # Given + Task.create( + dummy_task.task_identifier, + scheduled_for=timezone.now(), + args=("arg1", "arg2"), + ).save() + + # When + with pytest.raises(AssertionError): + run_tasks() + + @pytest.mark.django_db(transaction=True) +@pytest.mark.task_processor_mode +def test_run_tasks__expected_metrics( + dummy_task: TaskHandler[[str, str]], + raise_exception_task: TaskHandler[[str]], + assert_metric: AssertMetricFixture, + mocker: MockerFixture, +) -> None: + # Given + dummy_task_identifier = dummy_task.task_identifier + raise_exception_task_identifier = raise_exception_task.task_identifier + Task.create( + dummy_task_identifier, + scheduled_for=timezone.now(), + args=("arg1", "arg2"), + ).save() + Task.create( + raise_exception_task_identifier, + scheduled_for=timezone.now(), + args=("arg1",), + ).save() + + # When + run_tasks(2) + + # Then + assert_metric( + name="task_processor_finished_tasks_total", + value=1.0, + labels={ + "task_identifier": dummy_task_identifier, + "result": "success", + }, + ) + assert_metric( + name="task_processor_finished_tasks_total", + value=1.0, + labels={ + "task_identifier": raise_exception_task_identifier, + "result": "failure", + }, + ) + assert_metric( + name="task_processor_task_duration_seconds", + value=mocker.ANY, + labels={ + "task_identifier": dummy_task_identifier, + "result": "success", + }, + ) + assert_metric( + name="task_processor_task_duration_seconds", + value=mocker.ANY, + labels={ + "task_identifier": raise_exception_task_identifier, + "result": "failure", + }, + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.task_processor_mode def test_run_tasks_skips_locked_tasks( dummy_task: TaskHandler[[str, str]], sleep_task: TaskHandler[[int]], @@ -620,6 +679,7 @@ def test_run_tasks_skips_locked_tasks( task_runner_thread.join() +@pytest.mark.task_processor_mode def test_run_more_than_one_task(dummy_task: TaskHandler[[str, str]]) -> None: # Given num_tasks = 5 @@ -653,10 +713,9 @@ def test_run_more_than_one_task(dummy_task: TaskHandler[[str, str]]) -> None: assert task.completed -def test_recurring_tasks_are_unlocked_if_picked_up_but_not_executed( - db: None, - run_by_processor: None, -) -> None: +@pytest.mark.django_db +@pytest.mark.task_processor_mode +def test_recurring_tasks_are_unlocked_if_picked_up_but_not_executed() -> None: # Given @register_recurring_task(run_every=timedelta(days=1)) def my_task() -> None: diff --git a/tests/unit/task_processor/test_unit_task_processor_tasks.py b/tests/unit/task_processor/test_unit_task_processor_tasks.py index cfd8e19..99df3e6 100644 --- a/tests/unit/task_processor/test_unit_task_processor_tasks.py +++ b/tests/unit/task_processor/test_unit_task_processor_tasks.py @@ -1,5 +1,6 @@ from datetime import timedelta +import pytest from django.utils import timezone from pytest_django.fixtures import DjangoAssertNumQueries, SettingsWrapper @@ -16,7 +17,8 @@ sixty_days_ago = now - timedelta(days=60) -def test_clean_up_old_tasks_does_nothing_when_no_tasks(db: None) -> None: +@pytest.mark.django_db +def test_clean_up_old_tasks_does_nothing_when_no_tasks() -> None: # Given assert Task.objects.count() == 0 @@ -38,10 +40,10 @@ def test_clean_up_old_recurring_task_runs_does_nothing_when_no_runs(db: None) -> assert RecurringTaskRun.objects.count() == 0 +@pytest.mark.django_db def test_clean_up_old_tasks( settings: SettingsWrapper, django_assert_num_queries: DjangoAssertNumQueries, - db: None, ) -> None: # Given settings.TASK_DELETE_RETENTION_DAYS = 2 @@ -89,10 +91,10 @@ def test_clean_up_old_tasks( ] +@pytest.mark.django_db def test_clean_up_old_recurring_task_runs( settings: SettingsWrapper, django_assert_num_queries: DjangoAssertNumQueries, - db: None, ) -> None: # Given settings.RECURRING_TASK_RUN_RETENTION_DAYS = 2 @@ -125,11 +127,8 @@ def test_clean_up_old_recurring_task_runs( assert list(RecurringTaskRun.objects.all()) == [task_in_retention_period] -def test_clean_up_old_tasks_include_failed_tasks( - settings: SettingsWrapper, - django_assert_num_queries: DjangoAssertNumQueries, - db: None, -) -> None: +@pytest.mark.django_db +def test_clean_up_old_tasks_include_failed_tasks(settings: SettingsWrapper) -> None: # Given settings.TASK_DELETE_RETENTION_DAYS = 2 settings.TASK_DELETE_INCLUDE_FAILED_TASKS = True @@ -146,10 +145,10 @@ def test_clean_up_old_tasks_include_failed_tasks( assert not Task.objects.exists() +@pytest.mark.django_db def test_clean_up_old_tasks_does_not_run_if_disabled( settings: SettingsWrapper, django_assert_num_queries: DjangoAssertNumQueries, - db: None, ) -> None: # Given settings.ENABLE_CLEAN_UP_OLD_TASKS = False @@ -166,10 +165,10 @@ def test_clean_up_old_tasks_does_not_run_if_disabled( assert Task.objects.filter(id=task.id).exists() +@pytest.mark.django_db def test_clean_up_old_recurring_task_runs_does_not_run_if_disabled( settings: SettingsWrapper, django_assert_num_queries: DjangoAssertNumQueries, - db: None, ) -> None: # Given settings.RECURRING_TASK_RUN_RETENTION_DAYS = 2 diff --git a/tests/unit/task_processor/test_unit_task_processor_threads.py b/tests/unit/task_processor/test_unit_task_processor_threads.py index 3647d50..a2f5812 100644 --- a/tests/unit/task_processor/test_unit_task_processor_threads.py +++ b/tests/unit/task_processor/test_unit_task_processor_threads.py @@ -1,5 +1,4 @@ import logging -import typing from typing import Type import pytest @@ -8,25 +7,19 @@ from task_processor.threads import TaskRunner -if typing.TYPE_CHECKING: - # This import breaks private-package-test workflow in core - from tests.unit.task_processor.conftest import GetTaskProcessorCaplog - @pytest.mark.parametrize( "exception_class, exception_message", [(DatabaseError, "Database error"), (Exception, "Generic error")], ) +@pytest.mark.django_db def test_task_runner_is_resilient_to_errors( - db: None, mocker: MockerFixture, - get_task_processor_caplog: "GetTaskProcessorCaplog", + caplog: pytest.LogCaptureFixture, exception_class: Type[Exception], exception_message: str, ) -> None: # Given - caplog = get_task_processor_caplog(logging.DEBUG) - task_runner = TaskRunner() mocker.patch( "task_processor.threads.run_tasks",