Skip to content

Commit b105491

Browse files
authored
Merge branch 'main' into upgrade-latest-psycopg-version
2 parents 4cd238d + ee8f34d commit b105491

File tree

74 files changed

+1194
-1534
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+1194
-1534
lines changed

.gitlab/benchmarks/macrobenchmarks.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ candidate:
1717
image: $PACKAGE_IMAGE
1818
stage: build
1919
tags: [ "arch:amd64" ]
20+
rules:
21+
- if: $CI_COMMIT_BRANCH == "main" || $CI_COMMIT_BRANCH =~ /^[0-9]+\.[0-9]+$/
22+
interruptible: false
23+
- interruptible: true
2024
needs:
2125
- pipeline: $PARENT_PIPELINE_ID
2226
job: download_ddtrace_artifacts
@@ -33,6 +37,10 @@ candidate:
3337
stage: test
3438
needs: [ "candidate" ]
3539
tags: ["runner:apm-k8s-same-cpu"]
40+
rules:
41+
- if: $CI_COMMIT_BRANCH == "main" || $CI_COMMIT_BRANCH =~ /^[0-9]+\.[0-9]+$/
42+
interruptible: false
43+
- interruptible: true
3644
timeout: 1h
3745
retry:
3846
max: 2

.gitlab/benchmarks/microbenchmarks.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ variables:
1616
when: on_success
1717
tags: ["runner:apm-k8s-tweaked-metal"]
1818
image: $MICROBENCHMARKS_CI_IMAGE
19-
interruptible: true
19+
rules:
20+
- if: $CI_COMMIT_BRANCH == "main" || $CI_COMMIT_BRANCH =~ /^[0-9]+\.[0-9]+$/
21+
interruptible: false
22+
- interruptible: true
2023
timeout: 30m
2124
dependencies: [ "baseline:build", "candidate" ]
2225
script: |

ddtrace/_trace/_span_pointer.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ class _SpanPointerDirection(Enum):
2424
DOWNSTREAM = "d"
2525

2626

27+
class _SpanPointerDirectionName(Enum):
28+
UPSTREAM = "span-pointer-up"
29+
DOWNSTREAM = "span-pointer-down"
30+
31+
2732
class _SpanPointerDescription(NamedTuple):
2833
# Not to be confused with _SpanPointer. This class describes the parameters
2934
# required to attach a span pointer to a Span. It lets us decouple code

ddtrace/_trace/trace_handlers.py

Lines changed: 145 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
import ddtrace
1616
from ddtrace import config
1717
from ddtrace._trace._inferred_proxy import create_inferred_proxy_span_if_headers_exist
18+
from ddtrace._trace._span_link import SpanLinkKind as _SpanLinkKind
1819
from ddtrace._trace._span_pointer import _SpanPointerDescription
20+
from ddtrace._trace._span_pointer import _SpanPointerDirection
21+
from ddtrace._trace._span_pointer import _SpanPointerDirectionName
1922
from ddtrace._trace.span import Span
2023
from ddtrace._trace.utils import extract_DD_context_from_messages
2124
from ddtrace.constants import _SPAN_MEASURED_KEY
@@ -31,6 +34,7 @@
3134
from ddtrace.contrib.internal.trace_utils import _set_url_tag
3235
from ddtrace.ext import SpanKind
3336
from ddtrace.ext import SpanLinkKind
37+
from ddtrace.ext import SpanTypes
3438
from ddtrace.ext import db
3539
from ddtrace.ext import http
3640
from ddtrace.ext import net
@@ -43,6 +47,7 @@
4347
from ddtrace.internal.constants import FLASK_ENDPOINT
4448
from ddtrace.internal.constants import FLASK_URL_RULE
4549
from ddtrace.internal.constants import FLASK_VIEW_ARGS
50+
from ddtrace.internal.constants import HTTP_REQUEST_UPGRADED
4651
from ddtrace.internal.constants import MESSAGING_BATCH_COUNT
4752
from ddtrace.internal.constants import MESSAGING_DESTINATION_NAME
4853
from ddtrace.internal.constants import MESSAGING_MESSAGE_ID
@@ -57,6 +62,9 @@
5762

5863
log = get_logger(__name__)
5964

65+
_WEBSOCKET_LINK_ATTRS_EXECUTED = {SPAN_LINK_KIND: SpanLinkKind.EXECUTED}
66+
_WEBSOCKET_LINK_ATTRS_RESUMING = {SPAN_LINK_KIND: SpanLinkKind.RESUMING}
67+
6068

6169
class _TracedIterable(wrapt.ObjectProxy):
6270
def __init__(self, wrapped, span, parent_span, wrapped_is_iterator=False):
@@ -992,12 +1000,109 @@ def _set_client_ip_tags(scope: Mapping[str, Any], span: Span):
9921000
log.debug("Could not validate client IP address for websocket send message: %s", str(e))
9931001

9941002

1003+
def _init_websocket_message_counters(scope: Dict[str, Any]) -> None:
1004+
if "datadog" not in scope:
1005+
scope["datadog"] = {}
1006+
if "websocket_receive_counter" not in scope["datadog"]:
1007+
scope["datadog"]["websocket_receive_counter"] = 0
1008+
if "websocket_send_counter" not in scope["datadog"]:
1009+
scope["datadog"]["websocket_send_counter"] = 0
1010+
1011+
1012+
def _increment_websocket_counter(scope: Dict[str, Any], counter_type: str) -> int:
1013+
"""
1014+
Increment and return websocket message counter (either websocket_receive_counter or websocket_send_counter)
1015+
"""
1016+
scope["datadog"][counter_type] += 1
1017+
return scope["datadog"][counter_type]
1018+
1019+
1020+
def _build_websocket_span_pointer_hash(
1021+
handshake_trace_id: int,
1022+
handshake_span_id: int,
1023+
counter: int,
1024+
is_server: bool,
1025+
is_incoming: bool,
1026+
) -> str:
1027+
"""
1028+
Build websocket span pointer hash.
1029+
1030+
Format: <prefix><128 bit hex trace id><64 bit hex span id><32 bit hex counter>
1031+
Prefix: 'S' for server outgoing or client incoming, 'C' for server incoming or client outgoing
1032+
"""
1033+
if (is_server and not is_incoming) or (not is_server and is_incoming):
1034+
prefix = "S"
1035+
else:
1036+
prefix = "C"
1037+
1038+
trace_id_hex = f"{handshake_trace_id:032x}"
1039+
span_id_hex = f"{handshake_span_id:016x}"
1040+
counter_hex = f"{counter:08x}"
1041+
1042+
return f"{prefix}{trace_id_hex}{span_id_hex}{counter_hex}"
1043+
1044+
1045+
def _has_distributed_tracing_context(span: Span) -> bool:
1046+
"""
1047+
Check if the handshake span has extracted distributed tracing context.
1048+
1049+
A websocket server must not set the span pointer if the handshake has not extracted a context
1050+
1051+
A span has distributed tracing context if it has a parent context that was
1052+
extracted from headers.
1053+
"""
1054+
if not span or not span._parent_context:
1055+
return False
1056+
return span._parent_context._is_remote
1057+
1058+
1059+
def _add_websocket_span_pointer_attributes(
1060+
link_attributes: Dict[str, Any],
1061+
integration_config: Any,
1062+
handshake_span: Span,
1063+
scope: Dict[str, Any],
1064+
is_incoming: bool,
1065+
) -> None:
1066+
"""
1067+
Add span pointer attributes to link_attributes for websocket message correlation.
1068+
"""
1069+
1070+
if not integration_config.distributed_tracing or not _has_distributed_tracing_context(handshake_span):
1071+
return
1072+
1073+
# Increment counter based on message direction
1074+
counter_type = "websocket_receive_counter" if is_incoming else "websocket_send_counter"
1075+
counter = _increment_websocket_counter(scope, counter_type)
1076+
1077+
ptr_hash = _build_websocket_span_pointer_hash(
1078+
handshake_trace_id=handshake_span.trace_id,
1079+
handshake_span_id=handshake_span.span_id,
1080+
counter=counter,
1081+
is_server=True,
1082+
is_incoming=is_incoming,
1083+
)
1084+
1085+
if is_incoming:
1086+
link_name = _SpanPointerDirectionName.UPSTREAM
1087+
ptr_direction = _SpanPointerDirection.UPSTREAM
1088+
else:
1089+
link_name = _SpanPointerDirectionName.DOWNSTREAM
1090+
ptr_direction = _SpanPointerDirection.DOWNSTREAM
1091+
1092+
link_attributes.update(
1093+
{
1094+
"link.name": link_name,
1095+
"dd.kind": _SpanLinkKind.SPAN_POINTER.value,
1096+
"ptr.kind": SpanTypes.WEBSOCKET,
1097+
"ptr.dir": ptr_direction,
1098+
"ptr.hash": ptr_hash,
1099+
}
1100+
)
1101+
1102+
9951103
def _on_asgi_websocket_receive_message(ctx, scope, message):
9961104
"""
9971105
Handle websocket receive message events.
998-
999-
This handler is called when a websocket receive message event is dispatched.
1000-
It sets up the span with appropriate tags, metrics, and links.
10011106
"""
10021107
span = ctx.span
10031108
integration_config = ctx.get_item("integration_config")
@@ -1011,24 +1116,24 @@ def _on_asgi_websocket_receive_message(ctx, scope, message):
10111116
span.set_metric(websocket.MESSAGE_FRAMES, 1)
10121117

10131118
if hasattr(ctx, "parent") and ctx.parent.span:
1014-
span.set_link(
1015-
trace_id=ctx.parent.span.trace_id,
1016-
span_id=ctx.parent.span.span_id,
1017-
attributes={SPAN_LINK_KIND: SpanLinkKind.EXECUTED},
1119+
handshake_span = ctx.parent.span
1120+
link_attributes = _WEBSOCKET_LINK_ATTRS_EXECUTED.copy()
1121+
1122+
_add_websocket_span_pointer_attributes(
1123+
link_attributes, integration_config, handshake_span, scope, is_incoming=True
10181124
)
10191125

1126+
span.link_span(handshake_span.context, link_attributes)
1127+
10201128
if getattr(integration_config, "asgi_websocket_messages_inherit_sampling", True):
1021-
_inherit_sampling_tags(span, ctx.parent.span._local_root)
1129+
_inherit_sampling_tags(span, handshake_span._local_root)
10221130

1023-
_copy_trace_level_tags(span, ctx.parent.span)
1131+
_copy_trace_level_tags(span, handshake_span)
10241132

10251133

10261134
def _on_asgi_websocket_send_message(ctx, scope, message):
10271135
"""
10281136
Handle websocket send message events.
1029-
1030-
This handler is called when a websocket send message event is dispatched.
1031-
It sets up the span with appropriate tags, metrics, and links.
10321137
"""
10331138
span = ctx.span
10341139
integration_config = ctx.get_item("integration_config")
@@ -1041,19 +1146,19 @@ def _on_asgi_websocket_send_message(ctx, scope, message):
10411146
span.set_metric(websocket.MESSAGE_FRAMES, 1)
10421147

10431148
if hasattr(ctx, "parent") and ctx.parent.span:
1044-
span.set_link(
1045-
trace_id=ctx.parent.span.trace_id,
1046-
span_id=ctx.parent.span.span_id,
1047-
attributes={SPAN_LINK_KIND: SpanLinkKind.RESUMING},
1149+
handshake_span = ctx.parent.span
1150+
link_attributes = _WEBSOCKET_LINK_ATTRS_RESUMING.copy()
1151+
1152+
_add_websocket_span_pointer_attributes(
1153+
link_attributes, integration_config, handshake_span, scope, is_incoming=False
10481154
)
10491155

1156+
span.link_span(handshake_span.context, link_attributes)
1157+
10501158

10511159
def _on_asgi_websocket_close_message(ctx, scope, message):
10521160
"""
10531161
Handle websocket close message events.
1054-
1055-
This handler is called when a websocket close message event is dispatched.
1056-
It sets up the span with appropriate tags, metrics, and links.
10571162
"""
10581163
span = ctx.span
10591164
integration_config = ctx.get_item("integration_config")
@@ -1068,21 +1173,21 @@ def _on_asgi_websocket_close_message(ctx, scope, message):
10681173
_set_websocket_close_tags(span, message)
10691174

10701175
if hasattr(ctx, "parent") and ctx.parent.span:
1071-
span.set_link(
1072-
trace_id=ctx.parent.span.trace_id,
1073-
span_id=ctx.parent.span.span_id,
1074-
attributes={SPAN_LINK_KIND: SpanLinkKind.RESUMING},
1176+
handshake_span = ctx.parent.span
1177+
link_attributes = _WEBSOCKET_LINK_ATTRS_RESUMING.copy()
1178+
1179+
_add_websocket_span_pointer_attributes(
1180+
link_attributes, integration_config, handshake_span, scope, is_incoming=False
10751181
)
10761182

1077-
_copy_trace_level_tags(span, ctx.parent.span)
1183+
span.link_span(handshake_span.context, link_attributes)
1184+
1185+
_copy_trace_level_tags(span, handshake_span)
10781186

10791187

10801188
def _on_asgi_websocket_disconnect_message(ctx, scope, message):
10811189
"""
10821190
Handle websocket disconnect message events.
1083-
1084-
This handler is called when a websocket disconnect message event is dispatched.
1085-
It sets up the span with appropriate tags, metrics, and links.
10861191
"""
10871192
span = ctx.span
10881193
integration_config = ctx.get_item("integration_config")
@@ -1093,16 +1198,19 @@ def _on_asgi_websocket_disconnect_message(ctx, scope, message):
10931198
_set_websocket_close_tags(span, message)
10941199

10951200
if hasattr(ctx, "parent") and ctx.parent.span:
1096-
span.set_link(
1097-
trace_id=ctx.parent_span.trace_id,
1098-
span_id=ctx.parent_span.span_id,
1099-
attributes={SPAN_LINK_KIND: SpanLinkKind.EXECUTED},
1201+
handshake_span = ctx.parent.span
1202+
link_attributes = _WEBSOCKET_LINK_ATTRS_EXECUTED.copy()
1203+
1204+
_add_websocket_span_pointer_attributes(
1205+
link_attributes, integration_config, handshake_span, scope, is_incoming=True
11001206
)
11011207

1208+
span.link_span(handshake_span.context, link_attributes)
1209+
11021210
if getattr(integration_config, "asgi_websocket_messages_inherit_sampling", True):
1103-
_inherit_sampling_tags(span, ctx.parent.span._local_root)
1211+
_inherit_sampling_tags(span, handshake_span._local_root)
11041212

1105-
_copy_trace_level_tags(span, ctx.parent.span)
1213+
_copy_trace_level_tags(span, handshake_span)
11061214

11071215

11081216
def _on_asgi_request(ctx: core.ExecutionContext) -> None:
@@ -1115,14 +1223,15 @@ def _on_asgi_request(ctx: core.ExecutionContext) -> None:
11151223
span = _start_span(ctx)
11161224
ctx.set_item("req_span", span)
11171225

1118-
if scope["type"] == "websocket":
1119-
span._set_tag_str("http.upgraded", "websocket")
1120-
11211226
if "datadog" not in scope:
11221227
scope["datadog"] = {"request_spans": [span]}
11231228
else:
11241229
scope["datadog"]["request_spans"].append(span)
11251230

1231+
if scope["type"] == "websocket":
1232+
span._set_tag_str(HTTP_REQUEST_UPGRADED, SpanTypes.WEBSOCKET)
1233+
_init_websocket_message_counters(scope)
1234+
11261235

11271236
def listen():
11281237
core.on("wsgi.request.prepare", _on_request_prepare)

ddtrace/appsec/_exploit_prevention/__init__.py

Whitespace-only changes.

ddtrace/contrib/internal/aiohttp/__init__.py

Whitespace-only changes.

ddtrace/contrib/internal/asgi/__init__.py

Whitespace-only changes.

ddtrace/contrib/internal/botocore/services/__init__.py

Whitespace-only changes.

ddtrace/contrib/internal/bottle/__init__.py

Whitespace-only changes.

ddtrace/contrib/internal/celery/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)