Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"postgres",
"rabbitmq",
"redis",
"celery_worker"
"celery_worker",
"otel-collector"
],
"workspaceFolder": "/opt/app/${localEnv:PROJECT_NAME:python-template}",
"containerEnv": {
Expand Down Expand Up @@ -72,7 +73,10 @@
"redis:6379",
// RabbitMQ:
// localhost:5672 for accessing rabbitmq via local rabbitmq-cli
"rabbitmq:5672"
"rabbitmq:5672",
// OpenTelemetry Collector:
// localhost:4317 for accessing opentelemetry collector via local otel-cli
"otel-collector:4317"
],
"portsAttributes": {
"5432": {
Expand All @@ -90,6 +94,10 @@
"5672": {
"label": "RabbitMQ",
"onAutoForward": "silent"
},
"4317": {
"label": "OpenTelemetry Collector",
"onAutoForward": "silent"
}
}
}
5 changes: 5 additions & 0 deletions .devcontainer/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ services:
- rabbitmq
- redis

otel-collector:
image: otel/opentelemetry-collector:latest
ports:
- "4317:4317"

volumes:
postgres: {}
cache: {}
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ DATABASE_POOL_SIZE=5
DATABASE_POOL_RECYCLE=3600
DATABASE_MAX_OVERFLOW=10

# OpenTelemetry configuration
OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
ENV="dev"

# Server configuration
LOG_LEVEL=DEBUG
SERVER_URL=example.com
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
run: |
pip install poetry
poetry install --no-root --with dev

- name: Install pre-commit
run: pip install pre-commit
- name: Run linters and formatters
Expand Down Expand Up @@ -64,6 +64,7 @@ jobs:
JWT_SIGNING_KEY: your-signing-key
CELERY_BROKER_URL: amqp://guest:guest@rabbitmq:5672
CELERY_RESULT_BACKEND: redis://redis:6379/0
OTEL_EXPORTER_OTLP_ENDPOINT: http://otel-collector:4317
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,6 @@ The template includes a simple example of distributed tasks using [Celery](https

In case you want to implement some real-world task you should modify the `src/task_queue/task.py` with your logic and then modify `src/api/v1/routers/task.py`.
Remember to always add all your tasks modules to the `src/task_queue/celery_worker.py` with `celery_worker.autodiscover_tasks(["path.to.your.task.module"])`.

## OpenTelemetry
A simple example of OpenTelemetry is included using the native FastAPI instrumentor to collect basic data of requests, also there is a custom instrument to collect data from the controllers. There is a simple implementation to monitor Celery to count the total tasks executed. Given that OpenTelemetry do not have a frontend, to see what is going on you should run `docker logs -f <otel-collector-container-id>`.
690 changes: 667 additions & 23 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ fastapi = "^0.115.11"
fastapi-pagination = "^0.12.26"
httpx = "^0.28.1"
itsdangerous = "^2.2.0"
opentelemetry-api = "^1.31.1"
opentelemetry-sdk = "^1.31.1"
opentelemetry-instrumentation-fastapi = "^0.52b1"
opentelemetry-instrumentation-sqlalchemy = "^0.52b1"
opentelemetry-exporter-otlp = "^1.31.1"
passlib = "^1.7.4"
psycopg2 = "^2.9.9"
ptpython = "^3.0.29"
Expand All @@ -35,6 +40,7 @@ sqladmin = "^0.20.1"
sqlalchemy = "^2.0.39"
uvicorn = "^0.34.0"


[tool.poetry.group.dev.dependencies]
coverage = "^7.7.1"
flower = "^2.0.1"
Expand Down
7 changes: 6 additions & 1 deletion src/api/v1/routers/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from src.controllers import UserController
from src.core.database import AsyncSession
from src.core.security import AuthManager
from src.core.trace import tracer_provider

router = APIRouter()

Expand Down Expand Up @@ -43,5 +44,9 @@ def me(user: models.User = Depends(get_user)) -> Any:

@router.get("/{user_id}/items", response_model=Page[Item])
async def get_public_items(user_id: UUID, session: AsyncSession = Depends(db_session)) -> Any:
user = await models.User.objects(session).get_or_404(models.User.id == user_id)
# We can't use the @instrument decorator here because it will collide with the
# FastAPIinstrumentor and cause the span to be created twice.
# So we need to create the span manually.
with tracer_provider.get_tracer(__name__).start_as_current_span("get_public_items"):
user = await models.User.objects(session).get_or_404(models.User.id == user_id)
return await paginate(session, user.get_public_items())
3 changes: 3 additions & 0 deletions src/controllers/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@
from src import models
from src.api.v1 import schemas
from src.core.database import AsyncSession
from src.core.trace import instrument


class ItemController:
@staticmethod
@instrument(name="calling_item_controller_create")
async def create(item_data: schemas.ItemCreate, owner_id: UUID, session: AsyncSession) -> models.Item:
item_data = schemas.Item(owner_id=owner_id, **item_data.model_dump())
item = await models.Item.objects(session).create(item_data.model_dump())
await session.refresh(item)
return item

@staticmethod
@instrument(name="calling_item_controller_bulk_create")
async def bulk_create(
items_data: Sequence[schemas.ItemCreate],
owner_id: UUID,
Expand Down
3 changes: 3 additions & 0 deletions src/controllers/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from src.api.v1.schemas import UserCreate
from src.core.database import AsyncSession
from src.core.security import PasswordManager
from src.core.trace import instrument
from src.models import User


class UserController:
@staticmethod
@instrument(name="calling_user_controller_create")
async def create(user_data: UserCreate, session: AsyncSession) -> User:
user = await User.objects(session).get(User.email == user_data.email)
if user:
Expand All @@ -19,6 +21,7 @@ async def create(user_data: UserCreate, session: AsyncSession) -> User:
return user

@staticmethod
@instrument(name="calling_user_controller_login")
async def login(user_data: UserCreate, session: AsyncSession) -> User:
login_exception = HTTPException(status_code=401, detail="Invalid email or password")
user = await User.objects(session).get(User.email == user_data.email)
Expand Down
4 changes: 4 additions & 0 deletions src/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,9 @@ class Settings(BaseSettings):
celery_broker_url: str
celery_result_backend: str

# OpenTelemetry
otel_exporter_otlp_endpoint: str
env: str = "dev"


settings = Settings()
4 changes: 4 additions & 0 deletions src/core/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, Dict, Generic, Sequence, Type, TypeVar

from fastapi import HTTPException
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from sqlalchemy import ExceptionContext, func, select
from sqlalchemy.event import listens_for
from sqlalchemy.ext.asyncio import (
Expand Down Expand Up @@ -35,6 +36,9 @@
max_overflow=settings.database_max_overflow,
)

# Instrument SQLAlchemy with OpenTelemetry
SQLAlchemyInstrumentor().instrument()


@listens_for(engine.sync_engine, "handle_error")
def _on_handle_error(context: ExceptionContext) -> None:
Expand Down
43 changes: 43 additions & 0 deletions src/core/trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import functools
from typing import Any, Callable

from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from src.core.config import settings

# Configure OpenTelemetry
resource = Resource(attributes={"service.name": settings.env})
tracer_provider = TracerProvider(resource=resource)
otlp_exporter = OTLPSpanExporter(endpoint=settings.otel_exporter_otlp_endpoint, insecure=True)
otlp_metric_exporter = OTLPMetricExporter(endpoint=settings.otel_exporter_otlp_endpoint, insecure=True)
span_processor = BatchSpanProcessor(otlp_exporter)
tracer_provider.add_span_processor(span_processor)


def instrument(name: str = "request") -> Callable:
"""
Decorator to instrument a method with OpenTelemetry tracing.
Args:
name (str): The name of the span.
Returns:
Callable: The decorated method.
"""

def decorator(method: Callable) -> Callable:
@functools.wraps(method)
def wrapper(*args: Any, **kwargs: dict[Any, Any]) -> Any:
tracer = tracer_provider.get_tracer(__name__)
with tracer.start_as_current_span(name=name) as span: # noqa: F841
response = method(*args, **kwargs)
return response

return wrapper

return decorator


#
7 changes: 7 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,26 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi_pagination import add_pagination
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from sqladmin import Admin

from src.admin import AdminAuth, ItemAdmin, UserAdmin
from src.core.config import settings
from src.core.database import engine
from src.core.trace import tracer_provider
from src.logging import LogConfig
from src.urls import router

dictConfig(LogConfig().model_dump())


app = FastAPI()

app.include_router(router)

# Instrument FastAPI with OpenTelemetry
FastAPIInstrumentor.instrument_app(app, tracer_provider=tracer_provider)

origins = [
settings.server_url,
]
Expand All @@ -31,6 +37,7 @@

add_pagination(app)


authentication_backend = AdminAuth(secret_key="")
admin = Admin(app=app, engine=engine, authentication_backend=authentication_backend)

Expand Down
25 changes: 25 additions & 0 deletions src/task_queue/celery_worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
from typing import Any

from celery import Celery
from celery.signals import task_postrun
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader

from src.core.config import settings
from src.core.trace import otlp_metric_exporter

# Create the celery worker
celery_worker = Celery(
Expand All @@ -12,3 +19,21 @@
# The autodiscover_tasks method will search for tasks in the specified modules
# In real-world schenarios, you should add more modules to this list
celery_worker.autodiscover_tasks(["src.task_queue.task"])

# OpenTelemetry setup
meter_provider = MeterProvider(metric_readers=[PeriodicExportingMetricReader(otlp_metric_exporter)])
metrics.set_meter_provider(meter_provider)
meter = metrics.get_meter("celery_worker")

# Define a custom metric for task execution count
task_execution_counter = meter.create_counter(
name="celery_task_executions",
description="Counts the number of tasks executed by the Celery worker",
unit="1",
)


# Signal handler to increment the counter after task execution
@task_postrun.connect
def task_postrun_handler(task: Any = None, **extras: Any) -> None:
task_execution_counter.add(1, {"task_name": task.name})