diff --git a/.sampo/changesets/exception-autocapture-rate-limiting.md b/.sampo/changesets/exception-autocapture-rate-limiting.md new file mode 100644 index 00000000..ae6743e7 --- /dev/null +++ b/.sampo/changesets/exception-autocapture-rate-limiting.md @@ -0,0 +1,5 @@ +--- +pypi/posthog: minor +--- + +Add opt-in client-side rate limiting for exception autocapture, using the same token bucket algorithm as the posthog-js and posthog-node SDKs: a bucket per exception type allows a burst of captures, then refills over time. Rate-limited exceptions are skipped before they reach the ingestion queue. Disabled by default; enable with the new `enable_exception_autocapture_rate_limiting` client option and tune via `exception_autocapture_bucket_size` (default 50), `exception_autocapture_refill_rate` (default 10), and `exception_autocapture_refill_interval_seconds` (default 10). diff --git a/posthog/__init__.py b/posthog/__init__.py index 892a68c6..a8657b71 100644 --- a/posthog/__init__.py +++ b/posthog/__init__.py @@ -5,6 +5,7 @@ from posthog.args import ExceptionArg, OptionalCaptureArgs, OptionalSetArgs from posthog.client import Client +from posthog.exception_capture import ExceptionCapture from posthog.contexts import ( identify_context as inner_identify_context, ) @@ -304,6 +305,15 @@ def get_tags() -> Dict[str, Any]: code variables. in_app_modules: Module/package prefixes treated as in-app frames in captured exceptions. + enable_exception_autocapture_rate_limiting: Rate limit autocaptured + exceptions client-side with a token bucket per exception type. Disabled + by default. + exception_autocapture_bucket_size: Maximum burst of autocaptured exceptions + allowed per exception type (token bucket size, clamped to 0-100). + exception_autocapture_refill_rate: Tokens restored per refill interval for + each exception type's bucket. + exception_autocapture_refill_interval_seconds: Seconds between token refills + for autocaptured exception rate limiting. """ api_key = None # type: Optional[str] host = None # type: Optional[str] @@ -337,6 +347,12 @@ def get_tags() -> Dict[str, Any]: code_variables_mask_patterns = DEFAULT_CODE_VARIABLES_MASK_PATTERNS code_variables_ignore_patterns = DEFAULT_CODE_VARIABLES_IGNORE_PATTERNS in_app_modules = None # type: Optional[list[str]] +enable_exception_autocapture_rate_limiting = False # type: bool +exception_autocapture_bucket_size = ExceptionCapture.DEFAULT_BUCKET_SIZE # type: int +exception_autocapture_refill_rate = ExceptionCapture.DEFAULT_REFILL_RATE # type: int +exception_autocapture_refill_interval_seconds = ( + ExceptionCapture.DEFAULT_REFILL_INTERVAL_SECONDS +) # type: float # NOTE - this and following functions take unpacked kwargs because we needed to make @@ -1103,6 +1119,10 @@ def setup() -> Client: code_variables_mask_patterns=code_variables_mask_patterns, code_variables_ignore_patterns=code_variables_ignore_patterns, in_app_modules=in_app_modules, + enable_exception_autocapture_rate_limiting=enable_exception_autocapture_rate_limiting, + exception_autocapture_bucket_size=exception_autocapture_bucket_size, + exception_autocapture_refill_rate=exception_autocapture_refill_rate, + exception_autocapture_refill_interval_seconds=exception_autocapture_refill_interval_seconds, ) # Always set in case user changes it. Preserve Client's auto-disabled state diff --git a/posthog/bucketed_rate_limiter.py b/posthog/bucketed_rate_limiter.py new file mode 100644 index 00000000..7cf24be9 --- /dev/null +++ b/posthog/bucketed_rate_limiter.py @@ -0,0 +1,119 @@ +# Python port of the posthog-js BucketedRateLimiter: +# https://github.com/PostHog/posthog-js/blob/main/packages/core/src/utils/bucketed-rate-limiter.ts +# Kept behaviorally identical so rate limiting is consistent across SDKs. + +import logging +import threading +import time +from typing import Callable, Dict, Hashable, Optional, Union + +ONE_DAY_IN_SECONDS = 86400.0 + +log = logging.getLogger("posthog") + +Number = Union[int, float] + + +def _clamp_to_range(value, min_value: Number, max_value: Number, label: str) -> Number: + if isinstance(value, bool) or not isinstance(value, (int, float)): + log.warning(f"{label} must be a number. Using max value {max_value}.") + return max_value + if value > max_value: + log.warning(f"{label} cannot be greater than {max_value}. Using {max_value}.") + return max_value + if value < min_value: + log.warning(f"{label} cannot be less than {min_value}. Using {min_value}.") + return min_value + return value + + +class _Bucket: + __slots__ = ("tokens", "last_access") + + def __init__(self, tokens: Number, last_access: float): + self.tokens = tokens + self.last_access = last_access + + +class BucketedRateLimiter: + """Token bucket rate limiter that tracks a separate bucket per key. + + Each key starts with a full bucket of ``bucket_size`` tokens and every + call to :meth:`consume_rate_limit` consumes one token. ``refill_rate`` + tokens are restored per elapsed ``refill_interval_seconds`` (whole + intervals only, fractional elapsed time is carried over), capped at + ``bucket_size``. + + The call that empties a bucket is itself reported as rate limited — a + burst over a fresh bucket lets ``bucket_size - 1`` events through before + limiting kicks in — and ``on_bucket_rate_limited`` fires once each time a + bucket is drained. + + Thread-safe. ``clock`` must return seconds and is injectable for tests. + """ + + def __init__( + self, + bucket_size: Number, + refill_rate: Number, + refill_interval_seconds: Number, + on_bucket_rate_limited: Optional[Callable[[Hashable], None]] = None, + clock: Callable[[], float] = time.monotonic, + ): + self._bucket_size = _clamp_to_range(bucket_size, 0, 100, "bucket_size") + self._refill_rate = _clamp_to_range( + refill_rate, 0, self._bucket_size, "refill_rate" + ) + self._refill_interval = _clamp_to_range( + refill_interval_seconds, 0, ONE_DAY_IN_SECONDS, "refill_interval_seconds" + ) + self._on_bucket_rate_limited = on_bucket_rate_limited + self._clock = clock + self._buckets: Dict[Hashable, _Bucket] = {} + self._lock = threading.Lock() + + def _apply_refill(self, bucket: _Bucket, now: float) -> None: + if self._refill_interval <= 0: + bucket.tokens = self._bucket_size + bucket.last_access = now + return + + elapsed = now - bucket.last_access + refill_intervals = int(elapsed // self._refill_interval) + + if refill_intervals > 0: + tokens_to_add = refill_intervals * self._refill_rate + bucket.tokens = min(bucket.tokens + tokens_to_add, self._bucket_size) + # advance by whole intervals so fractional elapsed time still + # counts towards the next refill + bucket.last_access += refill_intervals * self._refill_interval + + def consume_rate_limit(self, key: Hashable) -> bool: + """Consume one token for ``key``. Returns True if rate limited.""" + callback = None + + with self._lock: + now = self._clock() + bucket = self._buckets.get(key) + + if bucket is None: + bucket = _Bucket(tokens=self._bucket_size, last_access=now) + self._buckets[key] = bucket + else: + self._apply_refill(bucket, now) + + if bucket.tokens <= 0: + return True + + bucket.tokens -= 1 + rate_limited = bucket.tokens <= 0 + if rate_limited: + callback = self._on_bucket_rate_limited + + if callback is not None: + callback(key) + return rate_limited + + def stop(self) -> None: + with self._lock: + self._buckets.clear() diff --git a/posthog/client.py b/posthog/client.py index e653a10e..6fc4481b 100644 --- a/posthog/client.py +++ b/posthog/client.py @@ -211,6 +211,10 @@ def __init__( code_variables_mask_patterns=None, code_variables_ignore_patterns=None, in_app_modules: list[str] | None = None, + enable_exception_autocapture_rate_limiting=False, + exception_autocapture_bucket_size=ExceptionCapture.DEFAULT_BUCKET_SIZE, + exception_autocapture_refill_rate=ExceptionCapture.DEFAULT_REFILL_RATE, + exception_autocapture_refill_interval_seconds=ExceptionCapture.DEFAULT_REFILL_INTERVAL_SECONDS, _dedicated_ai_endpoint=False, ): """ @@ -273,6 +277,16 @@ def __init__( capturing code variables. in_app_modules: Module/package prefixes treated as in-app frames in captured exceptions. + enable_exception_autocapture_rate_limiting: Rate limit + autocaptured exceptions client-side with a token bucket per + exception type. Disabled by default. + exception_autocapture_bucket_size: Maximum burst of autocaptured + exceptions allowed per exception type (token bucket size, + clamped to 0-100). + exception_autocapture_refill_rate: Tokens restored per refill + interval for each exception type's bucket. + exception_autocapture_refill_interval_seconds: Seconds between + token refills for autocaptured exception rate limiting. Examples: ```python @@ -330,6 +344,14 @@ def __init__( self.super_properties = super_properties self.enable_exception_autocapture = enable_exception_autocapture self.log_captured_exceptions = log_captured_exceptions + self.enable_exception_autocapture_rate_limiting = ( + enable_exception_autocapture_rate_limiting + ) + self.exception_autocapture_bucket_size = exception_autocapture_bucket_size + self.exception_autocapture_refill_rate = exception_autocapture_refill_rate + self.exception_autocapture_refill_interval_seconds = ( + exception_autocapture_refill_interval_seconds + ) self.exception_capture = None self.privacy_mode = privacy_mode self.enable_local_evaluation = enable_local_evaluation @@ -377,7 +399,13 @@ def __init__( self._set_before_send(before_send) if self.enable_exception_autocapture: - self.exception_capture = ExceptionCapture(self) + self.exception_capture = ExceptionCapture( + self, + rate_limiting_enabled=self.enable_exception_autocapture_rate_limiting, + bucket_size=self.exception_autocapture_bucket_size, + refill_rate=self.exception_autocapture_refill_rate, + refill_interval_seconds=self.exception_autocapture_refill_interval_seconds, + ) if sync_mode: self.consumers = None diff --git a/posthog/exception_capture.py b/posthog/exception_capture.py index 9d055351..d22eb970 100644 --- a/posthog/exception_capture.py +++ b/posthog/exception_capture.py @@ -9,23 +9,47 @@ import threading from typing import TYPE_CHECKING +from posthog.bucketed_rate_limiter import BucketedRateLimiter + if TYPE_CHECKING: from posthog.client import Client class ExceptionCapture: - # TODO: Add client side rate limiting to prevent spamming the server with exceptions - log = logging.getLogger("posthog") - def __init__(self, client: "Client"): + # more generous defaults than the browser SDK (10, 1, 10) because one + # server process aggregates exceptions across many users' requests + DEFAULT_BUCKET_SIZE = 50 + DEFAULT_REFILL_RATE = 10 + DEFAULT_REFILL_INTERVAL_SECONDS = 10 + + def __init__( + self, + client: "Client", + rate_limiting_enabled=False, + bucket_size=DEFAULT_BUCKET_SIZE, + refill_rate=DEFAULT_REFILL_RATE, + refill_interval_seconds=DEFAULT_REFILL_INTERVAL_SECONDS, + ): self.client = client self.original_excepthook = sys.excepthook sys.excepthook = self.exception_handler threading.excepthook = self.thread_exception_handler + # opt-in client-side rate limiting: per exception type, allow a burst + # of captures, then refill over time + self._rate_limiter = None + if rate_limiting_enabled: + self._rate_limiter = BucketedRateLimiter( + bucket_size=bucket_size, + refill_rate=refill_rate, + refill_interval_seconds=refill_interval_seconds, + ) def close(self): sys.excepthook = self.original_excepthook + if self._rate_limiter is not None: + self._rate_limiter.stop() def exception_handler(self, exc_type, exc_value, exc_traceback): # don't affect default behaviour. @@ -44,7 +68,20 @@ def exception_receiver(self, exc_info, extra_properties): def capture_exception(self, exception, metadata=None): try: + if self._rate_limiter is not None: + exception_type = self._exception_type(exception) + if self._rate_limiter.consume_rate_limit(exception_type): + self.log.info( + f"Skipping exception capture because of client rate limiting. exception={exception_type}" + ) + return + distinct_id = metadata.get("distinct_id") if metadata else None self.client.capture_exception(exception, distinct_id=distinct_id) except Exception as e: self.log.exception(f"Failed to capture exception: {e}") + + @staticmethod + def _exception_type(exception): + exc_type = exception[0] if isinstance(exception, tuple) else type(exception) + return getattr(exc_type, "__name__", None) or "Exception" diff --git a/posthog/test/test_bucketed_rate_limiter.py b/posthog/test/test_bucketed_rate_limiter.py new file mode 100644 index 00000000..0464716a --- /dev/null +++ b/posthog/test/test_bucketed_rate_limiter.py @@ -0,0 +1,312 @@ +import threading +from unittest.mock import MagicMock + +import pytest + +from posthog.bucketed_rate_limiter import BucketedRateLimiter + + +class FakeClock: + def __init__(self, start=0.0): + self.now = float(start) + + def advance(self, seconds): + self.now += seconds + + def __call__(self): + return self.now + + +def make_limiter( + clock, bucket_size=10, refill_rate=1, refill_interval_seconds=1, **kwargs +): + return BucketedRateLimiter( + bucket_size=bucket_size, + refill_rate=refill_rate, + refill_interval_seconds=refill_interval_seconds, + clock=clock, + **kwargs, + ) + + +def test_not_rate_limited_by_default(): + assert make_limiter(FakeClock()).consume_rate_limit("ResizeObserver") is False + + +@pytest.mark.parametrize("bucket_size", [1, 5, 10, 50]) +def test_exhausts_bucket_after_bucket_size_consumptions(bucket_size): + limiter = make_limiter(FakeClock(), bucket_size=bucket_size) + + # the call that drains the bucket is itself rate limited, so + # bucket_size - 1 events pass + for _ in range(bucket_size - 1): + assert limiter.consume_rate_limit("test") is False + + assert limiter.consume_rate_limit("test") is True + # can check the same bucket more than once + assert limiter.consume_rate_limit("test") is True + + +def test_refills_tokens_based_on_elapsed_time(): + clock = FakeClock() + limiter = make_limiter(clock) + + for _ in range(9): + assert limiter.consume_rate_limit("key") is False + assert limiter.consume_rate_limit("key") is True + + clock.advance(2) + + assert limiter.consume_rate_limit("key") is False + assert limiter._buckets["key"].tokens == 1 + + +def test_refills_to_bucket_size_maximum(): + clock = FakeClock() + limiter = make_limiter(clock) + limiter.consume_rate_limit("key") + + clock.advance(20) + + limiter.consume_rate_limit("key") + assert limiter._buckets["key"].tokens == 9 + + +def test_partial_refill_intervals_do_not_refill_tokens(): + clock = FakeClock() + limiter = make_limiter(clock) + + for _ in range(9): + limiter.consume_rate_limit("test") + + clock.advance(0.999) + + limiter.consume_rate_limit("test") + assert limiter._buckets["test"].tokens == 0 + + +@pytest.mark.parametrize( + "refill_rate, intervals, tokens_left, expected", + [ + (1, 1, 9, 9), + (2, 1, 9, 9), + (1, 2, 9, 9), + (3, 1, 5, 7), + (2, 2, 5, 8), + ], +) +def test_refill_rates(refill_rate, intervals, tokens_left, expected): + clock = FakeClock() + limiter = make_limiter(clock, refill_rate=refill_rate) + + for _ in range(10 - tokens_left): + limiter.consume_rate_limit("test") + + clock.advance(intervals) + + limiter.consume_rate_limit("test") + assert limiter._buckets["test"].tokens == expected + + +def test_different_keys_maintain_separate_buckets(): + limiter = make_limiter(FakeClock()) + + for _ in range(9): + limiter.consume_rate_limit("bucket1") + + assert limiter.consume_rate_limit("bucket1") is True + assert limiter.consume_rate_limit("bucket2") is False + + assert limiter._buckets["bucket1"].tokens == 0 + assert limiter._buckets["bucket2"].tokens == 9 + + +def test_invokes_callback_once_when_bucket_reaches_zero(): + callback = MagicMock() + limiter = make_limiter(FakeClock(), bucket_size=3, on_bucket_rate_limited=callback) + + limiter.consume_rate_limit("test") + limiter.consume_rate_limit("test") + callback.assert_not_called() + + limiter.consume_rate_limit("test") + callback.assert_called_once_with("test") + + # not invoked again while the bucket stays empty + limiter.consume_rate_limit("test") + callback.assert_called_once() + + +def test_invokes_callback_again_after_refill_and_re_exhaustion(): + callback = MagicMock() + clock = FakeClock() + limiter = make_limiter(clock, bucket_size=2, on_bucket_rate_limited=callback) + + limiter.consume_rate_limit("test") + limiter.consume_rate_limit("test") + assert callback.call_count == 1 + + clock.advance(2) + + limiter.consume_rate_limit("test") + limiter.consume_rate_limit("test") + assert callback.call_count == 2 + + +def test_stop_clears_all_buckets_and_resets_state(): + clock = FakeClock() + limiter = make_limiter(clock) + + for _ in range(9): + limiter.consume_rate_limit("test") + limiter.consume_rate_limit("other") + assert len(limiter._buckets) == 2 + + limiter.stop() + assert len(limiter._buckets) == 0 + + assert limiter.consume_rate_limit("test") is False + assert limiter._buckets["test"].tokens == 9 + + +def test_last_access_advances_by_complete_intervals_preserving_fraction(): + clock = FakeClock() + limiter = make_limiter(clock) + + limiter.consume_rate_limit("test") + assert limiter._buckets["test"].last_access == 0.0 + assert limiter._buckets["test"].tokens == 9 + + clock.advance(0.5) + limiter.consume_rate_limit("test") + assert limiter._buckets["test"].last_access == 0.0 + assert limiter._buckets["test"].tokens == 8 + + clock.advance(0.6) + limiter.consume_rate_limit("test") + assert limiter._buckets["test"].last_access == 1.0 + assert limiter._buckets["test"].tokens == 8 + + +def test_clamps_out_of_range_options(): + clock = FakeClock() + limiter = make_limiter( + clock, bucket_size=1000, refill_rate=-5, refill_interval_seconds="nope" + ) + + assert limiter._bucket_size == 100 + assert limiter._refill_rate == 0 + assert limiter._refill_interval == 86400.0 + + +def test_thread_safety_allows_exactly_bucket_size_minus_one(): + limiter = make_limiter(FakeClock(), bucket_size=50) + allowed = [] + + def consume(): + for _ in range(20): + if limiter.consume_rate_limit("shared") is False: + allowed.append(1) + + threads = [threading.Thread(target=consume) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert len(allowed) == 49 + + +def test_exception_capture_rate_limiting_is_disabled_by_default(): + from posthog.exception_capture import ExceptionCapture + + client = MagicMock() + capture = ExceptionCapture(client) + try: + assert capture._rate_limiter is None + + for _ in range(100): + capture.capture_exception((ValueError, ValueError("boom"), None)) + assert client.capture_exception.call_count == 100 + finally: + capture.close() + + +def test_exception_capture_default_configuration_when_enabled(): + from posthog.exception_capture import ExceptionCapture + + capture = ExceptionCapture(MagicMock(), rate_limiting_enabled=True) + try: + assert capture._rate_limiter._bucket_size == 50 + assert capture._rate_limiter._refill_rate == 10 + assert capture._rate_limiter._refill_interval == 10 + finally: + capture.close() + + +def test_exception_capture_rate_limiting_is_configurable(): + from posthog.exception_capture import ExceptionCapture + + capture = ExceptionCapture( + MagicMock(), + rate_limiting_enabled=True, + bucket_size=3, + refill_rate=2, + refill_interval_seconds=5, + ) + try: + assert capture._rate_limiter._bucket_size == 3 + assert capture._rate_limiter._refill_rate == 2 + assert capture._rate_limiter._refill_interval == 5 + finally: + capture.close() + + +def test_client_passes_rate_limiter_configuration_through(): + from posthog.client import Client + + client = Client( + "phc_test", + sync_mode=True, + disabled=True, + enable_exception_autocapture=True, + enable_exception_autocapture_rate_limiting=True, + exception_autocapture_bucket_size=3, + exception_autocapture_refill_rate=2, + exception_autocapture_refill_interval_seconds=5, + ) + try: + limiter = client.exception_capture._rate_limiter + assert limiter._bucket_size == 3 + assert limiter._refill_rate == 2 + assert limiter._refill_interval == 5 + finally: + client.shutdown() + + +def test_exception_capture_rate_limits_per_exception_type(): + from posthog.exception_capture import ExceptionCapture + + client = MagicMock() + capture = ExceptionCapture(client, rate_limiting_enabled=True, bucket_size=10) + try: + + def exc_info(error): + try: + raise error + except type(error): + import sys + + return sys.exc_info() + + for _ in range(15): + capture.capture_exception(exc_info(ValueError("boom"))) + + # bucket size 10 -> 9 captured, the rest rate limited + assert client.capture_exception.call_count == 9 + + # a different exception type has its own bucket + capture.capture_exception(exc_info(ZeroDivisionError("zero"))) + assert client.capture_exception.call_count == 10 + finally: + capture.close()