From a4b0270d30991ca1d40de4307f6171e4e7ad0d67 Mon Sep 17 00:00:00 2001 From: Bedram Tamang Date: Thu, 28 May 2026 15:35:00 -0700 Subject: [PATCH] feat: add queue-app example Co-Authored-By: Claude Sonnet 4.6 --- example/queue-app/.gitignore | 2 + example/queue-app/artisan | 8 +++ example/queue-app/bootstrap/application.py | 18 ++++++ .../queue-app/commands/queue_work_command.py | 38 +++++++++++++ example/queue-app/config/fastapi.py | 18 ++++++ example/queue-app/config/logging.py | 11 ++++ .../queue-app/providers/console_provider.py | 14 +++++ .../queue-app/providers/fastapi_provider.py | 9 +++ example/queue-app/providers/queue_provider.py | 31 +++++++++++ example/queue-app/pyproject.toml | 14 +++++ example/queue-app/routes/api.py | 55 +++++++++++++++++++ example/queue-app/tasks/example_task.py | 18 ++++++ example/queue-app/worker.py | 14 +++++ 13 files changed, 250 insertions(+) create mode 100644 example/queue-app/.gitignore create mode 100644 example/queue-app/artisan create mode 100644 example/queue-app/bootstrap/application.py create mode 100644 example/queue-app/commands/queue_work_command.py create mode 100644 example/queue-app/config/fastapi.py create mode 100644 example/queue-app/config/logging.py create mode 100644 example/queue-app/providers/console_provider.py create mode 100644 example/queue-app/providers/fastapi_provider.py create mode 100644 example/queue-app/providers/queue_provider.py create mode 100644 example/queue-app/pyproject.toml create mode 100644 example/queue-app/routes/api.py create mode 100644 example/queue-app/tasks/example_task.py create mode 100644 example/queue-app/worker.py diff --git a/example/queue-app/.gitignore b/example/queue-app/.gitignore new file mode 100644 index 00000000..88ea2708 --- /dev/null +++ b/example/queue-app/.gitignore @@ -0,0 +1,2 @@ +.venv +storage diff --git a/example/queue-app/artisan b/example/queue-app/artisan new file mode 100644 index 00000000..95887b0b --- /dev/null +++ b/example/queue-app/artisan @@ -0,0 +1,8 @@ +#!/usr/bin/env python3 + +import sys +from bootstrap.application import app + +if __name__ == "__main__": + status = app.handle_command() + sys.exit(status if isinstance(status, int) else 0) diff --git a/example/queue-app/bootstrap/application.py b/example/queue-app/bootstrap/application.py new file mode 100644 index 00000000..cdac776c --- /dev/null +++ b/example/queue-app/bootstrap/application.py @@ -0,0 +1,18 @@ +from pathlib import Path + +from fastapi_startkit import Application +from fastapi_startkit.logging import LogProvider +from config.fastapi import FastAPIConfig +from providers.fastapi_provider import FastAPIProvider +from providers.queue_provider import QueueProvider +from providers.console_provider import ConsoleProvider + +app: Application = Application( + base_path=str(Path.cwd()), + providers=[ + LogProvider, + QueueProvider, + (FastAPIProvider, FastAPIConfig), + ConsoleProvider, + ], +) diff --git a/example/queue-app/commands/queue_work_command.py b/example/queue-app/commands/queue_work_command.py new file mode 100644 index 00000000..29e36c87 --- /dev/null +++ b/example/queue-app/commands/queue_work_command.py @@ -0,0 +1,38 @@ +from cleo.helpers import option +from fastapi_startkit.console.command import Command + + +class QueueWorkCommand(Command): + """ + Start the Taskiq queue worker. + + queue:work + """ + + name = "queue:work" + description = "Start the Taskiq queue worker." + + options = [ + option( + "workers", + "w", + "Number of worker processes to spawn.", + flag=False, + default=1, + ) + ] + + def handle(self): + from taskiq.cli.worker.args import WorkerArgs + from taskiq.cli.worker.run import run_worker + + workers = int(self.option("workers")) + + args = WorkerArgs( + broker="worker:broker", + modules=["tasks.example_task"], + workers=workers, + ) + + self.line(f"Starting queue worker with {workers} process(es)...") + run_worker(args) diff --git a/example/queue-app/config/fastapi.py b/example/queue-app/config/fastapi.py new file mode 100644 index 00000000..ea734791 --- /dev/null +++ b/example/queue-app/config/fastapi.py @@ -0,0 +1,18 @@ +import dataclasses + +from fastapi_startkit.environment import env + + +@dataclasses.dataclass +class FastAPIConfig: + host: str = dataclasses.field(default_factory=lambda: env("APP_HOST", "127.0.0.1")) + port: int = dataclasses.field(default_factory=lambda: env("APP_PORT", 8000)) + reload: bool = dataclasses.field(default_factory=lambda: env("APP_RELOAD", True)) + reload_dirs: list | None = None + reload_excludes: list = dataclasses.field( + default_factory=lambda: [ + "*.log", + "tests/*", + "node_modules/*", + ] + ) diff --git a/example/queue-app/config/logging.py b/example/queue-app/config/logging.py new file mode 100644 index 00000000..fe102240 --- /dev/null +++ b/example/queue-app/config/logging.py @@ -0,0 +1,11 @@ +import dataclasses + +from fastapi_startkit.environment import env + + +@dataclasses.dataclass +class LoggingConfig: + level: str = dataclasses.field(default_factory=lambda: env("LOG_LEVEL", "debug")) + channel: str = dataclasses.field( + default_factory=lambda: env("LOG_CHANNEL", "daily") + ) diff --git a/example/queue-app/providers/console_provider.py b/example/queue-app/providers/console_provider.py new file mode 100644 index 00000000..ae217277 --- /dev/null +++ b/example/queue-app/providers/console_provider.py @@ -0,0 +1,14 @@ +from fastapi_startkit.providers import Provider +from commands.queue_work_command import QueueWorkCommand + + +class ConsoleProvider(Provider): + def register(self) -> None: + pass + + def boot(self) -> None: + self.app.add_commands( + [ + QueueWorkCommand, + ] + ) diff --git a/example/queue-app/providers/fastapi_provider.py b/example/queue-app/providers/fastapi_provider.py new file mode 100644 index 00000000..06090c05 --- /dev/null +++ b/example/queue-app/providers/fastapi_provider.py @@ -0,0 +1,9 @@ +from fastapi_startkit.fastapi import FastAPIProvider as BaseFastAPIProvider +from routes.api import public + + +class FastAPIProvider(BaseFastAPIProvider): + def boot(self) -> None: + super().boot() + + self.app.include_router(public) diff --git a/example/queue-app/providers/queue_provider.py b/example/queue-app/providers/queue_provider.py new file mode 100644 index 00000000..0c89fb89 --- /dev/null +++ b/example/queue-app/providers/queue_provider.py @@ -0,0 +1,31 @@ +from fastapi_startkit.environment import env +from fastapi_startkit.providers import Provider +from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend + + +class QueueProvider(Provider): + def register(self) -> None: + redis_host = env("REDIS_HOST", "127.0.0.1") + redis_port = env("REDIS_PORT", "6379") + redis_url = f"redis://{redis_host}:{redis_port}" + + broker = ListQueueBroker(redis_url).with_result_backend( + RedisAsyncResultBackend(redis_url) + ) + + self.app.bind("broker", broker) + + def boot(self) -> None: + if not self.app.has("fastapi"): + return + + fastapi_app = self.app.make("fastapi") + broker = self.app.make("broker") + + @fastapi_app.on_event("startup") + async def startup_broker(): + await broker.startup() + + @fastapi_app.on_event("shutdown") + async def shutdown_broker(): + await broker.shutdown() diff --git a/example/queue-app/pyproject.toml b/example/queue-app/pyproject.toml new file mode 100644 index 00000000..76d6449b --- /dev/null +++ b/example/queue-app/pyproject.toml @@ -0,0 +1,14 @@ +[project] +name = "queue-app" +version = "0.1.0" +description = "Queue application example using fastapi-startkit and taskiq" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "fastapi-startkit[fastapi]", + "taskiq>=0.11.0", + "taskiq-redis>=1.0.0", +] + +[tool.uv.sources] +fastapi-startkit = { path = "../../fastapi_startkit", editable = true } diff --git a/example/queue-app/routes/api.py b/example/queue-app/routes/api.py new file mode 100644 index 00000000..4cb39562 --- /dev/null +++ b/example/queue-app/routes/api.py @@ -0,0 +1,55 @@ +from fastapi import APIRouter +from fastapi_startkit.logging import Logger +from tasks.example_task import send_welcome_email, process_data + +public = APIRouter() + + +@public.get("/") +async def index(): + Logger.info("Welcome to FastAPI StartKit Queue Example!") + return { + "message": "Welcome to FastAPI StartKit Queue Example!", + "version": "1.0.0", + "docs": "/docs", + } + + +@public.get("/health") +async def health(): + Logger.info("Health check passed") + return {"status": "healthy"} + + +@public.post("/jobs/email") +async def dispatch_email(email: str, name: str): + """Dispatch a welcome email job to the queue.""" + task = await send_welcome_email.kiq(email=email, name=name) + Logger.info(f"Dispatched send_welcome_email task: {task.task_id}") + return {"task_id": task.task_id, "status": "queued"} + + +@public.post("/jobs/process") +async def dispatch_process(payload: dict): + """Dispatch a data processing job to the queue.""" + task = await process_data.kiq(payload=payload) + Logger.info(f"Dispatched process_data task: {task.task_id}") + return {"task_id": task.task_id, "status": "queued"} + + +@public.get("/jobs/{task_id}/result") +async def get_result(task_id: str): + """Poll a task result by its task ID.""" + from bootstrap.application import app as startkit_app + + broker = startkit_app.make("broker") + result = await broker.result_backend.get_result(task_id) + + if result is None: + return {"task_id": task_id, "status": "pending"} + + return { + "task_id": task_id, + "status": "completed" if not result.is_err else "failed", + "result": result.return_value, + } diff --git a/example/queue-app/tasks/example_task.py b/example/queue-app/tasks/example_task.py new file mode 100644 index 00000000..70bc764b --- /dev/null +++ b/example/queue-app/tasks/example_task.py @@ -0,0 +1,18 @@ +from bootstrap.application import app + +broker = app.make("broker") + + +@broker.task +async def send_welcome_email(email: str, name: str) -> dict: + """Simulate sending a welcome email asynchronously.""" + print(f"Sending welcome email to {name} <{email}>") + return {"status": "sent", "email": email, "name": name} + + +@broker.task +async def process_data(payload: dict) -> dict: + """Simulate processing a data payload asynchronously.""" + print(f"Processing payload: {payload}") + result = {k: str(v).upper() for k, v in payload.items()} + return {"status": "processed", "result": result} diff --git a/example/queue-app/worker.py b/example/queue-app/worker.py new file mode 100644 index 00000000..84d724a7 --- /dev/null +++ b/example/queue-app/worker.py @@ -0,0 +1,14 @@ +""" +Worker entrypoint — exposes `broker` for the Taskiq CLI. + +Run with: + uv run python -m taskiq worker worker:broker tasks.example_task + +Or via artisan: + uv run artisan queue:work + uv run artisan queue:work --workers 4 +""" + +from bootstrap.application import app + +broker = app.make("broker")