diff --git a/CHANGELOG.md b/CHANGELOG.md index f3bebeab..af84cc54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,18 +9,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Explicit HTTP connection pool configuration ([CHA-2956](https://linear.app/stream/issue/CHA-2956/connection-pooling)). + Four new kwargs on `Stream(...)` and `AsyncStream(...)`: + - `max_conns_per_host: int`: default `5` + - `idle_timeout: float` (seconds): default `55.0` + - `connect_timeout: float` (seconds): default `10.0` + - `request_timeout: float` (seconds): default `30.0` (was `6.0`; see Behavior changes) + + These tune the underlying `httpx.Limits` and `httpx.Timeout`. The existing `http_client=` and `transport=` kwargs continue to act as escape hatches; when `http_client` is set, none of the four new kwargs apply. Env-var fallbacks for the new kwargs: `STREAM_MAX_CONNS_PER_HOST`, `STREAM_IDLE_TIMEOUT`, `STREAM_CONNECT_TIMEOUT`, `STREAM_REQUEST_TIMEOUT`. +- INFO log on client construction (logger `getstream`) lists the effective pool config and whether a user-supplied `http_client` is in use. + - Webhook handling spec helpers (CHA-2961): `UnknownEvent` dataclass for forward-compat; `gunzip_payload`, `decode_sqs_payload`, `decode_sns_payload` primitives; `parse_event` (returns typed event or `UnknownEvent` for unrecognized discriminators); `verify_signature` canonical alias of `verify_webhook_signature`; `verify_and_parse_webhook` HTTP composite (gunzip + verify + parse); `parse_sqs` and `parse_sns` queue composites - (no signature parameter — queue transports are authenticated by AWS IAM, + (no signature parameter: queue transports are authenticated by AWS IAM, so the backend emits no HMAC for queue messages today). Transparent gzip via magic-byte detection. - New instance methods on `Stream` and `AsyncStream`: `verify_signature(body, signature)` and - `verify_and_parse_webhook(body, signature)` — drop the api_secret parameter + `verify_and_parse_webhook(body, signature)` that drop the api_secret parameter in favor of the client's stored secret. Dual API: the module-level functions in `getstream.webhook` remain available for callers who want explicit secret control. @@ -35,10 +45,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- **Default `request_timeout` is now `30.0` seconds (was `6.0`).** Aligns stream-py with the cross-SDK contract in CHA-2956. Existing callers using `timeout=` are unaffected; `timeout` is kept as an alias for `request_timeout`. Callers relying on the 6s ceiling for fail-fast behavior should pass `request_timeout=6.0` (or `timeout=6.0`) explicitly. +- Default HTTP transport now caps connections per host at `5` and closes idle sockets after `55.0s`. Previous default was httpx's `100` max-connections with `5.0s` keep-alive expiry. - No breaking changes. All existing webhook helpers (`verify_webhook_signature`, `parse_webhook_event`, `get_event_type`, event type constants) are preserved. -[Spec](https://www.notion.so/stream-wiki/Server-Side-SDK-Webhook-Handling-Spec-34b6a5d7f9f681e78003c443f227493c) +### Notes + +- Per-call `timeout=httpx.Timeout(...)` continues to work through `.get(...)`, `.post(...)`, etc., and pre-empts the client-level `request_timeout`. ## [3.0.0b1] - 2026-02-27 diff --git a/getstream/base.py b/getstream/base.py index 7f2843ff..27ea400a 100644 --- a/getstream/base.py +++ b/getstream/base.py @@ -1,4 +1,5 @@ import json +import logging import mimetypes import os import time @@ -27,6 +28,51 @@ import ijson +# ── Connection pool defaults (CHA-2956) ────────────────────────────── +# Kept in sync with getstream.stream constants; duplicated here so BaseClient/AsyncBaseClient can be instantiated standalone (e.g. by sub-clients constructed directly without going through Stream/AsyncStream). +DEFAULT_MAX_CONNS_PER_HOST = 5 +DEFAULT_IDLE_TIMEOUT = 55.0 +DEFAULT_CONNECT_TIMEOUT = 10.0 + + +logger = logging.getLogger("getstream") + + +def _resolve_pool_knobs(obj): + """Pull the 3 pool knobs off ``obj`` if BaseStream has set them, else fall back to spec defaults. Top-level ``Stream``/``AsyncStream`` sets them on ``self`` before calling ``super().__init__()``, so a directly instantiated sub-client (or test fixture) still gets sane values. + + `is None` (not truthiness) so an explicit `0` / `0.0` from the caller is preserved rather than silently swapped for a default. + """ + max_conns_per_host = getattr(obj, "max_conns_per_host", None) + idle_timeout = getattr(obj, "idle_timeout", None) + connect_timeout = getattr(obj, "connect_timeout", None) + return ( + DEFAULT_MAX_CONNS_PER_HOST + if max_conns_per_host is None + else max_conns_per_host, + DEFAULT_IDLE_TIMEOUT if idle_timeout is None else idle_timeout, + DEFAULT_CONNECT_TIMEOUT if connect_timeout is None else connect_timeout, + ) + + +def _log_pool_config(cfg, *, user_http_client: bool) -> None: + if user_http_client: + logger.info( + "getstream connection pool: user_http_client=True (5 knobs not applied)" + ) + else: + logger.info( + "getstream connection pool: " + "max_conns_per_host=%s idle_timeout=%ss " + "connect_timeout=%ss request_timeout=%ss " + "user_http_client=False", + cfg.max_conns_per_host, + cfg.idle_timeout, + cfg.connect_timeout, + cfg.timeout, + ) + + def _read_file_bytes(file_path: str) -> bytes: with open(file_path, "rb") as f: return f.read() @@ -151,12 +197,17 @@ def __init__( timeout=None, user_agent=None, ): + # The 3 pool knobs (max_conns_per_host, idle_timeout, connect_timeout) are set on ``self`` by BaseStream prior to this call when used via the top-level ``Stream``/``AsyncStream`` constructors. Sub-clients constructed directly use the spec defaults via _resolve_pool_knobs. + max_conns_per_host, idle_timeout, connect_timeout = _resolve_pool_knobs(self) super().__init__( api_key=api_key, base_url=base_url, token=token, timeout=timeout, user_agent=user_agent, + max_conns_per_host=max_conns_per_host, + idle_timeout=idle_timeout, + connect_timeout=connect_timeout, ) http_client = getattr(self, "_http_client", None) if http_client is not None: @@ -173,13 +224,25 @@ def __init__( self.client = http_client self._owns_http_client = False else: + limits = httpx.Limits( + max_connections=self.max_conns_per_host, + max_keepalive_connections=self.max_conns_per_host, + keepalive_expiry=self.idle_timeout, + ) + timeout_obj = httpx.Timeout( + connect=self.connect_timeout, + read=self.timeout, + write=self.timeout, + pool=self.timeout, + ) transport = getattr(self, "_transport", None) if transport is not None: self.client = httpx.Client( base_url=self.base_url or "", headers={**self.headers, "Accept-Encoding": "gzip"}, params=self.params, - timeout=httpx.Timeout(self.timeout), + timeout=timeout_obj, + limits=limits, transport=transport, ) else: @@ -187,9 +250,13 @@ def __init__( base_url=self.base_url or "", headers={**self.headers, "Accept-Encoding": "gzip"}, params=self.params, - timeout=httpx.Timeout(self.timeout), + timeout=timeout_obj, + limits=limits, ) self._owns_http_client = True + # The pool-config INFO line is emitted once by BaseStream after the + # top-level client is built, not here, to avoid one line per + # sub-client construction. def __enter__(self): return self @@ -376,7 +443,7 @@ def close(self): Close HTTPX client. If the client was provided externally via ``http_client``, this is a - no-op — the caller that created the client is responsible for closing + no-op; the caller that created the client is responsible for closing it. """ if getattr(self, "_owns_http_client", True): @@ -392,12 +459,17 @@ def __init__( timeout=None, user_agent=None, ): + # The 3 pool knobs (max_conns_per_host, idle_timeout, connect_timeout) are set on ``self`` by BaseStream prior to this call when used via the top-level ``Stream``/``AsyncStream`` constructors. Sub-clients constructed directly use the spec defaults via _resolve_pool_knobs. + max_conns_per_host, idle_timeout, connect_timeout = _resolve_pool_knobs(self) super().__init__( api_key=api_key, base_url=base_url, token=token, timeout=timeout, user_agent=user_agent, + max_conns_per_host=max_conns_per_host, + idle_timeout=idle_timeout, + connect_timeout=connect_timeout, ) http_client = getattr(self, "_http_client", None) if http_client is not None: @@ -414,13 +486,25 @@ def __init__( self.client = http_client self._owns_http_client = False else: + limits = httpx.Limits( + max_connections=self.max_conns_per_host, + max_keepalive_connections=self.max_conns_per_host, + keepalive_expiry=self.idle_timeout, + ) + timeout_obj = httpx.Timeout( + connect=self.connect_timeout, + read=self.timeout, + write=self.timeout, + pool=self.timeout, + ) transport = getattr(self, "_transport", None) if transport is not None: self.client = httpx.AsyncClient( base_url=self.base_url or "", headers={**self.headers, "Accept-Encoding": "gzip"}, params=self.params, - timeout=httpx.Timeout(self.timeout), + timeout=timeout_obj, + limits=limits, transport=transport, ) else: @@ -428,9 +512,13 @@ def __init__( base_url=self.base_url or "", headers={**self.headers, "Accept-Encoding": "gzip"}, params=self.params, - timeout=httpx.Timeout(self.timeout), + timeout=timeout_obj, + limits=limits, ) self._owns_http_client = True + # The pool-config INFO line is emitted once by BaseStream after the + # top-level client is built, not here, to avoid one line per + # sub-client construction. async def __aenter__(self): return self @@ -442,7 +530,7 @@ async def aclose(self): """Close HTTPX async client (closes pools/keep-alives). If the client was provided externally via ``http_client``, this is a - no-op — the caller that created the client is responsible for closing + no-op; the caller that created the client is responsible for closing it. """ if getattr(self, "_owns_http_client", True): diff --git a/getstream/config.py b/getstream/config.py index c507d69a..8373cdba 100644 --- a/getstream/config.py +++ b/getstream/config.py @@ -11,9 +11,15 @@ def __init__( anonymous=False, timeout=None, user_agent=None, + max_conns_per_host=None, + idle_timeout=None, + connect_timeout=None, ): self.anonymous = anonymous self.timeout = timeout + self.max_conns_per_host = max_conns_per_host + self.idle_timeout = idle_timeout + self.connect_timeout = connect_timeout self.base_url = base_url self.params = {"api_key": api_key} diff --git a/getstream/stream.py b/getstream/stream.py index 09c32b1a..b1e0ff40 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -10,6 +10,7 @@ import jwt from pydantic_settings import BaseSettings, SettingsConfigDict +from getstream.base import _log_pool_config from getstream.common import telemetry from getstream.chat.client import ChatClient from getstream.chat.async_client import ChatClient as AsyncChatClient @@ -27,13 +28,49 @@ BASE_URL = "https://chat.stream-io-api.com/" +# ── Connection pool defaults (CHA-2956) ────────────────────────────── +# DEFAULT_REQUEST_TIMEOUT is the default per-request timeout (was 6.0 prior to 3.5.0). +DEFAULT_REQUEST_TIMEOUT = 30.0 +# DEFAULT_MAX_CONNS_PER_HOST caps concurrent TCP connections per host. +DEFAULT_MAX_CONNS_PER_HOST = 5 +# DEFAULT_IDLE_TIMEOUT sits below the typical 60s LB idle timeout with a 5s safety margin. +DEFAULT_IDLE_TIMEOUT = 55.0 +# DEFAULT_CONNECT_TIMEOUT caps TCP + TLS handshake duration. +DEFAULT_CONNECT_TIMEOUT = 10.0 + class Settings(BaseSettings): - # Env names: STREAM_API_KEY, STREAM_API_SECRET, STREAM_BASE_URL, STREAM_TIMEOUT + # Env names: STREAM_API_KEY, STREAM_API_SECRET, STREAM_BASE_URL, STREAM_TIMEOUT, STREAM_REQUEST_TIMEOUT, STREAM_MAX_CONNS_PER_HOST, STREAM_IDLE_TIMEOUT, STREAM_CONNECT_TIMEOUT api_key: str api_secret: Optional[str] = None base_url: Optional[str] = None - timeout: float = 6.0 + # `timeout` kept as alias for `request_timeout` for backward compat. + timeout: float = DEFAULT_REQUEST_TIMEOUT + request_timeout: Optional[float] = None + max_conns_per_host: int = DEFAULT_MAX_CONNS_PER_HOST + idle_timeout: float = DEFAULT_IDLE_TIMEOUT + connect_timeout: float = DEFAULT_CONNECT_TIMEOUT + + model_config = SettingsConfigDict( + env_prefix="STREAM_", + ) + + +class _PoolSettings(BaseSettings): + """Env-only view of the pool knobs and request timeout. + + Unlike ``Settings`` this has no required ``api_key`` field, so it can be + instantiated for knob resolution even when no ``STREAM_*`` credentials are + present in the environment (the normal case for callers that pass + ``api_key``/``api_secret`` explicitly). Reads the same ``STREAM_*`` env + vars with the same types as ``Settings``. + """ + + timeout: float = DEFAULT_REQUEST_TIMEOUT + request_timeout: Optional[float] = None + max_conns_per_host: int = DEFAULT_MAX_CONNS_PER_HOST + idle_timeout: float = DEFAULT_IDLE_TIMEOUT + connect_timeout: float = DEFAULT_CONNECT_TIMEOUT model_config = SettingsConfigDict( env_prefix="STREAM_", @@ -45,46 +82,41 @@ def __init__( self, api_key: Optional[str] = None, api_secret: Optional[str] = None, - timeout: Optional[float] = 6.0, + timeout: Optional[float] = None, base_url: Optional[str] = BASE_URL, user_agent: Optional[str] = None, transport=None, http_client=None, token: Optional[str] = None, + request_timeout: Optional[float] = None, + max_conns_per_host: Optional[int] = None, + idle_timeout: Optional[float] = None, + connect_timeout: Optional[float] = None, ): """Build a Stream client. Pass exactly one of ``api_secret`` or ``token``: - - ``api_secret`` enables a server-side client that can mint user tokens - and call protected admin endpoints. - - ``token`` enables a client-side client authenticated as a single - user. Token-only clients cannot mint tokens or call admin endpoints. + - ``api_secret`` enables a server-side client that can mint user tokens and call protected admin endpoints. + - ``token`` enables a client-side client authenticated as a single user. Token-only clients cannot mint tokens or call admin endpoints. - Any of ``api_key``, ``api_secret``, ``base_url`` left as ``None`` are - loaded from ``STREAM_*`` env vars; passing ``token`` skips the - ``api_secret`` env fallback. + Any of ``api_key``, ``api_secret``, ``base_url`` left as ``None`` are loaded from ``STREAM_*`` env vars; passing ``token`` skips the ``api_secret`` env fallback. Args: api_key: Project API key. Falls back to ``STREAM_API_KEY``. - api_secret: Project API secret. Mutually exclusive with ``token``. - Falls back to ``STREAM_API_SECRET`` only when ``token`` is also - ``None``. - timeout: HTTP request timeout in seconds; must be > 0. - base_url: API base URL. Falls back to ``STREAM_BASE_URL`` then to - the SDK default. + api_secret: Project API secret. Mutually exclusive with ``token``. Falls back to ``STREAM_API_SECRET`` only when ``token`` is also ``None``. + timeout: HTTP request timeout in seconds; must be > 0. Kept as a backward-compat alias for ``request_timeout``. + base_url: API base URL. Falls back to ``STREAM_BASE_URL`` then to the SDK default. user_agent: Optional custom ``User-Agent`` string. - transport: Optional ``httpx`` transport. Mutually exclusive with - ``http_client``. - http_client: Optional pre-built ``httpx`` client. Mutually - exclusive with ``transport``. When provided, sub-clients - (video/chat/moderation) reuse it instead of opening their own. + transport: Optional ``httpx`` transport. Mutually exclusive with ``http_client``. + http_client: Optional pre-built ``httpx`` client. Mutually exclusive with ``transport``. When provided, sub-clients (video/chat/moderation) reuse it instead of opening their own. token: Pre-minted user JWT. Mutually exclusive with ``api_secret``. + request_timeout: Default per-request timeout in seconds. Default 30.0. Replaces the older ``timeout`` kwarg; ``timeout`` is kept as an alias for backward compatibility. + max_conns_per_host: Max concurrent TCP connections per host. Default 5. Ignored when ``http_client`` is set. + idle_timeout: Idle connection lifetime in seconds. Default 55.0 (sits 5s under the typical 60s LB idle timeout). Ignored when ``http_client`` is set. + connect_timeout: TCP + TLS handshake timeout in seconds. Default 10.0. Ignored when ``http_client`` is set. Raises: - ValueError: If both ``transport`` and ``http_client`` are set; if - neither ``api_secret`` nor ``token`` can be resolved; if both - are provided; if either is the empty string; if ``api_key`` is - missing; or if ``timeout`` is not a positive number. + ValueError: If both ``transport`` and ``http_client`` are set; if neither ``api_secret`` nor ``token`` can be resolved; if both are provided; if either is the empty string; if ``api_key`` is missing; or if ``request_timeout`` is not a positive number. """ if transport is not None and http_client is not None: raise ValueError("Cannot specify both 'transport' and 'http_client'") @@ -103,6 +135,35 @@ def __init__( if token is None and api_secret is None: api_secret = s.api_secret + # Env fallback + defaults for the 4 pool knobs and request_timeout. `timeout` is a backward-compat alias for `request_timeout`. + # _PoolSettings (not Settings) is used here so missing STREAM_API_KEY + # in the environment does not crash construction when api_key was + # passed explicitly. It reads the same STREAM_* env vars. + s_for_pool: Optional[_PoolSettings] = None + + def _settings() -> _PoolSettings: + nonlocal s_for_pool + if s_for_pool is None: + s_for_pool = _PoolSettings() + return s_for_pool + + # request_timeout precedence: explicit kwarg > explicit timeout kwarg > STREAM_REQUEST_TIMEOUT > STREAM_TIMEOUT > DEFAULT_REQUEST_TIMEOUT. + if request_timeout is None: + if timeout is not None: + request_timeout = timeout + else: + s = _settings() + request_timeout = ( + s.request_timeout if s.request_timeout is not None else s.timeout + ) + + if max_conns_per_host is None: + max_conns_per_host = _settings().max_conns_per_host + if idle_timeout is None: + idle_timeout = _settings().idle_timeout + if connect_timeout is None: + connect_timeout = _settings().connect_timeout + if not api_key: raise ValueError("api_key is required") if api_secret and token: @@ -115,26 +176,36 @@ def __init__( self.api_key = api_key self._api_secret = api_secret or None - if timeout is not None: - if not isinstance(timeout, (int, float)) or timeout <= 0.0: - raise ValueError("timeout must be a number greater than zero") - self.timeout = timeout + if not isinstance(request_timeout, (int, float)) or request_timeout <= 0.0: + raise ValueError("request_timeout must be a number greater than zero") + self.timeout = request_timeout + self.request_timeout = request_timeout + self.max_conns_per_host = max_conns_per_host + self.idle_timeout = idle_timeout + self.connect_timeout = connect_timeout self.base_url = validate_and_clean_url(base_url) self.user_agent = user_agent self._transport = transport self._http_client = http_client self.token = token or self._create_token() + # Pool knobs are read by BaseClient via getattr(self, ...) since the intermediate generated REST clients (CommonRestClient etc.) do not forward these kwargs. self.max_conns_per_host / idle_timeout / connect_timeout were set above before super().__init__(). super().__init__( self.api_key, self.base_url, self.token, self.timeout, self.user_agent ) - # After super().__init__(), self.client is fully built and configured. - # When the user provided custom HTTP config, sub-clients share this - # client instead of each building their own. - if transport is not None or http_client is not None: - self._shared_client = self.client - else: - self._shared_client = None + # After super().__init__(), self.client is fully built and configured + # with the resolved pool knobs (max_conns_per_host/idle_timeout/ + # connect_timeout) and request timeout. Sub-clients (video/chat/ + # moderation/feeds) always share this single client so the pool config + # actually reaches the clients that issue requests. The intermediate + # generated REST clients do not forward the pool kwargs, so a + # per-sub-client client would silently fall back to defaults; sharing + # the parent's client avoids that and keeps one pool per Stream. + self._shared_client = self.client + + # Emit the pool-config INFO line exactly once per Stream, reflecting the + # resolved knobs on the top-level client. Sub-clients no longer log. + _log_pool_config(self, user_http_client=http_client is not None) @property def api_secret(self) -> str: @@ -208,7 +279,10 @@ def clone_for_token(self, token: str): api_key=self.api_key, token=token, base_url=self.base_url, - timeout=self.timeout, + request_timeout=self.request_timeout, + max_conns_per_host=self.max_conns_per_host, + idle_timeout=self.idle_timeout, + connect_timeout=self.connect_timeout, user_agent=self.user_agent, ) @@ -384,7 +458,7 @@ def verify_and_parse_webhook(self, body, signature): parse failures. Note: this is intentionally a synchronous ``def`` rather than ``async - def`` because it performs no I/O — it's CPU-bound (HMAC + gzip + JSON + def`` because it performs no I/O; it's CPU-bound (HMAC + gzip + JSON parsing). """ from .webhook import verify_and_parse_webhook as _verify_and_parse_webhook @@ -422,10 +496,14 @@ class Stream(BaseStream, CommonClient): @classmethod @deprecated("from_env is deprecated, use __init__ instead") - def from_env(cls, timeout: float = 6.0) -> Stream: + def from_env(cls, timeout: Optional[float] = None) -> Stream: """ Construct a StreamClient by loading its credentials and base_url from environment variables (via our pydantic Settings). + + ``timeout`` defaults to ``None`` so the client inherits the SDK default + request timeout (``DEFAULT_REQUEST_TIMEOUT``, 30.0s) rather than the + old hard-coded 6.0s. """ settings = Settings() @@ -441,7 +519,10 @@ def as_async(self) -> "AsyncStream": api_key=self.api_key, api_secret=self._api_secret, token=None if self.has_api_secret else self.token, - timeout=self.timeout, + request_timeout=self.request_timeout, + max_conns_per_host=self.max_conns_per_host, + idle_timeout=self.idle_timeout, + connect_timeout=self.connect_timeout, base_url=self.base_url, user_agent=self.user_agent, ) diff --git a/getstream/tests/test_webhook.py b/getstream/tests/test_webhook.py index 96f8fe22..3c653da0 100644 --- a/getstream/tests/test_webhook.py +++ b/getstream/tests/test_webhook.py @@ -1237,7 +1237,7 @@ def test_bad_base64(self): # Per CHA-3071 wire format: decode_sqs_payload falls back to raw bytes when # base64 decoding fails (uncompressed wire format). For input that is # neither valid base64 nor valid JSON nor gzip-prefixed, parse_sqs still - # raises InvalidWebhookError — just down the chain at JSON parsing. + # raises InvalidWebhookError, just down the chain at JSON parsing. msg = _read_str(self._neg("bad_base64") / "sqs_body.txt") with pytest.raises(InvalidWebhookError): parse_sqs(msg) diff --git a/getstream/webhook.py b/getstream/webhook.py index b3b3478d..3d1109e9 100644 --- a/getstream/webhook.py +++ b/getstream/webhook.py @@ -677,7 +677,7 @@ def decode_sqs_payload(message_body: str) -> bytes: magic-byte detection decide whether to decompress. parse_sqs sits on top of this and works transparently for both wire - formats — no caller code change, no flag, no header. + formats: no caller code change, no flag, no header. """ if not isinstance(message_body, str): raise InvalidWebhookError( @@ -686,7 +686,7 @@ def decode_sqs_payload(message_body: str) -> bytes: try: decoded = base64.b64decode(message_body, validate=True) except ValueError: - # Not base64 — treat input as raw bytes (uncompressed wire format). + # Not base64, so treat input as raw bytes (uncompressed wire format). # base64.binascii.Error is a subclass of ValueError so a single catch suffices. decoded = message_body.encode("utf-8") return gunzip_payload(decoded) @@ -697,7 +697,7 @@ def _unwrap_sns_notification_body(body: str) -> str: else return body unchanged so a pre-extracted Message string flows through. Heuristic: try to JSON-parse the input. If it yields a dict with a string - 'Message' field, that's the envelope shape — return the Message. Otherwise + 'Message' field, that's the envelope shape; return the Message. Otherwise the input is presumed to BE the pre-extracted Message (base64-encoded bytes are not valid JSON, so this falls through cleanly). """ diff --git a/pyproject.toml b/pyproject.toml index 1eeb6bc3..f8703fe1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "getstream" -version = "3.4.0" +version = "3.5.0" description = "GetStream Python SDK - Build scalable activity feeds, chat, and video calling applications" authors = [ { name = "sachaarbonel", email = "sacha.arbonel@hotmail.fr" }, diff --git a/tests/test_http_client.py b/tests/test_http_client.py index b4c30ce8..b440b4be 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -1,3 +1,6 @@ +import logging +import os + import httpx import pytest @@ -23,6 +26,169 @@ def handler(request: httpx.Request) -> httpx.Response: return httpx.MockTransport(handler), captured +# ── pool defaults ──────────────────────────────────────────────────── + + +class TestSyncPoolDefaults: + def test_default_limits_applied(self): + client = Stream(api_key="k", api_secret="s", base_url="http://test") + pool = client.client._transport._pool + assert pool._max_connections == 5 + assert pool._max_keepalive_connections == 5 + assert pool._keepalive_expiry == 55.0 + + def test_default_timeout_is_30s(self): + client = Stream(api_key="k", api_secret="s", base_url="http://test") + t = client.client.timeout + assert t.connect == 10.0 + assert t.read == 30.0 + assert t.write == 30.0 + assert t.pool == 30.0 + + +@pytest.mark.asyncio +class TestAsyncPoolDefaults: + async def test_default_limits_applied(self): + client = AsyncStream(api_key="k", api_secret="s", base_url="http://test") + pool = client.client._transport._pool + assert pool._max_connections == 5 + assert pool._max_keepalive_connections == 5 + assert pool._keepalive_expiry == 55.0 + await client.aclose() + + async def test_default_timeout_is_30s(self): + client = AsyncStream(api_key="k", api_secret="s", base_url="http://test") + t = client.client.timeout + assert t.connect == 10.0 + assert t.read == 30.0 + assert t.write == 30.0 + assert t.pool == 30.0 + await client.aclose() + + +class TestSyncPoolOverrides: + def _make(self, **kw): + return Stream(api_key="k", api_secret="s", base_url="http://test", **kw) + + def test_max_conns_per_host_override(self): + pool = self._make(max_conns_per_host=20).client._transport._pool + assert pool._max_connections == 20 + assert pool._max_keepalive_connections == 20 + + def test_idle_timeout_override(self): + pool = self._make(idle_timeout=120.0).client._transport._pool + assert pool._keepalive_expiry == 120.0 + + def test_connect_timeout_override(self): + assert self._make(connect_timeout=3.0).client.timeout.connect == 3.0 + + def test_request_timeout_override(self): + t = self._make(request_timeout=15.0).client.timeout + assert t.read == 15.0 and t.write == 15.0 and t.pool == 15.0 + + def test_legacy_timeout_alias_still_works(self): + c = self._make(timeout=12.0) + assert c.client.timeout.read == 12.0 + assert c.timeout == 12.0 and c.request_timeout == 12.0 + + +@pytest.mark.asyncio +class TestAsyncPoolOverrides: + async def _make(self, **kw): + return AsyncStream(api_key="k", api_secret="s", base_url="http://test", **kw) + + async def test_max_conns_per_host_override(self): + c = await self._make(max_conns_per_host=20) + pool = c.client._transport._pool + assert pool._max_connections == 20 + assert pool._max_keepalive_connections == 20 + await c.aclose() + + async def test_idle_timeout_override(self): + c = await self._make(idle_timeout=120.0) + assert c.client._transport._pool._keepalive_expiry == 120.0 + await c.aclose() + + async def test_connect_timeout_override(self): + c = await self._make(connect_timeout=3.0) + assert c.client.timeout.connect == 3.0 + await c.aclose() + + async def test_request_timeout_override(self): + c = await self._make(request_timeout=15.0) + t = c.client.timeout + assert t.read == 15.0 and t.write == 15.0 and t.pool == 15.0 + await c.aclose() + + async def test_legacy_timeout_alias_still_works(self): + c = await self._make(timeout=12.0) + assert c.client.timeout.read == 12.0 + assert c.timeout == 12.0 and c.request_timeout == 12.0 + await c.aclose() + + +# ── sub-clients inherit the configured pool (default path) ─────────── + + +class TestSubClientPoolPropagation: + """Regression for CHA-2956 BLOCKER 1: on the DEFAULT path (no transport / + no http_client) the sub-clients (video/chat/moderation/feeds) are what + issue the real API calls, so their httpx pool must reflect the configured + knobs — not the SDK defaults. They were silently falling back to 5/55 + because the intermediate generated REST clients don't forward the pool + kwargs and ``stream`` was assigned after ``super().__init__()``. + """ + + KNOBS = dict( + max_conns_per_host=99, + idle_timeout=11.0, + connect_timeout=3.0, + request_timeout=7.0, + ) + + def _assert_pool(self, sub_client, parent_client): + # Sub-clients share the parent's single configured client. + assert sub_client.client is parent_client + pool = sub_client.client._transport._pool + assert pool._max_connections == 99 + assert pool._max_keepalive_connections == 99 + assert pool._keepalive_expiry == 11.0 + t = sub_client.client.timeout + assert t.connect == 3.0 + assert t.read == 7.0 + + def test_sync_sub_client_pools_match_configured_knobs(self): + client = Stream( + api_key="k", api_secret="s", base_url="http://test", **self.KNOBS + ) + for name in ("video", "chat", "moderation", "feeds"): + self._assert_pool(getattr(client, name), client.client) + client.close() + + @pytest.mark.asyncio + async def test_async_sub_client_pools_match_configured_knobs(self): + client = AsyncStream( + api_key="k", api_secret="s", base_url="http://test", **self.KNOBS + ) + for name in ("video", "chat", "moderation"): + self._assert_pool(getattr(client, name), client.client) + await client.aclose() + + def test_sync_sub_client_pools_match_defaults(self): + # Even with no explicit knobs, sub-clients must carry the SDK defaults + # (5/55/10/30), not whatever a freshly-built sub-client would default to. + client = Stream(api_key="k", api_secret="s", base_url="http://test") + for name in ("video", "chat", "moderation", "feeds"): + sub = getattr(client, name) + assert sub.client is client.client + pool = sub.client._transport._pool + assert pool._max_connections == 5 + assert pool._keepalive_expiry == 55.0 + assert sub.client.timeout.connect == 10.0 + assert sub.client.timeout.read == 30.0 + client.close() + + # ── transport (primary API) ────────────────────────────────────────── @@ -86,11 +252,14 @@ def test_custom_limits_propagated(self): assert pool._max_connections == 42 assert pool._max_keepalive_connections == 10 - def test_default_path_unchanged(self): + def test_default_path_owns_and_shares_client(self): + # On the default path (no transport/http_client) the top-level client + # owns the single httpx client AND shares it with sub-clients, so the + # resolved pool config reaches the clients that issue requests. client = Stream(api_key="k", api_secret="s", base_url="http://test") assert client._owns_http_client is True assert isinstance(client.client, httpx.Client) - assert client._shared_client is None + assert client._shared_client is client.client @pytest.mark.asyncio @@ -186,9 +355,344 @@ async def test_aclose_does_not_close_user_provided_client(self): assert not custom.is_closed +class TestSyncHttpClientEscapeHatchKnobs: + def test_pool_knobs_ignored_when_http_client_provided(self): + custom_limits = httpx.Limits( + max_connections=99, + max_keepalive_connections=99, + keepalive_expiry=99.0, + ) + custom = httpx.Client(transport=httpx.HTTPTransport(limits=custom_limits)) + client = Stream( + api_key="k", + api_secret="s", + base_url="http://test", + http_client=custom, + max_conns_per_host=1, + idle_timeout=1.0, + connect_timeout=1.0, + request_timeout=1.0, + ) + assert client.client is custom + pool = client.client._transport._pool + assert pool._max_connections == 99 + assert pool._max_keepalive_connections == 99 + assert pool._keepalive_expiry == 99.0 + + +@pytest.mark.asyncio +class TestAsyncHttpClientEscapeHatchKnobs: + async def test_pool_knobs_ignored_when_http_client_provided(self): + custom_limits = httpx.Limits( + max_connections=99, + max_keepalive_connections=99, + keepalive_expiry=99.0, + ) + custom = httpx.AsyncClient( + transport=httpx.AsyncHTTPTransport(limits=custom_limits), + ) + client = AsyncStream( + api_key="k", + api_secret="s", + base_url="http://test", + http_client=custom, + max_conns_per_host=1, + idle_timeout=1.0, + connect_timeout=1.0, + request_timeout=1.0, + ) + assert client.client is custom + pool = client.client._transport._pool + assert pool._max_connections == 99 + assert pool._max_keepalive_connections == 99 + assert pool._keepalive_expiry == 99.0 + await client.aclose() + + +# ── INFO log on construction ───────────────────────────────────────── + + +class TestSyncInfoLog: + def test_info_log_emitted_with_defaults(self, caplog): + with caplog.at_level(logging.INFO, logger="getstream"): + Stream(api_key="k", api_secret="s", base_url="http://test") + infos = [r for r in caplog.records if r.name == "getstream"] + assert len(infos) == 1 + msg = infos[0].getMessage() + assert "max_conns_per_host=5" in msg + assert "idle_timeout=55.0s" in msg + assert "connect_timeout=10.0s" in msg + assert "request_timeout=30.0s" in msg + assert "user_http_client=False" in msg + + def test_info_log_emitted_once_even_with_sub_clients(self, caplog): + # Regression for CHA-2956 MINOR: the pool-config INFO line must fire + # exactly once per Stream (at the top level), not once per sub-client. + with caplog.at_level(logging.INFO, logger="getstream"): + client = Stream(api_key="k", api_secret="s", base_url="http://test") + _ = (client.video, client.chat, client.moderation, client.feeds) + infos = [r for r in caplog.records if r.name == "getstream"] + assert len(infos) == 1 + client.close() + + def test_info_log_reflects_resolved_knobs(self, caplog): + with caplog.at_level(logging.INFO, logger="getstream"): + Stream( + api_key="k", + api_secret="s", + base_url="http://test", + max_conns_per_host=33, + idle_timeout=12.0, + connect_timeout=4.0, + request_timeout=9.0, + ) + infos = [r for r in caplog.records if r.name == "getstream"] + assert len(infos) == 1 + msg = infos[0].getMessage() + assert "max_conns_per_host=33" in msg + assert "idle_timeout=12.0s" in msg + assert "connect_timeout=4.0s" in msg + assert "request_timeout=9.0s" in msg + + def test_info_log_when_user_http_client_provided(self, caplog): + custom = httpx.Client(transport=_mock_transport()) + with caplog.at_level(logging.INFO, logger="getstream"): + Stream( + api_key="k", + api_secret="s", + base_url="http://test", + http_client=custom, + ) + infos = [r for r in caplog.records if r.name == "getstream"] + assert len(infos) == 1 + assert "user_http_client=True" in infos[0].getMessage() + + +@pytest.mark.asyncio +class TestAsyncInfoLog: + async def test_info_log_emitted_with_defaults(self, caplog): + with caplog.at_level(logging.INFO, logger="getstream"): + client = AsyncStream(api_key="k", api_secret="s", base_url="http://test") + await client.aclose() + infos = [r for r in caplog.records if r.name == "getstream"] + assert len(infos) == 1 + assert "max_conns_per_host=5" in infos[0].getMessage() + assert "user_http_client=False" in infos[0].getMessage() + + +# ── sync/async parity ──────────────────────────────────────────────── + + +@pytest.mark.asyncio +class TestSyncAsyncParity: + async def test_same_kwargs_produce_same_pool_config(self): + kwargs = dict( + api_key="k", + api_secret="s", + base_url="http://test", + max_conns_per_host=7, + idle_timeout=42.0, + connect_timeout=3.0, + request_timeout=11.0, + ) + sync_c = Stream(**kwargs) + async_c = AsyncStream(**kwargs) + + sp = sync_c.client._transport._pool + ap = async_c.client._transport._pool + assert sp._max_connections == ap._max_connections == 7 + assert sp._max_keepalive_connections == ap._max_keepalive_connections == 7 + assert sp._keepalive_expiry == ap._keepalive_expiry == 42.0 + + st = sync_c.client.timeout + at = async_c.client.timeout + assert st.connect == at.connect == 3.0 + assert st.read == at.read == 11.0 + assert st.write == at.write == 11.0 + assert st.pool == at.pool == 11.0 + + sync_c.close() + await async_c.aclose() + + +# ── per-call timeout override ──────────────────────────────────────── + + +def _timeout_capture_transport(): + observed = {} + + def handler(request: httpx.Request) -> httpx.Response: + observed["timeout"] = request.extensions.get("timeout") + return httpx.Response(200, json={}, request=request) + + return httpx.MockTransport(handler), observed + + +class TestSyncPerCallTimeoutOverride: + def test_per_call_timeout_reaches_httpx(self): + transport, observed = _timeout_capture_transport() + client = Stream( + api_key="k", + api_secret="s", + base_url="http://test", + transport=transport, + request_timeout=30.0, + ) + client.get("/app", timeout=httpx.Timeout(2.5)) + assert observed["timeout"]["read"] == 2.5 + assert observed["timeout"]["connect"] == 2.5 + + +@pytest.mark.asyncio +class TestAsyncPerCallTimeoutOverride: + async def test_per_call_timeout_reaches_httpx(self): + transport, observed = _timeout_capture_transport() + client = AsyncStream( + api_key="k", + api_secret="s", + base_url="http://test", + transport=transport, + request_timeout=30.0, + ) + await client.get("/app", timeout=httpx.Timeout(2.5)) + assert observed["timeout"]["read"] == 2.5 + assert observed["timeout"]["connect"] == 2.5 + await client.aclose() + + # ── validation ─────────────────────────────────────────────────────── +class TestResolvePoolKnobsExplicitZero: + """`_resolve_pool_knobs` must use `is None`, not truthiness, so callers + who deliberately pass `0` / `0.0` get their value back unchanged. + Regression test for the falsy-fallback bug flagged in CHA-2956 review. + """ + + def test_explicit_zero_preserved(self): + from getstream.base import _resolve_pool_knobs + + class Obj: + max_conns_per_host = 0 + idle_timeout = 0.0 + connect_timeout = 0.0 + + assert _resolve_pool_knobs(Obj()) == (0, 0.0, 0.0) + + def test_missing_attrs_fall_back_to_defaults(self): + from getstream.base import ( + _resolve_pool_knobs, + DEFAULT_MAX_CONNS_PER_HOST, + DEFAULT_IDLE_TIMEOUT, + DEFAULT_CONNECT_TIMEOUT, + ) + + class Obj: + pass + + assert _resolve_pool_knobs(Obj()) == ( + DEFAULT_MAX_CONNS_PER_HOST, + DEFAULT_IDLE_TIMEOUT, + DEFAULT_CONNECT_TIMEOUT, + ) + + def test_none_attrs_fall_back_to_defaults(self): + from getstream.base import ( + _resolve_pool_knobs, + DEFAULT_MAX_CONNS_PER_HOST, + DEFAULT_IDLE_TIMEOUT, + DEFAULT_CONNECT_TIMEOUT, + ) + + class Obj: + max_conns_per_host = None + idle_timeout = None + connect_timeout = None + + assert _resolve_pool_knobs(Obj()) == ( + DEFAULT_MAX_CONNS_PER_HOST, + DEFAULT_IDLE_TIMEOUT, + DEFAULT_CONNECT_TIMEOUT, + ) + + +class TestPoolConfigForwardedOnClone: + """clone_for_token and as_async build fresh top-level clients; they must + carry the source client's pool config, not silently revert to defaults. + """ + + @pytest.fixture + def stream_client(self): + client = Stream( + api_key="k", + api_secret="s", + base_url="http://test", + max_conns_per_host=20, + idle_timeout=120.0, + connect_timeout=3.0, + request_timeout=15.0, + ) + yield client + client.close() + + def test_clone_for_token_preserves_pool_config(self, stream_client): + clone = stream_client.clone_for_token("user-token") + assert clone.max_conns_per_host == 20 + assert clone.idle_timeout == 120.0 + assert clone.connect_timeout == 3.0 + assert clone.request_timeout == 15.0 + pool = clone.client._transport._pool + assert pool._max_connections == 20 + assert pool._keepalive_expiry == 120.0 + + @pytest.mark.asyncio + async def test_as_async_preserves_pool_config(self, stream_client): + aclient = stream_client.as_async() + assert aclient.max_conns_per_host == 20 + assert aclient.idle_timeout == 120.0 + assert aclient.connect_timeout == 3.0 + assert aclient.request_timeout == 15.0 + assert aclient.client._transport._pool._max_connections == 20 + await aclient.aclose() + + +class TestConstructsWithoutStreamEnv: + """Regression for CHA-2956 BLOCKER 2: knob resolution must not require + STREAM_API_KEY in the environment. Passing api_key/api_secret explicitly + in a clean environment (no STREAM_* vars) must succeed and fall back to the + spec defaults, instead of crashing inside ``Settings()`` (api_key is a + required field). + """ + + @staticmethod + def _clear_stream_env(monkeypatch): + for key in list(os.environ): + if key.startswith("STREAM_"): + monkeypatch.delenv(key, raising=False) + + def test_sync_constructs_with_spec_defaults(self, monkeypatch): + self._clear_stream_env(monkeypatch) + client = Stream(api_key="k", api_secret="s", base_url="http://test") + assert client.max_conns_per_host == 5 + assert client.idle_timeout == 55.0 + assert client.connect_timeout == 10.0 + assert client.request_timeout == 30.0 + pool = client.client._transport._pool + assert pool._max_connections == 5 + assert pool._keepalive_expiry == 55.0 + client.close() + + @pytest.mark.asyncio + async def test_async_constructs_with_spec_defaults(self, monkeypatch): + self._clear_stream_env(monkeypatch) + client = AsyncStream(api_key="k", api_secret="s", base_url="http://test") + assert client.max_conns_per_host == 5 + assert client.idle_timeout == 55.0 + assert client.connect_timeout == 10.0 + assert client.request_timeout == 30.0 + await client.aclose() + + class TestValidation: def test_transport_and_http_client_mutually_exclusive(self): with pytest.raises(ValueError, match="Cannot specify both"): diff --git a/tests/test_video_examples.py b/tests/test_video_examples.py index a07c25c3..6f7509e7 100644 --- a/tests/test_video_examples.py +++ b/tests/test_video_examples.py @@ -27,7 +27,7 @@ def test_setup_client(): assert isinstance(client, Stream) assert client.api_key == "your_api_key" assert client.api_secret == "your_api_secret" - assert client.timeout == 6.0 + assert client.timeout == 30.0 def test_create_user(client: Stream): @@ -493,7 +493,7 @@ async def test_async_client(): assert isinstance(client, AsyncStream) assert client.api_key == "your_api_key" assert client.api_secret == "your_api_secret" - assert client.timeout == 6.0 + assert client.timeout == 30.0 @pytest.mark.asyncio diff --git a/uv.lock b/uv.lock index 27a9d801..793e21d5 100644 --- a/uv.lock +++ b/uv.lock @@ -895,7 +895,7 @@ wheels = [ [[package]] name = "getstream" -version = "3.3.4" +version = "3.5.0" source = { editable = "." } dependencies = [ { name = "dataclasses-json" },