Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,34 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Standardized error hierarchy ([CHA-2958](https://linear.app/stream/issue/CHA-2958)).
New exception classes importable from `getstream` (also re-exported from
`getstream.exceptions`):
- `StreamException`: abstract base for every SDK error.
- `StreamApiException`: any HTTP 4xx/5xx response. Carries `status_code`,
`code`, `message`, `exception_fields`, `unrecoverable`,
`raw_response_body`, `more_info`, `details`. The `unrecoverable` flag
from `APIError` is now surfaced (was previously dropped on most paths).
- `StreamRateLimitException`: subclass of `StreamApiException` raised on
HTTP 429. Adds `retry_after: datetime.timedelta | None`, parsed from
`Retry-After` per RFC 7231 (integer seconds or HTTP-date). Missing or
unparseable headers map to `None`; past HTTP-dates clamp to `0`.
- `StreamTransportException`: raised when a network-layer failure (no
HTTP response received) propagates out of `httpx` — connection reset,
timeout, TLS handshake failure, DNS failure. Carries `error_type`
enum (`connection_reset` / `timeout` / `dns_failure` /
`tls_handshake_failed` / `unknown`). The original `httpx` exception
is preserved as `__cause__`.
- `StreamTaskException`: raised by `wait_for_task` when the polled task
ends in `status='failed'`. Carries `task_id`, `error_type`,
`description`, `stack_trace`, `version`.
- `Stream.wait_for_task(task_id, *, poll_interval=1.0, timeout=60.0)` and
the matching async coroutine on `AsyncStream`. Polls `get_task` until the
task reaches a terminal state. On `completed` returns the
`StreamResponse[GetTaskResponse]`; on `failed` raises
`StreamTaskException` populated from `ErrorResult`; on timeout raises
`StreamTransportException(error_type='timeout')`.

- Explicit HTTP connection pool configuration ([CHA-2956](https://linear.app/stream/issue/CHA-2956/connection-pooling)).
Four new kwargs on `Stream(...)` and `AsyncStream(...)`:
- `max_conns_per_host: int`: default `5`
Expand Down Expand Up @@ -45,11 +73,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- HTTP request errors raised by `httpx` (the `httpx.RequestError` family —
`ConnectError`, `ReadTimeout`, etc.) are now wrapped at the SDK boundary
in `StreamTransportException` so callers handle one Stream error category
instead of catching `httpx.RequestError` separately. The original `httpx`
exception is preserved via `__cause__` (CHA-2958).
- **Default `request_timeout` is now `30.0` seconds (was `6.0`).** Aligns stream-py with the cross-SDK contract in CHA-2956. Existing callers using `timeout=` are unaffected; `timeout` is kept as an alias for `request_timeout`. Callers relying on the 6s ceiling for fail-fast behavior should pass `request_timeout=6.0` (or `timeout=6.0`) explicitly.
- Default HTTP transport now caps connections per host at `5` and closes idle sockets after `55.0s`. Previous default was httpx's `100` max-connections with `5.0s` keep-alive expiry.
- No breaking changes. All existing webhook helpers (`verify_webhook_signature`,
`parse_webhook_event`, `get_event_type`, event type constants) are preserved.

### Deprecated

- `getstream.base.StreamAPIException` (capital `API`) is now an alias for
`getstream.exceptions.StreamApiException` (lowercase `Api`). Importing the
old name emits `DeprecationWarning`; existing `isinstance` / `except` /
`pytest.raises` checks continue to work because the alias resolves to the
same class. The legacy spelling will be removed one minor cycle after this
release (CHA-2958 §10).

### Notes

- Per-call `timeout=httpx.Timeout(...)` continues to work through `.get(...)`, `.post(...)`, etc., and pre-empts the client-level `request_timeout`.
Expand Down
7 changes: 7 additions & 0 deletions getstream/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
from getstream.exceptions import ( # noqa: F401
StreamApiException,
StreamException,
StreamRateLimitException,
StreamTaskException,
StreamTransportException,
)
from getstream.stream import Stream # noqa: F401
from getstream.stream import AsyncStream # noqa: F401
97 changes: 34 additions & 63 deletions getstream/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
import os
import time
import uuid
import warnings
import asyncio
from typing import Any, Dict, List, Optional, Tuple, Type, cast, get_origin

from getstream.models import APIError
from getstream.rate_limit import extract_rate_limit
from getstream.exceptions import (
StreamApiException,
build_api_exception,
wrap_transport_error,
)
from getstream.stream_response import StreamResponse
from getstream.generic import T
import httpx
Expand Down Expand Up @@ -102,9 +106,7 @@ def _parse_response(
self, response: httpx.Response, data_type: Type[T]
) -> StreamResponse[T]:
if response.status_code >= 399:
raise StreamAPIException(
response=response,
)
raise build_api_exception(response)

try:
parsed_result = json.loads(response.text) if response.text else {}
Expand All @@ -118,10 +120,8 @@ def _parse_response(
else:
data = cast(T, parsed_result)

except (ValueError, AttributeError):
raise StreamAPIException(
response=response,
)
except (ValueError, AttributeError) as err:
raise StreamApiException(response=response) from err

return StreamResponse(response, data)

Expand Down Expand Up @@ -291,9 +291,12 @@ def _request_sync(
) as span:
call_kwargs = dict(kwargs)
call_kwargs.pop("path_params", None)
response = getattr(self.client, method.lower())(
url_path, params=query_params, *args, **call_kwargs
)
try:
response = getattr(self.client, method.lower())(
url_path, params=query_params, *args, **call_kwargs
)
except httpx.RequestError as err:
raise wrap_transport_error(err) from err
duration = parse_duration_from_body(response.content)
if duration:
span.set_attribute("http.server.duration", duration)
Expand Down Expand Up @@ -604,9 +607,12 @@ async def _request_async(
call_kwargs["headers"] = call_kwargs.get("headers", {})
call_kwargs["headers"]["Content-Type"] = "application/json"

response = await getattr(self.client, method.lower())(
url_path, params=query_params, *args, **call_kwargs
)
try:
response = await getattr(self.client, method.lower())(
url_path, params=query_params, *args, **call_kwargs
)
except httpx.RequestError as err:
raise wrap_transport_error(err) from err
duration = parse_duration_from_body(response.content)
if duration:
span.set_attribute("http.server.duration", duration)
Expand Down Expand Up @@ -721,54 +727,19 @@ async def delete(
)


class StreamAPIException(Exception):
"""
A custom exception for handling errors from a Stream API response.

This exception is raised when an API call encounters an issue, providing
detailed information from the HTTP response. It attempts to parse the response
content into a structured API error. If the response content is not JSON or
lacks the expected structure, it will simply report the HTTP status code.

Attributes:
api_error (Optional[APIError]): An optional APIError object that is
populated if the response content contains structured error information.
rate_limit_info (RateLimitInfo): Information about the API's rate limiting
controls extracted from the response headers.
http_response (httpx.Response): The full HTTP response object from httpx.
status_code (int): The HTTP status code from the response.

Args:
response (httpx.Response): The HTTP response received from the Stream API.

Raises:
ValueError: If the response content cannot be parsed into JSON, indicating
that the server's response was not in the expected format.
"""

def __init__(self, response: httpx.Response) -> None:
self.api_error: Optional[APIError] = None
self.rate_limit_info = extract_rate_limit(response)
self.http_response = response
self.status_code = response.status_code

try:
parsed_response: Dict = json.loads(response.content)
self.api_error = APIError.from_dict(parsed_response)
except ValueError:
pass

def __str__(self) -> str:
if self.api_error:
return f'Stream error code {self.api_error.code}: {self.api_error.message}"'
body_preview = ""
try:
text = self.http_response.text[:200] if self.http_response.text else ""
if text:
body_preview = f" body: {text}"
except Exception:
pass
return f"Stream error HTTP code: {self.status_code}{body_preview}"
def __getattr__(name: str):
"""StreamApiException is exported under its new name; resolve here lazily and warn once."""
if name == "StreamAPIException":
warnings.warn(
"getstream.base.StreamAPIException is deprecated; import "
"StreamApiException from getstream (or getstream.exceptions) "
"instead. The legacy alias will be removed one minor cycle after "
"this release.",
DeprecationWarning,
stacklevel=2,
)
return StreamApiException
raise AttributeError(f"module 'getstream.base' has no attribute {name!r}")


def parse_duration_from_body(body: bytes) -> Optional[str]:
Expand Down
Loading