From bb3648be53f298631480bd4b295e82992590c8df Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Tue, 17 Mar 2026 11:04:04 +0100 Subject: [PATCH 1/7] fix(logging): Fix deadlock in log batcher --- sentry_sdk/integrations/logging.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/sentry_sdk/integrations/logging.py b/sentry_sdk/integrations/logging.py index 42029c5a7a..ef2fc8d5b0 100644 --- a/sentry_sdk/integrations/logging.py +++ b/sentry_sdk/integrations/logging.py @@ -1,5 +1,6 @@ import logging import sys +import threading from datetime import datetime, timezone from fnmatch import fnmatch @@ -321,20 +322,29 @@ class SentryLogsHandler(_BaseHandler): Note that you do not have to use this class if the logging integration is enabled, which it is by default. """ + _emitting = threading.local() + def emit(self, record: "LogRecord") -> "Any": - with capture_internal_exceptions(): - self.format(record) - if not self._can_record(record): - return + if getattr(self._emitting, "active", False): + return + + self._emitting.active = True + try: + with capture_internal_exceptions(): + self.format(record) + if not self._can_record(record): + return - client = sentry_sdk.get_client() - if not client.is_active(): - return + client = sentry_sdk.get_client() + if not client.is_active(): + return - if not has_logs_enabled(client.options): - return + if not has_logs_enabled(client.options): + return - self._capture_log_from_record(client, record) + self._capture_log_from_record(client, record) + finally: + self._emitting.active = False def _capture_log_from_record( self, client: "BaseClient", record: "LogRecord" From a46b0da54fd25ec587400abe375f9bc28359524b Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Tue, 17 Mar 2026 11:19:57 +0100 Subject: [PATCH 2/7] . --- sentry_sdk/_batcher.py | 38 +++++++++++++++++++++++------- sentry_sdk/integrations/logging.py | 30 ++++++++--------------- 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/sentry_sdk/_batcher.py b/sentry_sdk/_batcher.py index 3b188a694b..861a8aefce 100644 --- a/sentry_sdk/_batcher.py +++ b/sentry_sdk/_batcher.py @@ -31,6 +31,7 @@ def __init__( self._record_lost_func = record_lost_func self._running = True self._lock = threading.Lock() + self._active: "threading.local" = threading.local() self._flush_event: "threading.Event" = threading.Event() @@ -70,23 +71,40 @@ def _ensure_thread(self) -> bool: return True def _flush_loop(self) -> None: + # Mark the flush-loop thread as active for its entire lifetime so + # that any re-entrant add() triggered by GC warnings during wait(), + # flush(), or Event operations is silently dropped instead of + # deadlocking on internal locks. + self._active.flag = True while self._running: self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random()) self._flush_event.clear() self._flush() def add(self, item: "T") -> None: - if not self._ensure_thread() or self._flusher is None: + # Bail out if the current thread is already executing batcher code. + # This prevents deadlocks when code running inside the batcher (e.g. + # _add_to_envelope during flush, or _flush_event.wait/set) triggers + # a GC-emitted warning that routes back through the logging + # integration into add(). + if getattr(self._active, "flag", False): return None - with self._lock: - if len(self._buffer) >= self.MAX_BEFORE_DROP: - self._record_lost(item) + self._active.flag = True + try: + if not self._ensure_thread() or self._flusher is None: return None - self._buffer.append(item) - if len(self._buffer) >= self.MAX_BEFORE_FLUSH: - self._flush_event.set() + with self._lock: + if len(self._buffer) >= self.MAX_BEFORE_DROP: + self._record_lost(item) + return None + + self._buffer.append(item) + if len(self._buffer) >= self.MAX_BEFORE_FLUSH: + self._flush_event.set() + finally: + self._active.flag = False def kill(self) -> None: if self._flusher is None: @@ -97,7 +115,11 @@ def kill(self) -> None: self._flusher = None def flush(self) -> None: - self._flush() + self._active.flag = True + try: + self._flush() + finally: + self._active.flag = False def _add_to_envelope(self, envelope: "Envelope") -> None: envelope.add_item( diff --git a/sentry_sdk/integrations/logging.py b/sentry_sdk/integrations/logging.py index ef2fc8d5b0..42029c5a7a 100644 --- a/sentry_sdk/integrations/logging.py +++ b/sentry_sdk/integrations/logging.py @@ -1,6 +1,5 @@ import logging import sys -import threading from datetime import datetime, timezone from fnmatch import fnmatch @@ -322,29 +321,20 @@ class SentryLogsHandler(_BaseHandler): Note that you do not have to use this class if the logging integration is enabled, which it is by default. """ - _emitting = threading.local() - def emit(self, record: "LogRecord") -> "Any": - if getattr(self._emitting, "active", False): - return - - self._emitting.active = True - try: - with capture_internal_exceptions(): - self.format(record) - if not self._can_record(record): - return + with capture_internal_exceptions(): + self.format(record) + if not self._can_record(record): + return - client = sentry_sdk.get_client() - if not client.is_active(): - return + client = sentry_sdk.get_client() + if not client.is_active(): + return - if not has_logs_enabled(client.options): - return + if not has_logs_enabled(client.options): + return - self._capture_log_from_record(client, record) - finally: - self._emitting.active = False + self._capture_log_from_record(client, record) def _capture_log_from_record( self, client: "BaseClient", record: "LogRecord" From eec68c4f66bad551ca27f36dcb79735ae41a9ea1 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Tue, 17 Mar 2026 12:51:24 +0100 Subject: [PATCH 3/7] . --- sentry_sdk/_span_batcher.py | 55 +++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 132760625f..f2335feada 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -43,6 +43,7 @@ def __init__( self._record_lost_func = record_lost_func self._running = True self._lock = threading.Lock() + self._active: "threading.local" = threading.local() self._flush_event: "threading.Event" = threading.Event() @@ -50,29 +51,43 @@ def __init__( self._flusher_pid: "Optional[int]" = None def add(self, span: "StreamedSpan") -> None: - if not self._ensure_thread() or self._flusher is None: + # Bail out if the current thread is already executing batcher code. + # This prevents deadlocks when code running inside the batcher (e.g. + # _add_to_envelope during flush, or _flush_event.wait/set) triggers + # a GC-emitted warning that routes back through the logging + # integration into add(). + + if getattr(self._active, "flag", False): return None - with self._lock: - size = len(self._span_buffer[span.trace_id]) - if size >= self.MAX_BEFORE_DROP: - self._record_lost_func( - reason="queue_overflow", - data_category="span", - quantity=1, - ) - return None - - self._span_buffer[span.trace_id].append(span) - self._running_size[span.trace_id] += self._estimate_size(span) - - if size + 1 >= self.MAX_BEFORE_FLUSH: - self._flush_event.set() - return + self._active.flag = True - if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH: - self._flush_event.set() - return + if not self._ensure_thread() or self._flusher is None: + return None + + try: + with self._lock: + size = len(self._span_buffer[span.trace_id]) + if size >= self.MAX_BEFORE_DROP: + self._record_lost_func( + reason="queue_overflow", + data_category="span", + quantity=1, + ) + return None + + self._span_buffer[span.trace_id].append(span) + self._running_size[span.trace_id] += self._estimate_size(span) + + if size + 1 >= self.MAX_BEFORE_FLUSH: + self._flush_event.set() + return + + if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH: + self._flush_event.set() + return + finally: + self._active.flag = False @staticmethod def _estimate_size(item: "StreamedSpan") -> int: From ac5c6d109bb6f35b8e049479c4ffd69dfd356a47 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Tue, 17 Mar 2026 12:56:41 +0100 Subject: [PATCH 4/7] fix --- sentry_sdk/_span_batcher.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index f2335feada..967011c083 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -56,16 +56,15 @@ def add(self, span: "StreamedSpan") -> None: # _add_to_envelope during flush, or _flush_event.wait/set) triggers # a GC-emitted warning that routes back through the logging # integration into add(). - if getattr(self._active, "flag", False): return None self._active.flag = True - if not self._ensure_thread() or self._flusher is None: - return None - try: + if not self._ensure_thread() or self._flusher is None: + return None + with self._lock: size = len(self._span_buffer[span.trace_id]) if size >= self.MAX_BEFORE_DROP: From 870996b33f625edf5abed9784c955ace32dbac8c Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Tue, 17 Mar 2026 14:05:55 +0100 Subject: [PATCH 5/7] add test --- tests/test_logs.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tests/test_logs.py b/tests/test_logs.py index 5eb699f534..86861cfe90 100644 --- a/tests/test_logs.py +++ b/tests/test_logs.py @@ -783,3 +783,39 @@ def before_send_log(log, _): ) get_client().flush() + + +@minimum_python_37 +@pytest.mark.timeout(5) +def test_reentrant_add_does_not_deadlock(sentry_init, capture_envelopes): + """Adding to the batcher from within a flush must not deadlock. + + This covers the scenario where GC emits a ResourceWarning during + _add_to_envelope (or _flush_event.wait/set), and the warning is + routed through the logging integration back into batcher.add(). + See https://github.com/getsentry/sentry-python/issues/5681 + """ + sentry_init(enable_logs=True) + capture_envelopes() + + client = sentry_sdk.get_client() + batcher = client.log_batcher + + reentrant_add_called = False + original_add_to_envelope = batcher._add_to_envelope + + def add_to_envelope_with_reentrant_add(envelope): + nonlocal reentrant_add_called + # Simulate a GC warning routing back into add() during flush + batcher.add({"fake": "log"}) + reentrant_add_called = True + original_add_to_envelope(envelope) + + batcher._add_to_envelope = add_to_envelope_with_reentrant_add + + sentry_sdk.logger.warning("test log") + client.flush() + + assert reentrant_add_called + # If the re-entrancy guard didn't work, this test would hang and it'd + # eventually be timed out by pytest-timeout From 41bc366e4db303425cfc5f5c184295a901d37c0e Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Tue, 17 Mar 2026 14:18:26 +0100 Subject: [PATCH 6/7] . --- requirements-testing.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements-testing.txt b/requirements-testing.txt index 5cd669af9a..a6041972cd 100644 --- a/requirements-testing.txt +++ b/requirements-testing.txt @@ -4,6 +4,7 @@ tomli;python_version<"3.11" # Only needed for pytest on Python < 3.11 pytest-cov pytest-forked pytest-localserver +pytest-timeout pytest-watch jsonschema executing From ed408f0d65e4ccc36653c0e607091d5e2a80c45a Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Tue, 17 Mar 2026 14:21:53 +0100 Subject: [PATCH 7/7] merge --- sentry_sdk/_batcher.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/_batcher.py b/sentry_sdk/_batcher.py index 861a8aefce..4ba8046814 100644 --- a/sentry_sdk/_batcher.py +++ b/sentry_sdk/_batcher.py @@ -115,11 +115,12 @@ def kill(self) -> None: self._flusher = None def flush(self) -> None: + was_active = getattr(self._active, "flag", False) self._active.flag = True try: self._flush() finally: - self._active.flag = False + self._active.flag = was_active def _add_to_envelope(self, envelope: "Envelope") -> None: envelope.add_item(