diff --git a/AGENTS.md b/AGENTS.md index 6179106348..1667524880 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -65,3 +65,18 @@ Do NOT edit these directly — modify source scripts instead: | `sentry_sdk/profiler/` | Performance profiling | | `tests/integrations/{name}/` | Integration test suites | | `scripts/populate_tox/config.py` | Test suite configuration | + + +## Long-term Knowledge + +### Gotcha + + +* **AGENTS.md must be excluded from markdown linters**: AGENTS.md is auto-managed by lore and uses \`\*\` list markers and long lines that violate typical remark-lint rules (unordered-list-marker-style, maximum-line-length). When a project uses remark with \`--frail\` (warnings become errors), AGENTS.md will fail CI. Fix: add \`AGENTS.md\` to \`.remarkignore\`. This applies to any lore-managed project with markdown linting. + + +* **Consola prompt cancel returns truthy Symbol, not false**: When a user cancels a \`consola\` / \`@clack/prompts\` confirmation prompt (Ctrl+C), the return value is \`Symbol(clack:cancel)\`, not \`false\`. Since Symbols are truthy in JavaScript, checking \`!confirmed\` will be \`false\` and the code falls through as if the user confirmed. Fix: use \`confirmed !== true\` (strict equality) instead of \`!confirmed\` to correctly handle cancel, false, and any other non-true values. + + +* **Zod z.coerce.number() converts null to 0 silently**: Zod gotchas in this codebase: (1) \`z.coerce.number()\` passes input through \`Number()\`, so \`null\` silently becomes \`0\`. Be aware if \`null\` vs \`0\` distinction matters. (2) Zod v4 \`.default({})\` short-circuits — it returns the default value without parsing through inner schema defaults. So \`.object({ enabled: z.boolean().default(true) }).default({})\` returns \`{}\`, not \`{ enabled: true }\`. Fix: provide fully-populated default objects. This affected nested config sections in src/config.ts during the v3→v4 upgrade. + diff --git a/scripts/populate_tox/config.py b/scripts/populate_tox/config.py index c6921754b4..1f403285c5 100644 --- a/scripts/populate_tox/config.py +++ b/scripts/populate_tox/config.py @@ -122,7 +122,7 @@ "pytest-asyncio", "python-multipart", "requests", - "anyio<4", + "anyio>=3,<5", ], # There's an incompatibility between FastAPI's TestClient, which is # actually Starlette's TestClient, which is actually httpx's Client. @@ -132,6 +132,7 @@ # FastAPI versions we use older httpx which still supports the # deprecated argument. "<0.110.1": ["httpx<0.28.0"], + "<0.80": ["anyio<4"], "py3.6": ["aiocontextvars"], }, }, @@ -170,7 +171,7 @@ "httpx": { "package": "httpx", "deps": { - "*": ["anyio<4.0.0"], + "*": ["anyio>=3,<5"], ">=0.16,<0.17": ["pytest-httpx==0.10.0"], ">=0.17,<0.19": ["pytest-httpx==0.12.0"], ">=0.19,<0.21": ["pytest-httpx==0.14.0"], diff --git a/sentry_sdk/__init__.py b/sentry_sdk/__init__.py index fda2f18dd1..7fd0e1953d 100644 --- a/sentry_sdk/__init__.py +++ b/sentry_sdk/__init__.py @@ -25,6 +25,7 @@ "configure_scope", "continue_trace", "flush", + "flush_async", "get_baggage", "get_client", "get_global_scope", diff --git a/sentry_sdk/api.py b/sentry_sdk/api.py index bea22d8be7..7607f045c7 100644 --- a/sentry_sdk/api.py +++ b/sentry_sdk/api.py @@ -58,6 +58,7 @@ def overload(x: "T") -> "T": "configure_scope", "continue_trace", "flush", + "flush_async", "get_baggage", "get_client", "get_global_scope", @@ -349,6 +350,14 @@ def flush( return get_client().flush(timeout=timeout, callback=callback) +@clientmethod +async def flush_async( + timeout: "Optional[float]" = None, + callback: "Optional[Callable[[int, float], None]]" = None, +) -> None: + return await get_client().flush_async(timeout=timeout, callback=callback) + + @scopemethod def start_span( **kwargs: "Any", diff --git a/sentry_sdk/client.py b/sentry_sdk/client.py index f540a49f35..8b7bdb30b3 100644 --- a/sentry_sdk/client.py +++ b/sentry_sdk/client.py @@ -33,7 +33,7 @@ from sentry_sdk.serializer import serialize from sentry_sdk.tracing import trace from sentry_sdk.tracing_utils import has_span_streaming_enabled -from sentry_sdk.transport import BaseHttpTransport, make_transport +from sentry_sdk.transport import HttpTransportCore, make_transport, AsyncHttpTransport from sentry_sdk.consts import ( SPANDATA, DEFAULT_MAX_VALUE_LENGTH, @@ -253,6 +253,12 @@ def close(self, *args: "Any", **kwargs: "Any") -> None: def flush(self, *args: "Any", **kwargs: "Any") -> None: return None + async def close_async(self, *args: "Any", **kwargs: "Any") -> None: + return None + + async def flush_async(self, *args: "Any", **kwargs: "Any") -> None: + return None + def __enter__(self) -> "BaseClient": return self @@ -474,7 +480,7 @@ def _record_lost_event( or self.metrics_batcher or self.span_batcher or has_profiling_enabled(self.options) - or isinstance(self.transport, BaseHttpTransport) + or isinstance(self.transport, HttpTransportCore) ): # If we have anything on that could spawn a background thread, we # need to check if it's safe to use them. @@ -1001,6 +1007,28 @@ def get_integration( return self.integrations.get(integration_name) + def _close_components(self) -> None: + """Kill all client components in the correct order.""" + self.session_flusher.kill() + if self.log_batcher is not None: + self.log_batcher.kill() + if self.metrics_batcher is not None: + self.metrics_batcher.kill() + if self.span_batcher is not None: + self.span_batcher.kill() + if self.monitor: + self.monitor.kill() + + def _flush_components(self) -> None: + """Flush all client components.""" + self.session_flusher.flush() + if self.log_batcher is not None: + self.log_batcher.flush() + if self.metrics_batcher is not None: + self.metrics_batcher.flush() + if self.span_batcher is not None: + self.span_batcher.flush() + def close( self, timeout: "Optional[float]" = None, @@ -1011,19 +1039,45 @@ def close( semantics as :py:meth:`Client.flush`. """ if self.transport is not None: - self.flush(timeout=timeout, callback=callback) - self.session_flusher.kill() - if self.log_batcher is not None: - self.log_batcher.kill() - if self.metrics_batcher is not None: - self.metrics_batcher.kill() - if self.span_batcher is not None: - self.span_batcher.kill() - if self.monitor: - self.monitor.kill() + if isinstance(self.transport, AsyncHttpTransport) and hasattr( + self.transport, "loop" + ): + logger.warning( + "close() used with AsyncHttpTransport. " + "Prefer close_async() for graceful async shutdown. " + "Performing synchronous best-effort cleanup." + ) + else: + self.flush(timeout=timeout, callback=callback) + self._close_components() self.transport.kill() self.transport = None + async def close_async( + self, + timeout: "Optional[float]" = None, + callback: "Optional[Callable[[int, float], None]]" = None, + ) -> None: + """ + Asynchronously close the client and shut down the transport. Arguments have the same + semantics as :py:meth:`Client.flush_async`. + """ + if self.transport is not None: + if not ( + isinstance(self.transport, AsyncHttpTransport) + and hasattr(self.transport, "loop") + ): + logger.debug( + "close_async() used with non-async transport, aborting. Please use close() instead." + ) + return + await self.flush_async(timeout=timeout, callback=callback) + self._close_components() + kill_task = self.transport.kill() # type: ignore + if kill_task is not None: + await kill_task + self.transport = None + def flush( self, timeout: "Optional[float]" = None, @@ -1037,23 +1091,59 @@ def flush( :param callback: Is invoked with the number of pending events and the configured timeout. """ if self.transport is not None: + if isinstance(self.transport, AsyncHttpTransport) and hasattr( + self.transport, "loop" + ): + logger.warning( + "flush() used with AsyncHttpTransport. Please use flush_async() instead." + ) + return if timeout is None: timeout = self.options["shutdown_timeout"] - self.session_flusher.flush() - if self.log_batcher is not None: - self.log_batcher.flush() - if self.metrics_batcher is not None: - self.metrics_batcher.flush() - if self.span_batcher is not None: - self.span_batcher.flush() + self._flush_components() + self.transport.flush(timeout=timeout, callback=callback) + async def flush_async( + self, + timeout: "Optional[float]" = None, + callback: "Optional[Callable[[int, float], None]]" = None, + ) -> None: + """ + Asynchronously wait for the current events to be sent. + + :param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used. + + :param callback: Is invoked with the number of pending events and the configured timeout. + """ + if self.transport is not None: + if not ( + isinstance(self.transport, AsyncHttpTransport) + and hasattr(self.transport, "loop") + ): + logger.debug( + "flush_async() used with non-async transport, aborting. Please use flush() instead." + ) + return + if timeout is None: + timeout = self.options["shutdown_timeout"] + self._flush_components() + flush_task = self.transport.flush(timeout=timeout, callback=callback) # type: ignore + if flush_task is not None: + await flush_task + def __enter__(self) -> "_Client": return self def __exit__(self, exc_type: "Any", exc_value: "Any", tb: "Any") -> None: self.close() + async def __aenter__(self) -> "_Client": + return self + + async def __aexit__(self, exc_type: "Any", exc_value: "Any", tb: "Any") -> None: + await self.close_async() + from typing import TYPE_CHECKING diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index eb077ef62f..3c4e175079 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -79,6 +79,7 @@ class CompressionAlgo(Enum): "transport_compression_algo": Optional[CompressionAlgo], "transport_num_pools": Optional[int], "transport_http2": Optional[bool], + "transport_async": Optional[bool], "enable_logs": Optional[bool], "before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]], "enable_metrics": Optional[bool], diff --git a/sentry_sdk/integrations/asyncio.py b/sentry_sdk/integrations/asyncio.py index b7aa0a7202..91fd54bf3a 100644 --- a/sentry_sdk/integrations/asyncio.py +++ b/sentry_sdk/integrations/asyncio.py @@ -5,7 +5,13 @@ from sentry_sdk.consts import OP from sentry_sdk.integrations import Integration, DidNotEnable from sentry_sdk.integrations._wsgi_common import nullcontext -from sentry_sdk.utils import event_from_exception, logger, reraise +from sentry_sdk.utils import ( + event_from_exception, + logger, + reraise, + is_internal_task, +) +from sentry_sdk.transport import AsyncHttpTransport try: import asyncio @@ -13,7 +19,7 @@ except ImportError: raise DidNotEnable("asyncio not available") -from typing import cast, TYPE_CHECKING +from typing import TYPE_CHECKING if TYPE_CHECKING: from typing import Any, Callable, TypeVar @@ -42,6 +48,76 @@ def _wrap_coroutine(wrapped: "Coroutine[Any, Any, Any]") -> "Callable[[T], T]": ) +def patch_loop_close() -> None: + """Patch loop.close to flush pending events before shutdown.""" + # Atexit shutdown hook happens after the event loop is closed. + # Therefore, it is necessary to patch the loop.close method to ensure + # that pending events are flushed before the interpreter shuts down. + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # No running loop → cannot patch now + return + + if getattr(loop, "_sentry_flush_patched", False): + return + + async def _flush() -> None: + client = sentry_sdk.get_client() + if not client.is_active(): + return + + try: + if not isinstance(client.transport, AsyncHttpTransport): + return + + await client.close_async() + except Exception: + logger.warning("Sentry flush failed during loop shutdown", exc_info=True) + + orig_close = loop.close + + def _patched_close() -> None: + try: + loop.run_until_complete(_flush()) + except Exception: + logger.debug( + "Could not flush Sentry events during loop close", exc_info=True + ) + finally: + orig_close() + + loop.close = _patched_close # type: ignore + loop._sentry_flush_patched = True # type: ignore + + +def _create_task_with_factory( + orig_task_factory: "Any", + loop: "asyncio.AbstractEventLoop", + coro: "Coroutine[Any, Any, Any]", + **kwargs: "Any", +) -> "asyncio.Task[Any]": + task = None + + # Trying to use user set task factory (if there is one) + if orig_task_factory: + task = orig_task_factory(loop, coro, **kwargs) + + if task is None: + # The default task factory in `asyncio` does not have its own function + # but is just a couple of lines in `asyncio.base_events.create_task()` + # Those lines are copied here. + + # WARNING: + # If the default behavior of the task creation in asyncio changes, + # this will break! + task = Task(coro, loop=loop, **kwargs) + if task._source_traceback: # type: ignore + del task._source_traceback[-1] # type: ignore + + return task + + def patch_asyncio() -> None: orig_task_factory = None try: @@ -57,6 +133,12 @@ def _sentry_task_factory( coro: "Coroutine[Any, Any, Any]", **kwargs: "Any", ) -> "asyncio.Future[Any]": + # Check if this is an internal Sentry task + if is_internal_task(): + return _create_task_with_factory( + orig_task_factory, loop, coro, **kwargs + ) + @_wrap_coroutine(coro) async def _task_with_sentry_span_creation() -> "Any": result = None @@ -85,31 +167,13 @@ async def _task_with_sentry_span_creation() -> "Any": return result - task = None - - # Trying to use user set task factory (if there is one) - if orig_task_factory: - task = orig_task_factory( - loop, _task_with_sentry_span_creation(), **kwargs - ) - - if task is None: - # The default task factory in `asyncio` does not have its own function - # but is just a couple of lines in `asyncio.base_events.create_task()` - # Those lines are copied here. - - # WARNING: - # If the default behavior of the task creation in asyncio changes, - # this will break! - task = Task(_task_with_sentry_span_creation(), loop=loop, **kwargs) - if task._source_traceback: # type: ignore - del task._source_traceback[-1] # type: ignore + task = _create_task_with_factory( + orig_task_factory, loop, _task_with_sentry_span_creation(), **kwargs + ) # Set the task name to include the original coroutine's name try: - cast("asyncio.Task[Any]", task).set_name( - f"{get_name(coro)} (Sentry-wrapped)" - ) + task.set_name(f"{get_name(coro)} (Sentry-wrapped)") except AttributeError: # set_name might not be available in all Python versions pass @@ -156,6 +220,7 @@ def __init__(self, task_spans: bool = True) -> None: @staticmethod def setup_once() -> None: patch_asyncio() + patch_loop_close() def enable_asyncio_integration(*args: "Any", **kwargs: "Any") -> None: diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index dcfe55406b..28e1c8b34e 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +import asyncio import io import os import gzip @@ -15,13 +16,37 @@ except ImportError: brotli = None +try: + import httpcore +except ImportError: + httpcore = None # type: ignore[assignment,unused-ignore] + +try: + import h2 # noqa: F401 + + HTTP2_ENABLED = httpcore is not None +except ImportError: + HTTP2_ENABLED = False + +try: + import anyio # noqa: F401 + + ASYNC_TRANSPORT_ENABLED = httpcore is not None +except ImportError: + ASYNC_TRANSPORT_ENABLED = False + import urllib3 import certifi import sentry_sdk from sentry_sdk.consts import EndpointType -from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions -from sentry_sdk.worker import BackgroundWorker +from sentry_sdk.utils import ( + Dsn, + logger, + capture_internal_exceptions, + mark_sentry_task_internal, +) +from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker from sentry_sdk.envelope import Envelope, Item, PayloadRef from typing import TYPE_CHECKING, cast, List, Dict @@ -170,8 +195,8 @@ def _parse_rate_limits( continue -class BaseHttpTransport(Transport): - """The base HTTP transport.""" +class HttpTransportCore(Transport): + """Shared base class for sync and async transports.""" TIMEOUT = 30 # seconds @@ -181,7 +206,7 @@ def __init__(self: "Self", options: "Dict[str, Any]") -> None: Transport.__init__(self, options) assert self.parsed_dsn is not None self.options: "Dict[str, Any]" = options - self._worker = BackgroundWorker(queue_size=options["transport_queue_size"]) + self._worker = self._create_worker(options) self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION) self._disabled_until: "Dict[Optional[EventDataCategory], datetime]" = {} # We only use this Retry() class for the `get_retry_after` method it exposes @@ -235,6 +260,9 @@ def __init__(self: "Self", options: "Dict[str, Any]") -> None: elif self._compression_algo == "br": self._compression_level = 4 + def _create_worker(self: "Self", options: "Dict[str, Any]") -> "Worker": + raise NotImplementedError() + def record_lost_event( self, reason: str, @@ -305,12 +333,11 @@ def _update_rate_limits( seconds=retry_after ) - def _send_request( + def _handle_request_error( self: "Self", - body: bytes, - headers: "Dict[str, str]", - endpoint_type: "EndpointType" = EndpointType.ENVELOPE, - envelope: "Optional[Envelope]" = None, + envelope: "Optional[Envelope]", + loss_reason: str = "network", + record_reason: str = "network_error", ) -> None: def record_loss(reason: str) -> None: if envelope is None: @@ -319,59 +346,59 @@ def record_loss(reason: str) -> None: for item in envelope.items: self.record_lost_event(reason, item=item) + self.on_dropped_event(loss_reason) + record_loss(record_reason) + + def _handle_response( + self: "Self", + response: "Union[urllib3.BaseHTTPResponse, httpcore.Response]", + envelope: "Optional[Envelope]", + ) -> None: + self._update_rate_limits(response) + + if response.status == 413: + size_exceeded_message = ( + "HTTP 413: Event dropped due to exceeded envelope size limit" + ) + response_message = getattr( + response, "data", getattr(response, "content", None) + ) + if response_message is not None: + size_exceeded_message += f" (body: {response_message})" + + logger.error(size_exceeded_message) + self._handle_request_error( + envelope=envelope, loss_reason="status_413", record_reason="send_error" + ) + + elif response.status == 429: + # if we hit a 429. Something was rate limited but we already + # acted on this in `self._update_rate_limits`. Note that we + # do not want to record event loss here as we will have recorded + # an outcome in relay already. + self.on_dropped_event("status_429") + pass + + elif response.status >= 300 or response.status < 200: + logger.error( + "Unexpected status code: %s (body: %s)", + response.status, + getattr(response, "data", getattr(response, "content", None)), + ) + self._handle_request_error( + envelope=envelope, loss_reason="status_{}".format(response.status) + ) + + def _update_headers( + self: "Self", + headers: "Dict[str, str]", + ) -> None: headers.update( { "User-Agent": str(self._auth.client), "X-Sentry-Auth": str(self._auth.to_header()), } ) - try: - response = self._request( - "POST", - endpoint_type, - body, - headers, - ) - except Exception: - self.on_dropped_event("network") - record_loss("network_error") - raise - - try: - self._update_rate_limits(response) - - if response.status == 413: - size_exceeded_message = ( - "HTTP 413: Event dropped due to exceeded envelope size limit" - ) - response_message = getattr( - response, "data", getattr(response, "content", None) - ) - if response_message is not None: - size_exceeded_message += f" (body: {response_message})" - - logger.error(size_exceeded_message) - self.on_dropped_event("status_413") - record_loss("send_error") - - elif response.status == 429: - # if we hit a 429. Something was rate limited but we already - # acted on this in `self._update_rate_limits`. Note that we - # do not want to record event loss here as we will have recorded - # an outcome in relay already. - self.on_dropped_event("status_429") - pass - - elif response.status >= 300 or response.status < 200: - logger.error( - "Unexpected status code: %s (body: %s)", - response.status, - getattr(response, "data", getattr(response, "content", None)), - ) - self.on_dropped_event("status_{}".format(response.status)) - record_loss("network_error") - finally: - response.close() def on_dropped_event(self: "Self", _reason: str) -> None: return None @@ -408,11 +435,6 @@ def _fetch_pending_client_report( type="client_report", ) - def _flush_client_reports(self: "Self", force: bool = False) -> None: - client_report = self._fetch_pending_client_report(force=force, interval=60) - if client_report is not None: - self.capture_envelope(Envelope(items=[client_report])) - def _check_disabled(self, category: str) -> bool: def _disabled(bucket: "Any") -> bool: ts = self._disabled_until.get(bucket) @@ -431,7 +453,9 @@ def _is_worker_full(self: "Self") -> bool: def is_healthy(self: "Self") -> bool: return not (self._is_worker_full() or self._is_rate_limited()) - def _send_envelope(self: "Self", envelope: "Envelope") -> None: + def _prepare_envelope( + self: "Self", envelope: "Envelope" + ) -> "Optional[Tuple[Envelope, io.BytesIO, Dict[str, str]]]": # remove all items from the envelope which are over quota new_items = [] for item in envelope.items: @@ -468,19 +492,13 @@ def _send_envelope(self: "Self", envelope: "Envelope") -> None: self.parsed_dsn.host, ) - headers = { + headers: "Dict[str, str]" = { "Content-Type": "application/x-sentry-envelope", } if content_encoding: headers["Content-Encoding"] = content_encoding - self._send_request( - body.getvalue(), - headers=headers, - endpoint_type=EndpointType.ENVELOPE, - envelope=envelope, - ) - return None + return envelope, body, headers def _serialize_envelope( self: "Self", envelope: "Envelope" @@ -520,7 +538,7 @@ def _in_no_proxy(self: "Self", parsed_dsn: "Dsn") -> bool: def _make_pool( self: "Self", - ) -> "Union[PoolManager, ProxyManager, httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool]": + ) -> "Union[PoolManager, ProxyManager, httpcore.SOCKSProxy, httpcore.HTTPProxy, httpcore.ConnectionPool, httpcore.AsyncSOCKSProxy, httpcore.AsyncHTTPProxy, httpcore.AsyncConnectionPool]": raise NotImplementedError() def _request( @@ -532,6 +550,59 @@ def _request( ) -> "Union[urllib3.BaseHTTPResponse, httpcore.Response]": raise NotImplementedError() + def kill(self: "Self") -> None: + logger.debug("Killing HTTP transport") + self._worker.kill() + + +# Keep BaseHttpTransport as an alias for backwards compatibility +# and for the sync transport implementation +class BaseHttpTransport(HttpTransportCore): + """The base HTTP transport (synchronous).""" + + def _send_envelope(self: "Self", envelope: "Envelope") -> None: + _prepared_envelope = self._prepare_envelope(envelope) + if _prepared_envelope is not None: + envelope, body, headers = _prepared_envelope + self._send_request( + body.getvalue(), + headers=headers, + endpoint_type=EndpointType.ENVELOPE, + envelope=envelope, + ) + return None + + def _send_request( + self: "Self", + body: bytes, + headers: "Dict[str, str]", + endpoint_type: "EndpointType", + envelope: "Optional[Envelope]" = None, + ) -> None: + self._update_headers(headers) + try: + response = self._request( + "POST", + endpoint_type, + body, + headers, + ) + except Exception: + self._handle_request_error(envelope=envelope, loss_reason="network") + raise + try: + self._handle_response(response=response, envelope=envelope) + finally: + response.close() + + def _create_worker(self: "Self", options: "Dict[str, Any]") -> "Worker": + return BackgroundWorker(queue_size=options["transport_queue_size"]) + + def _flush_client_reports(self: "Self", force: bool = False) -> None: + client_report = self._fetch_pending_client_report(force=force, interval=60) + if client_report is not None: + self.capture_envelope(Envelope(items=[client_report])) + def capture_envelope( self, envelope: "Envelope", @@ -557,10 +628,6 @@ def flush( self._worker.submit(lambda: self._flush_client_reports(force=True)) self._worker.flush(timeout, callback) - def kill(self: "Self") -> None: - logger.debug("Killing HTTP transport") - self._worker.kill() - @staticmethod def _warn_hub_cls() -> None: """Convenience method to warn users about the deprecation of the `hub_cls` attribute.""" @@ -688,10 +755,223 @@ def _request( ) -try: - import httpcore - import h2 # noqa: F401 -except ImportError: +if not ASYNC_TRANSPORT_ENABLED: + # Sorry, no AsyncHttpTransport for you + AsyncHttpTransport = HttpTransport # type: ignore[misc,unused-ignore] + +else: + + class AsyncHttpTransport(HttpTransportCore): # type: ignore[no-redef] + def __init__(self: "Self", options: "Dict[str, Any]") -> None: + super().__init__(options) + # Requires event loop at init time + self.loop = asyncio.get_running_loop() + + def _create_worker(self: "Self", options: "Dict[str, Any]") -> "Worker": + return AsyncWorker(queue_size=options["transport_queue_size"]) + + def _get_header_value( + self: "Self", response: "Any", header: str + ) -> "Optional[str]": + header_lower = header.lower() + return next( + ( + val.decode("ascii") + for key, val in response.headers + if key.decode("ascii").lower() == header_lower + ), + None, + ) + + async def _send_envelope(self: "Self", envelope: "Envelope") -> None: # type: ignore[override,unused-ignore] + _prepared_envelope = self._prepare_envelope(envelope) + if _prepared_envelope is not None: + envelope, body, headers = _prepared_envelope + await self._send_request( + body.getvalue(), + headers=headers, + endpoint_type=EndpointType.ENVELOPE, + envelope=envelope, + ) + return None + + async def _send_request( # type: ignore[override,unused-ignore] + self: "Self", + body: bytes, + headers: "Dict[str, str]", + endpoint_type: "EndpointType", + envelope: "Optional[Envelope]" = None, + ) -> None: + self._update_headers(headers) + try: + response = await self._request( + "POST", + endpoint_type, + body, + headers, + ) + except Exception: + self._handle_request_error(envelope=envelope, loss_reason="network") + raise + try: + self._handle_response(response=response, envelope=envelope) + finally: + await response.aclose() + + async def _request( # type: ignore[override,unused-ignore] + self: "Self", + method: str, + endpoint_type: "EndpointType", + body: "Any", + headers: "Mapping[str, str]", + ) -> "httpcore.Response": + return await self._pool.request( # type: ignore[misc,unused-ignore] + method, + self._auth.get_api_url(endpoint_type), + content=body, + headers=headers, # type: ignore[arg-type,unused-ignore] + extensions={ + "timeout": { + "pool": self.TIMEOUT, + "connect": self.TIMEOUT, + "write": self.TIMEOUT, + "read": self.TIMEOUT, + } + }, + ) + + async def _flush_client_reports(self: "Self", force: bool = False) -> None: + client_report = self._fetch_pending_client_report(force=force, interval=60) + if client_report is not None: + self.capture_envelope(Envelope(items=[client_report])) + + def _capture_envelope(self: "Self", envelope: "Envelope") -> None: + async def send_envelope_wrapper() -> None: + with capture_internal_exceptions(): + await self._send_envelope(envelope) + await self._flush_client_reports() + + if not self._worker.submit(send_envelope_wrapper): + self.on_dropped_event("full_queue") + for item in envelope.items: + self.record_lost_event("queue_overflow", item=item) + + def capture_envelope(self: "Self", envelope: "Envelope") -> None: + # Synchronous entry point + if self.loop and self.loop.is_running(): + self.loop.call_soon_threadsafe(self._capture_envelope, envelope) + else: + # The event loop is no longer running + logger.warning("Async Transport is not running in an event loop.") + self.on_dropped_event("internal_sdk_error") + for item in envelope.items: + self.record_lost_event("internal_sdk_error", item=item) + + def flush( # type: ignore[override] + self: "Self", + timeout: float, + callback: "Optional[Callable[[int, float], None]]" = None, + ) -> "Optional[asyncio.Task[None]]": + logger.debug("Flushing HTTP transport") + + if timeout > 0: + self._worker.submit(lambda: self._flush_client_reports(force=True)) + return self._worker.flush(timeout, callback) # type: ignore[func-returns-value] + return None + + def _get_pool_options(self: "Self") -> "Dict[str, Any]": + options: "Dict[str, Any]" = { + "http2": False, # no HTTP2 for now + "retries": 3, + } + + socket_options = ( + self.options["socket_options"] + if self.options["socket_options"] is not None + else [] + ) + + used_options = {(o[0], o[1]) for o in socket_options} + for default_option in KEEP_ALIVE_SOCKET_OPTIONS: + if (default_option[0], default_option[1]) not in used_options: + socket_options.append(default_option) + + options["socket_options"] = socket_options + + ssl_context = ssl.create_default_context() + ssl_context.load_verify_locations( + self.options["ca_certs"] # User-provided bundle from the SDK init + or os.environ.get("SSL_CERT_FILE") + or os.environ.get("REQUESTS_CA_BUNDLE") + or certifi.where() + ) + cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE") + key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE") + if cert_file is not None: + ssl_context.load_cert_chain(cert_file, key_file) + + options["ssl_context"] = ssl_context + + return options + + def _make_pool( + self: "Self", + ) -> "Union[httpcore.AsyncSOCKSProxy, httpcore.AsyncHTTPProxy, httpcore.AsyncConnectionPool]": + if self.parsed_dsn is None: + raise ValueError("Cannot create HTTP-based transport without valid DSN") + proxy = None + no_proxy = self._in_no_proxy(self.parsed_dsn) + + # try HTTPS first + https_proxy = self.options["https_proxy"] + if self.parsed_dsn.scheme == "https" and (https_proxy != ""): + proxy = https_proxy or (not no_proxy and getproxies().get("https")) + + # maybe fallback to HTTP proxy + http_proxy = self.options["http_proxy"] + if not proxy and (http_proxy != ""): + proxy = http_proxy or (not no_proxy and getproxies().get("http")) + + opts = self._get_pool_options() + + if proxy: + proxy_headers = self.options["proxy_headers"] + if proxy_headers: + opts["proxy_headers"] = proxy_headers + + if proxy.startswith("socks"): + try: + socks_opts = opts.copy() + if "socket_options" in socks_opts: + socket_options = socks_opts.pop("socket_options") + if socket_options: + logger.warning( + "You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options." + ) + return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **socks_opts) + except RuntimeError: + logger.warning( + "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.", + proxy, + ) + else: + return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts) + + return httpcore.AsyncConnectionPool(**opts) + + def kill(self: "Self") -> "Optional[asyncio.Task[None]]": # type: ignore[override] + logger.debug("Killing HTTP transport") + self._worker.kill() + try: + # Return the pool cleanup task so caller can await it if needed + with mark_sentry_task_internal(): + return self.loop.create_task(self._pool.aclose()) # type: ignore[union-attr,unused-ignore] + except RuntimeError: + logger.warning("Event loop not running, aborting kill.") + return None + + +if not HTTP2_ENABLED: # Sorry, no Http2Transport for you class Http2Transport(HttpTransport): def __init__(self: "Self", options: "Dict[str, Any]") -> None: @@ -735,7 +1015,7 @@ def _request( method, self._auth.get_api_url(endpoint_type), content=body, - headers=headers, # type: ignore + headers=headers, # type: ignore[arg-type,unused-ignore] extensions={ "timeout": { "pool": self.TIMEOUT, @@ -861,12 +1141,39 @@ def make_transport(options: "Dict[str, Any]") -> "Optional[Transport]": ref_transport = options["transport"] use_http2_transport = options.get("_experiments", {}).get("transport_http2", False) + use_async_transport = options.get("_experiments", {}).get("transport_async", False) + async_integration = any( + integration.__class__.__name__ == "AsyncioIntegration" + for integration in options.get("integrations") or [] + ) # By default, we use the http transport class transport_cls: "Type[Transport]" = ( Http2Transport if use_http2_transport else HttpTransport ) + if use_async_transport and ASYNC_TRANSPORT_ENABLED: + try: + asyncio.get_running_loop() + if async_integration: + if use_http2_transport: + logger.warning( + "HTTP/2 transport is not supported with async transport. " + "Ignoring transport_http2 experiment." + ) + transport_cls = AsyncHttpTransport + else: + logger.warning( + "You tried to use AsyncHttpTransport but the AsyncioIntegration is not enabled. Falling back to sync transport." + ) + except RuntimeError: + # No event loop running, fall back to sync transport + logger.warning("No event loop running, falling back to sync transport.") + elif use_async_transport: + logger.warning( + "You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport." + ) + if isinstance(ref_transport, Transport): return ref_transport elif isinstance(ref_transport, type) and issubclass(ref_transport, Transport): diff --git a/sentry_sdk/utils.py b/sentry_sdk/utils.py index a333467ae9..ae6924d87f 100644 --- a/sentry_sdk/utils.py +++ b/sentry_sdk/utils.py @@ -1,4 +1,5 @@ import base64 +import contextvars import json import linecache import logging @@ -12,6 +13,7 @@ import threading import time from collections import namedtuple +from contextlib import contextmanager from datetime import datetime, timezone from decimal import Decimal from functools import partial, partialmethod, wraps @@ -44,6 +46,7 @@ Callable, ContextManager, Dict, + Generator, Iterator, List, Literal, @@ -82,6 +85,25 @@ _installed_modules = None +_is_sentry_internal_task = contextvars.ContextVar( + "is_sentry_internal_task", default=False +) + + +def is_internal_task() -> bool: + return _is_sentry_internal_task.get() + + +@contextmanager +def mark_sentry_task_internal() -> "Generator[None, None, None]": + """Context manager to mark a task as Sentry internal.""" + token = _is_sentry_internal_task.set(True) + try: + yield + finally: + _is_sentry_internal_task.reset(token) + + BASE64_ALPHABET = re.compile(r"^[a-zA-Z0-9/+=]*$") FALSY_ENV_VALUES = frozenset(("false", "f", "n", "no", "off", "0")) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 3d85a653d6..45435ef56c 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -1,9 +1,13 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +import asyncio import os import threading from time import sleep, time from sentry_sdk._queue import Queue, FullError -from sentry_sdk.utils import logger +from sentry_sdk.utils import logger, mark_sentry_task_internal from sentry_sdk.consts import DEFAULT_QUEUE_SIZE from typing import TYPE_CHECKING @@ -17,12 +21,70 @@ _TERMINATOR = object() -class BackgroundWorker: +class Worker(ABC): + """ + Base class for all workers. + + A worker is used to process events in the background and send them to Sentry. + """ + + @property + @abstractmethod + def is_alive(self) -> bool: + """ + Checks whether the worker is alive and running. + + Returns True if the worker is alive, False otherwise. + """ + pass + + @abstractmethod + def kill(self) -> None: + """ + Kills the worker. + + This method is used to kill the worker. The queue will be drained up to the point where the worker is killed. + The worker will not be able to process any more events. + """ + pass + + def flush( + self, timeout: float, callback: Optional[Callable[[int, float], Any]] = None + ) -> None: + """ + Flush the worker. + + This method blocks until the worker has flushed all events or the specified timeout is reached. + Default implementation is a no-op, since this method may only be relevant to some workers. + Subclasses should override this method if necessary. + """ + return None + + @abstractmethod + def full(self) -> bool: + """ + Checks whether the worker's queue is full. + + Returns True if the queue is full, False otherwise. + """ + pass + + @abstractmethod + def submit(self, callback: Callable[[], Any]) -> bool: + """ + Schedule a callback to be executed by the worker. + + Returns True if the callback was scheduled, False if the queue is full. + """ + pass + + +class BackgroundWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: - self._queue: "Queue" = Queue(queue_size) + self._queue: Queue = Queue(queue_size) self._lock = threading.Lock() - self._thread: "Optional[threading.Thread]" = None - self._thread_for_pid: "Optional[int]" = None + self._thread: Optional[threading.Thread] = None + self._thread_for_pid: Optional[int] = None @property def is_alive(self) -> bool: @@ -85,7 +147,7 @@ def kill(self) -> None: self._thread = None self._thread_for_pid = None - def flush(self, timeout: float, callback: "Optional[Any]" = None) -> None: + def flush(self, timeout: float, callback: Optional[Any] = None) -> None: logger.debug("background worker got flush request") with self._lock: if self.is_alive and timeout > 0.0: @@ -95,7 +157,7 @@ def flush(self, timeout: float, callback: "Optional[Any]" = None) -> None: def full(self) -> bool: return self._queue.full() - def _wait_flush(self, timeout: float, callback: "Optional[Any]") -> None: + def _wait_flush(self, timeout: float, callback: Optional[Any]) -> None: initial_timeout = min(0.1, timeout) if not self._timed_queue_join(initial_timeout): pending = self._queue.qsize() + 1 @@ -107,7 +169,7 @@ def _wait_flush(self, timeout: float, callback: "Optional[Any]") -> None: pending = self._queue.qsize() + 1 logger.error("flush timed out, dropped %s events", pending) - def submit(self, callback: "Callable[[], None]") -> bool: + def submit(self, callback: Callable[[], Any]) -> bool: self._ensure_thread() try: self._queue.put_nowait(callback) @@ -128,3 +190,143 @@ def _target(self) -> None: finally: self._queue.task_done() sleep(0) + + +class AsyncWorker(Worker): + def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: + self._queue: Optional[asyncio.Queue[Any]] = None + self._queue_size = queue_size + self._task: Optional[asyncio.Task[None]] = None + # Event loop needs to remain in the same process + self._task_for_pid: Optional[int] = None + self._loop: Optional[asyncio.AbstractEventLoop] = None + # Track active callback tasks so they have a strong reference and can be cancelled on kill + self._active_tasks: set[asyncio.Task[None]] = set() + + @property + def is_alive(self) -> bool: + if self._task_for_pid != os.getpid(): + return False + if not self._task or not self._loop: + return False + return self._loop.is_running() and not self._task.done() + + def kill(self) -> None: + if self._task: + # Cancel the main consumer task to prevent duplicate consumers + self._task.cancel() + if self._queue is not None: + try: + self._queue.put_nowait(_TERMINATOR) + except asyncio.QueueFull: + logger.debug("async worker queue full, kill failed") + # Also cancel any active callback tasks + # Avoid modifying the set while cancelling tasks + tasks_to_cancel = set(self._active_tasks) + for task in tasks_to_cancel: + task.cancel() + self._active_tasks.clear() + self._loop = None + self._task = None + self._task_for_pid = None + + def start(self) -> None: + if not self.is_alive: + try: + self._loop = asyncio.get_running_loop() + if self._queue is None: + self._queue = asyncio.Queue(maxsize=self._queue_size) + with mark_sentry_task_internal(): + self._task = self._loop.create_task(self._target()) + self._task_for_pid = os.getpid() + except RuntimeError: + # There is no event loop running + logger.warning("No event loop running, async worker not started") + self._loop = None + self._task = None + self._task_for_pid = None + + def full(self) -> bool: + if self._queue is None: + return True + return self._queue.full() + + def _ensure_task(self) -> None: + if not self.is_alive: + self.start() + + async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> None: + if not self._loop or not self._loop.is_running() or self._queue is None: + return + + initial_timeout = min(0.1, timeout) + + # Timeout on the join + try: + await asyncio.wait_for(self._queue.join(), timeout=initial_timeout) + except asyncio.TimeoutError: + pending = self._queue.qsize() + len(self._active_tasks) + logger.debug("%d event(s) pending on flush", pending) + if callback is not None: + callback(pending, timeout) + + try: + remaining_timeout = timeout - initial_timeout + await asyncio.wait_for(self._queue.join(), timeout=remaining_timeout) + except asyncio.TimeoutError: + pending = self._queue.qsize() + len(self._active_tasks) + logger.error("flush timed out, dropped %s events", pending) + + def flush( # type: ignore[override] + self, timeout: float, callback: Optional[Any] = None + ) -> Optional[asyncio.Task[None]]: + if self.is_alive and timeout > 0.0 and self._loop and self._loop.is_running(): + with mark_sentry_task_internal(): + return self._loop.create_task(self._wait_flush(timeout, callback)) + return None + + def submit(self, callback: Callable[[], Any]) -> bool: + self._ensure_task() + if self._queue is None: + return False + try: + self._queue.put_nowait(callback) + return True + except asyncio.QueueFull: + return False + + async def _target(self) -> None: + if self._queue is None: + return + while True: + callback = await self._queue.get() + if callback is _TERMINATOR: + self._queue.task_done() + break + # Firing tasks instead of awaiting them allows for concurrent requests + with mark_sentry_task_internal(): + task = asyncio.create_task(self._process_callback(callback)) + # Create a strong reference to the task so it can be cancelled on kill + # and does not get garbage collected while running + self._active_tasks.add(task) + task.add_done_callback(self._on_task_complete) + # Yield to let the event loop run other tasks + await asyncio.sleep(0) + + async def _process_callback(self, callback: Callable[[], Any]) -> None: + # Callback is an async coroutine, need to await it + await callback() + + def _on_task_complete(self, task: asyncio.Task[None]) -> None: + try: + task.result() + except asyncio.CancelledError: + pass # Task was cancelled, expected during shutdown + except Exception: + logger.error("Failed processing job", exc_info=True) + finally: + # Mark the task as done and remove it from the active tasks set + # This happens only after the task has completed + if self._queue is not None: + self._queue.task_done() + self._active_tasks.discard(task) diff --git a/setup.py b/setup.py index eb8ee4bd4a..8c173465fd 100644 --- a/setup.py +++ b/setup.py @@ -59,6 +59,7 @@ def get_file_text(file_name): "flask": ["flask>=0.11", "blinker>=1.1", "markupsafe"], "grpcio": ["grpcio>=1.21.1", "protobuf>=3.8.0"], "http2": ["httpcore[http2]==1.*"], + "asyncio": ["httpcore[asyncio]==1.*"], "httpx": ["httpx>=0.16.0"], "huey": ["huey>=2"], "huggingface_hub": ["huggingface_hub>=0.22"], diff --git a/tests/integrations/asyncio/test_asyncio.py b/tests/integrations/asyncio/test_asyncio.py index b41aa244cb..c7eb057416 100644 --- a/tests/integrations/asyncio/test_asyncio.py +++ b/tests/integrations/asyncio/test_asyncio.py @@ -1,7 +1,10 @@ import asyncio import inspect import sys -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, Mock, patch + +if sys.version_info >= (3, 8): + from unittest.mock import AsyncMock import pytest @@ -12,6 +15,7 @@ patch_asyncio, enable_asyncio_integration, ) +from sentry_sdk.utils import mark_sentry_task_internal try: from contextvars import Context, ContextVar @@ -24,6 +28,11 @@ ) +minimum_python_39 = pytest.mark.skipif( + sys.version_info < (3, 9), reason="Test requires Python >= 3.9" +) + + minimum_python_311 = pytest.mark.skipif( sys.version_info < (3, 11), reason="Asyncio task context parameter was introduced in Python 3.11", @@ -564,3 +573,107 @@ async def test_delayed_enable_integration_after_disabling(sentry_init, capture_e (transaction,) = events assert transaction["spans"] assert transaction["spans"][0]["origin"] == "auto.function.asyncio" + + +@minimum_python_39 +@pytest.mark.asyncio(loop_scope="module") +async def test_internal_tasks_not_wrapped(sentry_init, capture_events): + sentry_init(integrations=[AsyncioIntegration()], traces_sample_rate=1.0) + events = capture_events() + + # Create a user task that should be wrapped + async def user_task(): + await asyncio.sleep(0.01) + return "user_result" + + # Create an internal task that should NOT be wrapped + async def internal_task(): + await asyncio.sleep(0.01) + return "internal_result" + + with sentry_sdk.start_transaction(name="test_transaction"): + user_task_obj = asyncio.create_task(user_task()) + + with mark_sentry_task_internal(): + internal_task_obj = asyncio.create_task(internal_task()) + + user_result = await user_task_obj + internal_result = await internal_task_obj + + assert user_result == "user_result" + assert internal_result == "internal_result" + + assert len(events) == 1 + transaction = events[0] + + user_spans = [] + internal_spans = [] + + for span in transaction.get("spans", []): + if "user_task" in span.get("description", ""): + user_spans.append(span) + elif "internal_task" in span.get("description", ""): + internal_spans.append(span) + + assert len(user_spans) > 0, ( + f"User task should have been traced. All spans: {[s.get('description') for s in transaction.get('spans', [])]}" + ) + assert len(internal_spans) == 0, ( + f"Internal task should NOT have been traced. All spans: {[s.get('description') for s in transaction.get('spans', [])]}" + ) + + +@minimum_python_38 +def test_loop_close_patching(sentry_init): + sentry_init(integrations=[AsyncioIntegration()]) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + with patch("asyncio.get_running_loop", return_value=loop): + assert not hasattr(loop, "_sentry_flush_patched") + AsyncioIntegration.setup_once() + assert hasattr(loop, "_sentry_flush_patched") + assert loop._sentry_flush_patched is True + + finally: + if not loop.is_closed(): + loop.close() + + +@minimum_python_38 +def test_loop_close_flushes_async_transport(sentry_init): + from sentry_sdk.transport import AsyncHttpTransport + + sentry_init(integrations=[AsyncioIntegration()]) + + # Save the current event loop to restore it later + try: + original_loop = asyncio.get_event_loop() + except RuntimeError: + original_loop = None + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + with patch("asyncio.get_running_loop", return_value=loop): + AsyncioIntegration.setup_once() + + mock_client = Mock() + mock_transport = Mock(spec=AsyncHttpTransport) + mock_client.transport = mock_transport + mock_client.close_async = AsyncMock(return_value=None) + + with patch("sentry_sdk.get_client", return_value=mock_client): + loop.close() + + mock_client.close_async.assert_called_once() + mock_client.close_async.assert_awaited_once() + + finally: + if not loop.is_closed(): + loop.close() + if original_loop: + asyncio.set_event_loop(original_loop) diff --git a/tests/test_client.py b/tests/test_client.py index 96ebcf1790..5032bdabe2 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -26,9 +26,22 @@ from sentry_sdk.spotlight import DEFAULT_SPOTLIGHT_URL from sentry_sdk.utils import capture_internal_exception from sentry_sdk.integrations.executing import ExecutingIntegration -from sentry_sdk.transport import Transport +from sentry_sdk.integrations.asyncio import AsyncioIntegration +from sentry_sdk.transport import Transport, AsyncHttpTransport from sentry_sdk.serializer import MAX_DATABAG_BREADTH from sentry_sdk.consts import DEFAULT_MAX_BREADCRUMBS, DEFAULT_MAX_VALUE_LENGTH +from sentry_sdk._compat import PY38 + +try: + import gevent # noqa: F401 + + running_under_gevent = True +except ImportError: + running_under_gevent = False + +skip_under_gevent = pytest.mark.skipif( + running_under_gevent, reason="Async tests not compatible with gevent" +) from typing import TYPE_CHECKING @@ -1585,3 +1598,325 @@ def test_keep_alive(env_value, arg_value, expected_value): ) assert transport_cls.options["keep_alive"] is expected_value + + +@pytest.mark.parametrize( + "testcase", + [ + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "https://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": None, + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": "", + "arg_https_proxy": "", + "expected_proxy_scheme": None, + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": "", + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": "", + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": "", + "expected_proxy_scheme": None, + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + # NO_PROXY testcases + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": None, + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": "https://localhost/123", + "env_no_proxy": "example.com,sentry.io", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": None, + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": None, + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": None, + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "https", + "arg_proxy_headers": {"Test-Header": "foo-bar"}, + }, + ], +) +@skip_under_gevent +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_proxy(monkeypatch, testcase): + # These are just the same tests as the sync ones, but they need to be run in an event loop + # and respect the shutdown behavior of the async transport + if testcase["env_http_proxy"] is not None: + monkeypatch.setenv("HTTP_PROXY", testcase["env_http_proxy"]) + if testcase["env_https_proxy"] is not None: + monkeypatch.setenv("HTTPS_PROXY", testcase["env_https_proxy"]) + if testcase.get("env_no_proxy") is not None: + monkeypatch.setenv("NO_PROXY", testcase["env_no_proxy"]) + + kwargs = { + "_experiments": {"transport_async": True}, + "integrations": [AsyncioIntegration()], + } + + if testcase["arg_http_proxy"] is not None: + kwargs["http_proxy"] = testcase["arg_http_proxy"] + if testcase["arg_https_proxy"] is not None: + kwargs["https_proxy"] = testcase["arg_https_proxy"] + if testcase.get("arg_proxy_headers") is not None: + kwargs["proxy_headers"] = testcase["arg_proxy_headers"] + + client = Client(testcase["dsn"], **kwargs) + assert isinstance(client.transport, AsyncHttpTransport) + + proxy = getattr( + client.transport._pool, + "proxy", + getattr(client.transport._pool, "_proxy_url", None), + ) + if testcase["expected_proxy_scheme"] is None: + assert proxy is None + else: + scheme = ( + proxy.scheme.decode("ascii") + if isinstance(proxy.scheme, bytes) + else proxy.scheme + ) + assert scheme == testcase["expected_proxy_scheme"] + + if testcase.get("arg_proxy_headers") is not None: + proxy_headers = dict( + (k.decode("ascii"), v.decode("ascii")) + for k, v in client.transport._pool._proxy_headers + ) + assert proxy_headers == testcase["arg_proxy_headers"] + + await client.close_async() + + +@pytest.mark.parametrize( + "testcase", + [ + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": False, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks4a://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks4://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks5h://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks5://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks4a://localhost/123", + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks4://localhost/123", + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks5h://localhost/123", + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks5://localhost/123", + "should_be_socks_proxy": True, + }, + ], +) +@skip_under_gevent +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_socks_proxy(testcase): + # These are just the same tests as the sync ones, but they need to be run in an event loop + # and respect the shutdown behavior of the async transport + + kwargs = { + "_experiments": {"transport_async": True}, + "integrations": [AsyncioIntegration()], + } + + if testcase["arg_http_proxy"] is not None: + kwargs["http_proxy"] = testcase["arg_http_proxy"] + if testcase["arg_https_proxy"] is not None: + kwargs["https_proxy"] = testcase["arg_https_proxy"] + + client = Client(testcase["dsn"], **kwargs) + assert isinstance(client.transport, AsyncHttpTransport) + + assert ("socks" in str(type(client.transport._pool)).lower()) == testcase[ + "should_be_socks_proxy" + ], ( + f"Expected {kwargs} to result in SOCKS == {testcase['should_be_socks_proxy']}" + f"but got {str(type(client.transport._pool))}" + ) + + await client.close_async() diff --git a/tests/test_transport.py b/tests/test_transport.py index 8601a4f138..c366e275b2 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -3,6 +3,8 @@ import os import socket import sys +import asyncio +import threading from collections import defaultdict from datetime import datetime, timedelta, timezone from unittest import mock @@ -15,6 +17,17 @@ except (ImportError, ModuleNotFoundError): httpcore = None +try: + import gevent # noqa: F401 + + running_under_gevent = True +except ImportError: + running_under_gevent = False + +skip_under_gevent = pytest.mark.skipif( + running_under_gevent, reason="Async tests not compatible with gevent" +) + import sentry_sdk from sentry_sdk import ( Client, @@ -29,9 +42,11 @@ from sentry_sdk.transport import ( KEEP_ALIVE_SOCKET_OPTIONS, _parse_rate_limits, + AsyncHttpTransport, HttpTransport, ) from sentry_sdk.integrations.logging import LoggingIntegration, ignore_logger +from sentry_sdk.integrations.asyncio import AsyncioIntegration server = None @@ -841,3 +856,301 @@ def test_record_lost_event_transaction_item(capturing_server, make_client, span_ "reason": "test", "quantity": span_count + 1, } in discarded_events + + +def test_handle_unexpected_status_invokes_handle_request_error( + make_client, monkeypatch +): + client = make_client() + transport = client.transport + + monkeypatch.setattr(transport._worker, "submit", lambda fn: fn() or True) + + def stub_request(method, endpoint, body=None, headers=None): + class MockResponse: + def __init__(self): + self.status = 500 # Integer + self.data = b"server error" + self.headers = {} + + def close(self): + pass + + return MockResponse() + + monkeypatch.setattr(transport, "_request", stub_request) + + seen = [] + monkeypatch.setattr( + transport, + "_handle_request_error", + lambda envelope, loss_reason: seen.append(loss_reason), + ) + + client.capture_event({"message": "test"}) + client.flush() + + assert seen == ["status_500"] + + +def test_handle_request_error_basic_coverage(make_client, monkeypatch): + client = make_client() + transport = client.transport + + monkeypatch.setattr(transport._worker, "submit", lambda fn: fn() or True) + + # Track method calls + calls = [] + + def mock_on_dropped_event(reason): + calls.append(("on_dropped_event", reason)) + + def mock_record_lost_event(reason, data_category=None, item=None): + calls.append(("record_lost_event", reason, data_category, item)) + + monkeypatch.setattr(transport, "on_dropped_event", mock_on_dropped_event) + monkeypatch.setattr(transport, "record_lost_event", mock_record_lost_event) + + # Test case 1: envelope is None + transport._handle_request_error(envelope=None, loss_reason="test_reason") + + assert len(calls) == 2 + assert calls[0] == ("on_dropped_event", "test_reason") + assert calls[1] == ("record_lost_event", "network_error", "error", None) + + # Reset + calls.clear() + + # Test case 2: envelope with items + envelope = Envelope() + envelope.add_item(mock.MagicMock()) # Simple mock item + envelope.add_item(mock.MagicMock()) # Another mock item + + transport._handle_request_error(envelope=envelope, loss_reason="connection_error") + + assert len(calls) == 3 + assert calls[0] == ("on_dropped_event", "connection_error") + assert calls[1][0:2] == ("record_lost_event", "network_error") + assert calls[2][0:2] == ("record_lost_event", "network_error") + + +@skip_under_gevent +@pytest.mark.asyncio +@pytest.mark.parametrize("debug", (True, False)) +@pytest.mark.parametrize("client_flush_method", ["close", "flush"]) +@pytest.mark.parametrize("use_pickle", (True, False)) +@pytest.mark.parametrize("compression_level", (0, 9, None)) +@pytest.mark.parametrize("compression_algo", ("gzip", "br", "", None)) +@pytest.mark.skipif(not PY38, reason="Async transport only supported in Python 3.8+") +async def test_transport_works_async( + capturing_server, + request, + capsys, + caplog, + debug, + make_client, + client_flush_method, + use_pickle, + compression_level, + compression_algo, +): + caplog.set_level(logging.DEBUG) + + experiments = {} + if compression_level is not None: + experiments["transport_compression_level"] = compression_level + + if compression_algo is not None: + experiments["transport_compression_algo"] = compression_algo + + # Enable async transport + experiments["transport_async"] = True + + client = make_client( + debug=debug, + _experiments=experiments, + integrations=[AsyncioIntegration()], + ) + + if use_pickle: + client = pickle.loads(pickle.dumps(client)) + + # Verify we're using async transport + assert isinstance(client.transport, AsyncHttpTransport), ( + "Expected AsyncHttpTransport" + ) + + sentry_sdk.get_global_scope().set_client(client) + request.addfinalizer(lambda: sentry_sdk.get_global_scope().set_client(None)) + + add_breadcrumb( + level="info", message="i like bread", timestamp=datetime.now(timezone.utc) + ) + capture_message("löl") + + if client_flush_method == "close": + await client.close_async(timeout=2.0) + if client_flush_method == "flush": + await client.flush_async(timeout=2.0) + + out, err = capsys.readouterr() + assert not err and not out + assert capturing_server.captured + should_compress = ( + # default is to compress with brotli if available, gzip otherwise + (compression_level is None) + or ( + # setting compression level to 0 means don't compress + compression_level > 0 + ) + ) and ( + # if we couldn't resolve to a known algo, we don't compress + compression_algo != "" + ) + + assert capturing_server.captured[0].compressed == should_compress + # After flush, the worker task is still running, but the end of the test will shut down the event loop + # Therefore, we need to explicitly close the client to clean up the worker task + assert any("Sending envelope" in record.msg for record in caplog.records) == debug + if client_flush_method == "flush": + await client.close_async(timeout=2.0) + + +@skip_under_gevent +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_background_thread_capture( + capturing_server, make_client, caplog +): + """Test capture_envelope from background threads uses run_coroutine_threadsafe""" + caplog.set_level(logging.DEBUG) + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + assert isinstance(client.transport, AsyncHttpTransport) + sentry_sdk.get_global_scope().set_client(client) + captured_from_thread = [] + exception_from_thread = [] + + def background_thread_work(): + try: + # This should use run_coroutine_threadsafe path + capture_message("from background thread") + captured_from_thread.append(True) + except Exception as e: + exception_from_thread.append(e) + + thread = threading.Thread(target=background_thread_work) + thread.start() + thread.join() + assert not exception_from_thread + assert captured_from_thread + await client.close_async(timeout=2.0) + assert capturing_server.captured + + +@skip_under_gevent +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_event_loop_closed_scenario( + capturing_server, make_client, caplog +): + """Test behavior when trying to capture after event loop context ends""" + caplog.set_level(logging.DEBUG) + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + sentry_sdk.get_global_scope().set_client(client) + original_loop = client.transport.loop + + with mock.patch("asyncio.get_running_loop", side_effect=RuntimeError("no loop")): + with mock.patch.object(client.transport.loop, "is_running", return_value=False): + with mock.patch("sentry_sdk.transport.logger") as mock_logger: + # This should trigger the "no_async_context" path + capture_message("after loop closed") + + mock_logger.warning.assert_called_with( + "Async Transport is not running in an event loop." + ) + + client.transport.loop = original_loop + await client.close_async(timeout=2.0) + + +@skip_under_gevent +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_concurrent_requests( + capturing_server, make_client, caplog +): + """Test multiple simultaneous envelope submissions""" + caplog.set_level(logging.DEBUG) + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + assert isinstance(client.transport, AsyncHttpTransport) + sentry_sdk.get_global_scope().set_client(client) + + num_messages = 15 + + async def send_message(i): + capture_message(f"concurrent message {i}") + + tasks = [send_message(i) for i in range(num_messages)] + await asyncio.gather(*tasks) + await client.close_async(timeout=2.0) + assert len(capturing_server.captured) == num_messages + + +@skip_under_gevent +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_rate_limiting_with_concurrency( + capturing_server, make_client, request +): + """Test async transport rate limiting with concurrent requests""" + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + + assert isinstance(client.transport, AsyncHttpTransport) + sentry_sdk.get_global_scope().set_client(client) + request.addfinalizer(lambda: sentry_sdk.get_global_scope().set_client(None)) + capturing_server.respond_with( + code=429, headers={"X-Sentry-Rate-Limits": "60:error:organization"} + ) + + # Send one request first to trigger rate limiting + capture_message("initial message") + await asyncio.sleep(0.1) # Wait for request to execute + assert client.transport._check_disabled("error") is True + capturing_server.clear_captured() + + async def send_message(i): + capture_message(f"message {i}") + await asyncio.sleep(0.01) + + await asyncio.gather(*[send_message(i) for i in range(5)]) + await asyncio.sleep(0.1) + # New request should be dropped due to rate limiting + assert len(capturing_server.captured) == 0 + await client.close_async(timeout=2.0) + + +@skip_under_gevent +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_two_way_ssl_authentication(): + current_dir = os.path.dirname(__file__) + cert_file = f"{current_dir}/test.pem" + key_file = f"{current_dir}/test.key" + + client = Client( + "https://foo@sentry.io/123", + cert_file=cert_file, + key_file=key_file, + _experiments={"transport_async": True}, + integrations=[AsyncioIntegration()], + ) + assert isinstance(client.transport, AsyncHttpTransport) + + options = client.transport._get_pool_options() + assert options["ssl_context"] is not None + + await client.close_async() diff --git a/tox.ini b/tox.ini index 3f8ae2b530..339a9814d6 100644 --- a/tox.ini +++ b/tox.ini @@ -327,14 +327,17 @@ deps = linters: -r requirements-linting.txt linters: werkzeug<2.3.0 + linters: httpcore[asyncio] mypy: -r requirements-linting.txt mypy: werkzeug<2.3.0 + mypy: httpcore[asyncio] ruff: -r requirements-linting.txt # === Common === py3.8-common: hypothesis common: pytest-asyncio + common: httpcore[asyncio] # See https://github.com/pytest-dev/pytest/issues/9621 # and https://github.com/pytest-dev/pytest-forked/issues/67 # for justification of the upper bound on pytest @@ -587,7 +590,7 @@ deps = httpx-v0.20.0: httpx==0.20.0 httpx-v0.24.1: httpx==0.24.1 httpx-v0.28.1: httpx==0.28.1 - httpx: anyio<4.0.0 + httpx: anyio>=3,<5 httpx-v0.16.1: pytest-httpx==0.10.0 httpx-v0.20.0: pytest-httpx==0.14.0 httpx-v0.24.1: pytest-httpx==0.22.0 @@ -701,7 +704,8 @@ deps = fastapi: pytest-asyncio fastapi: python-multipart fastapi: requests - fastapi: anyio<4 + fastapi: anyio>=3,<5 + fastapi-v0.79.1: anyio<4 fastapi-v0.79.1: httpx<0.28.0 fastapi-v0.98.0: httpx<0.28.0 {py3.6}-fastapi: aiocontextvars