Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .sampo/changesets/exception-autocapture-rate-limiting.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
pypi/posthog: minor
---

Add opt-in client-side rate limiting for exception autocapture, using the same token bucket algorithm as the posthog-js and posthog-node SDKs: a bucket per exception type allows a burst of captures, then refills over time. Rate-limited exceptions are skipped before they reach the ingestion queue. Disabled by default; enable with the new `enable_exception_autocapture_rate_limiting` client option and tune via `exception_autocapture_bucket_size` (default 50), `exception_autocapture_refill_rate` (default 10), and `exception_autocapture_refill_interval_seconds` (default 10).
20 changes: 20 additions & 0 deletions posthog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from posthog.args import ExceptionArg, OptionalCaptureArgs, OptionalSetArgs
from posthog.client import Client
from posthog.exception_capture import ExceptionCapture
from posthog.contexts import (
identify_context as inner_identify_context,
)
Expand Down Expand Up @@ -304,6 +305,15 @@ def get_tags() -> Dict[str, Any]:
code variables.
in_app_modules: Module/package prefixes treated as in-app frames in captured
exceptions.
enable_exception_autocapture_rate_limiting: Rate limit autocaptured
exceptions client-side with a token bucket per exception type. Disabled
by default.
exception_autocapture_bucket_size: Maximum burst of autocaptured exceptions
allowed per exception type (token bucket size, clamped to 0-100).
exception_autocapture_refill_rate: Tokens restored per refill interval for
each exception type's bucket.
exception_autocapture_refill_interval_seconds: Seconds between token refills
for autocaptured exception rate limiting.
"""
api_key = None # type: Optional[str]
host = None # type: Optional[str]
Expand Down Expand Up @@ -337,6 +347,12 @@ def get_tags() -> Dict[str, Any]:
code_variables_mask_patterns = DEFAULT_CODE_VARIABLES_MASK_PATTERNS
code_variables_ignore_patterns = DEFAULT_CODE_VARIABLES_IGNORE_PATTERNS
in_app_modules = None # type: Optional[list[str]]
enable_exception_autocapture_rate_limiting = False # type: bool
exception_autocapture_bucket_size = ExceptionCapture.DEFAULT_BUCKET_SIZE # type: int
exception_autocapture_refill_rate = ExceptionCapture.DEFAULT_REFILL_RATE # type: int
exception_autocapture_refill_interval_seconds = (
ExceptionCapture.DEFAULT_REFILL_INTERVAL_SECONDS
) # type: float


# NOTE - this and following functions take unpacked kwargs because we needed to make
Expand Down Expand Up @@ -1103,6 +1119,10 @@ def setup() -> Client:
code_variables_mask_patterns=code_variables_mask_patterns,
code_variables_ignore_patterns=code_variables_ignore_patterns,
in_app_modules=in_app_modules,
enable_exception_autocapture_rate_limiting=enable_exception_autocapture_rate_limiting,
exception_autocapture_bucket_size=exception_autocapture_bucket_size,
exception_autocapture_refill_rate=exception_autocapture_refill_rate,
exception_autocapture_refill_interval_seconds=exception_autocapture_refill_interval_seconds,
)

# Always set in case user changes it. Preserve Client's auto-disabled state
Expand Down
119 changes: 119 additions & 0 deletions posthog/bucketed_rate_limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Python port of the posthog-js BucketedRateLimiter:
# https://github.com/PostHog/posthog-js/blob/main/packages/core/src/utils/bucketed-rate-limiter.ts
# Kept behaviorally identical so rate limiting is consistent across SDKs.

import logging
import threading
import time
from typing import Callable, Dict, Hashable, Optional, Union

ONE_DAY_IN_SECONDS = 86400.0

log = logging.getLogger("posthog")

Number = Union[int, float]


def _clamp_to_range(value, min_value: Number, max_value: Number, label: str) -> Number:
if isinstance(value, bool) or not isinstance(value, (int, float)):
log.warning(f"{label} must be a number. Using max value {max_value}.")
return max_value
if value > max_value:
log.warning(f"{label} cannot be greater than {max_value}. Using {max_value}.")
return max_value
if value < min_value:
log.warning(f"{label} cannot be less than {min_value}. Using {min_value}.")
return min_value
return value


class _Bucket:
__slots__ = ("tokens", "last_access")

def __init__(self, tokens: Number, last_access: float):
self.tokens = tokens
self.last_access = last_access


class BucketedRateLimiter:
"""Token bucket rate limiter that tracks a separate bucket per key.

Each key starts with a full bucket of ``bucket_size`` tokens and every
call to :meth:`consume_rate_limit` consumes one token. ``refill_rate``
tokens are restored per elapsed ``refill_interval_seconds`` (whole
intervals only, fractional elapsed time is carried over), capped at
``bucket_size``.

The call that empties a bucket is itself reported as rate limited β€” a
burst over a fresh bucket lets ``bucket_size - 1`` events through before
limiting kicks in β€” and ``on_bucket_rate_limited`` fires once each time a
bucket is drained.

Thread-safe. ``clock`` must return seconds and is injectable for tests.
"""

def __init__(
self,
bucket_size: Number,
refill_rate: Number,
refill_interval_seconds: Number,
on_bucket_rate_limited: Optional[Callable[[Hashable], None]] = None,
clock: Callable[[], float] = time.monotonic,
):
self._bucket_size = _clamp_to_range(bucket_size, 0, 100, "bucket_size")
self._refill_rate = _clamp_to_range(
refill_rate, 0, self._bucket_size, "refill_rate"
)
self._refill_interval = _clamp_to_range(
refill_interval_seconds, 0, ONE_DAY_IN_SECONDS, "refill_interval_seconds"
)
self._on_bucket_rate_limited = on_bucket_rate_limited
self._clock = clock
self._buckets: Dict[Hashable, _Bucket] = {}
self._lock = threading.Lock()

def _apply_refill(self, bucket: _Bucket, now: float) -> None:
if self._refill_interval <= 0:
bucket.tokens = self._bucket_size
bucket.last_access = now
return

elapsed = now - bucket.last_access
refill_intervals = int(elapsed // self._refill_interval)

if refill_intervals > 0:
tokens_to_add = refill_intervals * self._refill_rate
bucket.tokens = min(bucket.tokens + tokens_to_add, self._bucket_size)
# advance by whole intervals so fractional elapsed time still
# counts towards the next refill
bucket.last_access += refill_intervals * self._refill_interval

def consume_rate_limit(self, key: Hashable) -> bool:
"""Consume one token for ``key``. Returns True if rate limited."""
callback = None

with self._lock:
now = self._clock()
bucket = self._buckets.get(key)

if bucket is None:
bucket = _Bucket(tokens=self._bucket_size, last_access=now)
self._buckets[key] = bucket
else:
self._apply_refill(bucket, now)

if bucket.tokens <= 0:
return True

bucket.tokens -= 1
rate_limited = bucket.tokens <= 0
if rate_limited:
callback = self._on_bucket_rate_limited

if callback is not None:
callback(key)
return rate_limited

def stop(self) -> None:
with self._lock:
self._buckets.clear()
30 changes: 29 additions & 1 deletion posthog/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ def __init__(
code_variables_mask_patterns=None,
code_variables_ignore_patterns=None,
in_app_modules: list[str] | None = None,
enable_exception_autocapture_rate_limiting=False,
exception_autocapture_bucket_size=ExceptionCapture.DEFAULT_BUCKET_SIZE,
exception_autocapture_refill_rate=ExceptionCapture.DEFAULT_REFILL_RATE,
exception_autocapture_refill_interval_seconds=ExceptionCapture.DEFAULT_REFILL_INTERVAL_SECONDS,
_dedicated_ai_endpoint=False,
):
"""
Expand Down Expand Up @@ -273,6 +277,16 @@ def __init__(
capturing code variables.
in_app_modules: Module/package prefixes treated as in-app frames in
captured exceptions.
enable_exception_autocapture_rate_limiting: Rate limit
autocaptured exceptions client-side with a token bucket per
exception type. Disabled by default.
exception_autocapture_bucket_size: Maximum burst of autocaptured
exceptions allowed per exception type (token bucket size,
clamped to 0-100).
exception_autocapture_refill_rate: Tokens restored per refill
interval for each exception type's bucket.
exception_autocapture_refill_interval_seconds: Seconds between
token refills for autocaptured exception rate limiting.

Examples:
```python
Expand Down Expand Up @@ -330,6 +344,14 @@ def __init__(
self.super_properties = super_properties
self.enable_exception_autocapture = enable_exception_autocapture
self.log_captured_exceptions = log_captured_exceptions
self.enable_exception_autocapture_rate_limiting = (
enable_exception_autocapture_rate_limiting
)
self.exception_autocapture_bucket_size = exception_autocapture_bucket_size
self.exception_autocapture_refill_rate = exception_autocapture_refill_rate
self.exception_autocapture_refill_interval_seconds = (
exception_autocapture_refill_interval_seconds
)
self.exception_capture = None
self.privacy_mode = privacy_mode
self.enable_local_evaluation = enable_local_evaluation
Expand Down Expand Up @@ -377,7 +399,13 @@ def __init__(
self._set_before_send(before_send)

if self.enable_exception_autocapture:
self.exception_capture = ExceptionCapture(self)
self.exception_capture = ExceptionCapture(
self,
rate_limiting_enabled=self.enable_exception_autocapture_rate_limiting,
bucket_size=self.exception_autocapture_bucket_size,
refill_rate=self.exception_autocapture_refill_rate,
refill_interval_seconds=self.exception_autocapture_refill_interval_seconds,
)

if sync_mode:
self.consumers = None
Expand Down
43 changes: 40 additions & 3 deletions posthog/exception_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,47 @@
import threading
from typing import TYPE_CHECKING

from posthog.bucketed_rate_limiter import BucketedRateLimiter

if TYPE_CHECKING:
from posthog.client import Client


class ExceptionCapture:
# TODO: Add client side rate limiting to prevent spamming the server with exceptions

log = logging.getLogger("posthog")

def __init__(self, client: "Client"):
# more generous defaults than the browser SDK (10, 1, 10) because one
# server process aggregates exceptions across many users' requests
DEFAULT_BUCKET_SIZE = 50
DEFAULT_REFILL_RATE = 10
DEFAULT_REFILL_INTERVAL_SECONDS = 10

def __init__(
self,
client: "Client",
rate_limiting_enabled=False,
bucket_size=DEFAULT_BUCKET_SIZE,
refill_rate=DEFAULT_REFILL_RATE,
refill_interval_seconds=DEFAULT_REFILL_INTERVAL_SECONDS,
):
self.client = client
self.original_excepthook = sys.excepthook
sys.excepthook = self.exception_handler
threading.excepthook = self.thread_exception_handler
# opt-in client-side rate limiting: per exception type, allow a burst
# of captures, then refill over time
self._rate_limiter = None
if rate_limiting_enabled:
self._rate_limiter = BucketedRateLimiter(
bucket_size=bucket_size,
refill_rate=refill_rate,
refill_interval_seconds=refill_interval_seconds,
)

def close(self):
sys.excepthook = self.original_excepthook
if self._rate_limiter is not None:
self._rate_limiter.stop()

def exception_handler(self, exc_type, exc_value, exc_traceback):
# don't affect default behaviour.
Expand All @@ -44,7 +68,20 @@ def exception_receiver(self, exc_info, extra_properties):

def capture_exception(self, exception, metadata=None):
try:
if self._rate_limiter is not None:
exception_type = self._exception_type(exception)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the correct order with chained exceptions? I think that might consume the top-level one instead, i.e. RuntimeError from ZeroDivisionError consumes RuntimeError instead of ZeroDivisionError

client = MagicMock()
capture = ExceptionCapture(client, rate_limiting_enabled=True, bucket_size=2)

capture.capture_exception(exc_info_for(RuntimeError("wrapped", ZeroDivisionError())))
capture.capture_exception(exc_info_for(RuntimeError("wrapped", KeyError())))

assert client.capture_exception.call_count == 2

I think this fails, but we'd want it to pass?

if self._rate_limiter.consume_rate_limit(exception_type):
self.log.info(
f"Skipping exception capture because of client rate limiting. exception={exception_type}"
)
return

distinct_id = metadata.get("distinct_id") if metadata else None
self.client.capture_exception(exception, distinct_id=distinct_id)
except Exception as e:
self.log.exception(f"Failed to capture exception: {e}")

@staticmethod
def _exception_type(exception):
exc_type = exception[0] if isinstance(exception, tuple) else type(exception)
return getattr(exc_type, "__name__", None) or "Exception"
Loading
Loading