Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions example/queue-app/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.venv
storage
8 changes: 8 additions & 0 deletions example/queue-app/artisan
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env python3

import sys
from bootstrap.application import app

if __name__ == "__main__":
status = app.handle_command()
sys.exit(status if isinstance(status, int) else 0)
18 changes: 18 additions & 0 deletions example/queue-app/bootstrap/application.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from pathlib import Path

from fastapi_startkit import Application
from fastapi_startkit.logging import LogProvider
from config.fastapi import FastAPIConfig
from providers.fastapi_provider import FastAPIProvider
from providers.queue_provider import QueueProvider
from providers.console_provider import ConsoleProvider

app: Application = Application(
base_path=str(Path.cwd()),
providers=[
LogProvider,
QueueProvider,
(FastAPIProvider, FastAPIConfig),
ConsoleProvider,
],
)
38 changes: 38 additions & 0 deletions example/queue-app/commands/queue_work_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from cleo.helpers import option
from fastapi_startkit.console.command import Command


class QueueWorkCommand(Command):
"""
Start the Taskiq queue worker.

queue:work
"""

name = "queue:work"
description = "Start the Taskiq queue worker."

options = [
option(
"workers",
"w",
"Number of worker processes to spawn.",
flag=False,
default=1,
)
]

def handle(self):
from taskiq.cli.worker.args import WorkerArgs
from taskiq.cli.worker.run import run_worker

workers = int(self.option("workers"))

args = WorkerArgs(
broker="worker:broker",
modules=["tasks.example_task"],
workers=workers,
)

self.line(f"Starting queue worker with {workers} process(es)...")
run_worker(args)
18 changes: 18 additions & 0 deletions example/queue-app/config/fastapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import dataclasses

from fastapi_startkit.environment import env


@dataclasses.dataclass
class FastAPIConfig:
host: str = dataclasses.field(default_factory=lambda: env("APP_HOST", "127.0.0.1"))
port: int = dataclasses.field(default_factory=lambda: env("APP_PORT", 8000))
reload: bool = dataclasses.field(default_factory=lambda: env("APP_RELOAD", True))
reload_dirs: list | None = None
reload_excludes: list = dataclasses.field(
default_factory=lambda: [
"*.log",
"tests/*",
"node_modules/*",
]
)
11 changes: 11 additions & 0 deletions example/queue-app/config/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import dataclasses

from fastapi_startkit.environment import env


@dataclasses.dataclass
class LoggingConfig:
level: str = dataclasses.field(default_factory=lambda: env("LOG_LEVEL", "debug"))
channel: str = dataclasses.field(
default_factory=lambda: env("LOG_CHANNEL", "daily")
)
14 changes: 14 additions & 0 deletions example/queue-app/providers/console_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from fastapi_startkit.providers import Provider
from commands.queue_work_command import QueueWorkCommand


class ConsoleProvider(Provider):
def register(self) -> None:
pass

def boot(self) -> None:
self.app.add_commands(
[
QueueWorkCommand,
]
)
9 changes: 9 additions & 0 deletions example/queue-app/providers/fastapi_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from fastapi_startkit.fastapi import FastAPIProvider as BaseFastAPIProvider
from routes.api import public


class FastAPIProvider(BaseFastAPIProvider):
def boot(self) -> None:
super().boot()

self.app.include_router(public)
31 changes: 31 additions & 0 deletions example/queue-app/providers/queue_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from fastapi_startkit.environment import env
from fastapi_startkit.providers import Provider
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend


class QueueProvider(Provider):
def register(self) -> None:
redis_host = env("REDIS_HOST", "127.0.0.1")
redis_port = env("REDIS_PORT", "6379")
redis_url = f"redis://{redis_host}:{redis_port}"

broker = ListQueueBroker(redis_url).with_result_backend(
RedisAsyncResultBackend(redis_url)
)

self.app.bind("broker", broker)

def boot(self) -> None:
if not self.app.has("fastapi"):
return

fastapi_app = self.app.make("fastapi")
broker = self.app.make("broker")

@fastapi_app.on_event("startup")
async def startup_broker():
await broker.startup()

@fastapi_app.on_event("shutdown")
async def shutdown_broker():
await broker.shutdown()
14 changes: 14 additions & 0 deletions example/queue-app/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[project]
name = "queue-app"
version = "0.1.0"
description = "Queue application example using fastapi-startkit and taskiq"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"fastapi-startkit[fastapi]",
"taskiq>=0.11.0",
"taskiq-redis>=1.0.0",
]

[tool.uv.sources]
fastapi-startkit = { path = "../../fastapi_startkit", editable = true }
55 changes: 55 additions & 0 deletions example/queue-app/routes/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from fastapi import APIRouter
from fastapi_startkit.logging import Logger
from tasks.example_task import send_welcome_email, process_data

public = APIRouter()


@public.get("/")
async def index():
Logger.info("Welcome to FastAPI StartKit Queue Example!")
return {
"message": "Welcome to FastAPI StartKit Queue Example!",
"version": "1.0.0",
"docs": "/docs",
}


@public.get("/health")
async def health():
Logger.info("Health check passed")
return {"status": "healthy"}


@public.post("/jobs/email")
async def dispatch_email(email: str, name: str):
"""Dispatch a welcome email job to the queue."""
task = await send_welcome_email.kiq(email=email, name=name)
Logger.info(f"Dispatched send_welcome_email task: {task.task_id}")
return {"task_id": task.task_id, "status": "queued"}


@public.post("/jobs/process")
async def dispatch_process(payload: dict):
"""Dispatch a data processing job to the queue."""
task = await process_data.kiq(payload=payload)
Logger.info(f"Dispatched process_data task: {task.task_id}")
return {"task_id": task.task_id, "status": "queued"}


@public.get("/jobs/{task_id}/result")
async def get_result(task_id: str):
"""Poll a task result by its task ID."""
from bootstrap.application import app as startkit_app

broker = startkit_app.make("broker")
result = await broker.result_backend.get_result(task_id)

if result is None:
return {"task_id": task_id, "status": "pending"}

return {
"task_id": task_id,
"status": "completed" if not result.is_err else "failed",
"result": result.return_value,
}
18 changes: 18 additions & 0 deletions example/queue-app/tasks/example_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from bootstrap.application import app

broker = app.make("broker")


@broker.task
async def send_welcome_email(email: str, name: str) -> dict:
"""Simulate sending a welcome email asynchronously."""
print(f"Sending welcome email to {name} <{email}>")
return {"status": "sent", "email": email, "name": name}


@broker.task
async def process_data(payload: dict) -> dict:
"""Simulate processing a data payload asynchronously."""
print(f"Processing payload: {payload}")
result = {k: str(v).upper() for k, v in payload.items()}
return {"status": "processed", "result": result}
14 changes: 14 additions & 0 deletions example/queue-app/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""
Worker entrypoint — exposes `broker` for the Taskiq CLI.

Run with:
uv run python -m taskiq worker worker:broker tasks.example_task

Or via artisan:
uv run artisan queue:work
uv run artisan queue:work --workers 4
"""

from bootstrap.application import app

broker = app.make("broker")
Loading