Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath"
version = "2.9.10"
version = "2.9.11"
description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools."
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
116 changes: 108 additions & 8 deletions src/uipath/telemetry/_track.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,47 @@
from applicationinsights import ( # type: ignore[import-untyped]
TelemetryClient as AppInsightsTelemetryClient,
)
from applicationinsights.channel import ( # type: ignore[import-untyped]
SynchronousQueue,
SynchronousSender,
TelemetryChannel,
)

_HAS_APPINSIGHTS = True
except ImportError:
_HAS_APPINSIGHTS = False
AppInsightsTelemetryClient = None
SynchronousSender = None
SynchronousQueue = None
TelemetryChannel = None


def _parse_connection_string(connection_string: str) -> Optional[str]:
"""Parse Azure Application Insights connection string to get instrumentation key.
def _parse_connection_string(
connection_string: str,
) -> Optional[Dict[str, str]]:
"""Parse Azure Application Insights connection string.

Args:
connection_string: The full connection string from Azure.

Returns:
The instrumentation key if found, None otherwise.
Dict with 'InstrumentationKey' and optionally 'IngestionEndpoint',
or None if InstrumentationKey is not found.
"""
try:
parts = {}
parts: Dict[str, str] = {}
for part in connection_string.split(";"):
if "=" in part:
key, value = part.split("=", 1)
parts[key] = value
return parts.get("InstrumentationKey")
ikey = parts.get("InstrumentationKey")
if not ikey:
return None
result: Dict[str, str] = {"InstrumentationKey": ikey}
ingestion = parts.get("IngestionEndpoint")
if ingestion:
result["IngestionEndpoint"] = ingestion
return result
except Exception:
return None

Expand Down Expand Up @@ -136,6 +154,59 @@ def _get_attributes(record: LogRecord) -> Mapping[str, AnyValue]:
return attributes


class _DiagnosticSender(SynchronousSender):
"""SynchronousSender that logs HTTP failures the base SDK silently discards."""

def send(self, data_to_send: Any) -> None:
"""Send telemetry data with diagnostic logging.

The base SDK silently discards HTTP 400 responses and swallows all
other network errors. This override adds WARNING-level logs so
silent data loss becomes visible in logs.
"""
import json as _json

try:
import urllib.request as HTTPClient
from urllib.error import HTTPError
except ImportError:
super().send(data_to_send)
return

request_payload = _json.dumps([a.write() for a in data_to_send])
request = HTTPClient.Request(
self._service_endpoint_uri,
bytearray(request_payload, "utf-8"),
{
"Accept": "application/json",
"Content-Type": "application/json; charset=utf-8",
},
)
try:
response = HTTPClient.urlopen(request, timeout=self._timeout)
status_code = response.getcode()
if 200 <= status_code < 300:
return
except HTTPError as e:
if e.getcode() == 400:
_logger.warning(
"AppInsights send: HTTP 400 — payload rejected (%d items discarded)",
len(data_to_send),
)
return
_logger.warning(
"AppInsights send: HTTP %d (%d items re-queued)",
e.getcode(),
len(data_to_send),
)
except Exception as e:
_logger.warning("AppInsights send: %s (%s)", type(e).__name__, e)

# Re-queue unsent data
for data in data_to_send:
self._queue.put(data)


class _AppInsightsEventClient:
"""Application Insights SDK client for sending custom events.

Expand Down Expand Up @@ -168,12 +239,25 @@ def _initialize() -> None:
return

try:
instrumentation_key = _parse_connection_string(connection_string)
if not instrumentation_key:
parsed = _parse_connection_string(connection_string)
if not parsed:
return

instrumentation_key = parsed["InstrumentationKey"]
ingestion_endpoint = parsed.get("IngestionEndpoint")

# Build custom channel: DiagnosticSender → SynchronousQueue → TelemetryChannel
if ingestion_endpoint:
endpoint_url = ingestion_endpoint.rstrip("/") + "/v2/track"
else:
endpoint_url = None # SDK default

sender = _DiagnosticSender(service_endpoint_uri=endpoint_url)
queue = SynchronousQueue(sender)
channel = TelemetryChannel(queue=queue)

_AppInsightsEventClient._client = AppInsightsTelemetryClient(
instrumentation_key
instrumentation_key, telemetry_channel=channel
)

# Set application version
Expand Down Expand Up @@ -222,6 +306,18 @@ def flush() -> None:
if _AppInsightsEventClient._client:
try:
_AppInsightsEventClient._client.flush()
# Check if items remain after flush (indicates send failure)
try:
remaining = (
_AppInsightsEventClient._client.channel.queue._queue.qsize()
)
if remaining > 0:
_logger.warning(
"AppInsights flush: %d items still in queue after flush",
remaining,
)
except Exception:
pass
except Exception as e:
# Log but don't raise - telemetry should never break the main application
_logger.warning(f"Failed to flush telemetry events: {e}")
Expand Down Expand Up @@ -308,6 +404,10 @@ def track_event(

try:
_AppInsightsEventClient.track_event(name, properties)
# Safety net: register atexit flush so events are sent even if
# the caller never explicitly flushes (e.g. serverless containers).
# Idempotent — only registers once.
_AppInsightsEventClient.register_atexit_flush()
except Exception as e:
# Log but don't raise - telemetry should never break the main application
_logger.warning(f"Failed to track event '{name}': {e}")
Expand Down
61 changes: 51 additions & 10 deletions tests/telemetry/test_track.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,18 @@ def test_parse_valid_connection_string(self):

result = _parse_connection_string(connection_string)

assert result == "test-key-123"
assert result == {
"InstrumentationKey": "test-key-123",
"IngestionEndpoint": "https://example.com/",
}

def test_parse_connection_string_only_instrumentation_key(self):
"""Test parsing connection string with only InstrumentationKey."""
connection_string = "InstrumentationKey=simple-key"

result = _parse_connection_string(connection_string)

assert result == "simple-key"
assert result == {"InstrumentationKey": "simple-key"}

def test_parse_connection_string_missing_instrumentation_key(self):
"""Test parsing connection string without InstrumentationKey."""
Expand Down Expand Up @@ -68,7 +71,7 @@ def test_parse_connection_string_with_special_chars_in_value(self):

result = _parse_connection_string(connection_string)

assert result == "key=with=equals"
assert result == {"InstrumentationKey": "key=with=equals"}


class TestAppInsightsEventClient:
Expand Down Expand Up @@ -96,14 +99,17 @@ def test_initialize_no_connection_string(self):
assert _AppInsightsEventClient._initialized is True
assert _AppInsightsEventClient._client is None

@patch("uipath.telemetry._track.TelemetryChannel")
@patch("uipath.telemetry._track.SynchronousQueue")
@patch("uipath.telemetry._track._DiagnosticSender")
@patch("uipath.telemetry._track._HAS_APPINSIGHTS", True)
@patch("uipath.telemetry._track.AppInsightsTelemetryClient")
@patch(
"uipath.telemetry._track._CONNECTION_STRING",
"InstrumentationKey=builtin-key;IngestionEndpoint=https://example.com/",
)
def test_initialize_falls_back_to_builtin_connection_string(
self, mock_client_class
self, mock_client_class, mock_sender_class, mock_queue_class, mock_channel_class
):
"""Test initialization uses _CONNECTION_STRING when env var is not set."""
mock_client = MagicMock()
Expand All @@ -116,7 +122,12 @@ def test_initialize_falls_back_to_builtin_connection_string(

assert _AppInsightsEventClient._initialized is True
assert _AppInsightsEventClient._client is mock_client
mock_client_class.assert_called_once_with("builtin-key")
mock_sender_class.assert_called_once_with(
service_endpoint_uri="https://example.com/v2/track"
)
mock_client_class.assert_called_once_with(
"builtin-key", telemetry_channel=mock_channel_class.return_value
)

@patch("uipath.telemetry._track._HAS_APPINSIGHTS", False)
def test_initialize_no_appinsights_package(self):
Expand All @@ -126,9 +137,14 @@ def test_initialize_no_appinsights_package(self):
assert _AppInsightsEventClient._initialized is True
assert _AppInsightsEventClient._client is None

@patch("uipath.telemetry._track.TelemetryChannel")
@patch("uipath.telemetry._track.SynchronousQueue")
@patch("uipath.telemetry._track._DiagnosticSender")
@patch("uipath.telemetry._track._HAS_APPINSIGHTS", True)
@patch("uipath.telemetry._track.AppInsightsTelemetryClient")
def test_initialize_creates_client(self, mock_client_class):
def test_initialize_creates_client(
self, mock_client_class, mock_sender_class, mock_queue_class, mock_channel_class
):
"""Test that initialization creates Application Insights client."""
mock_client = MagicMock()
mock_client_class.return_value = mock_client
Expand All @@ -145,11 +161,19 @@ def test_initialize_creates_client(self, mock_client_class):

assert _AppInsightsEventClient._initialized is True
assert _AppInsightsEventClient._client is mock_client
mock_client_class.assert_called_once_with("test-key")
mock_sender_class.assert_called_once_with(
service_endpoint_uri="https://example.com/v2/track"
)
mock_client_class.assert_called_once_with(
"test-key", telemetry_channel=mock_channel_class.return_value
)

@patch("uipath.telemetry._track._DiagnosticSender")
@patch("uipath.telemetry._track._HAS_APPINSIGHTS", True)
@patch("uipath.telemetry._track.AppInsightsTelemetryClient")
def test_initialize_invalid_connection_string(self, mock_client_class):
def test_initialize_invalid_connection_string(
self, mock_client_class, mock_sender_class
):
"""Test initialization with invalid connection string."""
with patch.dict(
os.environ,
Expand Down Expand Up @@ -276,16 +300,28 @@ def test_track_event_disabled(self, mock_is_enabled):

mock_track.assert_not_called()

@patch.object(_AppInsightsEventClient, "register_atexit_flush")
@patch.object(_TelemetryClient, "_is_enabled", return_value=True)
@patch.object(_AppInsightsEventClient, "track_event")
def test_track_event_enabled(self, mock_track, mock_is_enabled):
def test_track_event_enabled(self, mock_track, mock_is_enabled, mock_atexit):
"""Test that track_event calls AppInsightsEventClient when enabled."""
properties = {"key": "value"}

_TelemetryClient.track_event("test_event", properties)

mock_track.assert_called_once_with("test_event", properties)

@patch.object(_AppInsightsEventClient, "register_atexit_flush")
@patch.object(_TelemetryClient, "_is_enabled", return_value=True)
@patch.object(_AppInsightsEventClient, "track_event")
def test_track_event_registers_atexit_handler(
self, mock_track, mock_is_enabled, mock_atexit
):
"""Test that track_event registers atexit flush handler."""
_TelemetryClient.track_event("test_event", {"key": "value"})

mock_atexit.assert_called_once()


class TestPublicFunctions:
"""Test public telemetry functions."""
Expand Down Expand Up @@ -488,9 +524,14 @@ def test_flush_handles_exception(self):
# Should not raise exception
_AppInsightsEventClient.flush()

@patch("uipath.telemetry._track.TelemetryChannel")
@patch("uipath.telemetry._track.SynchronousQueue")
@patch("uipath.telemetry._track._DiagnosticSender")
@patch("uipath.telemetry._track._HAS_APPINSIGHTS", True)
@patch("uipath.telemetry._track.AppInsightsTelemetryClient")
def test_initialize_handles_exception(self, mock_client_class):
def test_initialize_handles_exception(
self, mock_client_class, mock_sender_class, mock_queue_class, mock_channel_class
):
"""Test that initialization handles exceptions."""
mock_client_class.side_effect = Exception("Init error")

Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading