diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 9473fa5..4dd5a2d 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -26,18 +26,15 @@ "customizations": { "vscode": { "extensions": [ + "charliermarsh.ruff", "ms-azuretools.vscode-docker", - "ms-python.black-formatter", - "ms-python.flake8", - "ms-python.isort", - "ms-python.python", "ms-python.mypy-type-checker", "tamasfe.even-better-toml" ], "settings": { "editor.formatOnSave": true, "editor.rulers": [ - 120 + 130 ], "editor.tabSize": 4, "files.insertFinalNewline": true, @@ -46,7 +43,7 @@ "editor.codeActionsOnSave": { "source.organizeImports": "explicit" }, - "python.editor.defaultFormatter": "ms-python.black-formatter", + "python.editor.defaultFormatter": "charliermarsh.ruff", "python.testing.unittestEnabled": false, "python.testing.pytestArgs": [ "src/tests" diff --git a/README.md b/README.md index f9c8170..4461ff2 100755 --- a/README.md +++ b/README.md @@ -3,8 +3,8 @@ ![python version](https://img.shields.io/badge/python-3.13-brightgreen) ![fastAPI version](https://img.shields.io/badge/fastapi-0.95.2-brightgreen) - ## Components + - Rest API built with FastAPI and SQLAlchemy - PostgreSQL database @@ -40,7 +40,6 @@ variable loaded, otherwise the dev container might fail or not work as expected. PROJECT_NAME=your-awesome-project code ``` - ## Migrations We use Alembic as database migration tool. You can run migration commands directly inside the dev container or use the provided shortcut in the `exec.sh` script. @@ -48,14 +47,15 @@ We use Alembic as database migration tool. You can run migration commands direct - `migrate` – Runs all migrations. - `makemigrations` – Compares the current database state with the table metadata and generates the necessary migration files. - ## Code tools + Linters, formatters, etc. - **ruff**: Linter and formatter - **mypy**: Static type checker ### pre-commit + `pre-commit` is part of the `dev` group in the `pyproject.toml` and is installed by default. Setup the `pre-commit` hooks, specified in `.pre-commit-config.yaml`: @@ -77,9 +77,11 @@ There is a shortcut under the `/scripts` directory that runs all this tools for ![Screenshot](.docs/images/format.png) ## Tests + We use FastAPI's `TestClient` and `pytest` for testing. `./exec.sh test` shortcut can be used to run all tests or just `test` inside the dev container. ## Shell + You can start an interactive Python shell inside the dev container in two ways: 1. Simply run `shell` inside the container. @@ -94,6 +96,7 @@ The shell provides some useful pre-imported stuff: ![Screenshot](.docs/images/shell.png) ## Admin + The template includes an admin interface via [SQLAdmin](https://github.com/aminalaee/sqladmin). It's a flexible admin that can be configured in many ways. *One note: You should be careful when adding relationships to the list or detail pages (specially large many-to-many / one-to-many relationships), because it's not very optimal in terms of DB querys in those cases (all the related objects would be loaded in memory).* @@ -101,10 +104,59 @@ The template includes an admin interface via [SQLAdmin](https://github.com/amina ![Screenshot](.docs/images/admin.png) ## Celery + The template includes a simple example of distributed tasks using [Celery](https://docs.celeryq.dev/en/stable/). There's an example endpoint which sends a task to the queue and then the celery worker will execute it. You can monitor the worker with [Flower](https://flower.readthedocs.io/en/latest/), to do so first execute `poetry run celery -A src.task_queue.celery_worker flower --loglevel=info` and then go to `localhost:5555`. In case you want to implement some real-world task you should modify the `src/task_queue/task.py` with your logic and then modify `src/api/v1/routers/task.py`. Remember to always add all your tasks modules to the `src/task_queue/celery_worker.py` with `celery_worker.autodiscover_tasks(["path.to.your.task.module"])`. ## OpenTelemetry + A simple example of OpenTelemetry is included using the native FastAPI instrumentor to collect basic data of requests, also there is a custom instrument to collect data from the controllers. There is a simple implementation to monitor Celery to count the total tasks executed. Given that OpenTelemetry do not have a frontend, to see what is going on you should run `docker logs -f `. + +## Logging + +This project uses [structlog](https://www.structlog.org/en/stable/) for structured logging. + +Structured logs make it easier to parse, search, and analyze logs in production systems, especially when using centralized logging tools like Loki, ELK, or Datadog. + +### Examples + +1. Pass context information as key word arguments + +```python +import structlog + +log = structlog.get_logger() +# Pass extra information as key word arguments to log calls instead of wrapping them into the log message itself +log.info("user_logged_in", user_id="1234", method="password") +``` + +2. Bind additional context information to be used by any logger instance in scope + +```python +import structlog +from structlog.contextvars import bound_contextvars + +async def route_handler(user_id: str, session) -> User: + with bound_contextvars(user_id=user_id): + logger = structlog.get_logger(__name__) + logger.info("Handling request XYZ") # Will include user_id in the log entry + + user = await fetch_user(user_id, session) + + return user + + +async def fetch_user(user_id: str, session) -> User | None: + logger = structlog.get_logger(__name__) + log = logger.bind(class_name=User.__class__.name) + user = await User.objects(session).get(User.id == user_id) + + if user is None: + # This will include class_name and also user_id in the logs, as the logger is created + # within the previous method's context. + log.debug("Record not found") + + return user +``` diff --git a/poetry.lock b/poetry.lock index e2102e8..0459d9b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2372,6 +2372,24 @@ anyio = ">=3.6.2,<5" [package.extras] full = ["httpx (>=0.27.0,<0.29.0)", "itsdangerous", "jinja2", "python-multipart (>=0.0.18)", "pyyaml"] +[[package]] +name = "structlog" +version = "25.3.0" +description = "Structured Logging for Python" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "structlog-25.3.0-py3-none-any.whl", hash = "sha256:a341f5524004c158498c3127eecded091eb67d3a611e7a3093deca30db06e172"}, + {file = "structlog-25.3.0.tar.gz", hash = "sha256:8dab497e6f6ca962abad0c283c46744185e0c9ba900db52a423cb6db99f7abeb"}, +] + +[package.extras] +dev = ["freezegun (>=0.2.8)", "mypy (>=1.4)", "pretend", "pytest (>=6.0)", "pytest-asyncio (>=0.17)", "rich", "simplejson", "twisted"] +docs = ["cogapp", "furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphinxcontrib-mermaid", "sphinxext-opengraph", "twisted"] +tests = ["freezegun (>=0.2.8)", "pretend", "pytest (>=6.0)", "pytest-asyncio (>=0.17)", "simplejson"] +typing = ["mypy (>=1.4)", "rich", "twisted"] + [[package]] name = "tornado" version = "6.4.2" @@ -2696,4 +2714,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = ">=3.13.0,<4.0.0" -content-hash = "a48536026f55f4ea0c3bff006cefd23efa9159c25d797a47cfdcfb4be1d6b117" +content-hash = "4295a745b43c0f89ad0b7ff0c6676c6eae423c1abcaceabdbd5dca763c545d49" diff --git a/pyproject.toml b/pyproject.toml index 5cc88a8..6cfde0d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ python-jose = "^3.4.0" redis = "^5.2.1" sqladmin = "^0.20.1" sqlalchemy = "^2.0.39" +structlog = "^25.3.0" uvicorn = "^0.34.0" diff --git a/src/api/v1/routers/user.py b/src/api/v1/routers/user.py index 969ba58..d545e06 100755 --- a/src/api/v1/routers/user.py +++ b/src/api/v1/routers/user.py @@ -1,9 +1,11 @@ from typing import Any from uuid import UUID +import structlog from fastapi import APIRouter, Depends, Response from fastapi_pagination import Page from fastapi_pagination.ext.sqlalchemy import paginate +from structlog.contextvars import bound_contextvars from src import models from src.api.dependencies import db_session, get_user @@ -39,14 +41,22 @@ async def login( @router.get("/me", response_model=schemas.User) def me(user: models.User = Depends(get_user)) -> Any: + logger = structlog.get_logger(__name__) + logger.debug("Getting current user profile") return user @router.get("/{user_id}/items", response_model=Page[Item]) async def get_public_items(user_id: UUID, session: AsyncSession = Depends(db_session)) -> Any: - # We can't use the @instrument decorator here because it will collide with the - # FastAPIinstrumentor and cause the span to be created twice. - # So we need to create the span manually. - with tracer_provider.get_tracer(__name__).start_as_current_span("get_public_items"): - user = await models.User.objects(session).get_or_404(models.User.id == user_id) - return await paginate(session, user.get_public_items()) + # Adding user_id to the context information for loggers + with bound_contextvars(method_handler="get_public_items"): + logger = structlog.get_logger(__name__) + logger.debug("Getting user public items") + + # We can't use the @instrument decorator here because it will collide with the + # FastAPIinstrumentor and cause the span to be created twice. + # So we need to create the span manually. + with tracer_provider.get_tracer(__name__).start_as_current_span("get_public_items"): + user = await models.User.objects(session).get_or_404(models.User.id == user_id) + + return await paginate(session, user.get_public_items()) diff --git a/src/core/config.py b/src/core/config.py index d9332ec..abd9f9e 100755 --- a/src/core/config.py +++ b/src/core/config.py @@ -1,18 +1,52 @@ -from enum import Enum +import logging +import sys +from enum import IntEnum, StrEnum +from typing import Any, Literal -from pydantic import PostgresDsn -from pydantic_settings import BaseSettings +from pydantic import PostgresDsn, model_validator +from pydantic_settings import BaseSettings, SettingsConfigDict -class LogLevel(str, Enum): - critical = "CRITICAL" - error = "ERROR" - warning = "WARNING" - info = "INFO" - debug = "DEBUG" +class Env(StrEnum): + dev = "dev" + + +class LogLevel(IntEnum): + CRITICAL = logging.CRITICAL + ERROR = logging.ERROR + WARNING = logging.WARNING + INFO = logging.INFO + DEBUG = logging.DEBUG + + +class LogSettings(BaseSettings): + # Makes the settings immutable and hashable (can be used as dicts key) + model_config = SettingsConfigDict(frozen=True) + + log_level: LogLevel = LogLevel.INFO + structured_log: bool | Literal["auto"] = "auto" + cache_loggers: bool = True + + @property + def enable_structured_log(self) -> bool: + return not sys.stdout.isatty() if self.structured_log == "auto" else self.structured_log + + @model_validator(mode="before") + @classmethod + def parse_log_level(cls, data: Any) -> Any: + if isinstance(data.get("log_level"), str): + data["log_level"] = LogLevel[data["log_level"]] + + return data class Settings(BaseSettings): + # Makes the settings immutable and hashable (can be used as dicts key) + model_config = SettingsConfigDict(frozen=True) + + env: str = "dev" + project_name: str + # Database async_database_url: PostgresDsn database_pool_pre_ping: bool @@ -21,8 +55,8 @@ class Settings(BaseSettings): database_max_overflow: int # Logging - log_level: LogLevel = LogLevel.debug server_url: str + log_settings: LogSettings = LogSettings() # Auth access_token_expire_minutes: float @@ -36,7 +70,6 @@ class Settings(BaseSettings): # OpenTelemetry otel_exporter_otlp_endpoint: str - env: str = "dev" settings = Settings() diff --git a/src/core/database.py b/src/core/database.py index c2a0a45..3a635dd 100755 --- a/src/core/database.py +++ b/src/core/database.py @@ -1,8 +1,8 @@ -import logging import uuid from datetime import datetime, timezone from typing import Any, Dict, Generic, Sequence, Type, TypeVar +import structlog from fastapi import HTTPException from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor from sqlalchemy import ExceptionContext, func, select @@ -27,6 +27,8 @@ from src.helpers.casing import snakecase from src.helpers.sql import random_uuid, utcnow +logger = structlog.get_logger(__name__) + # Async engine and session engine: AsyncEngine = create_async_engine( url=str(settings.async_database_url), @@ -54,7 +56,7 @@ def _on_handle_error(context: ExceptionContext) -> None: Returns: None: this returns nothing. """ - logging.warning(f"handle_error event triggered for PostgreSQL engine: {context.sqlalchemy_exception}") + logger.warning(f"handle_error event triggered for PostgreSQL engine: {context.sqlalchemy_exception}") if "Can't connect to PostgreSQL server on" in str(context.sqlalchemy_exception): # Setting is_disconnect to True should tell SQLAlchemy treat this as a connection error and retry context.is_disconnect = True # type: ignore diff --git a/src/core/security.py b/src/core/security.py index d8f9341..780a53e 100755 --- a/src/core/security.py +++ b/src/core/security.py @@ -58,16 +58,19 @@ def process_login(cls, user: User, response: Response) -> Token | None: return None async def get_user_from_token(self, token: str, session: AsyncSession) -> User: - try: - payload = jwt.decode(token=token, key=settings.jwt_signing_key, algorithms=self.algorithm) - token_data = TokenPayload(**payload) - except (JWTError, ValidationError): - raise self.credentials_exception + token_data = await self._validate_token(token=token) user = await User.objects(session).get(User.id == token_data.user_id) if not user: raise self.credentials_exception return user + async def _validate_token(self, token: str) -> TokenPayload: + try: + payload = jwt.decode(token=token, key=settings.jwt_signing_key, algorithms=self.algorithm) + return TokenPayload(**payload) + except (JWTError, ValidationError): + raise self.credentials_exception + def _get_token_from_cookie(self, request: Request) -> str | None: token = request.cookies.get(self.cookie_name) return token @@ -79,16 +82,29 @@ def _get_token_from_header(self, request: Request) -> str | None: return None return token - def _get_token(self, request: Request) -> str: + def _get_token(self, request: Request) -> str | None: token = None if self.accept_header: token = self._get_token_from_header(request) if not token and self.accept_cookie: token = self._get_token_from_cookie(request) - if not token: - raise self.credentials_exception + return token async def __call__(self, request: Request, session: AsyncSession) -> User: token = self._get_token(request) + if not token: + raise self.credentials_exception + return await self.get_user_from_token(token, session) + + async def get_token_payload(self, request: Request) -> TokenPayload | None: + """ + Get the user token payload from the request auth token if present. Otherwise, return None. + This method validates the token if present and raise a 401 error if invalid. + """ + token = self._get_token(request) + if not token: + return None + + return await self._validate_token(token) diff --git a/src/core/trace.py b/src/core/trace.py index 1df2e28..f000125 100644 --- a/src/core/trace.py +++ b/src/core/trace.py @@ -10,7 +10,7 @@ from src.core.config import settings # Configure OpenTelemetry -resource = Resource(attributes={"service.name": settings.env}) +resource = Resource(attributes={"service.name": settings.project_name, "env": settings.env}) tracer_provider = TracerProvider(resource=resource) otlp_exporter = OTLPSpanExporter(endpoint=settings.otel_exporter_otlp_endpoint, insecure=True) otlp_metric_exporter = OTLPMetricExporter(endpoint=settings.otel_exporter_otlp_endpoint, insecure=True) @@ -38,6 +38,3 @@ def wrapper(*args: Any, **kwargs: dict[Any, Any]) -> Any: return wrapper return decorator - - -# diff --git a/src/logging.py b/src/logging.py index d7dcb35..5da3f24 100755 --- a/src/logging.py +++ b/src/logging.py @@ -1,30 +1,157 @@ -from pydantic import BaseModel +import logging.config +import os +import sys +import time +import typing +from functools import cache +from typing import TYPE_CHECKING, Any -from src.core.config import settings +import structlog +from structlog import PrintLogger +from src.core.config import LogLevel -class LogConfig(BaseModel): - LOGGER_NAME: str = "root" - LOG_FORMAT: str = "%(levelprefix)s | %(asctime)s | %(message)s" - LOG_LEVEL: str = settings.log_level.value +if typing.TYPE_CHECKING: + from src.core.config import LogSettings - # Logging config - version: int = 1 - disable_existing_loggers: bool = False - formatters: dict = { - "default": { - "()": "uvicorn.logging.DefaultFormatter", - "fmt": LOG_FORMAT, - "datefmt": "%Y-%m-%d %H:%M:%S", +if TYPE_CHECKING: + from structlog.types import EventDict + from structlog.typing import Processor + + +class Logger: + name: str + + _stderr_logger: PrintLogger + _stdout_logger: PrintLogger + + def __init__(self, name: str): + self.name = name + + self._stderr_logger = PrintLogger(sys.stderr) + self._stdout_logger = PrintLogger(sys.stdout) + + def _print_to_stderr(self, message: str) -> None: + self._stderr_logger.msg(message) + + def _print_to_stdout(self, message: str) -> None: + self._stdout_logger.msg(message) + + debug = _print_to_stdout + info = _print_to_stdout + msg = _print_to_stdout + warning = _print_to_stdout + error = _print_to_stderr + exception = _print_to_stderr + critical = _print_to_stderr + + +def logger_factory(name: str | None, *args: Any) -> Logger: + """Create a logger instance.""" + return Logger(name=name or "default") + + +class StreamHandler(logging.Handler): + _loggers: dict[str, Logger] + + def __init__(self) -> None: + self._loggers = {} + super().__init__(logging.DEBUG) + + def _logging_method(self, level: int) -> str | None: + if level >= logging.CRITICAL: + return "critical" + if level >= logging.ERROR: + return "error" + if level >= logging.WARNING: + return "warning" + if level >= logging.INFO: + return "info" + if level >= logging.DEBUG: + return "debug" + + return None + + def _logger(self, name: str) -> Logger: + if not self._loggers.get(name, None): + self._loggers[name] = structlog.get_logger(name) + + return self._loggers[name] + + def emit(self, record: logging.LogRecord) -> None: + logging_func = self._logging_method(record.levelno) + if not logging_func: + return + + logger = self._logger(record.name) + getattr(logger, logging_func)(record.getMessage()) + + +def default_logging_config(log_level: LogLevel = LogLevel.INFO) -> dict: + """Python logging configuration.""" + + return { + "version": 1, + "disable_existing_loggers": False, + "formatters": {}, + "handlers": { + "stream": { + "level": log_level.value, + "class": "src.logging.StreamHandler", + }, }, - } - handlers: dict = { - "default": { - "formatter": "default", - "class": "logging.StreamHandler", - "stream": "ext://sys.stderr", + "loggers": { + "": {"handlers": ["stream"], "level": log_level.value, "propagate": True}, }, } - loggers: dict = { - LOGGER_NAME: {"handlers": ["default"], "level": LOG_LEVEL}, - } + + +def _timestamp_processor(logger: Logger, msg: str, event_dict: "EventDict") -> "EventDict": + """Add timestamp to the event dictionary.""" + event_dict["timestamp"] = time.time_ns() + return event_dict + + +def _pid_processor(logger: Logger, msg: str, event_dict: "EventDict") -> "EventDict": + """Add process ID to the event dictionary.""" + event_dict["pid"] = os.getpid() + return event_dict + + +@cache +def configure_logging(config: "LogSettings") -> None: + """Configure logging for the application.""" + logging.config.dictConfig(default_logging_config(config.log_level)) + + processors: list["Processor"] = [ + structlog.contextvars.merge_contextvars, + structlog.stdlib.add_log_level, + structlog.stdlib.add_logger_name, + structlog.stdlib.PositionalArgumentsFormatter(), + ] + + if config.enable_structured_log: + processors.extend( + [ + _timestamp_processor, + _pid_processor, + structlog.processors.dict_tracebacks, + structlog.processors.ExceptionRenderer(), + structlog.processors.StackInfoRenderer(), + structlog.processors.JSONRenderer(), + ] + ) + else: + processors.extend( + [ + structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M.%S"), + structlog.dev.ConsoleRenderer(), + ] + ) + + structlog.configure( + processors=processors, + logger_factory=logger_factory, + wrapper_class=structlog.stdlib.BoundLogger, + cache_logger_on_first_use=config.cache_loggers, + ) diff --git a/src/main.py b/src/main.py index d45b8fe..e658511 100755 --- a/src/main.py +++ b/src/main.py @@ -1,5 +1,3 @@ -from logging.config import dictConfig - from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi_pagination import add_pagination @@ -10,16 +8,19 @@ from src.core.config import settings from src.core.database import engine from src.core.trace import tracer_provider -from src.logging import LogConfig +from src.logging import configure_logging, default_logging_config +from src.middlewares import LoggingMiddleware from src.urls import router -dictConfig(LogConfig().model_dump()) - +configure_logging(settings.log_settings) app = FastAPI() app.include_router(router) +# Add middleware to the app +app.add_middleware(LoggingMiddleware) + # Instrument FastAPI with OpenTelemetry FastAPIInstrumentor.instrument_app(app, tracer_provider=tracer_provider) @@ -48,4 +49,10 @@ if __name__ == "__main__": import uvicorn - uvicorn.run("src.main:app", host="0.0.0.0", port=8000, reload=True) + uvicorn.run( + "src.main:app", + host="0.0.0.0", + port=8000, + reload=True, + log_config=default_logging_config(settings.log_settings.log_level), + ) diff --git a/src/middlewares.py b/src/middlewares.py new file mode 100644 index 0000000..ac4b340 --- /dev/null +++ b/src/middlewares.py @@ -0,0 +1,49 @@ +from typing import Awaitable, Callable +from uuid import uuid4 + +import structlog +from fastapi import HTTPException, Request, Response +from starlette.middleware.base import BaseHTTPMiddleware +from structlog import contextvars +from structlog.contextvars import bound_contextvars + +from src.core.security import AuthManager + +logger = structlog.get_logger(__name__) + + +class LoggingMiddleware(BaseHTTPMiddleware): + async def dispatch( + self, + request: Request, + call_next: Callable[[Request], Awaitable[Response]], + ) -> Response: + """ + Middleware to add logging context to each request. + """ + context = { + "request_id": str(uuid4()), + # You may need to sanitize or truncate user agent as it could be fingerprinted to help uniquely identify a + # user — not PII by itself, but becomes sensitive when paired with a user ID. + "user_agent": request.headers.get("user-agent"), + # Adding path without query parameters to the context + "path": request.url.path.split("?")[0], + "method": request.method, + } + + try: + token_payload = await AuthManager().get_token_payload(request) + if token_payload: + context["user_id"] = str(token_payload.user_id) + except HTTPException as e: + if e.status_code == 401: + # If the token is invalid, we can just ignore it and continue + logger.bind(**context).debug("Invalid token") + else: + raise e + + contextvars.clear_contextvars() + with bound_contextvars(**context): + response = await call_next(request) + + return response diff --git a/src/tests/conftest.py b/src/tests/conftest.py index 5fb4ae8..e82e959 100644 --- a/src/tests/conftest.py +++ b/src/tests/conftest.py @@ -1,3 +1,8 @@ +import os + +os.environ["ACCESS_TOKEN_EXPIRE_MINUTES"] = "0.02" + + import asyncio from typing import AsyncGenerator, Generator diff --git a/src/tests/test_user.py b/src/tests/test_user.py index 91a0259..ee6499f 100755 --- a/src/tests/test_user.py +++ b/src/tests/test_user.py @@ -129,7 +129,6 @@ async def test_me_bad_cookie(self, reset_database: AsyncGenerator, async_client: response = await async_client.get(self.ME_URL) assert response.status_code == 401 - @patch.object(settings, "access_token_expire_minutes", 0.02) async def test_expired_token(self, reset_database: AsyncGenerator, async_client: AsyncClient) -> None: await async_client.post(self.SIGNUP_URL, json=self.TEST_PAYLOAD) time.sleep(3)