Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ name = "smooai-observability"
version = "0.1.0"
description = "SmooAI Observability SDK — error capture, PII scrubbing, batched webhook transport, OTLP traces + metrics, and GenAI semantic conventions."
readme = "README.md"
requires-python = ">=3.10"
# Bumped from >=3.10 to >=3.12: the webhook transport now depends on
# smooai-fetch, which requires Python >=3.12 (SMOODEV-2026).
requires-python = ">=3.12"
license = "MIT"
authors = [{ name = "SmooAI" }]
keywords = ["observability", "opentelemetry", "telemetry", "error-tracking", "genai", "smooai"]
Expand All @@ -16,6 +18,11 @@ classifiers = [
"Topic :: System :: Monitoring",
]
dependencies = [
# Webhook event delivery goes through smooai-fetch (retries/timeouts/circuit
# breaking) — SMOODEV-2026.
"smooai-fetch>=2.1",
# httpx is still used directly by the OAuth token provider (auth/token_provider.py),
# which backs the OTLP exporter path. It is also smooai-fetch's underlying client.
"httpx>=0.27",
"opentelemetry-api>=1.27",
"opentelemetry-sdk>=1.27",
Expand Down Expand Up @@ -85,7 +92,7 @@ asyncio_mode = "auto"

[tool.ruff]
line-length = 160
target-version = "py310"
target-version = "py312"

[tool.ruff.lint]
select = ["E", "F", "I", "N", "W", "UP"]
2 changes: 1 addition & 1 deletion python/src/smooai_observability/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def with_scope() -> Iterator[Scope]:
_current_scope.reset(token)


def run_with_scope(fn: Callable[[Scope], T]) -> T:
def run_with_scope[T](fn: Callable[[Scope], T]) -> T:
"""Functional form of ``with_scope`` — mirrors the TS ``withScope(fn)``
signature for callers that prefer a callback."""
with with_scope() as scope:
Expand Down
110 changes: 80 additions & 30 deletions python/src/smooai_observability/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,50 @@
batch back to the front for the next attempt. POSTs the same
``{type: 'error', events: [...]}`` envelope the TS SDK sends to the DSN webhook.

The TS version runs on the JS event loop (``setTimeout`` + ``fetch``). Python
has no implicit loop, and the SDK must work in plain sync hosts (Lambda
handlers, scripts) as well as async ones, so this uses a daemon worker thread
with a flush interval — no asyncio dependency, never blocks the host. Errors are
The webhook POST goes through ``smooai-fetch`` (timeouts + retries + circuit
breaking) rather than raw ``httpx`` (SMOODEV-2026). smooai-fetch already retries
429/5xx and network/timeout errors internally; the queue requeue here covers the
post-retry surface so a permanently-failing endpoint still re-tries on the next
flush tick.

The TS version runs on the JS event loop (``setTimeout`` + ``fetch``). Python has
no implicit loop, and the SDK must work in plain sync hosts (Lambda handlers,
scripts) as well as async ones, so this uses a daemon worker thread with a flush
interval — no asyncio dependency on the host. ``smooai-fetch`` is async-only, so
the worker thread owns a private event loop and drives ``fetch()`` via
``run_until_complete``; the host's loop (if any) is never touched. Errors are
swallowed: observability must never throw into user code.
"""

from __future__ import annotations

import asyncio
import threading
from collections.abc import Awaitable, Callable
from typing import Any

import httpx
from smooai_fetch import FetchOptions, fetch
from smooai_fetch._types import RetryOptions, TimeoutOptions

from .types import IngestPayload, ObservabilityEvent

DEFAULT_FLUSH_INTERVAL_MS = 1000
DEFAULT_BATCH_SIZE = 30
DEFAULT_QUEUE_MAX = 250

# Type of the injectable fetch callable. Mirrors the subset of ``smooai_fetch.fetch``
# the transport uses: ``await fetch_fn(url, options)``. Tests substitute a stub.
FetchFn = Callable[[str, FetchOptions], Awaitable[Any]]


class Transport:
"""Thread-backed batched transport."""
"""Thread-backed batched transport.

The webhook POST is delegated to ``smooai-fetch`` (``fetch_fn``), which owns
retries/timeouts/circuit-breaking. ``fetch_fn`` is injectable so tests can
substitute a stub — ``smooai-fetch`` constructs its own
``httpx.AsyncClient`` internally and exposes no client-injection seam.
"""

def __init__(
self,
Expand All @@ -35,22 +57,24 @@ def __init__(
flush_interval_ms: int = DEFAULT_FLUSH_INTERVAL_MS,
max_batch_size: int = DEFAULT_BATCH_SIZE,
max_queue_size: int = DEFAULT_QUEUE_MAX,
client: httpx.Client | None = None,
fetch_fn: FetchFn | None = None,
timeout_s: float = 5.0,
) -> None:
self._dsn = dsn
self._flush_interval_s = max(flush_interval_ms, 0) / 1000.0
self._max_batch_size = max(max_batch_size, 1)
self._max_queue_size = max(max_queue_size, 1)
self._timeout_s = timeout_s
# Caller may inject a client (tests / connection reuse); otherwise own one.
self._owns_client = client is None
self._client = client or httpx.Client(timeout=timeout_s)
# Caller may inject a fetch fn (tests); otherwise use smooai-fetch.
self._fetch_fn: FetchFn = fetch_fn or fetch

self._queue: list[ObservabilityEvent] = []
self._lock = threading.Lock()
self._wake = threading.Event()
self._stopped = False
# Private loop owned by the worker thread; smooai-fetch needs to run on
# an event loop and the host's loop must never be touched.
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(
target=self._run,
name="smooai-observability-transport",
Expand All @@ -74,7 +98,8 @@ def enqueue(self, event: ObservabilityEvent) -> None:

def flush(self, timeout_s: float | None = None) -> None:
"""Synchronously drain the queue. Used at shutdown / SIGTERM so buffered
events aren't lost when the process exits."""
events aren't lost when the process exits. Runs the async send on the
worker loop so there is a single owner for the smooai-fetch event loop."""
deadline_iter = True
while deadline_iter:
with self._lock:
Expand All @@ -86,12 +111,19 @@ def flush(self, timeout_s: float | None = None) -> None:
self._send(batch)

def _run(self) -> None:
while True:
# Wake early when a full batch lands; otherwise tick on the interval.
self._wake.wait(self._flush_interval_s)
self._wake.clear()
if self._drain_once() == "stop":
return
asyncio.set_event_loop(self._loop)
try:
while True:
# Wake early when a full batch lands; otherwise tick on the interval.
self._wake.wait(self._flush_interval_s)
self._wake.clear()
if self._drain_once() == "stop":
return
finally:
try:
self._loop.close()
except Exception:
pass

def _drain_once(self) -> str:
with self._lock:
Expand All @@ -108,15 +140,8 @@ def _drain_once(self) -> str:
return "continue"

def _send(self, batch: list[ObservabilityEvent]) -> None:
payload = IngestPayload(events=batch)
try:
resp = self._client.post(
self._dsn,
json=payload.to_wire(),
headers={"content-type": "application/json"},
)
if resp.status_code >= 400:
raise httpx.HTTPStatusError("ingest rejected", request=resp.request, response=resp)
self._run_async(self._send_async(batch))
except Exception:
# Best-effort: push the batch back to the front for a retry. Never
# raise — observability must not crash the host.
Expand All @@ -128,6 +153,36 @@ def _send(self, batch: list[ObservabilityEvent]) -> None:
overflow = len(self._queue) - self._max_queue_size
del self._queue[:overflow]

def _run_async(self, coro: Awaitable[Any]) -> None:
"""Drive a coroutine to completion on the right loop.

The worker thread owns ``self._loop`` and reuses it (cheap, keeps a
single owner for the smooai-fetch loop). ``flush()`` may also be called
from the host thread at shutdown — a loop must not be driven from two
threads, so off-worker callers get a transient loop via ``asyncio.run``."""
if threading.current_thread() is self._thread:
self._loop.run_until_complete(coro)
else:
asyncio.run(coro)

async def _send_async(self, batch: list[ObservabilityEvent]) -> None:
"""POST one batch via smooai-fetch. smooai-fetch raises on non-2xx (after
its own 429/5xx retries) and on transport/timeout errors — exactly the
cases the caller requeues."""
payload = IngestPayload(events=batch)
await self._fetch_fn(
self._dsn,
FetchOptions(
method="POST",
headers={"content-type": "application/json"},
body=payload.to_wire(),
# smooai-fetch's default retry covers 429/5xx + network errors;
# cap the per-request timeout so a flush can't hang the worker.
retry=RetryOptions(),
timeout=TimeoutOptions(timeout_ms=self._timeout_s * 1000.0),
),
)

def queue_size(self) -> int:
with self._lock:
return len(self._queue)
Expand All @@ -139,8 +194,3 @@ def shutdown(self, timeout_s: float = 2.0) -> None:
self.flush()
self._wake.set()
self._thread.join(timeout=timeout_s)
if self._owns_client:
try:
self._client.close()
except Exception:
pass
72 changes: 43 additions & 29 deletions python/tests/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import time

import httpx
from smooai_fetch import FetchOptions, FetchResponse
from smooai_fetch._errors import HTTPResponseError

from smooai_observability.transport import Transport
from smooai_observability.types import ObservabilityEvent, Sdk
Expand All @@ -17,30 +19,36 @@ def _event(i: int) -> ObservabilityEvent:
)


class _RecordingTransport:
"""Captures POSTed payloads via an httpx MockTransport."""
def _ok_response(url: str) -> FetchResponse:
"""A minimal successful FetchResponse, as smooai-fetch would return."""
return FetchResponse(response=httpx.Response(200, request=httpx.Request("POST", url)))


class _RecordingFetch:
"""Captures the bodies POSTed through an injected smooai-fetch stub.

smooai-fetch constructs its own ``httpx.AsyncClient`` internally and exposes
no client-injection seam, so the transport takes an injectable ``fetch_fn``
and tests substitute this stub.
"""

def __init__(self):
self.payloads: list[dict] = []
self.bodies: list[dict] = []
self.lock = threading.Lock()
self.event = threading.Event()

def handler(request: httpx.Request) -> httpx.Response:
with self.lock:
import json

self.payloads.append(json.loads(request.content))
self.event.set()
return httpx.Response(200, json={"ok": True})

self.client = httpx.Client(transport=httpx.MockTransport(handler))
async def __call__(self, url: str, options: FetchOptions) -> FetchResponse:
with self.lock:
self.bodies.append(options.body)
self.event.set()
return _ok_response(url)


def test_transport_batches_and_posts():
rec = _RecordingTransport()
rec = _RecordingFetch()
t = Transport(
"https://example.test/webhook",
client=rec.client,
fetch_fn=rec,
flush_interval_ms=50,
max_batch_size=5,
)
Expand All @@ -51,18 +59,18 @@ def test_transport_batches_and_posts():
assert rec.event.wait(2.0), "transport never flushed"
time.sleep(0.1)
with rec.lock:
all_events = [e for p in rec.payloads for e in p["events"]]
all_events = [e for body in rec.bodies for e in body["events"]]
assert len(all_events) == 3
assert rec.payloads[0]["type"] == "error"
assert rec.bodies[0]["type"] == "error"
finally:
t.shutdown()


def test_transport_flush_on_full_batch():
rec = _RecordingTransport()
rec = _RecordingFetch()
t = Transport(
"https://example.test/webhook",
client=rec.client,
fetch_fn=rec,
flush_interval_ms=10_000, # long, so only batch-size triggers flush
max_batch_size=3,
)
Expand All @@ -75,10 +83,10 @@ def test_transport_flush_on_full_batch():


def test_transport_drops_oldest_on_overflow():
rec = _RecordingTransport()
rec = _RecordingFetch()
t = Transport(
"https://example.test/webhook",
client=rec.client,
fetch_fn=rec,
flush_interval_ms=10_000,
max_batch_size=1000,
max_queue_size=5,
Expand All @@ -92,21 +100,27 @@ def test_transport_drops_oldest_on_overflow():


def test_transport_retries_on_failure():
"""smooai-fetch raises on non-2xx (after its own retries); the transport
requeues the failed batch and re-sends it on the next flush tick."""
state = {"calls": 0}
flushed = threading.Event()

def handler(request: httpx.Request) -> httpx.Response:
state["calls"] += 1
if state["calls"] == 1:
return httpx.Response(500)
lock = threading.Lock()

async def fetch_fn(url: str, options: FetchOptions) -> FetchResponse:
with lock:
state["calls"] += 1
n = state["calls"]
if n == 1:
# Mirror smooai-fetch surfacing a non-2xx after exhausting retries.
raise HTTPResponseError(httpx.Response(500, request=httpx.Request("POST", url)))
flushed.set()
return httpx.Response(200)
return _ok_response(url)

client = httpx.Client(transport=httpx.MockTransport(handler))
t = Transport("https://example.test/webhook", client=client, flush_interval_ms=50, max_batch_size=100)
t = Transport("https://example.test/webhook", fetch_fn=fetch_fn, flush_interval_ms=50, max_batch_size=100)
try:
t.enqueue(_event(0))
assert flushed.wait(3.0), "retry never succeeded"
assert state["calls"] >= 2
with lock:
assert state["calls"] >= 2
finally:
t.shutdown()
Loading
Loading