Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,17 @@ Flagsmith uses Prometheus to track performance metrics.

The following default metrics are exposed:

##### Common metrics

- `flagsmith_build_info`: Has the labels `version` and `ci_commit_sha`.
- `http_server_request_duration_seconds`: Histogram labeled with `method`, `route`, and `response_status`.
- `http_server_requests_total`: Counter labeled with `method`, `route`, and `response_status`.
- `task_processor_enqueued_tasks_total`: Counter labeled with `task_identifier`.

##### Task Processor metrics

- `task_processor_finished_tasks_total`: Counter labeled with `task_identifier` and `result` (`"success"`, `"failure"`).
- `task_processor_task_duration_seconds`: Histogram labeled with `task_identifier` and `result` (`"success"`, `"failure"`).

##### Guidelines

Expand Down
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ dependencies = [
"flagsmith-flag-engine",
"gunicorn (>=19.1)",
"prometheus-client (>=0.0.16)",
"simplejson (>=3,<4)",
"psycopg2-binary (>=2.9,<3)",
"simplejson (>=3,<4)",
]
authors = [
{ name = "Matthew Elwell" },
Expand Down
1 change: 1 addition & 0 deletions settings/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
TASK_DELETE_RETENTION_DAYS = 15
TASK_DELETE_RUN_EVERY = timedelta(days=1)
TASK_DELETE_RUN_TIME = time(5, 0, 0)
TASK_PROCESSOR_MODE = False
TASK_RUN_METHOD = TaskRunMethod.TASK_PROCESSOR

# Avoid models.W042 warnings
Expand Down
26 changes: 23 additions & 3 deletions src/common/prometheus/utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import typing
import importlib

import prometheus_client
from django.conf import settings
from prometheus_client.metrics import MetricWrapperBase
from prometheus_client.multiprocess import MultiProcessCollector

T = typing.TypeVar("T", bound=MetricWrapperBase)


class Histogram(prometheus_client.Histogram):
DEFAULT_BUCKETS = settings.PROMETHEUS_HISTOGRAM_BUCKETS
Expand All @@ -16,3 +14,25 @@ def get_registry() -> prometheus_client.CollectorRegistry:
registry = prometheus_client.CollectorRegistry()
MultiProcessCollector(registry) # type: ignore[no-untyped-call]
return registry


def reload_metrics(*metric_module_names: str) -> None:
"""
Clear the registry of all collectors from the given modules
and reload the modules to register the collectors again.

Used in tests to reset the state of the metrics module
when needed.
"""

registry = prometheus_client.REGISTRY

for module_name in metric_module_names:
metrics_module = importlib.import_module(module_name)

for module_attr in vars(metrics_module).values():
if isinstance(module_attr, MetricWrapperBase):
# Unregister the collector from the registry
registry.unregister(module_attr)

importlib.reload(metrics_module)
17 changes: 10 additions & 7 deletions src/task_processor/decorators.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import os
import typing
from datetime import datetime, time, timedelta
from threading import Thread
Expand All @@ -8,7 +7,7 @@
from django.db.transaction import on_commit
from django.utils import timezone

from task_processor import task_registry
from task_processor import metrics, task_registry
from task_processor.exceptions import InvalidArgumentsError, TaskQueueFullError
from task_processor.models import RecurringTask, Task, TaskPriority
from task_processor.task_run_method import TaskRunMethod
Expand Down Expand Up @@ -69,7 +68,8 @@ def delay(
args: tuple[typing.Any, ...] = (),
kwargs: dict[str, typing.Any] | None = None,
) -> Task | None:
logger.debug("Request to run task '%s' asynchronously.", self.task_identifier)
task_identifier = self.task_identifier
logger.debug("Request to run task '%s' asynchronously.", task_identifier)

kwargs = kwargs or {}

Expand All @@ -84,13 +84,16 @@ def delay(
_validate_inputs(*args, **kwargs)
self.unwrapped(*args, **kwargs)
elif settings.TASK_RUN_METHOD == TaskRunMethod.SEPARATE_THREAD:
logger.debug("Running task '%s' in separate thread", self.task_identifier)
logger.debug("Running task '%s' in separate thread", task_identifier)
self.run_in_thread(args=args, kwargs=kwargs)
else:
logger.debug("Creating task for function '%s'...", self.task_identifier)
logger.debug("Creating task for function '%s'...", task_identifier)
metrics.task_processor_enqueued_tasks_total.labels(
task_identifier=task_identifier
).inc()
try:
task = Task.create(
task_identifier=self.task_identifier,
task_identifier=task_identifier,
scheduled_for=delay_until or timezone.now(),
priority=self.priority,
queue_size=self.queue_size,
Expand Down Expand Up @@ -174,7 +177,7 @@ def register_recurring_task(
first_run_time: time | None = None,
timeout: timedelta | None = timedelta(minutes=30),
) -> typing.Callable[[TaskCallable[TaskParameters]], TaskCallable[TaskParameters]]:
if not os.environ.get("RUN_BY_PROCESSOR"):
if not settings.TASK_PROCESSOR_MODE:
# Do not register recurring tasks if not invoked by task processor
return lambda f: f

Expand Down
22 changes: 22 additions & 0 deletions src/task_processor/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import prometheus_client
from django.conf import settings

from common.prometheus import Histogram

task_processor_enqueued_tasks_total = prometheus_client.Counter(
"task_processor_enqueued_tasks_total",
"Total number of enqueued tasks",
["task_identifier"],
)

if settings.TASK_PROCESSOR_MODE:
task_processor_finished_tasks_total = prometheus_client.Counter(
"task_processor_finished_tasks_total",
"Total number of finished tasks",
["task_identifier", "result"],
)
task_processor_task_duration_seconds = Histogram(
"task_processor_task_duration_seconds",
"Task processor task duration in seconds",
["task_identifier", "result"],
)
42 changes: 35 additions & 7 deletions src/task_processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
import traceback
import typing
from concurrent.futures import ThreadPoolExecutor
from contextlib import ExitStack
from datetime import timedelta

from django.conf import settings
from django.utils import timezone

from task_processor import metrics
from task_processor.models import (
AbstractBaseTask,
RecurringTask,
Expand Down Expand Up @@ -101,38 +104,63 @@ def run_recurring_tasks() -> list[RecurringTaskRun]:
def _run_task(
task: T,
) -> typing.Tuple[T, AnyTaskRun]:
assert settings.TASK_PROCESSOR_MODE, (
"Attempt to run tasks in a non-task-processor environment"
)

ctx = ExitStack()
timer = metrics.task_processor_task_duration_seconds.time()
ctx.enter_context(timer)

task_identifier = task.task_identifier

logger.debug(
f"Running task {task.task_identifier} id={task.pk} args={task.args} kwargs={task.kwargs}"
f"Running task {task_identifier} id={task.pk} args={task.args} kwargs={task.kwargs}"
)
task_run: AnyTaskRun = task.task_runs.model(started_at=timezone.now(), task=task) # type: ignore[attr-defined]
result: str

try:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(task.run)
timeout = task.timeout.total_seconds() if task.timeout else None
future.result(timeout=timeout) # Wait for completion or timeout

task_run.result = TaskResult.SUCCESS.value
task_run.result = result = TaskResult.SUCCESS.value
task_run.finished_at = timezone.now()
task.mark_success()
logger.debug(f"Task {task.task_identifier} id={task.pk} completed")

logger.debug(f"Task {task_identifier} id={task.pk} completed")

except Exception as e:
# For errors that don't include a default message (e.g., TimeoutError),
# fall back to using repr.
err_msg = str(e) or repr(e)

task.mark_failure()

task_run.result = result = TaskResult.FAILURE.value
task_run.error_details = str(traceback.format_exc())

logger.error(
"Failed to execute task '%s', with id %d. Exception: %s",
task.task_identifier,
task_identifier,
task.pk,
err_msg,
exc_info=True,
)

task.mark_failure()
result_label_value = result.lower()

task_run.result = TaskResult.FAILURE.value
task_run.error_details = str(traceback.format_exc())
timer.labels(
task_identifier=task_identifier,
result=result_label_value,
) # type: ignore[no-untyped-call]
ctx.close()

metrics.task_processor_finished_tasks_total.labels(
task_identifier=task_identifier,
result=result_label_value,
).inc()

return task, task_run
41 changes: 13 additions & 28 deletions tests/unit/task_processor/conftest.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,25 @@
import logging
import typing

import pytest
from pytest_django.fixtures import SettingsWrapper

from common.prometheus.utils import reload_metrics
from task_processor.task_registry import RegisteredTask


@pytest.fixture
def run_by_processor(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("RUN_BY_PROCESSOR", "True")
@pytest.fixture()
def task_processor_mode(settings: SettingsWrapper) -> None:
settings.TASK_PROCESSOR_MODE = True
# The setting is supposed to be set before the metrics module is imported,
# so reload it
reload_metrics("task_processor.metrics")


class GetTaskProcessorCaplog(typing.Protocol):
def __call__(
self, log_level: str | int = logging.INFO
) -> pytest.LogCaptureFixture: ...


@pytest.fixture
def get_task_processor_caplog(
caplog: pytest.LogCaptureFixture,
) -> GetTaskProcessorCaplog:
# caplog doesn't allow you to capture logging outputs from loggers that don't
# propagate to root. Quick hack here to get the task_processor logger to
# propagate.
# TODO: look into using loguru.

def _inner(log_level: str | int = logging.INFO) -> pytest.LogCaptureFixture:
task_processor_logger = logging.getLogger("task_processor")
task_processor_logger.propagate = True
# Assume required level for the logger.
task_processor_logger.setLevel(log_level)
caplog.set_level(log_level)
return caplog

return _inner
@pytest.fixture(autouse=True)
def task_processor_mode_marked(request: pytest.FixtureRequest) -> None:
for marker in request.node.iter_markers():
if marker.name == "task_processor_mode":
request.getfixturevalue("task_processor_mode")


@pytest.fixture(autouse=True)
Expand Down
Loading