From 1289ee8cefb4683c89feaff0a9b433baf2bef883 Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Tue, 26 May 2026 18:52:25 +0200 Subject: [PATCH 01/23] chore: bootstrap CHA-2956 connection pooling branch From 4be6ce80ea48a6fc7b5bd0c9a3d35d81cb828f0c Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Tue, 26 May 2026 18:53:45 +0200 Subject: [PATCH 02/23] feat: add 5-knob HTTP transport config (Stream/AsyncStream constructor + BaseConfig) --- getstream/config.py | 6 +++ getstream/stream.py | 90 ++++++++++++++++++++++++++++++++++----- tests/test_http_client.py | 22 ++++++++++ 3 files changed, 108 insertions(+), 10 deletions(-) diff --git a/getstream/config.py b/getstream/config.py index c507d69a..8373cdba 100644 --- a/getstream/config.py +++ b/getstream/config.py @@ -11,9 +11,15 @@ def __init__( anonymous=False, timeout=None, user_agent=None, + max_conns_per_host=None, + idle_timeout=None, + connect_timeout=None, ): self.anonymous = anonymous self.timeout = timeout + self.max_conns_per_host = max_conns_per_host + self.idle_timeout = idle_timeout + self.connect_timeout = connect_timeout self.base_url = base_url self.params = {"api_key": api_key} diff --git a/getstream/stream.py b/getstream/stream.py index 09c32b1a..bebf76a4 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -27,13 +27,30 @@ BASE_URL = "https://chat.stream-io-api.com/" +# ── Connection pool defaults (spec §4, CHA-2956) ───────────────────── +# DEFAULT_REQUEST_TIMEOUT is the default per-request timeout (was 6.0 prior to 3.5.0). +DEFAULT_REQUEST_TIMEOUT = 30.0 +# DEFAULT_MAX_CONNS_PER_HOST caps concurrent TCP connections per host. +DEFAULT_MAX_CONNS_PER_HOST = 5 +# DEFAULT_IDLE_TIMEOUT sits below the typical 60s LB idle timeout with a 5s safety margin. +DEFAULT_IDLE_TIMEOUT = 55.0 +# DEFAULT_CONNECT_TIMEOUT caps TCP + TLS handshake duration. +DEFAULT_CONNECT_TIMEOUT = 10.0 + class Settings(BaseSettings): - # Env names: STREAM_API_KEY, STREAM_API_SECRET, STREAM_BASE_URL, STREAM_TIMEOUT + # Env names: STREAM_API_KEY, STREAM_API_SECRET, STREAM_BASE_URL, + # STREAM_TIMEOUT, STREAM_REQUEST_TIMEOUT, STREAM_MAX_CONNS_PER_HOST, + # STREAM_IDLE_TIMEOUT, STREAM_CONNECT_TIMEOUT api_key: str api_secret: Optional[str] = None base_url: Optional[str] = None - timeout: float = 6.0 + # `timeout` kept as alias for `request_timeout` for backward compat. + timeout: float = DEFAULT_REQUEST_TIMEOUT + request_timeout: Optional[float] = None + max_conns_per_host: int = DEFAULT_MAX_CONNS_PER_HOST + idle_timeout: float = DEFAULT_IDLE_TIMEOUT + connect_timeout: float = DEFAULT_CONNECT_TIMEOUT model_config = SettingsConfigDict( env_prefix="STREAM_", @@ -45,12 +62,16 @@ def __init__( self, api_key: Optional[str] = None, api_secret: Optional[str] = None, - timeout: Optional[float] = 6.0, + timeout: Optional[float] = None, base_url: Optional[str] = BASE_URL, user_agent: Optional[str] = None, transport=None, http_client=None, token: Optional[str] = None, + request_timeout: Optional[float] = None, + max_conns_per_host: Optional[int] = None, + idle_timeout: Optional[float] = None, + connect_timeout: Optional[float] = None, ): """Build a Stream client. @@ -69,7 +90,8 @@ def __init__( api_secret: Project API secret. Mutually exclusive with ``token``. Falls back to ``STREAM_API_SECRET`` only when ``token`` is also ``None``. - timeout: HTTP request timeout in seconds; must be > 0. + timeout: HTTP request timeout in seconds; must be > 0. Kept as a + backward-compat alias for ``request_timeout``. base_url: API base URL. Falls back to ``STREAM_BASE_URL`` then to the SDK default. user_agent: Optional custom ``User-Agent`` string. @@ -79,12 +101,22 @@ def __init__( exclusive with ``transport``. When provided, sub-clients (video/chat/moderation) reuse it instead of opening their own. token: Pre-minted user JWT. Mutually exclusive with ``api_secret``. + request_timeout: Default per-request timeout in seconds. Default + 30.0. Replaces the older ``timeout`` kwarg; ``timeout`` is + kept as an alias for backward compatibility. + max_conns_per_host: Max concurrent TCP connections per host. + Default 5. Ignored when ``http_client`` is set. + idle_timeout: Idle connection lifetime in seconds. Default 55.0 + (sits 5s under the typical 60s LB idle timeout). Ignored + when ``http_client`` is set. + connect_timeout: TCP + TLS handshake timeout in seconds. + Default 10.0. Ignored when ``http_client`` is set. Raises: ValueError: If both ``transport`` and ``http_client`` are set; if neither ``api_secret`` nor ``token`` can be resolved; if both are provided; if either is the empty string; if ``api_key`` is - missing; or if ``timeout`` is not a positive number. + missing; or if ``request_timeout`` is not a positive number. """ if transport is not None and http_client is not None: raise ValueError("Cannot specify both 'transport' and 'http_client'") @@ -103,6 +135,34 @@ def __init__( if token is None and api_secret is None: api_secret = s.api_secret + # Env fallback + defaults for the 4 pool knobs and request_timeout. + # `timeout` is a backward-compat alias for `request_timeout`. + s_for_pool: Optional[Settings] = None + + def _settings() -> Settings: + nonlocal s_for_pool + if s_for_pool is None: + s_for_pool = Settings() + return s_for_pool + + # request_timeout precedence: explicit kwarg > explicit timeout kwarg + # > STREAM_REQUEST_TIMEOUT > STREAM_TIMEOUT > DEFAULT_REQUEST_TIMEOUT. + if request_timeout is None: + if timeout is not None: + request_timeout = timeout + else: + s = _settings() + request_timeout = ( + s.request_timeout if s.request_timeout is not None else s.timeout + ) + + if max_conns_per_host is None: + max_conns_per_host = _settings().max_conns_per_host + if idle_timeout is None: + idle_timeout = _settings().idle_timeout + if connect_timeout is None: + connect_timeout = _settings().connect_timeout + if not api_key: raise ValueError("api_key is required") if api_secret and token: @@ -115,10 +175,13 @@ def __init__( self.api_key = api_key self._api_secret = api_secret or None - if timeout is not None: - if not isinstance(timeout, (int, float)) or timeout <= 0.0: - raise ValueError("timeout must be a number greater than zero") - self.timeout = timeout + if not isinstance(request_timeout, (int, float)) or request_timeout <= 0.0: + raise ValueError("request_timeout must be a number greater than zero") + self.timeout = request_timeout + self.request_timeout = request_timeout + self.max_conns_per_host = max_conns_per_host + self.idle_timeout = idle_timeout + self.connect_timeout = connect_timeout self.base_url = validate_and_clean_url(base_url) self.user_agent = user_agent @@ -126,7 +189,14 @@ def __init__( self._http_client = http_client self.token = token or self._create_token() super().__init__( - self.api_key, self.base_url, self.token, self.timeout, self.user_agent + self.api_key, + self.base_url, + self.token, + self.timeout, + self.user_agent, + max_conns_per_host=self.max_conns_per_host, + idle_timeout=self.idle_timeout, + connect_timeout=self.connect_timeout, ) # After super().__init__(), self.client is fully built and configured. # When the user provided custom HTTP config, sub-clients share this diff --git a/tests/test_http_client.py b/tests/test_http_client.py index b4c30ce8..551d15eb 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -1,3 +1,5 @@ +import logging + import httpx import pytest @@ -23,6 +25,26 @@ def handler(request: httpx.Request) -> httpx.Response: return httpx.MockTransport(handler), captured +# ── pool defaults (spec §4) ────────────────────────────────────────── + + +class TestSyncPoolDefaults: + def test_default_limits_applied(self): + client = Stream(api_key="k", api_secret="s", base_url="http://test") + pool = client.client._transport._pool + assert pool._max_connections == 5 + assert pool._max_keepalive_connections == 5 + assert pool._keepalive_expiry == 55.0 + + def test_default_timeout_is_30s(self): + client = Stream(api_key="k", api_secret="s", base_url="http://test") + t = client.client.timeout + assert t.connect == 10.0 + assert t.read == 30.0 + assert t.write == 30.0 + assert t.pool == 30.0 + + # ── transport (primary API) ────────────────────────────────────────── From 6315a387cfc8bfe1665e8128ec4843df9fe3db35 Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Tue, 26 May 2026 18:56:33 +0200 Subject: [PATCH 03/23] feat: wire 5 pool knobs into httpx.Client/AsyncClient (limits + timeout) --- getstream/base.py | 72 ++++++++++++++++++++++++++++++++-- getstream/stream.py | 13 +++---- tests/test_http_client.py | 81 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 154 insertions(+), 12 deletions(-) diff --git a/getstream/base.py b/getstream/base.py index 7f2843ff..55d06be1 100644 --- a/getstream/base.py +++ b/getstream/base.py @@ -27,6 +27,28 @@ import ijson +# ── Connection pool defaults (spec §4, CHA-2956) ───────────────────── +# Kept in sync with getstream.stream constants; duplicated here so +# BaseClient/AsyncBaseClient can be instantiated standalone (e.g. by +# sub-clients constructed directly without going through Stream/AsyncStream). +DEFAULT_MAX_CONNS_PER_HOST = 5 +DEFAULT_IDLE_TIMEOUT = 55.0 +DEFAULT_CONNECT_TIMEOUT = 10.0 + + +def _resolve_pool_knobs(obj): + """Pull the 3 pool knobs off ``obj`` if BaseStream has set them, else + fall back to spec defaults. Top-level ``Stream``/``AsyncStream`` sets + them on ``self`` before calling ``super().__init__()``, so a directly + instantiated sub-client (or test fixture) still gets sane values. + """ + return ( + getattr(obj, "max_conns_per_host", None) or DEFAULT_MAX_CONNS_PER_HOST, + getattr(obj, "idle_timeout", None) or DEFAULT_IDLE_TIMEOUT, + getattr(obj, "connect_timeout", None) or DEFAULT_CONNECT_TIMEOUT, + ) + + def _read_file_bytes(file_path: str) -> bytes: with open(file_path, "rb") as f: return f.read() @@ -151,12 +173,20 @@ def __init__( timeout=None, user_agent=None, ): + # The 3 pool knobs (max_conns_per_host, idle_timeout, connect_timeout) + # are set on ``self`` by BaseStream prior to this call when used via + # the top-level ``Stream``/``AsyncStream`` constructors. Sub-clients + # constructed directly use the spec defaults via _resolve_pool_knobs. + max_conns_per_host, idle_timeout, connect_timeout = _resolve_pool_knobs(self) super().__init__( api_key=api_key, base_url=base_url, token=token, timeout=timeout, user_agent=user_agent, + max_conns_per_host=max_conns_per_host, + idle_timeout=idle_timeout, + connect_timeout=connect_timeout, ) http_client = getattr(self, "_http_client", None) if http_client is not None: @@ -173,13 +203,25 @@ def __init__( self.client = http_client self._owns_http_client = False else: + limits = httpx.Limits( + max_connections=self.max_conns_per_host, + max_keepalive_connections=self.max_conns_per_host, + keepalive_expiry=self.idle_timeout, + ) + timeout_obj = httpx.Timeout( + connect=self.connect_timeout, + read=self.timeout, + write=self.timeout, + pool=self.timeout, + ) transport = getattr(self, "_transport", None) if transport is not None: self.client = httpx.Client( base_url=self.base_url or "", headers={**self.headers, "Accept-Encoding": "gzip"}, params=self.params, - timeout=httpx.Timeout(self.timeout), + timeout=timeout_obj, + limits=limits, transport=transport, ) else: @@ -187,7 +229,8 @@ def __init__( base_url=self.base_url or "", headers={**self.headers, "Accept-Encoding": "gzip"}, params=self.params, - timeout=httpx.Timeout(self.timeout), + timeout=timeout_obj, + limits=limits, ) self._owns_http_client = True @@ -392,12 +435,20 @@ def __init__( timeout=None, user_agent=None, ): + # The 3 pool knobs (max_conns_per_host, idle_timeout, connect_timeout) + # are set on ``self`` by BaseStream prior to this call when used via + # the top-level ``Stream``/``AsyncStream`` constructors. Sub-clients + # constructed directly use the spec defaults via _resolve_pool_knobs. + max_conns_per_host, idle_timeout, connect_timeout = _resolve_pool_knobs(self) super().__init__( api_key=api_key, base_url=base_url, token=token, timeout=timeout, user_agent=user_agent, + max_conns_per_host=max_conns_per_host, + idle_timeout=idle_timeout, + connect_timeout=connect_timeout, ) http_client = getattr(self, "_http_client", None) if http_client is not None: @@ -414,13 +465,25 @@ def __init__( self.client = http_client self._owns_http_client = False else: + limits = httpx.Limits( + max_connections=self.max_conns_per_host, + max_keepalive_connections=self.max_conns_per_host, + keepalive_expiry=self.idle_timeout, + ) + timeout_obj = httpx.Timeout( + connect=self.connect_timeout, + read=self.timeout, + write=self.timeout, + pool=self.timeout, + ) transport = getattr(self, "_transport", None) if transport is not None: self.client = httpx.AsyncClient( base_url=self.base_url or "", headers={**self.headers, "Accept-Encoding": "gzip"}, params=self.params, - timeout=httpx.Timeout(self.timeout), + timeout=timeout_obj, + limits=limits, transport=transport, ) else: @@ -428,7 +491,8 @@ def __init__( base_url=self.base_url or "", headers={**self.headers, "Accept-Encoding": "gzip"}, params=self.params, - timeout=httpx.Timeout(self.timeout), + timeout=timeout_obj, + limits=limits, ) self._owns_http_client = True diff --git a/getstream/stream.py b/getstream/stream.py index bebf76a4..94d2310c 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -188,15 +188,12 @@ def _settings() -> Settings: self._transport = transport self._http_client = http_client self.token = token or self._create_token() + # Pool knobs are read by BaseClient via getattr(self, ...) since the + # intermediate generated REST clients (CommonRestClient etc.) do not + # forward these kwargs. self.max_conns_per_host / idle_timeout / + # connect_timeout were set above before super().__init__(). super().__init__( - self.api_key, - self.base_url, - self.token, - self.timeout, - self.user_agent, - max_conns_per_host=self.max_conns_per_host, - idle_timeout=self.idle_timeout, - connect_timeout=self.connect_timeout, + self.api_key, self.base_url, self.token, self.timeout, self.user_agent ) # After super().__init__(), self.client is fully built and configured. # When the user provided custom HTTP config, sub-clients share this diff --git a/tests/test_http_client.py b/tests/test_http_client.py index 551d15eb..42136f38 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -45,6 +45,87 @@ def test_default_timeout_is_30s(self): assert t.pool == 30.0 +@pytest.mark.asyncio +class TestAsyncPoolDefaults: + async def test_default_limits_applied(self): + client = AsyncStream(api_key="k", api_secret="s", base_url="http://test") + pool = client.client._transport._pool + assert pool._max_connections == 5 + assert pool._max_keepalive_connections == 5 + assert pool._keepalive_expiry == 55.0 + await client.aclose() + + async def test_default_timeout_is_30s(self): + client = AsyncStream(api_key="k", api_secret="s", base_url="http://test") + t = client.client.timeout + assert t.connect == 10.0 + assert t.read == 30.0 + assert t.write == 30.0 + assert t.pool == 30.0 + await client.aclose() + + +class TestSyncPoolOverrides: + def _make(self, **kw): + return Stream(api_key="k", api_secret="s", base_url="http://test", **kw) + + def test_max_conns_per_host_override(self): + pool = self._make(max_conns_per_host=20).client._transport._pool + assert pool._max_connections == 20 + assert pool._max_keepalive_connections == 20 + + def test_idle_timeout_override(self): + pool = self._make(idle_timeout=120.0).client._transport._pool + assert pool._keepalive_expiry == 120.0 + + def test_connect_timeout_override(self): + assert self._make(connect_timeout=3.0).client.timeout.connect == 3.0 + + def test_request_timeout_override(self): + t = self._make(request_timeout=15.0).client.timeout + assert t.read == 15.0 and t.write == 15.0 and t.pool == 15.0 + + def test_legacy_timeout_alias_still_works(self): + c = self._make(timeout=12.0) + assert c.client.timeout.read == 12.0 + assert c.timeout == 12.0 and c.request_timeout == 12.0 + + +@pytest.mark.asyncio +class TestAsyncPoolOverrides: + async def _make(self, **kw): + return AsyncStream(api_key="k", api_secret="s", base_url="http://test", **kw) + + async def test_max_conns_per_host_override(self): + c = await self._make(max_conns_per_host=20) + pool = c.client._transport._pool + assert pool._max_connections == 20 + assert pool._max_keepalive_connections == 20 + await c.aclose() + + async def test_idle_timeout_override(self): + c = await self._make(idle_timeout=120.0) + assert c.client._transport._pool._keepalive_expiry == 120.0 + await c.aclose() + + async def test_connect_timeout_override(self): + c = await self._make(connect_timeout=3.0) + assert c.client.timeout.connect == 3.0 + await c.aclose() + + async def test_request_timeout_override(self): + c = await self._make(request_timeout=15.0) + t = c.client.timeout + assert t.read == 15.0 and t.write == 15.0 and t.pool == 15.0 + await c.aclose() + + async def test_legacy_timeout_alias_still_works(self): + c = await self._make(timeout=12.0) + assert c.client.timeout.read == 12.0 + assert c.timeout == 12.0 and c.request_timeout == 12.0 + await c.aclose() + + # ── transport (primary API) ────────────────────────────────────────── From 2209a8c85628fa263e1455243b5e4e8d3d2dc4ec Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Tue, 26 May 2026 18:56:55 +0200 Subject: [PATCH 04/23] test: verify http_client escape hatch bypasses pool knob application --- tests/test_http_client.py | 54 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/tests/test_http_client.py b/tests/test_http_client.py index 42136f38..c87272d2 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -289,6 +289,60 @@ async def test_aclose_does_not_close_user_provided_client(self): assert not custom.is_closed +class TestSyncHttpClientEscapeHatchKnobs: + def test_pool_knobs_ignored_when_http_client_provided(self): + custom_limits = httpx.Limits( + max_connections=99, + max_keepalive_connections=99, + keepalive_expiry=99.0, + ) + custom = httpx.Client(transport=httpx.HTTPTransport(limits=custom_limits)) + client = Stream( + api_key="k", + api_secret="s", + base_url="http://test", + http_client=custom, + max_conns_per_host=1, + idle_timeout=1.0, + connect_timeout=1.0, + request_timeout=1.0, + ) + assert client.client is custom + pool = client.client._transport._pool + assert pool._max_connections == 99 + assert pool._max_keepalive_connections == 99 + assert pool._keepalive_expiry == 99.0 + + +@pytest.mark.asyncio +class TestAsyncHttpClientEscapeHatchKnobs: + async def test_pool_knobs_ignored_when_http_client_provided(self): + custom_limits = httpx.Limits( + max_connections=99, + max_keepalive_connections=99, + keepalive_expiry=99.0, + ) + custom = httpx.AsyncClient( + transport=httpx.AsyncHTTPTransport(limits=custom_limits), + ) + client = AsyncStream( + api_key="k", + api_secret="s", + base_url="http://test", + http_client=custom, + max_conns_per_host=1, + idle_timeout=1.0, + connect_timeout=1.0, + request_timeout=1.0, + ) + assert client.client is custom + pool = client.client._transport._pool + assert pool._max_connections == 99 + assert pool._max_keepalive_connections == 99 + assert pool._keepalive_expiry == 99.0 + await client.aclose() + + # ── validation ─────────────────────────────────────────────────────── From 7a1e18355894c80e0952bc73af66815dab819322 Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Tue, 26 May 2026 18:57:10 +0200 Subject: [PATCH 05/23] test: verify per-call timeout override reaches httpx for sync and async --- tests/test_http_client.py | 45 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/tests/test_http_client.py b/tests/test_http_client.py index c87272d2..0d1516b3 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -343,6 +343,51 @@ async def test_pool_knobs_ignored_when_http_client_provided(self): await client.aclose() +# ── per-call timeout override (spec §5.2) ──────────────────────────── + + +def _timeout_capture_transport(): + observed = {} + + def handler(request: httpx.Request) -> httpx.Response: + observed["timeout"] = request.extensions.get("timeout") + return httpx.Response(200, json={}, request=request) + + return httpx.MockTransport(handler), observed + + +class TestSyncPerCallTimeoutOverride: + def test_per_call_timeout_reaches_httpx(self): + transport, observed = _timeout_capture_transport() + client = Stream( + api_key="k", + api_secret="s", + base_url="http://test", + transport=transport, + request_timeout=30.0, + ) + client.get("/app", timeout=httpx.Timeout(2.5)) + assert observed["timeout"]["read"] == 2.5 + assert observed["timeout"]["connect"] == 2.5 + + +@pytest.mark.asyncio +class TestAsyncPerCallTimeoutOverride: + async def test_per_call_timeout_reaches_httpx(self): + transport, observed = _timeout_capture_transport() + client = AsyncStream( + api_key="k", + api_secret="s", + base_url="http://test", + transport=transport, + request_timeout=30.0, + ) + await client.get("/app", timeout=httpx.Timeout(2.5)) + assert observed["timeout"]["read"] == 2.5 + assert observed["timeout"]["connect"] == 2.5 + await client.aclose() + + # ── validation ─────────────────────────────────────────────────────── From 3efc07491f42e2a47e53a079224347cc5c56e623 Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Tue, 26 May 2026 18:58:01 +0200 Subject: [PATCH 06/23] =?UTF-8?q?feat:=20log=20effective=20transport=20con?= =?UTF-8?q?fig=20on=20client=20construction=20(spec=20=C2=A78)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- getstream/base.py | 24 ++++++++++++++++++++++ tests/test_http_client.py | 42 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/getstream/base.py b/getstream/base.py index 55d06be1..0c88e891 100644 --- a/getstream/base.py +++ b/getstream/base.py @@ -1,4 +1,5 @@ import json +import logging import mimetypes import os import time @@ -36,6 +37,9 @@ DEFAULT_CONNECT_TIMEOUT = 10.0 +logger = logging.getLogger("getstream") + + def _resolve_pool_knobs(obj): """Pull the 3 pool knobs off ``obj`` if BaseStream has set them, else fall back to spec defaults. Top-level ``Stream``/``AsyncStream`` sets @@ -49,6 +53,24 @@ def _resolve_pool_knobs(obj): ) +def _log_pool_config(cfg, *, user_http_client: bool) -> None: + if user_http_client: + logger.info( + "getstream connection pool: user_http_client=True (5 knobs not applied)" + ) + else: + logger.info( + "getstream connection pool: " + "max_conns_per_host=%s idle_timeout=%ss " + "connect_timeout=%ss request_timeout=%ss " + "user_http_client=False", + cfg.max_conns_per_host, + cfg.idle_timeout, + cfg.connect_timeout, + cfg.timeout, + ) + + def _read_file_bytes(file_path: str) -> bytes: with open(file_path, "rb") as f: return f.read() @@ -233,6 +255,7 @@ def __init__( limits=limits, ) self._owns_http_client = True + _log_pool_config(self, user_http_client=http_client is not None) def __enter__(self): return self @@ -495,6 +518,7 @@ def __init__( limits=limits, ) self._owns_http_client = True + _log_pool_config(self, user_http_client=http_client is not None) async def __aenter__(self): return self diff --git a/tests/test_http_client.py b/tests/test_http_client.py index 0d1516b3..c649402e 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -343,6 +343,48 @@ async def test_pool_knobs_ignored_when_http_client_provided(self): await client.aclose() +# ── INFO log on construction (spec §8) ─────────────────────────────── + + +class TestSyncInfoLog: + def test_info_log_emitted_with_defaults(self, caplog): + with caplog.at_level(logging.INFO, logger="getstream"): + Stream(api_key="k", api_secret="s", base_url="http://test") + infos = [r for r in caplog.records if r.name == "getstream"] + assert len(infos) == 1 + msg = infos[0].getMessage() + assert "max_conns_per_host=5" in msg + assert "idle_timeout=55.0s" in msg + assert "connect_timeout=10.0s" in msg + assert "request_timeout=30.0s" in msg + assert "user_http_client=False" in msg + + def test_info_log_when_user_http_client_provided(self, caplog): + custom = httpx.Client(transport=_mock_transport()) + with caplog.at_level(logging.INFO, logger="getstream"): + Stream( + api_key="k", + api_secret="s", + base_url="http://test", + http_client=custom, + ) + infos = [r for r in caplog.records if r.name == "getstream"] + assert len(infos) == 1 + assert "user_http_client=True" in infos[0].getMessage() + + +@pytest.mark.asyncio +class TestAsyncInfoLog: + async def test_info_log_emitted_with_defaults(self, caplog): + with caplog.at_level(logging.INFO, logger="getstream"): + client = AsyncStream(api_key="k", api_secret="s", base_url="http://test") + await client.aclose() + infos = [r for r in caplog.records if r.name == "getstream"] + assert len(infos) == 1 + assert "max_conns_per_host=5" in infos[0].getMessage() + assert "user_http_client=False" in infos[0].getMessage() + + # ── per-call timeout override (spec §5.2) ──────────────────────────── From 56518c3c6f358ff7dde2e907dbfe7a3fa98dbafc Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Tue, 26 May 2026 18:58:15 +0200 Subject: [PATCH 07/23] =?UTF-8?q?test:=20lock=20sync/async=20pool=20config?= =?UTF-8?q?=20parity=20(spec=20=C2=A75.3)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_http_client.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/tests/test_http_client.py b/tests/test_http_client.py index c649402e..95852c60 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -385,6 +385,41 @@ async def test_info_log_emitted_with_defaults(self, caplog): assert "user_http_client=False" in infos[0].getMessage() +# ── sync/async parity (spec §5.3) ──────────────────────────────────── + + +@pytest.mark.asyncio +class TestSyncAsyncParity: + async def test_same_kwargs_produce_same_pool_config(self): + kwargs = dict( + api_key="k", + api_secret="s", + base_url="http://test", + max_conns_per_host=7, + idle_timeout=42.0, + connect_timeout=3.0, + request_timeout=11.0, + ) + sync_c = Stream(**kwargs) + async_c = AsyncStream(**kwargs) + + sp = sync_c.client._transport._pool + ap = async_c.client._transport._pool + assert sp._max_connections == ap._max_connections == 7 + assert sp._max_keepalive_connections == ap._max_keepalive_connections == 7 + assert sp._keepalive_expiry == ap._keepalive_expiry == 42.0 + + st = sync_c.client.timeout + at = async_c.client.timeout + assert st.connect == at.connect == 3.0 + assert st.read == at.read == 11.0 + assert st.write == at.write == 11.0 + assert st.pool == at.pool == 11.0 + + sync_c.close() + await async_c.aclose() + + # ── per-call timeout override (spec §5.2) ──────────────────────────── From b87c9fba9a5a46a2e1643e30d45814dd021e66ac Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Tue, 26 May 2026 18:59:18 +0200 Subject: [PATCH 08/23] docs: changelog for 3.5.0 connection pooling; bump version --- CHANGELOG.md | 29 +++++++++++++++++++++++++++++ pyproject.toml | 2 +- uv.lock | 2 +- 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f3bebeab..fae77db7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Explicit HTTP connection pool configuration ([CHA-2956](https://linear.app/stream/issue/CHA-2956/connection-pooling)). + Four new kwargs on `Stream(...)` and `AsyncStream(...)`: + - `max_conns_per_host: int` — default `5` + - `idle_timeout: float` (seconds) — default `55.0` + - `connect_timeout: float` (seconds) — default `10.0` + - `request_timeout: float` (seconds) — default `30.0` (was `6.0`; see Behavior changes) + + These tune the underlying `httpx.Limits` and `httpx.Timeout`. The existing + `http_client=` and `transport=` kwargs continue to act as escape hatches — when + `http_client` is set, none of the four new kwargs apply. Env-var fallbacks for + the new kwargs: `STREAM_MAX_CONNS_PER_HOST`, `STREAM_IDLE_TIMEOUT`, + `STREAM_CONNECT_TIMEOUT`, `STREAM_REQUEST_TIMEOUT`. + See the [Connection Pooling Spec](https://www.notion.so/stream-wiki/Server-Side-SDK-Connection-Pooling-Spec-3496a5d7f9f680749b8be9ee238ae108). +- INFO log on client construction (logger `getstream`) lists the effective pool + config and whether a user-supplied `http_client` is in use (spec §8). + - Webhook handling spec helpers (CHA-2961): `UnknownEvent` dataclass for forward-compat; `gunzip_payload`, `decode_sqs_payload`, `decode_sns_payload` primitives; `parse_event` (returns typed event or `UnknownEvent` for @@ -35,9 +51,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- **Default `request_timeout` is now `30.0` seconds (was `6.0`).** Aligns + stream-py with the cross-SDK contract in CHA-2956. Existing callers using + `timeout=` are unaffected — `timeout` is kept as an alias for `request_timeout`. + Callers relying on the 6s ceiling for fail-fast behavior should pass + `request_timeout=6.0` (or `timeout=6.0`) explicitly. +- Default HTTP transport now caps connections per host at `5` and closes idle + sockets after `55.0s`. Previous default was httpx's `100` max-connections + with `5.0s` keep-alive expiry. - No breaking changes. All existing webhook helpers (`verify_webhook_signature`, `parse_webhook_event`, `get_event_type`, event type constants) are preserved. +### Notes + +- Per-call `timeout=httpx.Timeout(...)` continues to work through `.get(...)`, + `.post(...)`, etc., and pre-empts the client-level `request_timeout`. + [Spec](https://www.notion.so/stream-wiki/Server-Side-SDK-Webhook-Handling-Spec-34b6a5d7f9f681e78003c443f227493c) ## [3.0.0b1] - 2026-02-27 diff --git a/pyproject.toml b/pyproject.toml index 1eeb6bc3..f8703fe1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "getstream" -version = "3.4.0" +version = "3.5.0" description = "GetStream Python SDK - Build scalable activity feeds, chat, and video calling applications" authors = [ { name = "sachaarbonel", email = "sacha.arbonel@hotmail.fr" }, diff --git a/uv.lock b/uv.lock index 27a9d801..793e21d5 100644 --- a/uv.lock +++ b/uv.lock @@ -895,7 +895,7 @@ wheels = [ [[package]] name = "getstream" -version = "3.3.4" +version = "3.5.0" source = { editable = "." } dependencies = [ { name = "dataclasses-json" }, From 7fbcf683480ec8f49404794c86eedc70fb90be57 Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Tue, 26 May 2026 19:31:17 +0200 Subject: [PATCH 09/23] fix: _resolve_pool_knobs preserves explicit 0/0.0 via is-None checks Caller-provided falsy values (max_conns_per_host=0, idle_timeout=0.0) were silently replaced with spec defaults because of the 'getattr(...) or DEFAULT_*' pattern. Switch to 'is None' so the resolver only substitutes a default when the attribute is missing or explicitly None. Adds TestResolvePoolKnobsExplicitZero with three cases: explicit 0 preserved, missing attrs fall back, explicit None falls back. --- getstream/base.py | 12 ++++++--- tests/test_http_client.py | 53 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/getstream/base.py b/getstream/base.py index 0c88e891..aa5812bd 100644 --- a/getstream/base.py +++ b/getstream/base.py @@ -45,11 +45,17 @@ def _resolve_pool_knobs(obj): fall back to spec defaults. Top-level ``Stream``/``AsyncStream`` sets them on ``self`` before calling ``super().__init__()``, so a directly instantiated sub-client (or test fixture) still gets sane values. + + `is None` (not truthiness) so an explicit `0` / `0.0` from the caller is + preserved rather than silently swapped for a default. """ + max_conns_per_host = getattr(obj, "max_conns_per_host", None) + idle_timeout = getattr(obj, "idle_timeout", None) + connect_timeout = getattr(obj, "connect_timeout", None) return ( - getattr(obj, "max_conns_per_host", None) or DEFAULT_MAX_CONNS_PER_HOST, - getattr(obj, "idle_timeout", None) or DEFAULT_IDLE_TIMEOUT, - getattr(obj, "connect_timeout", None) or DEFAULT_CONNECT_TIMEOUT, + DEFAULT_MAX_CONNS_PER_HOST if max_conns_per_host is None else max_conns_per_host, + DEFAULT_IDLE_TIMEOUT if idle_timeout is None else idle_timeout, + DEFAULT_CONNECT_TIMEOUT if connect_timeout is None else connect_timeout, ) diff --git a/tests/test_http_client.py b/tests/test_http_client.py index 95852c60..727ffd9b 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -468,6 +468,59 @@ async def test_per_call_timeout_reaches_httpx(self): # ── validation ─────────────────────────────────────────────────────── +class TestResolvePoolKnobsExplicitZero: + """`_resolve_pool_knobs` must use `is None`, not truthiness, so callers + who deliberately pass `0` / `0.0` get their value back unchanged. + Regression test for the falsy-fallback bug flagged in CHA-2956 review. + """ + + def test_explicit_zero_preserved(self): + from getstream.base import _resolve_pool_knobs + + class Obj: + max_conns_per_host = 0 + idle_timeout = 0.0 + connect_timeout = 0.0 + + assert _resolve_pool_knobs(Obj()) == (0, 0.0, 0.0) + + def test_missing_attrs_fall_back_to_defaults(self): + from getstream.base import ( + _resolve_pool_knobs, + DEFAULT_MAX_CONNS_PER_HOST, + DEFAULT_IDLE_TIMEOUT, + DEFAULT_CONNECT_TIMEOUT, + ) + + class Obj: + pass + + assert _resolve_pool_knobs(Obj()) == ( + DEFAULT_MAX_CONNS_PER_HOST, + DEFAULT_IDLE_TIMEOUT, + DEFAULT_CONNECT_TIMEOUT, + ) + + def test_none_attrs_fall_back_to_defaults(self): + from getstream.base import ( + _resolve_pool_knobs, + DEFAULT_MAX_CONNS_PER_HOST, + DEFAULT_IDLE_TIMEOUT, + DEFAULT_CONNECT_TIMEOUT, + ) + + class Obj: + max_conns_per_host = None + idle_timeout = None + connect_timeout = None + + assert _resolve_pool_knobs(Obj()) == ( + DEFAULT_MAX_CONNS_PER_HOST, + DEFAULT_IDLE_TIMEOUT, + DEFAULT_CONNECT_TIMEOUT, + ) + + class TestValidation: def test_transport_and_http_client_mutually_exclusive(self): with pytest.raises(ValueError, match="Cannot specify both"): From 3c5f546a5b8c3179dddd61e41921b2c3f044cd09 Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Wed, 27 May 2026 10:34:43 +0200 Subject: [PATCH 10/23] style: wrap long ternary in _resolve_pool_knobs for ruff format --- getstream/base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/getstream/base.py b/getstream/base.py index aa5812bd..6653d3a6 100644 --- a/getstream/base.py +++ b/getstream/base.py @@ -53,7 +53,9 @@ def _resolve_pool_knobs(obj): idle_timeout = getattr(obj, "idle_timeout", None) connect_timeout = getattr(obj, "connect_timeout", None) return ( - DEFAULT_MAX_CONNS_PER_HOST if max_conns_per_host is None else max_conns_per_host, + DEFAULT_MAX_CONNS_PER_HOST + if max_conns_per_host is None + else max_conns_per_host, DEFAULT_IDLE_TIMEOUT if idle_timeout is None else idle_timeout, DEFAULT_CONNECT_TIMEOUT if connect_timeout is None else connect_timeout, ) From 9ce1772a046b1c0a3cddb973a3864ffcdf445ce3 Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Wed, 27 May 2026 10:59:01 +0200 Subject: [PATCH 11/23] test: update video example default-timeout assertions to 30.0 test_setup_client and test_async_client asserted the old 6.0s default timeout; CHA-2956 moved the default request timeout to 30s. The http-client suite was updated in the original commit but these two construction smoke tests were missed. --- tests/test_video_examples.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_video_examples.py b/tests/test_video_examples.py index a07c25c3..6f7509e7 100644 --- a/tests/test_video_examples.py +++ b/tests/test_video_examples.py @@ -27,7 +27,7 @@ def test_setup_client(): assert isinstance(client, Stream) assert client.api_key == "your_api_key" assert client.api_secret == "your_api_secret" - assert client.timeout == 6.0 + assert client.timeout == 30.0 def test_create_user(client: Stream): @@ -493,7 +493,7 @@ async def test_async_client(): assert isinstance(client, AsyncStream) assert client.api_key == "your_api_key" assert client.api_secret == "your_api_secret" - assert client.timeout == 6.0 + assert client.timeout == 30.0 @pytest.mark.asyncio From 697b64cd28d5e6de7e5971ebbaaa9e0d99c3344a Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Wed, 27 May 2026 12:01:08 +0200 Subject: [PATCH 12/23] docs: remove internal Notion links and spec-section refs from customer-facing content --- CHANGELOG.md | 5 +---- getstream/base.py | 2 +- getstream/stream.py | 2 +- getstream/tests/test_webhook.py | 2 +- tests/test_http_client.py | 8 ++++---- 5 files changed, 8 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fae77db7..538ce272 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,9 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 `http_client` is set, none of the four new kwargs apply. Env-var fallbacks for the new kwargs: `STREAM_MAX_CONNS_PER_HOST`, `STREAM_IDLE_TIMEOUT`, `STREAM_CONNECT_TIMEOUT`, `STREAM_REQUEST_TIMEOUT`. - See the [Connection Pooling Spec](https://www.notion.so/stream-wiki/Server-Side-SDK-Connection-Pooling-Spec-3496a5d7f9f680749b8be9ee238ae108). - INFO log on client construction (logger `getstream`) lists the effective pool - config and whether a user-supplied `http_client` is in use (spec §8). + config and whether a user-supplied `http_client` is in use. - Webhook handling spec helpers (CHA-2961): `UnknownEvent` dataclass for forward-compat; `gunzip_payload`, `decode_sqs_payload`, `decode_sns_payload` @@ -67,8 +66,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Per-call `timeout=httpx.Timeout(...)` continues to work through `.get(...)`, `.post(...)`, etc., and pre-empts the client-level `request_timeout`. -[Spec](https://www.notion.so/stream-wiki/Server-Side-SDK-Webhook-Handling-Spec-34b6a5d7f9f681e78003c443f227493c) - ## [3.0.0b1] - 2026-02-27 ### Breaking Changes diff --git a/getstream/base.py b/getstream/base.py index 6653d3a6..9956d28e 100644 --- a/getstream/base.py +++ b/getstream/base.py @@ -28,7 +28,7 @@ import ijson -# ── Connection pool defaults (spec §4, CHA-2956) ───────────────────── +# ── Connection pool defaults (CHA-2956) ────────────────────────────── # Kept in sync with getstream.stream constants; duplicated here so # BaseClient/AsyncBaseClient can be instantiated standalone (e.g. by # sub-clients constructed directly without going through Stream/AsyncStream). diff --git a/getstream/stream.py b/getstream/stream.py index 94d2310c..8f372313 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -27,7 +27,7 @@ BASE_URL = "https://chat.stream-io-api.com/" -# ── Connection pool defaults (spec §4, CHA-2956) ───────────────────── +# ── Connection pool defaults (CHA-2956) ────────────────────────────── # DEFAULT_REQUEST_TIMEOUT is the default per-request timeout (was 6.0 prior to 3.5.0). DEFAULT_REQUEST_TIMEOUT = 30.0 # DEFAULT_MAX_CONNS_PER_HOST caps concurrent TCP connections per host. diff --git a/getstream/tests/test_webhook.py b/getstream/tests/test_webhook.py index 96f8fe22..8cfe1608 100644 --- a/getstream/tests/test_webhook.py +++ b/getstream/tests/test_webhook.py @@ -1109,7 +1109,7 @@ def test_invalid_json(self): # Conformance fixture suite (CHA-2961). The fixture set is generated by # chat/'s `chat-manager openapi generate-webhook-fixtures` subcommand; the # repo's generate.sh copies it into tests/assets/webhooks/ at codegen time -# (per stream-py AGENTS.md §8: test file assets live under tests/assets/). +# (test file assets live under tests/assets/). # --------------------------------------------------------------------------- FIXTURE_ROOT = Path(__file__).resolve().parents[2] / "tests" / "assets" / "webhooks" diff --git a/tests/test_http_client.py b/tests/test_http_client.py index 727ffd9b..796ef863 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -25,7 +25,7 @@ def handler(request: httpx.Request) -> httpx.Response: return httpx.MockTransport(handler), captured -# ── pool defaults (spec §4) ────────────────────────────────────────── +# ── pool defaults ──────────────────────────────────────────────────── class TestSyncPoolDefaults: @@ -343,7 +343,7 @@ async def test_pool_knobs_ignored_when_http_client_provided(self): await client.aclose() -# ── INFO log on construction (spec §8) ─────────────────────────────── +# ── INFO log on construction ───────────────────────────────────────── class TestSyncInfoLog: @@ -385,7 +385,7 @@ async def test_info_log_emitted_with_defaults(self, caplog): assert "user_http_client=False" in infos[0].getMessage() -# ── sync/async parity (spec §5.3) ──────────────────────────────────── +# ── sync/async parity ──────────────────────────────────────────────── @pytest.mark.asyncio @@ -420,7 +420,7 @@ async def test_same_kwargs_produce_same_pool_config(self): await async_c.aclose() -# ── per-call timeout override (spec §5.2) ──────────────────────────── +# ── per-call timeout override ──────────────────────────────────────── def _timeout_capture_transport(): From 9336c1ab8849ddee74e3c35b259f22363cb4d9aa Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Wed, 27 May 2026 12:18:19 +0200 Subject: [PATCH 13/23] docs: unwrap hard-wrapped CHA-2956 prose into natural lines --- CHANGELOG.md | 22 ++++------------ getstream/base.py | 22 ++++------------ getstream/stream.py | 61 +++++++++++++-------------------------------- 3 files changed, 27 insertions(+), 78 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 538ce272..90b6a9c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,13 +16,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `connect_timeout: float` (seconds) — default `10.0` - `request_timeout: float` (seconds) — default `30.0` (was `6.0`; see Behavior changes) - These tune the underlying `httpx.Limits` and `httpx.Timeout`. The existing - `http_client=` and `transport=` kwargs continue to act as escape hatches — when - `http_client` is set, none of the four new kwargs apply. Env-var fallbacks for - the new kwargs: `STREAM_MAX_CONNS_PER_HOST`, `STREAM_IDLE_TIMEOUT`, - `STREAM_CONNECT_TIMEOUT`, `STREAM_REQUEST_TIMEOUT`. -- INFO log on client construction (logger `getstream`) lists the effective pool - config and whether a user-supplied `http_client` is in use. + These tune the underlying `httpx.Limits` and `httpx.Timeout`. The existing `http_client=` and `transport=` kwargs continue to act as escape hatches — when `http_client` is set, none of the four new kwargs apply. Env-var fallbacks for the new kwargs: `STREAM_MAX_CONNS_PER_HOST`, `STREAM_IDLE_TIMEOUT`, `STREAM_CONNECT_TIMEOUT`, `STREAM_REQUEST_TIMEOUT`. +- INFO log on client construction (logger `getstream`) lists the effective pool config and whether a user-supplied `http_client` is in use. - Webhook handling spec helpers (CHA-2961): `UnknownEvent` dataclass for forward-compat; `gunzip_payload`, `decode_sqs_payload`, `decode_sns_payload` @@ -50,21 +45,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -- **Default `request_timeout` is now `30.0` seconds (was `6.0`).** Aligns - stream-py with the cross-SDK contract in CHA-2956. Existing callers using - `timeout=` are unaffected — `timeout` is kept as an alias for `request_timeout`. - Callers relying on the 6s ceiling for fail-fast behavior should pass - `request_timeout=6.0` (or `timeout=6.0`) explicitly. -- Default HTTP transport now caps connections per host at `5` and closes idle - sockets after `55.0s`. Previous default was httpx's `100` max-connections - with `5.0s` keep-alive expiry. +- **Default `request_timeout` is now `30.0` seconds (was `6.0`).** Aligns stream-py with the cross-SDK contract in CHA-2956. Existing callers using `timeout=` are unaffected — `timeout` is kept as an alias for `request_timeout`. Callers relying on the 6s ceiling for fail-fast behavior should pass `request_timeout=6.0` (or `timeout=6.0`) explicitly. +- Default HTTP transport now caps connections per host at `5` and closes idle sockets after `55.0s`. Previous default was httpx's `100` max-connections with `5.0s` keep-alive expiry. - No breaking changes. All existing webhook helpers (`verify_webhook_signature`, `parse_webhook_event`, `get_event_type`, event type constants) are preserved. ### Notes -- Per-call `timeout=httpx.Timeout(...)` continues to work through `.get(...)`, - `.post(...)`, etc., and pre-empts the client-level `request_timeout`. +- Per-call `timeout=httpx.Timeout(...)` continues to work through `.get(...)`, `.post(...)`, etc., and pre-empts the client-level `request_timeout`. ## [3.0.0b1] - 2026-02-27 diff --git a/getstream/base.py b/getstream/base.py index 9956d28e..c1eeecdc 100644 --- a/getstream/base.py +++ b/getstream/base.py @@ -29,9 +29,7 @@ # ── Connection pool defaults (CHA-2956) ────────────────────────────── -# Kept in sync with getstream.stream constants; duplicated here so -# BaseClient/AsyncBaseClient can be instantiated standalone (e.g. by -# sub-clients constructed directly without going through Stream/AsyncStream). +# Kept in sync with getstream.stream constants; duplicated here so BaseClient/AsyncBaseClient can be instantiated standalone (e.g. by sub-clients constructed directly without going through Stream/AsyncStream). DEFAULT_MAX_CONNS_PER_HOST = 5 DEFAULT_IDLE_TIMEOUT = 55.0 DEFAULT_CONNECT_TIMEOUT = 10.0 @@ -41,13 +39,9 @@ def _resolve_pool_knobs(obj): - """Pull the 3 pool knobs off ``obj`` if BaseStream has set them, else - fall back to spec defaults. Top-level ``Stream``/``AsyncStream`` sets - them on ``self`` before calling ``super().__init__()``, so a directly - instantiated sub-client (or test fixture) still gets sane values. + """Pull the 3 pool knobs off ``obj`` if BaseStream has set them, else fall back to spec defaults. Top-level ``Stream``/``AsyncStream`` sets them on ``self`` before calling ``super().__init__()``, so a directly instantiated sub-client (or test fixture) still gets sane values. - `is None` (not truthiness) so an explicit `0` / `0.0` from the caller is - preserved rather than silently swapped for a default. + `is None` (not truthiness) so an explicit `0` / `0.0` from the caller is preserved rather than silently swapped for a default. """ max_conns_per_host = getattr(obj, "max_conns_per_host", None) idle_timeout = getattr(obj, "idle_timeout", None) @@ -203,10 +197,7 @@ def __init__( timeout=None, user_agent=None, ): - # The 3 pool knobs (max_conns_per_host, idle_timeout, connect_timeout) - # are set on ``self`` by BaseStream prior to this call when used via - # the top-level ``Stream``/``AsyncStream`` constructors. Sub-clients - # constructed directly use the spec defaults via _resolve_pool_knobs. + # The 3 pool knobs (max_conns_per_host, idle_timeout, connect_timeout) are set on ``self`` by BaseStream prior to this call when used via the top-level ``Stream``/``AsyncStream`` constructors. Sub-clients constructed directly use the spec defaults via _resolve_pool_knobs. max_conns_per_host, idle_timeout, connect_timeout = _resolve_pool_knobs(self) super().__init__( api_key=api_key, @@ -466,10 +457,7 @@ def __init__( timeout=None, user_agent=None, ): - # The 3 pool knobs (max_conns_per_host, idle_timeout, connect_timeout) - # are set on ``self`` by BaseStream prior to this call when used via - # the top-level ``Stream``/``AsyncStream`` constructors. Sub-clients - # constructed directly use the spec defaults via _resolve_pool_knobs. + # The 3 pool knobs (max_conns_per_host, idle_timeout, connect_timeout) are set on ``self`` by BaseStream prior to this call when used via the top-level ``Stream``/``AsyncStream`` constructors. Sub-clients constructed directly use the spec defaults via _resolve_pool_knobs. max_conns_per_host, idle_timeout, connect_timeout = _resolve_pool_knobs(self) super().__init__( api_key=api_key, diff --git a/getstream/stream.py b/getstream/stream.py index 8f372313..eb21098d 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -39,9 +39,7 @@ class Settings(BaseSettings): - # Env names: STREAM_API_KEY, STREAM_API_SECRET, STREAM_BASE_URL, - # STREAM_TIMEOUT, STREAM_REQUEST_TIMEOUT, STREAM_MAX_CONNS_PER_HOST, - # STREAM_IDLE_TIMEOUT, STREAM_CONNECT_TIMEOUT + # Env names: STREAM_API_KEY, STREAM_API_SECRET, STREAM_BASE_URL, STREAM_TIMEOUT, STREAM_REQUEST_TIMEOUT, STREAM_MAX_CONNS_PER_HOST, STREAM_IDLE_TIMEOUT, STREAM_CONNECT_TIMEOUT api_key: str api_secret: Optional[str] = None base_url: Optional[str] = None @@ -76,47 +74,27 @@ def __init__( """Build a Stream client. Pass exactly one of ``api_secret`` or ``token``: - - ``api_secret`` enables a server-side client that can mint user tokens - and call protected admin endpoints. - - ``token`` enables a client-side client authenticated as a single - user. Token-only clients cannot mint tokens or call admin endpoints. + - ``api_secret`` enables a server-side client that can mint user tokens and call protected admin endpoints. + - ``token`` enables a client-side client authenticated as a single user. Token-only clients cannot mint tokens or call admin endpoints. - Any of ``api_key``, ``api_secret``, ``base_url`` left as ``None`` are - loaded from ``STREAM_*`` env vars; passing ``token`` skips the - ``api_secret`` env fallback. + Any of ``api_key``, ``api_secret``, ``base_url`` left as ``None`` are loaded from ``STREAM_*`` env vars; passing ``token`` skips the ``api_secret`` env fallback. Args: api_key: Project API key. Falls back to ``STREAM_API_KEY``. - api_secret: Project API secret. Mutually exclusive with ``token``. - Falls back to ``STREAM_API_SECRET`` only when ``token`` is also - ``None``. - timeout: HTTP request timeout in seconds; must be > 0. Kept as a - backward-compat alias for ``request_timeout``. - base_url: API base URL. Falls back to ``STREAM_BASE_URL`` then to - the SDK default. + api_secret: Project API secret. Mutually exclusive with ``token``. Falls back to ``STREAM_API_SECRET`` only when ``token`` is also ``None``. + timeout: HTTP request timeout in seconds; must be > 0. Kept as a backward-compat alias for ``request_timeout``. + base_url: API base URL. Falls back to ``STREAM_BASE_URL`` then to the SDK default. user_agent: Optional custom ``User-Agent`` string. - transport: Optional ``httpx`` transport. Mutually exclusive with - ``http_client``. - http_client: Optional pre-built ``httpx`` client. Mutually - exclusive with ``transport``. When provided, sub-clients - (video/chat/moderation) reuse it instead of opening their own. + transport: Optional ``httpx`` transport. Mutually exclusive with ``http_client``. + http_client: Optional pre-built ``httpx`` client. Mutually exclusive with ``transport``. When provided, sub-clients (video/chat/moderation) reuse it instead of opening their own. token: Pre-minted user JWT. Mutually exclusive with ``api_secret``. - request_timeout: Default per-request timeout in seconds. Default - 30.0. Replaces the older ``timeout`` kwarg; ``timeout`` is - kept as an alias for backward compatibility. - max_conns_per_host: Max concurrent TCP connections per host. - Default 5. Ignored when ``http_client`` is set. - idle_timeout: Idle connection lifetime in seconds. Default 55.0 - (sits 5s under the typical 60s LB idle timeout). Ignored - when ``http_client`` is set. - connect_timeout: TCP + TLS handshake timeout in seconds. - Default 10.0. Ignored when ``http_client`` is set. + request_timeout: Default per-request timeout in seconds. Default 30.0. Replaces the older ``timeout`` kwarg; ``timeout`` is kept as an alias for backward compatibility. + max_conns_per_host: Max concurrent TCP connections per host. Default 5. Ignored when ``http_client`` is set. + idle_timeout: Idle connection lifetime in seconds. Default 55.0 (sits 5s under the typical 60s LB idle timeout). Ignored when ``http_client`` is set. + connect_timeout: TCP + TLS handshake timeout in seconds. Default 10.0. Ignored when ``http_client`` is set. Raises: - ValueError: If both ``transport`` and ``http_client`` are set; if - neither ``api_secret`` nor ``token`` can be resolved; if both - are provided; if either is the empty string; if ``api_key`` is - missing; or if ``request_timeout`` is not a positive number. + ValueError: If both ``transport`` and ``http_client`` are set; if neither ``api_secret`` nor ``token`` can be resolved; if both are provided; if either is the empty string; if ``api_key`` is missing; or if ``request_timeout`` is not a positive number. """ if transport is not None and http_client is not None: raise ValueError("Cannot specify both 'transport' and 'http_client'") @@ -135,8 +113,7 @@ def __init__( if token is None and api_secret is None: api_secret = s.api_secret - # Env fallback + defaults for the 4 pool knobs and request_timeout. - # `timeout` is a backward-compat alias for `request_timeout`. + # Env fallback + defaults for the 4 pool knobs and request_timeout. `timeout` is a backward-compat alias for `request_timeout`. s_for_pool: Optional[Settings] = None def _settings() -> Settings: @@ -145,8 +122,7 @@ def _settings() -> Settings: s_for_pool = Settings() return s_for_pool - # request_timeout precedence: explicit kwarg > explicit timeout kwarg - # > STREAM_REQUEST_TIMEOUT > STREAM_TIMEOUT > DEFAULT_REQUEST_TIMEOUT. + # request_timeout precedence: explicit kwarg > explicit timeout kwarg > STREAM_REQUEST_TIMEOUT > STREAM_TIMEOUT > DEFAULT_REQUEST_TIMEOUT. if request_timeout is None: if timeout is not None: request_timeout = timeout @@ -188,10 +164,7 @@ def _settings() -> Settings: self._transport = transport self._http_client = http_client self.token = token or self._create_token() - # Pool knobs are read by BaseClient via getattr(self, ...) since the - # intermediate generated REST clients (CommonRestClient etc.) do not - # forward these kwargs. self.max_conns_per_host / idle_timeout / - # connect_timeout were set above before super().__init__(). + # Pool knobs are read by BaseClient via getattr(self, ...) since the intermediate generated REST clients (CommonRestClient etc.) do not forward these kwargs. self.max_conns_per_host / idle_timeout / connect_timeout were set above before super().__init__(). super().__init__( self.api_key, self.base_url, self.token, self.timeout, self.user_agent ) From ecc7d1cb034f5a770c53872ed2fc3e500317bda2 Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Wed, 27 May 2026 12:24:37 +0200 Subject: [PATCH 14/23] docs: replace em dashes with plain ASCII in CHA-2956 content --- CHANGELOG.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 90b6a9c0..f81d9814 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,12 +11,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Explicit HTTP connection pool configuration ([CHA-2956](https://linear.app/stream/issue/CHA-2956/connection-pooling)). Four new kwargs on `Stream(...)` and `AsyncStream(...)`: - - `max_conns_per_host: int` — default `5` - - `idle_timeout: float` (seconds) — default `55.0` - - `connect_timeout: float` (seconds) — default `10.0` - - `request_timeout: float` (seconds) — default `30.0` (was `6.0`; see Behavior changes) + - `max_conns_per_host: int` - default `5` + - `idle_timeout: float` (seconds) - default `55.0` + - `connect_timeout: float` (seconds) - default `10.0` + - `request_timeout: float` (seconds) - default `30.0` (was `6.0`; see Behavior changes) - These tune the underlying `httpx.Limits` and `httpx.Timeout`. The existing `http_client=` and `transport=` kwargs continue to act as escape hatches — when `http_client` is set, none of the four new kwargs apply. Env-var fallbacks for the new kwargs: `STREAM_MAX_CONNS_PER_HOST`, `STREAM_IDLE_TIMEOUT`, `STREAM_CONNECT_TIMEOUT`, `STREAM_REQUEST_TIMEOUT`. + These tune the underlying `httpx.Limits` and `httpx.Timeout`. The existing `http_client=` and `transport=` kwargs continue to act as escape hatches; when `http_client` is set, none of the four new kwargs apply. Env-var fallbacks for the new kwargs: `STREAM_MAX_CONNS_PER_HOST`, `STREAM_IDLE_TIMEOUT`, `STREAM_CONNECT_TIMEOUT`, `STREAM_REQUEST_TIMEOUT`. - INFO log on client construction (logger `getstream`) lists the effective pool config and whether a user-supplied `http_client` is in use. - Webhook handling spec helpers (CHA-2961): `UnknownEvent` dataclass for @@ -45,7 +45,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -- **Default `request_timeout` is now `30.0` seconds (was `6.0`).** Aligns stream-py with the cross-SDK contract in CHA-2956. Existing callers using `timeout=` are unaffected — `timeout` is kept as an alias for `request_timeout`. Callers relying on the 6s ceiling for fail-fast behavior should pass `request_timeout=6.0` (or `timeout=6.0`) explicitly. +- **Default `request_timeout` is now `30.0` seconds (was `6.0`).** Aligns stream-py with the cross-SDK contract in CHA-2956. Existing callers using `timeout=` are unaffected; `timeout` is kept as an alias for `request_timeout`. Callers relying on the 6s ceiling for fail-fast behavior should pass `request_timeout=6.0` (or `timeout=6.0`) explicitly. - Default HTTP transport now caps connections per host at `5` and closes idle sockets after `55.0s`. Previous default was httpx's `100` max-connections with `5.0s` keep-alive expiry. - No breaking changes. All existing webhook helpers (`verify_webhook_signature`, `parse_webhook_event`, `get_event_type`, event type constants) are preserved. From cfe2b02da4ed9be39e4ccda811d31be009af7755 Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Wed, 27 May 2026 13:07:24 +0200 Subject: [PATCH 15/23] docs: replace em dashes with natural punctuation --- CHANGELOG.md | 4 ++-- getstream/base.py | 4 ++-- getstream/stream.py | 2 +- getstream/tests/test_webhook.py | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f81d9814..1c4f3ece 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,12 +25,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 unrecognized discriminators); `verify_signature` canonical alias of `verify_webhook_signature`; `verify_and_parse_webhook` HTTP composite (gunzip + verify + parse); `parse_sqs` and `parse_sns` queue composites - (no signature parameter — queue transports are authenticated by AWS IAM, + (no signature parameter: queue transports are authenticated by AWS IAM, so the backend emits no HMAC for queue messages today). Transparent gzip via magic-byte detection. - New instance methods on `Stream` and `AsyncStream`: `verify_signature(body, signature)` and - `verify_and_parse_webhook(body, signature)` — drop the api_secret parameter + `verify_and_parse_webhook(body, signature)` that drop the api_secret parameter in favor of the client's stored secret. Dual API: the module-level functions in `getstream.webhook` remain available for callers who want explicit secret control. diff --git a/getstream/base.py b/getstream/base.py index c1eeecdc..d57d98f2 100644 --- a/getstream/base.py +++ b/getstream/base.py @@ -441,7 +441,7 @@ def close(self): Close HTTPX client. If the client was provided externally via ``http_client``, this is a - no-op — the caller that created the client is responsible for closing + no-op; the caller that created the client is responsible for closing it. """ if getattr(self, "_owns_http_client", True): @@ -526,7 +526,7 @@ async def aclose(self): """Close HTTPX async client (closes pools/keep-alives). If the client was provided externally via ``http_client``, this is a - no-op — the caller that created the client is responsible for closing + no-op; the caller that created the client is responsible for closing it. """ if getattr(self, "_owns_http_client", True): diff --git a/getstream/stream.py b/getstream/stream.py index eb21098d..16ebd129 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -424,7 +424,7 @@ def verify_and_parse_webhook(self, body, signature): parse failures. Note: this is intentionally a synchronous ``def`` rather than ``async - def`` because it performs no I/O — it's CPU-bound (HMAC + gzip + JSON + def`` because it performs no I/O; it's CPU-bound (HMAC + gzip + JSON parsing). """ from .webhook import verify_and_parse_webhook as _verify_and_parse_webhook diff --git a/getstream/tests/test_webhook.py b/getstream/tests/test_webhook.py index 8cfe1608..07b6bfad 100644 --- a/getstream/tests/test_webhook.py +++ b/getstream/tests/test_webhook.py @@ -1237,7 +1237,7 @@ def test_bad_base64(self): # Per CHA-3071 wire format: decode_sqs_payload falls back to raw bytes when # base64 decoding fails (uncompressed wire format). For input that is # neither valid base64 nor valid JSON nor gzip-prefixed, parse_sqs still - # raises InvalidWebhookError — just down the chain at JSON parsing. + # raises InvalidWebhookError, just down the chain at JSON parsing. msg = _read_str(self._neg("bad_base64") / "sqs_body.txt") with pytest.raises(InvalidWebhookError): parse_sqs(msg) From 689bbf35d697acb15554d19ecdc500df4ad43d3d Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Wed, 27 May 2026 14:02:47 +0200 Subject: [PATCH 16/23] docs: use colon separator in CHANGELOG option list --- CHANGELOG.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c4f3ece..af84cc54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,10 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Explicit HTTP connection pool configuration ([CHA-2956](https://linear.app/stream/issue/CHA-2956/connection-pooling)). Four new kwargs on `Stream(...)` and `AsyncStream(...)`: - - `max_conns_per_host: int` - default `5` - - `idle_timeout: float` (seconds) - default `55.0` - - `connect_timeout: float` (seconds) - default `10.0` - - `request_timeout: float` (seconds) - default `30.0` (was `6.0`; see Behavior changes) + - `max_conns_per_host: int`: default `5` + - `idle_timeout: float` (seconds): default `55.0` + - `connect_timeout: float` (seconds): default `10.0` + - `request_timeout: float` (seconds): default `30.0` (was `6.0`; see Behavior changes) These tune the underlying `httpx.Limits` and `httpx.Timeout`. The existing `http_client=` and `transport=` kwargs continue to act as escape hatches; when `http_client` is set, none of the four new kwargs apply. Env-var fallbacks for the new kwargs: `STREAM_MAX_CONNS_PER_HOST`, `STREAM_IDLE_TIMEOUT`, `STREAM_CONNECT_TIMEOUT`, `STREAM_REQUEST_TIMEOUT`. - INFO log on client construction (logger `getstream`) lists the effective pool config and whether a user-supplied `http_client` is in use. From 0759b65d5b35c2c17107b44a6c5dd3405541949b Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Wed, 27 May 2026 14:58:55 +0200 Subject: [PATCH 17/23] fix: forward pool config in clone_for_token and as_async Both built fresh top-level clients passing only timeout, so cloned and async-converted clients silently reverted to default pool config. Forward request_timeout + max_conns_per_host + idle_timeout + connect_timeout. --- getstream/stream.py | 10 ++++++++-- tests/test_http_client.py | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/getstream/stream.py b/getstream/stream.py index 16ebd129..f5187328 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -248,7 +248,10 @@ def clone_for_token(self, token: str): api_key=self.api_key, token=token, base_url=self.base_url, - timeout=self.timeout, + request_timeout=self.request_timeout, + max_conns_per_host=self.max_conns_per_host, + idle_timeout=self.idle_timeout, + connect_timeout=self.connect_timeout, user_agent=self.user_agent, ) @@ -481,7 +484,10 @@ def as_async(self) -> "AsyncStream": api_key=self.api_key, api_secret=self._api_secret, token=None if self.has_api_secret else self.token, - timeout=self.timeout, + request_timeout=self.request_timeout, + max_conns_per_host=self.max_conns_per_host, + idle_timeout=self.idle_timeout, + connect_timeout=self.connect_timeout, base_url=self.base_url, user_agent=self.user_agent, ) diff --git a/tests/test_http_client.py b/tests/test_http_client.py index 796ef863..703c1414 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -521,6 +521,45 @@ class Obj: ) +class TestPoolConfigForwardedOnClone: + """clone_for_token and as_async build fresh top-level clients; they must + carry the source client's pool config, not silently revert to defaults. + """ + + def _make(self): + return Stream( + api_key="k", + api_secret="s", + base_url="http://test", + max_conns_per_host=20, + idle_timeout=120.0, + connect_timeout=3.0, + request_timeout=15.0, + ) + + def test_clone_for_token_preserves_pool_config(self): + clone = self._make().clone_for_token("user-token") + assert clone.max_conns_per_host == 20 + assert clone.idle_timeout == 120.0 + assert clone.connect_timeout == 3.0 + assert clone.request_timeout == 15.0 + pool = clone.client._transport._pool + assert pool._max_connections == 20 + assert pool._keepalive_expiry == 120.0 + + @pytest.mark.asyncio + async def test_as_async_preserves_pool_config(self): + sync_client = self._make() + aclient = sync_client.as_async() + assert aclient.max_conns_per_host == 20 + assert aclient.idle_timeout == 120.0 + assert aclient.connect_timeout == 3.0 + assert aclient.request_timeout == 15.0 + assert aclient.client._transport._pool._max_connections == 20 + await aclient.aclose() + sync_client.close() + + class TestValidation: def test_transport_and_http_client_mutually_exclusive(self): with pytest.raises(ValueError, match="Cannot specify both"): From a960e635b0ea81fe546fdfa40d14bb4f9aa172f9 Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Wed, 27 May 2026 15:31:49 +0200 Subject: [PATCH 18/23] fix: propagate pool config to sub-clients Sub-clients (video/chat/moderation/feeds) are what issue the real API calls, but on the default path they each built their own httpx client via the generated REST clients, which do not forward the pool kwargs. The pool knobs were therefore silently dropped and sub-client pools fell back to the SDK defaults (5/55), making max_conns_per_host/idle_timeout/ connect_timeout a no-op for actual traffic. Always share the parent's single fully-configured client with the sub-clients (the same mechanism already used for the transport/http_client paths), so the resolved pool config reaches the clients that send requests. --- getstream/stream.py | 16 +++++---- tests/test_http_client.py | 69 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 76 insertions(+), 9 deletions(-) diff --git a/getstream/stream.py b/getstream/stream.py index f5187328..5483abe2 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -168,13 +168,15 @@ def _settings() -> Settings: super().__init__( self.api_key, self.base_url, self.token, self.timeout, self.user_agent ) - # After super().__init__(), self.client is fully built and configured. - # When the user provided custom HTTP config, sub-clients share this - # client instead of each building their own. - if transport is not None or http_client is not None: - self._shared_client = self.client - else: - self._shared_client = None + # After super().__init__(), self.client is fully built and configured + # with the resolved pool knobs (max_conns_per_host/idle_timeout/ + # connect_timeout) and request timeout. Sub-clients (video/chat/ + # moderation/feeds) always share this single client so the pool config + # actually reaches the clients that issue requests. The intermediate + # generated REST clients do not forward the pool kwargs, so a + # per-sub-client client would silently fall back to defaults; sharing + # the parent's client avoids that and keeps one pool per Stream. + self._shared_client = self.client @property def api_secret(self) -> str: diff --git a/tests/test_http_client.py b/tests/test_http_client.py index 703c1414..59fa8ef8 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -126,6 +126,68 @@ async def test_legacy_timeout_alias_still_works(self): await c.aclose() +# ── sub-clients inherit the configured pool (default path) ─────────── + + +class TestSubClientPoolPropagation: + """Regression for CHA-2956 BLOCKER 1: on the DEFAULT path (no transport / + no http_client) the sub-clients (video/chat/moderation/feeds) are what + issue the real API calls, so their httpx pool must reflect the configured + knobs — not the SDK defaults. They were silently falling back to 5/55 + because the intermediate generated REST clients don't forward the pool + kwargs and ``stream`` was assigned after ``super().__init__()``. + """ + + KNOBS = dict( + max_conns_per_host=99, + idle_timeout=11.0, + connect_timeout=3.0, + request_timeout=7.0, + ) + + def _assert_pool(self, sub_client, parent_client): + # Sub-clients share the parent's single configured client. + assert sub_client.client is parent_client + pool = sub_client.client._transport._pool + assert pool._max_connections == 99 + assert pool._max_keepalive_connections == 99 + assert pool._keepalive_expiry == 11.0 + t = sub_client.client.timeout + assert t.connect == 3.0 + assert t.read == 7.0 + + def test_sync_sub_client_pools_match_configured_knobs(self): + client = Stream( + api_key="k", api_secret="s", base_url="http://test", **self.KNOBS + ) + for name in ("video", "chat", "moderation", "feeds"): + self._assert_pool(getattr(client, name), client.client) + client.close() + + @pytest.mark.asyncio + async def test_async_sub_client_pools_match_configured_knobs(self): + client = AsyncStream( + api_key="k", api_secret="s", base_url="http://test", **self.KNOBS + ) + for name in ("video", "chat", "moderation"): + self._assert_pool(getattr(client, name), client.client) + await client.aclose() + + def test_sync_sub_client_pools_match_defaults(self): + # Even with no explicit knobs, sub-clients must carry the SDK defaults + # (5/55/10/30), not whatever a freshly-built sub-client would default to. + client = Stream(api_key="k", api_secret="s", base_url="http://test") + for name in ("video", "chat", "moderation", "feeds"): + sub = getattr(client, name) + assert sub.client is client.client + pool = sub.client._transport._pool + assert pool._max_connections == 5 + assert pool._keepalive_expiry == 55.0 + assert sub.client.timeout.connect == 10.0 + assert sub.client.timeout.read == 30.0 + client.close() + + # ── transport (primary API) ────────────────────────────────────────── @@ -189,11 +251,14 @@ def test_custom_limits_propagated(self): assert pool._max_connections == 42 assert pool._max_keepalive_connections == 10 - def test_default_path_unchanged(self): + def test_default_path_owns_and_shares_client(self): + # On the default path (no transport/http_client) the top-level client + # owns the single httpx client AND shares it with sub-clients, so the + # resolved pool config reaches the clients that issue requests. client = Stream(api_key="k", api_secret="s", base_url="http://test") assert client._owns_http_client is True assert isinstance(client.client, httpx.Client) - assert client._shared_client is None + assert client._shared_client is client.client @pytest.mark.asyncio From 15124e0e34bf7bc49f0bd5a3e9e1fee7654fd0a2 Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Wed, 27 May 2026 15:32:30 +0200 Subject: [PATCH 19/23] fix: resolve pool env fallbacks without requiring STREAM_API_KEY The pool-knob / request-timeout env fallbacks instantiated the full Settings model whenever a knob or timeout was unset (the normal case). Settings.api_key is a required field, so Settings() raised a pydantic ValidationError in any environment without STREAM_* vars, crashing Stream(api_key="k", api_secret="s") in a clean environment. The test suite hid this because the fixtures call load_dotenv(). Introduce a small _PoolSettings model that reads the same STREAM_* env vars but has no required api_key, and use it for knob resolution. Existing env behavior (including the STREAM_TIMEOUT alias) is unchanged. --- getstream/stream.py | 30 +++++++++++++++++++++++++++--- tests/test_http_client.py | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 3 deletions(-) diff --git a/getstream/stream.py b/getstream/stream.py index 5483abe2..3e545f4c 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -55,6 +55,27 @@ class Settings(BaseSettings): ) +class _PoolSettings(BaseSettings): + """Env-only view of the pool knobs and request timeout. + + Unlike ``Settings`` this has no required ``api_key`` field, so it can be + instantiated for knob resolution even when no ``STREAM_*`` credentials are + present in the environment (the normal case for callers that pass + ``api_key``/``api_secret`` explicitly). Reads the same ``STREAM_*`` env + vars with the same types as ``Settings``. + """ + + timeout: float = DEFAULT_REQUEST_TIMEOUT + request_timeout: Optional[float] = None + max_conns_per_host: int = DEFAULT_MAX_CONNS_PER_HOST + idle_timeout: float = DEFAULT_IDLE_TIMEOUT + connect_timeout: float = DEFAULT_CONNECT_TIMEOUT + + model_config = SettingsConfigDict( + env_prefix="STREAM_", + ) + + class BaseStream: def __init__( self, @@ -114,12 +135,15 @@ def __init__( api_secret = s.api_secret # Env fallback + defaults for the 4 pool knobs and request_timeout. `timeout` is a backward-compat alias for `request_timeout`. - s_for_pool: Optional[Settings] = None + # _PoolSettings (not Settings) is used here so missing STREAM_API_KEY + # in the environment does not crash construction when api_key was + # passed explicitly. It reads the same STREAM_* env vars. + s_for_pool: Optional[_PoolSettings] = None - def _settings() -> Settings: + def _settings() -> _PoolSettings: nonlocal s_for_pool if s_for_pool is None: - s_for_pool = Settings() + s_for_pool = _PoolSettings() return s_for_pool # request_timeout precedence: explicit kwarg > explicit timeout kwarg > STREAM_REQUEST_TIMEOUT > STREAM_TIMEOUT > DEFAULT_REQUEST_TIMEOUT. diff --git a/tests/test_http_client.py b/tests/test_http_client.py index 59fa8ef8..03f3c672 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -1,4 +1,5 @@ import logging +import os import httpx import pytest @@ -625,6 +626,43 @@ async def test_as_async_preserves_pool_config(self): sync_client.close() +class TestConstructsWithoutStreamEnv: + """Regression for CHA-2956 BLOCKER 2: knob resolution must not require + STREAM_API_KEY in the environment. Passing api_key/api_secret explicitly + in a clean environment (no STREAM_* vars) must succeed and fall back to the + spec defaults, instead of crashing inside ``Settings()`` (api_key is a + required field). + """ + + @staticmethod + def _clear_stream_env(monkeypatch): + for key in list(os.environ): + if key.startswith("STREAM_"): + monkeypatch.delenv(key, raising=False) + + def test_sync_constructs_with_spec_defaults(self, monkeypatch): + self._clear_stream_env(monkeypatch) + client = Stream(api_key="k", api_secret="s", base_url="http://test") + assert client.max_conns_per_host == 5 + assert client.idle_timeout == 55.0 + assert client.connect_timeout == 10.0 + assert client.request_timeout == 30.0 + pool = client.client._transport._pool + assert pool._max_connections == 5 + assert pool._keepalive_expiry == 55.0 + client.close() + + @pytest.mark.asyncio + async def test_async_constructs_with_spec_defaults(self, monkeypatch): + self._clear_stream_env(monkeypatch) + client = AsyncStream(api_key="k", api_secret="s", base_url="http://test") + assert client.max_conns_per_host == 5 + assert client.idle_timeout == 55.0 + assert client.connect_timeout == 10.0 + assert client.request_timeout == 30.0 + await client.aclose() + + class TestValidation: def test_transport_and_http_client_mutually_exclusive(self): with pytest.raises(ValueError, match="Cannot specify both"): From 78b019151c279e55ab5e64d783550c1a980fb78f Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Wed, 27 May 2026 15:32:35 +0200 Subject: [PATCH 20/23] chore: revert generated test_webhook.py getstream/tests/test_webhook.py carries a "Code generated ... DO NOT EDIT." header but was modified by the em-dash cleanup. Restore it to origin/main so it leaves this PR's diff; the generated content is handled via the chat/ template separately. --- getstream/tests/test_webhook.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/getstream/tests/test_webhook.py b/getstream/tests/test_webhook.py index 07b6bfad..96f8fe22 100644 --- a/getstream/tests/test_webhook.py +++ b/getstream/tests/test_webhook.py @@ -1109,7 +1109,7 @@ def test_invalid_json(self): # Conformance fixture suite (CHA-2961). The fixture set is generated by # chat/'s `chat-manager openapi generate-webhook-fixtures` subcommand; the # repo's generate.sh copies it into tests/assets/webhooks/ at codegen time -# (test file assets live under tests/assets/). +# (per stream-py AGENTS.md §8: test file assets live under tests/assets/). # --------------------------------------------------------------------------- FIXTURE_ROOT = Path(__file__).resolve().parents[2] / "tests" / "assets" / "webhooks" @@ -1237,7 +1237,7 @@ def test_bad_base64(self): # Per CHA-3071 wire format: decode_sqs_payload falls back to raw bytes when # base64 decoding fails (uncompressed wire format). For input that is # neither valid base64 nor valid JSON nor gzip-prefixed, parse_sqs still - # raises InvalidWebhookError, just down the chain at JSON parsing. + # raises InvalidWebhookError — just down the chain at JSON parsing. msg = _read_str(self._neg("bad_base64") / "sqs_body.txt") with pytest.raises(InvalidWebhookError): parse_sqs(msg) From 792abdcf05b4ba749c569f9e501807f5bebdfab7 Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Wed, 27 May 2026 15:33:30 +0200 Subject: [PATCH 21/23] fix: emit pool config INFO log once at top level The pool-config INFO line was emitted from every BaseClient/AsyncBaseClient construction, so a single Stream logged it once per sub-client (about five lines) and the sub-client lines reported the default knobs rather than the configured ones. Emit it exactly once from BaseStream after the top-level client is built, reflecting the resolved knobs, and drop the per-sub-client emission. Also default Stream.from_env(timeout=...) to None so it inherits the new 30.0s default request timeout instead of the old hard-coded 6.0s. --- getstream/base.py | 8 ++++++-- getstream/stream.py | 11 ++++++++++- tests/test_http_client.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 3 deletions(-) diff --git a/getstream/base.py b/getstream/base.py index d57d98f2..27ea400a 100644 --- a/getstream/base.py +++ b/getstream/base.py @@ -254,7 +254,9 @@ def __init__( limits=limits, ) self._owns_http_client = True - _log_pool_config(self, user_http_client=http_client is not None) + # The pool-config INFO line is emitted once by BaseStream after the + # top-level client is built, not here, to avoid one line per + # sub-client construction. def __enter__(self): return self @@ -514,7 +516,9 @@ def __init__( limits=limits, ) self._owns_http_client = True - _log_pool_config(self, user_http_client=http_client is not None) + # The pool-config INFO line is emitted once by BaseStream after the + # top-level client is built, not here, to avoid one line per + # sub-client construction. async def __aenter__(self): return self diff --git a/getstream/stream.py b/getstream/stream.py index 3e545f4c..b1e0ff40 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -10,6 +10,7 @@ import jwt from pydantic_settings import BaseSettings, SettingsConfigDict +from getstream.base import _log_pool_config from getstream.common import telemetry from getstream.chat.client import ChatClient from getstream.chat.async_client import ChatClient as AsyncChatClient @@ -202,6 +203,10 @@ def _settings() -> _PoolSettings: # the parent's client avoids that and keeps one pool per Stream. self._shared_client = self.client + # Emit the pool-config INFO line exactly once per Stream, reflecting the + # resolved knobs on the top-level client. Sub-clients no longer log. + _log_pool_config(self, user_http_client=http_client is not None) + @property def api_secret(self) -> str: """ @@ -491,10 +496,14 @@ class Stream(BaseStream, CommonClient): @classmethod @deprecated("from_env is deprecated, use __init__ instead") - def from_env(cls, timeout: float = 6.0) -> Stream: + def from_env(cls, timeout: Optional[float] = None) -> Stream: """ Construct a StreamClient by loading its credentials and base_url from environment variables (via our pydantic Settings). + + ``timeout`` defaults to ``None`` so the client inherits the SDK default + request timeout (``DEFAULT_REQUEST_TIMEOUT``, 30.0s) rather than the + old hard-coded 6.0s. """ settings = Settings() diff --git a/tests/test_http_client.py b/tests/test_http_client.py index 03f3c672..3f21a26f 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -425,6 +425,35 @@ def test_info_log_emitted_with_defaults(self, caplog): assert "request_timeout=30.0s" in msg assert "user_http_client=False" in msg + def test_info_log_emitted_once_even_with_sub_clients(self, caplog): + # Regression for CHA-2956 MINOR: the pool-config INFO line must fire + # exactly once per Stream (at the top level), not once per sub-client. + with caplog.at_level(logging.INFO, logger="getstream"): + client = Stream(api_key="k", api_secret="s", base_url="http://test") + _ = (client.video, client.chat, client.moderation, client.feeds) + infos = [r for r in caplog.records if r.name == "getstream"] + assert len(infos) == 1 + client.close() + + def test_info_log_reflects_resolved_knobs(self, caplog): + with caplog.at_level(logging.INFO, logger="getstream"): + Stream( + api_key="k", + api_secret="s", + base_url="http://test", + max_conns_per_host=33, + idle_timeout=12.0, + connect_timeout=4.0, + request_timeout=9.0, + ) + infos = [r for r in caplog.records if r.name == "getstream"] + assert len(infos) == 1 + msg = infos[0].getMessage() + assert "max_conns_per_host=33" in msg + assert "idle_timeout=12.0s" in msg + assert "connect_timeout=4.0s" in msg + assert "request_timeout=9.0s" in msg + def test_info_log_when_user_http_client_provided(self, caplog): custom = httpx.Client(transport=_mock_transport()) with caplog.at_level(logging.INFO, logger="getstream"): From 70a660d600c9baf18743c04283efcb6d2c92cce1 Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Wed, 27 May 2026 16:38:52 +0200 Subject: [PATCH 22/23] docs: remove em dashes from generated webhook files Applied the em-dash delta directly because a clean regen is blocked by a pre-existing generator drift (duplicate AsyncExportErrorEvent imports); the template fix is in chat/ cha-2956_sdk-templates, so this matches what a future clean regen will produce. --- getstream/tests/test_webhook.py | 2 +- getstream/webhook.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/getstream/tests/test_webhook.py b/getstream/tests/test_webhook.py index 96f8fe22..3c653da0 100644 --- a/getstream/tests/test_webhook.py +++ b/getstream/tests/test_webhook.py @@ -1237,7 +1237,7 @@ def test_bad_base64(self): # Per CHA-3071 wire format: decode_sqs_payload falls back to raw bytes when # base64 decoding fails (uncompressed wire format). For input that is # neither valid base64 nor valid JSON nor gzip-prefixed, parse_sqs still - # raises InvalidWebhookError — just down the chain at JSON parsing. + # raises InvalidWebhookError, just down the chain at JSON parsing. msg = _read_str(self._neg("bad_base64") / "sqs_body.txt") with pytest.raises(InvalidWebhookError): parse_sqs(msg) diff --git a/getstream/webhook.py b/getstream/webhook.py index b3b3478d..3d1109e9 100644 --- a/getstream/webhook.py +++ b/getstream/webhook.py @@ -677,7 +677,7 @@ def decode_sqs_payload(message_body: str) -> bytes: magic-byte detection decide whether to decompress. parse_sqs sits on top of this and works transparently for both wire - formats — no caller code change, no flag, no header. + formats: no caller code change, no flag, no header. """ if not isinstance(message_body, str): raise InvalidWebhookError( @@ -686,7 +686,7 @@ def decode_sqs_payload(message_body: str) -> bytes: try: decoded = base64.b64decode(message_body, validate=True) except ValueError: - # Not base64 — treat input as raw bytes (uncompressed wire format). + # Not base64, so treat input as raw bytes (uncompressed wire format). # base64.binascii.Error is a subclass of ValueError so a single catch suffices. decoded = message_body.encode("utf-8") return gunzip_payload(decoded) @@ -697,7 +697,7 @@ def _unwrap_sns_notification_body(body: str) -> str: else return body unchanged so a pre-extracted Message string flows through. Heuristic: try to JSON-parse the input. If it yields a dict with a string - 'Message' field, that's the envelope shape — return the Message. Otherwise + 'Message' field, that's the envelope shape; return the Message. Otherwise the input is presumed to BE the pre-extracted Message (base64-encoded bytes are not valid JSON, so this falls through cleanly). """ From 44d1307dc2b1dc421091aaf39e39139e63a50e9d Mon Sep 17 00:00:00 2001 From: Yun Wang Date: Wed, 27 May 2026 17:05:37 +0200 Subject: [PATCH 23/23] test: use a pytest fixture for the clone/as_async pool-config tests --- tests/test_http_client.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tests/test_http_client.py b/tests/test_http_client.py index 3f21a26f..b440b4be 100644 --- a/tests/test_http_client.py +++ b/tests/test_http_client.py @@ -621,8 +621,9 @@ class TestPoolConfigForwardedOnClone: carry the source client's pool config, not silently revert to defaults. """ - def _make(self): - return Stream( + @pytest.fixture + def stream_client(self): + client = Stream( api_key="k", api_secret="s", base_url="http://test", @@ -631,9 +632,11 @@ def _make(self): connect_timeout=3.0, request_timeout=15.0, ) + yield client + client.close() - def test_clone_for_token_preserves_pool_config(self): - clone = self._make().clone_for_token("user-token") + def test_clone_for_token_preserves_pool_config(self, stream_client): + clone = stream_client.clone_for_token("user-token") assert clone.max_conns_per_host == 20 assert clone.idle_timeout == 120.0 assert clone.connect_timeout == 3.0 @@ -643,16 +646,14 @@ def test_clone_for_token_preserves_pool_config(self): assert pool._keepalive_expiry == 120.0 @pytest.mark.asyncio - async def test_as_async_preserves_pool_config(self): - sync_client = self._make() - aclient = sync_client.as_async() + async def test_as_async_preserves_pool_config(self, stream_client): + aclient = stream_client.as_async() assert aclient.max_conns_per_host == 20 assert aclient.idle_timeout == 120.0 assert aclient.connect_timeout == 3.0 assert aclient.request_timeout == 15.0 assert aclient.client._transport._pool._max_connections == 20 await aclient.aclose() - sync_client.close() class TestConstructsWithoutStreamEnv: