@@ -35,28 +35,48 @@ def _process_failure_signal(sender, task_id, einfo, **kw):
3535 if integration is None :
3636 return
3737
38- if hasattr (sender , "throws" ) and isinstance (einfo .exception , sender .throws ):
39- return
40-
41- if isinstance (einfo .exception , SoftTimeLimitExceeded ):
42- # TODO: Move this into event processor
43- with hub .push_scope () as scope :
44- scope .fingerprint = [
45- "celery" ,
46- "SoftTimeLimitExceeded" ,
47- getattr (sender , "name" , sender ),
48- ]
49- _capture_event (hub , exc_info )
50- else :
51- _capture_event (hub , exc_info )
38+ _capture_event (hub , exc_info )
5239
5340
54- def _handle_task_prerun (sender , task , ** kw ):
41+ def _handle_task_prerun (sender , task , args , kwargs , ** _ ):
5542 hub = Hub .current
5643 if hub .get_integration (CeleryIntegration ) is not None :
5744 scope = hub .push_scope ().__enter__ ()
45+ scope .add_event_processor (_make_event_processor (args , kwargs , task ))
46+
47+
48+ def _make_event_processor (args , kwargs , task ):
49+ def event_processor (event , hint ):
50+ with capture_internal_exceptions ():
51+ if "transaction" not in event :
52+ event ["transaction" ] = task .name
53+
5854 with capture_internal_exceptions ():
59- scope .transaction = task .name
55+ extra = event .setdefault ("extra" , {})
56+ extra ["celery-job" ] = {
57+ "task_name" : task .name ,
58+ "args" : args ,
59+ "kwargs" : kwargs ,
60+ }
61+
62+ if "exc_info" in hint :
63+ with capture_internal_exceptions ():
64+ if issubclass (hint ["exc_info" ][0 ], SoftTimeLimitExceeded ):
65+ event ["fingerprint" ] = [
66+ "celery" ,
67+ "SoftTimeLimitExceeded" ,
68+ getattr (task , "name" , task ),
69+ ]
70+
71+ with capture_internal_exceptions ():
72+ if hasattr (task , "throws" ) and isinstance (
73+ hint ["exc_info" ][1 ], task .throws
74+ ):
75+ return None
76+
77+ return event
78+
79+ return event_processor
6080
6181
6282def _handle_task_postrun (sender , task_id , task , ** kw ):
0 commit comments