66
77from sentry_sdk .hub import Hub
88from sentry_sdk .utils import capture_internal_exceptions , event_from_exception
9+ from sentry_sdk .tracing import SpanContext
910from sentry_sdk ._compat import reraise
1011from sentry_sdk .integrations import Integration
1112from sentry_sdk .integrations .logging import ignore_logger
1415class CeleryIntegration (Integration ):
1516 identifier = "celery"
1617
18+ def __init__ (self , propagate_traces = True ):
19+ self .propagate_traces = propagate_traces
20+
1721 @staticmethod
1822 def setup_once ():
1923 import celery .app .trace as trace # type: ignore
@@ -25,6 +29,7 @@ def sentry_build_tracer(name, task, *args, **kwargs):
2529 # short-circuits to task.run if it thinks it's safe.
2630 task .__call__ = _wrap_task_call (task , task .__call__ )
2731 task .run = _wrap_task_call (task , task .run )
32+ task .apply_async = _wrap_apply_async (task , task .apply_async )
2833 return _wrap_tracer (task , old_build_tracer (name , task , * args , ** kwargs ))
2934
3035 trace .build_tracer = sentry_build_tracer
@@ -37,6 +42,23 @@ def sentry_build_tracer(name, task, *args, **kwargs):
3742 ignore_logger ("celery.worker.job" )
3843
3944
45+ def _wrap_apply_async (task , f ):
46+ def apply_async (self , * args , ** kwargs ):
47+ hub = Hub .current
48+ integration = hub .get_integration (CeleryIntegration )
49+ if integration is not None and integration .propagate_traces :
50+ headers = None
51+ for key , value in hub .iter_trace_propagation_headers ():
52+ if headers is None :
53+ headers = dict (kwargs .get ("headers" ) or {})
54+ headers [key ] = value
55+ if headers is not None :
56+ kwargs ["headers" ] = headers
57+ return f (self , * args , ** kwargs )
58+
59+ return apply_async
60+
61+
4062def _wrap_tracer (task , f ):
4163 # Need to wrap tracer for pushing the scope before prerun is sent, and
4264 # popping it after postrun is sent.
@@ -52,13 +74,22 @@ def _inner(*args, **kwargs):
5274 with hub .push_scope () as scope :
5375 scope ._name = "celery"
5476 scope .clear_breadcrumbs ()
77+ _continue_trace (args [3 ].get ("headers" ) or {}, scope )
5578 scope .add_event_processor (_make_event_processor (task , * args , ** kwargs ))
5679
5780 return f (* args , ** kwargs )
5881
5982 return _inner
6083
6184
85+ def _continue_trace (headers , scope ):
86+ if headers :
87+ span_context = SpanContext .continue_from_headers (headers )
88+ else :
89+ span_context = SpanContext .start_trace ()
90+ scope .set_span_context (span_context )
91+
92+
6293def _wrap_task_call (task , f ):
6394 # Need to wrap task call because the exception is caught before we get to
6495 # see it. Also celery's reported stacktrace is untrustworthy.
0 commit comments