diff --git a/pyproject.toml b/pyproject.toml index e3f470499..6ced56904 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath" -version = "2.9.10" +version = "2.9.11" description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools." readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/src/uipath/telemetry/_track.py b/src/uipath/telemetry/_track.py index d7709f6f7..64742b318 100644 --- a/src/uipath/telemetry/_track.py +++ b/src/uipath/telemetry/_track.py @@ -41,29 +41,47 @@ from applicationinsights import ( # type: ignore[import-untyped] TelemetryClient as AppInsightsTelemetryClient, ) + from applicationinsights.channel import ( # type: ignore[import-untyped] + SynchronousQueue, + SynchronousSender, + TelemetryChannel, + ) _HAS_APPINSIGHTS = True except ImportError: _HAS_APPINSIGHTS = False AppInsightsTelemetryClient = None + SynchronousSender = None + SynchronousQueue = None + TelemetryChannel = None -def _parse_connection_string(connection_string: str) -> Optional[str]: - """Parse Azure Application Insights connection string to get instrumentation key. +def _parse_connection_string( + connection_string: str, +) -> Optional[Dict[str, str]]: + """Parse Azure Application Insights connection string. Args: connection_string: The full connection string from Azure. Returns: - The instrumentation key if found, None otherwise. + Dict with 'InstrumentationKey' and optionally 'IngestionEndpoint', + or None if InstrumentationKey is not found. """ try: - parts = {} + parts: Dict[str, str] = {} for part in connection_string.split(";"): if "=" in part: key, value = part.split("=", 1) parts[key] = value - return parts.get("InstrumentationKey") + ikey = parts.get("InstrumentationKey") + if not ikey: + return None + result: Dict[str, str] = {"InstrumentationKey": ikey} + ingestion = parts.get("IngestionEndpoint") + if ingestion: + result["IngestionEndpoint"] = ingestion + return result except Exception: return None @@ -136,6 +154,59 @@ def _get_attributes(record: LogRecord) -> Mapping[str, AnyValue]: return attributes +class _DiagnosticSender(SynchronousSender): + """SynchronousSender that logs HTTP failures the base SDK silently discards.""" + + def send(self, data_to_send: Any) -> None: + """Send telemetry data with diagnostic logging. + + The base SDK silently discards HTTP 400 responses and swallows all + other network errors. This override adds WARNING-level logs so + silent data loss becomes visible in logs. + """ + import json as _json + + try: + import urllib.request as HTTPClient + from urllib.error import HTTPError + except ImportError: + super().send(data_to_send) + return + + request_payload = _json.dumps([a.write() for a in data_to_send]) + request = HTTPClient.Request( + self._service_endpoint_uri, + bytearray(request_payload, "utf-8"), + { + "Accept": "application/json", + "Content-Type": "application/json; charset=utf-8", + }, + ) + try: + response = HTTPClient.urlopen(request, timeout=self._timeout) + status_code = response.getcode() + if 200 <= status_code < 300: + return + except HTTPError as e: + if e.getcode() == 400: + _logger.warning( + "AppInsights send: HTTP 400 — payload rejected (%d items discarded)", + len(data_to_send), + ) + return + _logger.warning( + "AppInsights send: HTTP %d (%d items re-queued)", + e.getcode(), + len(data_to_send), + ) + except Exception as e: + _logger.warning("AppInsights send: %s (%s)", type(e).__name__, e) + + # Re-queue unsent data + for data in data_to_send: + self._queue.put(data) + + class _AppInsightsEventClient: """Application Insights SDK client for sending custom events. @@ -168,12 +239,25 @@ def _initialize() -> None: return try: - instrumentation_key = _parse_connection_string(connection_string) - if not instrumentation_key: + parsed = _parse_connection_string(connection_string) + if not parsed: return + instrumentation_key = parsed["InstrumentationKey"] + ingestion_endpoint = parsed.get("IngestionEndpoint") + + # Build custom channel: DiagnosticSender → SynchronousQueue → TelemetryChannel + if ingestion_endpoint: + endpoint_url = ingestion_endpoint.rstrip("/") + "/v2/track" + else: + endpoint_url = None # SDK default + + sender = _DiagnosticSender(service_endpoint_uri=endpoint_url) + queue = SynchronousQueue(sender) + channel = TelemetryChannel(queue=queue) + _AppInsightsEventClient._client = AppInsightsTelemetryClient( - instrumentation_key + instrumentation_key, telemetry_channel=channel ) # Set application version @@ -222,6 +306,18 @@ def flush() -> None: if _AppInsightsEventClient._client: try: _AppInsightsEventClient._client.flush() + # Check if items remain after flush (indicates send failure) + try: + remaining = ( + _AppInsightsEventClient._client.channel.queue._queue.qsize() + ) + if remaining > 0: + _logger.warning( + "AppInsights flush: %d items still in queue after flush", + remaining, + ) + except Exception: + pass except Exception as e: # Log but don't raise - telemetry should never break the main application _logger.warning(f"Failed to flush telemetry events: {e}") @@ -308,6 +404,10 @@ def track_event( try: _AppInsightsEventClient.track_event(name, properties) + # Safety net: register atexit flush so events are sent even if + # the caller never explicitly flushes (e.g. serverless containers). + # Idempotent — only registers once. + _AppInsightsEventClient.register_atexit_flush() except Exception as e: # Log but don't raise - telemetry should never break the main application _logger.warning(f"Failed to track event '{name}': {e}") diff --git a/tests/telemetry/test_track.py b/tests/telemetry/test_track.py index efe234daa..0c47d21e7 100644 --- a/tests/telemetry/test_track.py +++ b/tests/telemetry/test_track.py @@ -27,7 +27,10 @@ def test_parse_valid_connection_string(self): result = _parse_connection_string(connection_string) - assert result == "test-key-123" + assert result == { + "InstrumentationKey": "test-key-123", + "IngestionEndpoint": "https://example.com/", + } def test_parse_connection_string_only_instrumentation_key(self): """Test parsing connection string with only InstrumentationKey.""" @@ -35,7 +38,7 @@ def test_parse_connection_string_only_instrumentation_key(self): result = _parse_connection_string(connection_string) - assert result == "simple-key" + assert result == {"InstrumentationKey": "simple-key"} def test_parse_connection_string_missing_instrumentation_key(self): """Test parsing connection string without InstrumentationKey.""" @@ -68,7 +71,7 @@ def test_parse_connection_string_with_special_chars_in_value(self): result = _parse_connection_string(connection_string) - assert result == "key=with=equals" + assert result == {"InstrumentationKey": "key=with=equals"} class TestAppInsightsEventClient: @@ -96,6 +99,9 @@ def test_initialize_no_connection_string(self): assert _AppInsightsEventClient._initialized is True assert _AppInsightsEventClient._client is None + @patch("uipath.telemetry._track.TelemetryChannel") + @patch("uipath.telemetry._track.SynchronousQueue") + @patch("uipath.telemetry._track._DiagnosticSender") @patch("uipath.telemetry._track._HAS_APPINSIGHTS", True) @patch("uipath.telemetry._track.AppInsightsTelemetryClient") @patch( @@ -103,7 +109,7 @@ def test_initialize_no_connection_string(self): "InstrumentationKey=builtin-key;IngestionEndpoint=https://example.com/", ) def test_initialize_falls_back_to_builtin_connection_string( - self, mock_client_class + self, mock_client_class, mock_sender_class, mock_queue_class, mock_channel_class ): """Test initialization uses _CONNECTION_STRING when env var is not set.""" mock_client = MagicMock() @@ -116,7 +122,12 @@ def test_initialize_falls_back_to_builtin_connection_string( assert _AppInsightsEventClient._initialized is True assert _AppInsightsEventClient._client is mock_client - mock_client_class.assert_called_once_with("builtin-key") + mock_sender_class.assert_called_once_with( + service_endpoint_uri="https://example.com/v2/track" + ) + mock_client_class.assert_called_once_with( + "builtin-key", telemetry_channel=mock_channel_class.return_value + ) @patch("uipath.telemetry._track._HAS_APPINSIGHTS", False) def test_initialize_no_appinsights_package(self): @@ -126,9 +137,14 @@ def test_initialize_no_appinsights_package(self): assert _AppInsightsEventClient._initialized is True assert _AppInsightsEventClient._client is None + @patch("uipath.telemetry._track.TelemetryChannel") + @patch("uipath.telemetry._track.SynchronousQueue") + @patch("uipath.telemetry._track._DiagnosticSender") @patch("uipath.telemetry._track._HAS_APPINSIGHTS", True) @patch("uipath.telemetry._track.AppInsightsTelemetryClient") - def test_initialize_creates_client(self, mock_client_class): + def test_initialize_creates_client( + self, mock_client_class, mock_sender_class, mock_queue_class, mock_channel_class + ): """Test that initialization creates Application Insights client.""" mock_client = MagicMock() mock_client_class.return_value = mock_client @@ -145,11 +161,19 @@ def test_initialize_creates_client(self, mock_client_class): assert _AppInsightsEventClient._initialized is True assert _AppInsightsEventClient._client is mock_client - mock_client_class.assert_called_once_with("test-key") + mock_sender_class.assert_called_once_with( + service_endpoint_uri="https://example.com/v2/track" + ) + mock_client_class.assert_called_once_with( + "test-key", telemetry_channel=mock_channel_class.return_value + ) + @patch("uipath.telemetry._track._DiagnosticSender") @patch("uipath.telemetry._track._HAS_APPINSIGHTS", True) @patch("uipath.telemetry._track.AppInsightsTelemetryClient") - def test_initialize_invalid_connection_string(self, mock_client_class): + def test_initialize_invalid_connection_string( + self, mock_client_class, mock_sender_class + ): """Test initialization with invalid connection string.""" with patch.dict( os.environ, @@ -276,9 +300,10 @@ def test_track_event_disabled(self, mock_is_enabled): mock_track.assert_not_called() + @patch.object(_AppInsightsEventClient, "register_atexit_flush") @patch.object(_TelemetryClient, "_is_enabled", return_value=True) @patch.object(_AppInsightsEventClient, "track_event") - def test_track_event_enabled(self, mock_track, mock_is_enabled): + def test_track_event_enabled(self, mock_track, mock_is_enabled, mock_atexit): """Test that track_event calls AppInsightsEventClient when enabled.""" properties = {"key": "value"} @@ -286,6 +311,17 @@ def test_track_event_enabled(self, mock_track, mock_is_enabled): mock_track.assert_called_once_with("test_event", properties) + @patch.object(_AppInsightsEventClient, "register_atexit_flush") + @patch.object(_TelemetryClient, "_is_enabled", return_value=True) + @patch.object(_AppInsightsEventClient, "track_event") + def test_track_event_registers_atexit_handler( + self, mock_track, mock_is_enabled, mock_atexit + ): + """Test that track_event registers atexit flush handler.""" + _TelemetryClient.track_event("test_event", {"key": "value"}) + + mock_atexit.assert_called_once() + class TestPublicFunctions: """Test public telemetry functions.""" @@ -488,9 +524,14 @@ def test_flush_handles_exception(self): # Should not raise exception _AppInsightsEventClient.flush() + @patch("uipath.telemetry._track.TelemetryChannel") + @patch("uipath.telemetry._track.SynchronousQueue") + @patch("uipath.telemetry._track._DiagnosticSender") @patch("uipath.telemetry._track._HAS_APPINSIGHTS", True) @patch("uipath.telemetry._track.AppInsightsTelemetryClient") - def test_initialize_handles_exception(self, mock_client_class): + def test_initialize_handles_exception( + self, mock_client_class, mock_sender_class, mock_queue_class, mock_channel_class + ): """Test that initialization handles exceptions.""" mock_client_class.side_effect = Exception("Init error") diff --git a/uv.lock b/uv.lock index bd2107b63..c3e5492f0 100644 --- a/uv.lock +++ b/uv.lock @@ -2531,7 +2531,7 @@ wheels = [ [[package]] name = "uipath" -version = "2.9.10" +version = "2.9.11" source = { editable = "." } dependencies = [ { name = "applicationinsights" },