diff --git a/requirements-testing.txt b/requirements-testing.txt index 5cd669af9a..a6041972cd 100644 --- a/requirements-testing.txt +++ b/requirements-testing.txt @@ -4,6 +4,7 @@ tomli;python_version<"3.11" # Only needed for pytest on Python < 3.11 pytest-cov pytest-forked pytest-localserver +pytest-timeout pytest-watch jsonschema executing diff --git a/sentry_sdk/_batcher.py b/sentry_sdk/_batcher.py index 3b188a694b..4ba8046814 100644 --- a/sentry_sdk/_batcher.py +++ b/sentry_sdk/_batcher.py @@ -31,6 +31,7 @@ def __init__( self._record_lost_func = record_lost_func self._running = True self._lock = threading.Lock() + self._active: "threading.local" = threading.local() self._flush_event: "threading.Event" = threading.Event() @@ -70,23 +71,40 @@ def _ensure_thread(self) -> bool: return True def _flush_loop(self) -> None: + # Mark the flush-loop thread as active for its entire lifetime so + # that any re-entrant add() triggered by GC warnings during wait(), + # flush(), or Event operations is silently dropped instead of + # deadlocking on internal locks. + self._active.flag = True while self._running: self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random()) self._flush_event.clear() self._flush() def add(self, item: "T") -> None: - if not self._ensure_thread() or self._flusher is None: + # Bail out if the current thread is already executing batcher code. + # This prevents deadlocks when code running inside the batcher (e.g. + # _add_to_envelope during flush, or _flush_event.wait/set) triggers + # a GC-emitted warning that routes back through the logging + # integration into add(). + if getattr(self._active, "flag", False): return None - with self._lock: - if len(self._buffer) >= self.MAX_BEFORE_DROP: - self._record_lost(item) + self._active.flag = True + try: + if not self._ensure_thread() or self._flusher is None: return None - self._buffer.append(item) - if len(self._buffer) >= self.MAX_BEFORE_FLUSH: - self._flush_event.set() + with self._lock: + if len(self._buffer) >= self.MAX_BEFORE_DROP: + self._record_lost(item) + return None + + self._buffer.append(item) + if len(self._buffer) >= self.MAX_BEFORE_FLUSH: + self._flush_event.set() + finally: + self._active.flag = False def kill(self) -> None: if self._flusher is None: @@ -97,7 +115,12 @@ def kill(self) -> None: self._flusher = None def flush(self) -> None: - self._flush() + was_active = getattr(self._active, "flag", False) + self._active.flag = True + try: + self._flush() + finally: + self._active.flag = was_active def _add_to_envelope(self, envelope: "Envelope") -> None: envelope.add_item( diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 132760625f..967011c083 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -43,6 +43,7 @@ def __init__( self._record_lost_func = record_lost_func self._running = True self._lock = threading.Lock() + self._active: "threading.local" = threading.local() self._flush_event: "threading.Event" = threading.Event() @@ -50,29 +51,42 @@ def __init__( self._flusher_pid: "Optional[int]" = None def add(self, span: "StreamedSpan") -> None: - if not self._ensure_thread() or self._flusher is None: + # Bail out if the current thread is already executing batcher code. + # This prevents deadlocks when code running inside the batcher (e.g. + # _add_to_envelope during flush, or _flush_event.wait/set) triggers + # a GC-emitted warning that routes back through the logging + # integration into add(). + if getattr(self._active, "flag", False): return None - with self._lock: - size = len(self._span_buffer[span.trace_id]) - if size >= self.MAX_BEFORE_DROP: - self._record_lost_func( - reason="queue_overflow", - data_category="span", - quantity=1, - ) - return None - - self._span_buffer[span.trace_id].append(span) - self._running_size[span.trace_id] += self._estimate_size(span) + self._active.flag = True - if size + 1 >= self.MAX_BEFORE_FLUSH: - self._flush_event.set() - return + try: + if not self._ensure_thread() or self._flusher is None: + return None - if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH: - self._flush_event.set() - return + with self._lock: + size = len(self._span_buffer[span.trace_id]) + if size >= self.MAX_BEFORE_DROP: + self._record_lost_func( + reason="queue_overflow", + data_category="span", + quantity=1, + ) + return None + + self._span_buffer[span.trace_id].append(span) + self._running_size[span.trace_id] += self._estimate_size(span) + + if size + 1 >= self.MAX_BEFORE_FLUSH: + self._flush_event.set() + return + + if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH: + self._flush_event.set() + return + finally: + self._active.flag = False @staticmethod def _estimate_size(item: "StreamedSpan") -> int: diff --git a/tests/test_logs.py b/tests/test_logs.py index 5eb699f534..86861cfe90 100644 --- a/tests/test_logs.py +++ b/tests/test_logs.py @@ -783,3 +783,39 @@ def before_send_log(log, _): ) get_client().flush() + + +@minimum_python_37 +@pytest.mark.timeout(5) +def test_reentrant_add_does_not_deadlock(sentry_init, capture_envelopes): + """Adding to the batcher from within a flush must not deadlock. + + This covers the scenario where GC emits a ResourceWarning during + _add_to_envelope (or _flush_event.wait/set), and the warning is + routed through the logging integration back into batcher.add(). + See https://github.com/getsentry/sentry-python/issues/5681 + """ + sentry_init(enable_logs=True) + capture_envelopes() + + client = sentry_sdk.get_client() + batcher = client.log_batcher + + reentrant_add_called = False + original_add_to_envelope = batcher._add_to_envelope + + def add_to_envelope_with_reentrant_add(envelope): + nonlocal reentrant_add_called + # Simulate a GC warning routing back into add() during flush + batcher.add({"fake": "log"}) + reentrant_add_called = True + original_add_to_envelope(envelope) + + batcher._add_to_envelope = add_to_envelope_with_reentrant_add + + sentry_sdk.logger.warning("test log") + client.flush() + + assert reentrant_add_called + # If the re-entrancy guard didn't work, this test would hang and it'd + # eventually be timed out by pytest-timeout