From b423a3716d98dc472ba5fffce903746d28fddc58 Mon Sep 17 00:00:00 2001 From: Hector Hernandez <39923391+hectorhdzg@users.noreply.github.com> Date: Tue, 19 May 2026 13:39:42 -0700 Subject: [PATCH 1/4] Add client-side token bucket rate limiter for telemetry export Addresses resilience finding: the Azure Monitor OpenTelemetry exporter had no client-side send-rate cap, allowing telemetry bursts to overload shared ingestion infrastructure. Changes: - Add _TokenBucketRateLimiter in export/_rate_limiter.py with configurable max_envelopes_per_second (default 10,000/sec, 1s burst capacity) - Integrate rate limiting into BaseExporter._transmit() so all exporter types (traces, logs, metrics) are protected - Excess envelopes are routed to local storage for retry, not dropped - Stats/internal exporters bypass rate limiting to preserve observability - Rate limiting can be disabled via max_envelopes_per_second=0 - Add 19 unit and integration tests in tests/test_rate_limiter.py --- .../opentelemetry/exporter/export/_base.py | 188 ++++++++++++--- .../exporter/export/_rate_limiter.py | 66 ++++++ .../tests/test_rate_limiter.py | 215 ++++++++++++++++++ 3 files changed, 434 insertions(+), 35 deletions(-) create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_rate_limiter.py create mode 100644 sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_rate_limiter.py diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py index 14354219d503..3f46a250731b 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py @@ -44,12 +44,18 @@ DropCode, _exception_categories, ) -from azure.monitor.opentelemetry.exporter._connection_string_parser import ConnectionStringParser +from azure.monitor.opentelemetry.exporter._connection_string_parser import ( + ConnectionStringParser, +) from azure.monitor.opentelemetry.exporter._storage import LocalFileStorage from azure.monitor.opentelemetry.exporter._utils import ( _get_auth_policy, _get_sha256_hash, ) +from azure.monitor.opentelemetry.exporter.export._rate_limiter import ( + _TokenBucketRateLimiter, + _DEFAULT_MAX_ENVELOPES_PER_SECOND, +) from azure.monitor.opentelemetry.exporter.statsbeat._state import ( get_statsbeat_initial_success, get_statsbeat_shutdown, @@ -70,7 +76,6 @@ get_customer_stats_manager, ) - logger = logging.getLogger(__name__) _AZURE_TEMPDIR_PREFIX = "Microsoft-AzureMonitor-" @@ -98,9 +103,12 @@ def __init__(self, **kwargs: Any) -> None: :keyword ManagedIdentityCredential/ClientSecretCredential credential: Token credential, such as ManagedIdentityCredential or ClientSecretCredential, used for Azure Active Directory (AAD) authentication. Defaults to None. :keyword bool disable_offline_storage: Determines whether to disable storing failed telemetry records for retry. Defaults to `False`. :keyword str storage_directory: Storage path in which to store retry files. Defaults to `/opentelemetry-python-`. + :keyword int max_envelopes_per_second: Maximum number of telemetry envelopes sent per second. Acts as a client-side safety cap to prevent overloading shared ingestion infrastructure during telemetry bursts. Defaults to 10000. Set to 0 to disable rate limiting. :rtype: None """ - parsed_connection_string = ConnectionStringParser(kwargs.get("connection_string")) + parsed_connection_string = ConnectionStringParser( + kwargs.get("connection_string") + ) # TODO: Uncomment configuration changes once testing is completed # Get the configuration manager @@ -132,18 +140,31 @@ def __init__(self, **kwargs: Any) -> None: if "storage_directory" in kwargs: self._storage_directory = kwargs.get("storage_directory") elif not self._disable_offline_storage: - self._storage_directory = _get_storage_directory(self._instrumentation_key or "") + self._storage_directory = _get_storage_directory( + self._instrumentation_key or "" + ) else: self._storage_directory = None self._storage_retention_period = kwargs.get( "storage_retention_period", 48 * 60 * 60 ) # Retention period in seconds (default 48 hrs) self._timeout = kwargs.get("timeout", 10.0) # networking timeout in seconds + max_eps = kwargs.get( + "max_envelopes_per_second", _DEFAULT_MAX_ENVELOPES_PER_SECOND + ) + if max_eps and max_eps > 0: + self._rate_limiter: Optional[_TokenBucketRateLimiter] = ( + _TokenBucketRateLimiter(max_eps) + ) + else: + self._rate_limiter = None self._distro_version = kwargs.get( _AZURE_MONITOR_DISTRO_VERSION_ARG, "" ) # If set, indicates the exporter is instantiated via Azure monitor OpenTelemetry distro. Versions corresponds to distro version. # specifies whether current exporter is used for collection of instrumentation metrics - self._instrumentation_collection = kwargs.get("instrumentation_collection", False) + self._instrumentation_collection = kwargs.get( + "instrumentation_collection", False + ) config = AzureMonitorClientConfiguration(self._endpoint, **kwargs) policies = [ @@ -155,7 +176,9 @@ def __init__(self, **kwargs: Any) -> None: # Handle redirects in exporter, set new endpoint if redirected RedirectPolicy(permit_redirects=False), config.retry_policy, - _get_auth_policy(self._credential, config.authentication_policy, self._aad_audience), + _get_auth_policy( + self._credential, config.authentication_policy, self._aad_audience + ), config.custom_hook_policy, config.logging_policy, # Explicitly disabling to avoid infinite loop of Span creation when data is exported @@ -168,7 +191,10 @@ def __init__(self, **kwargs: Any) -> None: policies.append(config.http_logging_policy or HttpLoggingPolicy(**kwargs)) self.client: AzureMonitorClient = AzureMonitorClient( - host=self._endpoint, connection_timeout=self._timeout, policies=policies, **kwargs + host=self._endpoint, + connection_timeout=self._timeout, + policies=policies, + **kwargs, ) # TODO: Uncomment configuration changes once testing is completed # if self._configuration_manager: @@ -195,7 +221,9 @@ def __init__(self, **kwargs: Any) -> None: if self._should_collect_stats(): try: # Import here to avoid circular dependencies - from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import collect_statsbeat_metrics + from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import ( + collect_statsbeat_metrics, + ) collect_statsbeat_metrics(self) except Exception as e: # pylint: disable=broad-except @@ -205,7 +233,9 @@ def __init__(self, **kwargs: Any) -> None: # customer sdkstats initialization if self._should_collect_customer_sdkstats(): - from azure.monitor.opentelemetry.exporter.statsbeat.customer import collect_customer_sdkstats + from azure.monitor.opentelemetry.exporter.statsbeat.customer import ( + collect_customer_sdkstats, + ) # Collect customer sdkstats metrics collect_customer_sdkstats(self) @@ -229,7 +259,9 @@ def _transmit_from_storage(self) -> None: # If blob.get() returns None, delete the corrupted blob blob.delete() - def _handle_transmit_from_storage(self, envelopes: List[TelemetryItem], result: ExportResult) -> None: + def _handle_transmit_from_storage( + self, envelopes: List[TelemetryItem], result: ExportResult + ) -> None: if self.storage: if result == ExportResult.FAILED_RETRYABLE: envelopes_to_store = [x.as_dict() for x in envelopes] @@ -260,6 +292,35 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: :rtype: ~azure.monitor.opentelemetry.exporter.export._base._ExportResult """ if len(envelopes) > 0: + # Client-side rate limiting: cap send rate to protect shared ingestion infrastructure. + # Stats exporters bypass rate limiting to ensure observability data is not lost. + if ( + self._rate_limiter + and not self._is_stats_exporter() + and not self._is_customer_sdkstats_exporter() + ): + granted = self._rate_limiter.try_consume(len(envelopes)) + if granted == 0: + logger.warning( + "Rate limiter rejected entire batch of %d envelopes. " + "Routing to local storage for retry.", + len(envelopes), + ) + return ExportResult.FAILED_RETRYABLE + if granted < len(envelopes): + # Send what we can, route the rest to local storage via FAILED_RETRYABLE + overflow = envelopes[granted:] + envelopes = envelopes[:granted] + logger.info( + "Rate limiter admitted %d of %d envelopes; " + "%d envelopes deferred to local storage.", + granted, + granted + len(overflow), + len(overflow), + ) + if self.storage: + self.storage.put([x.as_dict() for x in overflow]) + result = ExportResult.SUCCESS # Track whether or not exporter has successfully reached ingestion # Currently only used for statsbeat exporter to detect shutdown cases @@ -295,7 +356,11 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: logger.info( "Data dropped due to ingestion sampling: %s %s.", error.message, - envelopes[error.index] if error.index is not None else "", + ( + envelopes[error.index] + if error.index is not None + else "" + ), ) elif _is_retryable_code(error.status_code): resend_envelopes.append(envelopes[error.index]) # type: ignore @@ -312,23 +377,33 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: and error.index is not None and isinstance(error.status_code, int) ): - track_dropped_items([envelopes[error.index]], error.status_code) + track_dropped_items( + [envelopes[error.index]], error.status_code + ) logger.error( "Data drop %s: %s %s.", error.status_code, error.message, - envelopes[error.index] if error.index is not None else "", + ( + envelopes[error.index] + if error.index is not None + else "" + ), ) if self.storage and resend_envelopes: envelopes_to_store = [x.as_dict() for x in resend_envelopes] result_from_storage = self.storage.put(envelopes_to_store, 0) if self._should_collect_customer_sdkstats(): - track_dropped_items_from_storage(result_from_storage, resend_envelopes) + track_dropped_items_from_storage( + result_from_storage, resend_envelopes + ) self._consecutive_redirects = 0 elif resend_envelopes: # Track items that would have been retried but are dropped since client has local storage disabled if self._should_collect_customer_sdkstats(): - track_dropped_items(resend_envelopes, DropCode.CLIENT_STORAGE_DISABLED) + track_dropped_items( + resend_envelopes, DropCode.CLIENT_STORAGE_DISABLED + ) # Mark as not retryable because we already write to storage here result = ExportResult.FAILED_NOT_RETRYABLE except HttpResponseError as response_error: @@ -337,7 +412,9 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: reach_ingestion = True if _is_retryable_code(response_error.status_code): if self._should_collect_stats(): - _update_requests_map(_REQ_RETRY_NAME[1], value=response_error.status_code) + _update_requests_map( + _REQ_RETRY_NAME[1], value=response_error.status_code + ) result = ExportResult.FAILED_RETRYABLE # Log error for 401: Unauthorized, 403: Forbidden to assist with customer troubleshooting if not self._is_stats_exporter(): @@ -361,11 +438,15 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: ) elif _is_throttle_code(response_error.status_code): if self._should_collect_stats(): - _update_requests_map(_REQ_THROTTLE_NAME[1], value=response_error.status_code) + _update_requests_map( + _REQ_THROTTLE_NAME[1], value=response_error.status_code + ) result = ExportResult.FAILED_NOT_RETRYABLE if not self._is_stats_exporter(): - if self._should_collect_customer_sdkstats() and isinstance(response_error.status_code, int): + if self._should_collect_customer_sdkstats() and isinstance( + response_error.status_code, int + ): track_dropped_items(envelopes, response_error.status_code) elif _is_redirect_code(response_error.status_code): self._consecutive_redirects = self._consecutive_redirects + 1 @@ -377,9 +458,13 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: url = urlparse(location) else: redirect_has_headers = False - if redirect_has_headers and url.scheme and url.netloc: # pylint: disable=E0606 + if ( + redirect_has_headers and url.scheme and url.netloc + ): # pylint: disable=E0606 # Change the host to the new redirected host - self.client._config.host = "{}://{}".format(url.scheme, url.netloc) # pylint: disable=W0212 + self.client._config.host = "{}://{}".format( + url.scheme, url.netloc + ) # pylint: disable=W0212 # Attempt to export again result = self._transmit(envelopes) else: @@ -399,7 +484,9 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: # Track dropped items in customer sdkstats, non-retryable scenario if self._should_collect_customer_sdkstats(): track_dropped_items( - envelopes, DropCode.CLIENT_EXCEPTION, _exception_categories.CLIENT_EXCEPTION.value + envelopes, + DropCode.CLIENT_EXCEPTION, + _exception_categories.CLIENT_EXCEPTION.value, ) logger.error( "Error sending telemetry because of circular redirects. " @@ -407,21 +494,27 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: ) # If redirect but did not return, exception occurred if self._should_collect_stats(): - _update_requests_map(_REQ_EXCEPTION_NAME[1], value="Circular Redirect") + _update_requests_map( + _REQ_EXCEPTION_NAME[1], value="Circular Redirect" + ) result = ExportResult.FAILED_NOT_RETRYABLE else: # Any other status code counts as failure (non-retryable) # 400 - Invalid - The server cannot or will not process the request due to the invalid telemetry (invalid data, iKey, etc.) # 404 - Ingestion is allowed only from stamp specific endpoint - must update connection string if self._should_collect_stats(): - _update_requests_map(_REQ_FAILURE_NAME[1], value=response_error.status_code) + _update_requests_map( + _REQ_FAILURE_NAME[1], value=response_error.status_code + ) if not self._is_stats_exporter(): logger.error( "Non-retryable server side error: %s.", response_error.message, ) # Track dropped items in customer sdkstats, non-retryable scenario - if self._should_collect_customer_sdkstats() and isinstance(response_error.status_code, int): + if self._should_collect_customer_sdkstats() and isinstance( + response_error.status_code, int + ): track_dropped_items(envelopes, response_error.status_code) if _is_invalid_code(response_error.status_code): # Shutdown statsbeat on invalid code from customer endpoint @@ -441,7 +534,9 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: # Errors when we're fairly sure that the server did not receive the # request, so it should be safe to retry. # ServiceRequestError is raised by azure.core for these cases - logger.warning("Retrying due to server request error: %s.", request_error.message) + logger.warning( + "Retrying due to server request error: %s.", request_error.message + ) # Track retry items in customer sdkstats for client-side exceptions if self._should_collect_customer_sdkstats(): @@ -449,7 +544,9 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: if self._should_collect_stats(): exc_type = request_error.exc_type - if exc_type is None or exc_type is type(None): # pylint: disable=unidiomatic-typecheck + if exc_type is None or exc_type is type( + None + ): # pylint: disable=unidiomatic-typecheck exc_type = request_error.__class__.__name__ # type: ignore _update_requests_map(_REQ_EXCEPTION_NAME[1], value=exc_type) result = ExportResult.FAILED_RETRYABLE @@ -461,17 +558,23 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: # Track dropped items in customer sdkstats for general exceptions if self._should_collect_customer_sdkstats(): track_dropped_items( - envelopes, DropCode.CLIENT_EXCEPTION, _exception_categories.CLIENT_EXCEPTION.value + envelopes, + DropCode.CLIENT_EXCEPTION, + _exception_categories.CLIENT_EXCEPTION.value, ) if self._should_collect_stats(): - _update_requests_map(_REQ_EXCEPTION_NAME[1], value=ex.__class__.__name__) + _update_requests_map( + _REQ_EXCEPTION_NAME[1], value=ex.__class__.__name__ + ) result = ExportResult.FAILED_NOT_RETRYABLE finally: if self._should_collect_stats(): end_time = time.time() _update_requests_map("count", 1) - _update_requests_map(_REQ_DURATION_NAME[1], value=end_time - start_time) + _update_requests_map( + _REQ_DURATION_NAME[1], value=end_time - start_time + ) if self._is_statsbeat_initializing_state(): # Update statsbeat initial success state if reached ingestion if reach_ingestion: @@ -519,7 +622,11 @@ def _should_collect_customer_sdkstats(self): # check to see if statsbeat is in "attempting to be initialized" state def _is_statsbeat_initializing_state(self): - return self._is_stats_exporter() and not get_statsbeat_shutdown() and not get_statsbeat_initial_success() + return ( + self._is_stats_exporter() + and not get_statsbeat_shutdown() + and not get_statsbeat_initial_success() + ) def _is_stats_exporter(self): return getattr(self, "_is_sdkstats", False) @@ -595,7 +702,9 @@ def _is_sampling_rejection(message: Optional[str]) -> bool: # mypy: disable-error-code="union-attr" -def _get_authentication_credential(**kwargs: Any) -> Optional[ManagedIdentityCredential]: +def _get_authentication_credential( + **kwargs: Any, +) -> Optional[ManagedIdentityCredential]: if "credential" in kwargs: return kwargs.get("credential") auth_string = os.getenv(_APPLICATIONINSIGHTS_AUTHENTICATION_STRING, "") @@ -604,15 +713,22 @@ def _get_authentication_credential(**kwargs: Any) -> Optional[ManagedIdentityCre kv_pairs = auth_string.split(";") auth_string_d = dict(s.split("=") for s in kv_pairs) auth_string_d = {key.lower(): value for key, value in auth_string_d.items()} - if "authorization" in auth_string_d and auth_string_d["authorization"] == "AAD": + if ( + "authorization" in auth_string_d + and auth_string_d["authorization"] == "AAD" + ): if "clientid" in auth_string_d: - credential = ManagedIdentityCredential(client_id=auth_string_d["clientid"]) + credential = ManagedIdentityCredential( + client_id=auth_string_d["clientid"] + ) return credential credential = ManagedIdentityCredential() return credential except ValueError as exc: logger.error( # pylint: disable=do-not-log-exceptions-if-not-debug - "APPLICATIONINSIGHTS_AUTHENTICATION_STRING, %s, has invalid format: %s", auth_string, exc + "APPLICATIONINSIGHTS_AUTHENTICATION_STRING, %s, has invalid format: %s", + auth_string, + exc, ) except Exception as e: logger.error( # pylint: disable=do-not-log-exceptions-if-not-debug @@ -675,6 +791,8 @@ def _safe_psutil_call(func, default=""): ) subdirectory = _get_sha256_hash(hash_input) storage_directory = os.path.join( - shared_root, _AZURE_TEMPDIR_PREFIX + subdirectory, _TEMPDIR_PREFIX + instrumentation_key + shared_root, + _AZURE_TEMPDIR_PREFIX + subdirectory, + _TEMPDIR_PREFIX + instrumentation_key, ) return storage_directory diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_rate_limiter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_rate_limiter.py new file mode 100644 index 000000000000..86ae7fb7ae4b --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_rate_limiter.py @@ -0,0 +1,66 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import logging +import threading +import time + +logger = logging.getLogger(__name__) + +# Default maximum envelopes per second across all telemetry types. +# This is a client-side safety cap to prevent self-inflicted overload +# of shared ingestion infrastructure during telemetry bursts. +_DEFAULT_MAX_ENVELOPES_PER_SECOND = 10000 + +# Minimum allowed value to prevent misconfiguration +_MIN_MAX_ENVELOPES_PER_SECOND = 1 + + +class _TokenBucketRateLimiter: + """Thread-safe token bucket rate limiter for outbound telemetry. + + The bucket refills at ``max_per_second`` tokens per second and holds + at most ``max_per_second`` tokens (i.e. one second of burst capacity). + + :param float max_per_second: Maximum tokens (envelopes) allowed per second. + """ + + def __init__(self, max_per_second: float) -> None: + if max_per_second < _MIN_MAX_ENVELOPES_PER_SECOND: + raise ValueError( + f"max_per_second must be at least {_MIN_MAX_ENVELOPES_PER_SECOND}" + ) + self._max_per_second = float(max_per_second) + self._tokens = self._max_per_second # start full + self._last_refill = time.monotonic() + self._lock = threading.Lock() + + def try_consume(self, count: int) -> int: + """Try to consume *count* tokens from the bucket. + + Returns the number of tokens actually consumed (i.e. how many + envelopes may be sent). The caller should handle the remainder + (e.g. store for retry or drop). + + :param int count: Number of tokens requested. + :return: Number of tokens granted (<= *count*). + :rtype: int + """ + if count <= 0: + return 0 + + with self._lock: + now = time.monotonic() + elapsed = now - self._last_refill + self._last_refill = now + + # Refill tokens based on elapsed time, capped at bucket capacity + self._tokens = min( + self._max_per_second, + self._tokens + elapsed * self._max_per_second, + ) + + granted = min(count, int(self._tokens)) + self._tokens -= granted + + return granted diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_rate_limiter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_rate_limiter.py new file mode 100644 index 000000000000..08cb01358ed1 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_rate_limiter.py @@ -0,0 +1,215 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import threading +import time +import unittest + +from azure.monitor.opentelemetry.exporter.export._rate_limiter import ( + _TokenBucketRateLimiter, + _DEFAULT_MAX_ENVELOPES_PER_SECOND, + _MIN_MAX_ENVELOPES_PER_SECOND, +) + + +class TestTokenBucketRateLimiter(unittest.TestCase): + """Unit tests for _TokenBucketRateLimiter.""" + + def test_constructor_valid(self): + limiter = _TokenBucketRateLimiter(100) + self.assertIsNotNone(limiter) + + def test_constructor_rejects_zero(self): + with self.assertRaises(ValueError): + _TokenBucketRateLimiter(0) + + def test_constructor_rejects_negative(self): + with self.assertRaises(ValueError): + _TokenBucketRateLimiter(-1) + + def test_constructor_minimum(self): + limiter = _TokenBucketRateLimiter(_MIN_MAX_ENVELOPES_PER_SECOND) + self.assertIsNotNone(limiter) + + def test_consume_zero_returns_zero(self): + limiter = _TokenBucketRateLimiter(100) + self.assertEqual(limiter.try_consume(0), 0) + + def test_consume_negative_returns_zero(self): + limiter = _TokenBucketRateLimiter(100) + self.assertEqual(limiter.try_consume(-5), 0) + + def test_full_bucket_grants_all(self): + limiter = _TokenBucketRateLimiter(100) + # Bucket starts full at 100 tokens + granted = limiter.try_consume(50) + self.assertEqual(granted, 50) + + def test_full_bucket_grants_up_to_capacity(self): + limiter = _TokenBucketRateLimiter(100) + # Request more than capacity + granted = limiter.try_consume(200) + self.assertEqual(granted, 100) + + def test_bucket_depletes(self): + limiter = _TokenBucketRateLimiter(100) + # Drain the bucket + granted1 = limiter.try_consume(100) + self.assertEqual(granted1, 100) + # Immediately request more - bucket should be empty (or nearly so) + granted2 = limiter.try_consume(100) + # Should get very few or zero tokens + self.assertLessEqual(granted2, 5) + + def test_bucket_refills_over_time(self): + limiter = _TokenBucketRateLimiter(1000) + # Drain the bucket + limiter.try_consume(1000) + # Wait for partial refill + time.sleep(0.1) + granted = limiter.try_consume(1000) + # Should have refilled ~100 tokens (1000/sec * 0.1sec) + self.assertGreater(granted, 50) + self.assertLessEqual(granted, 200) + + def test_bucket_caps_at_capacity(self): + limiter = _TokenBucketRateLimiter(100) + # Wait a while - bucket should not exceed capacity + time.sleep(0.2) + granted = limiter.try_consume(200) + self.assertEqual(granted, 100) + + def test_thread_safety(self): + limiter = _TokenBucketRateLimiter(1000) + results = [] + errors = [] + + def consume(): + try: + for _ in range(100): + limiter.try_consume(1) + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=consume) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + self.assertEqual(len(errors), 0) + + def test_default_constant_value(self): + self.assertEqual(_DEFAULT_MAX_ENVELOPES_PER_SECOND, 10000) + + +class TestBaseExporterRateLimiting(unittest.TestCase): + """Integration tests for rate limiting in BaseExporter.""" + + @classmethod + def setUpClass(cls): + import os + + os.environ["APPINSIGHTS_INSTRUMENTATIONKEY"] = ( + "1234abcd-5678-4efa-8abc-1234567890ab" + ) + os.environ["APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL"] = "true" + os.environ["APPLICATIONINSIGHTS_SDKSTATS_DISABLED"] = "true" + + def test_rate_limiter_initialized_by_default(self): + from azure.monitor.opentelemetry.exporter.export._base import BaseExporter + + base = BaseExporter(disable_offline_storage=True) + self.assertIsNotNone(base._rate_limiter) + + def test_rate_limiter_disabled_with_zero(self): + from azure.monitor.opentelemetry.exporter.export._base import BaseExporter + + base = BaseExporter(disable_offline_storage=True, max_envelopes_per_second=0) + self.assertIsNone(base._rate_limiter) + + def test_rate_limiter_custom_value(self): + from azure.monitor.opentelemetry.exporter.export._base import BaseExporter + + base = BaseExporter(disable_offline_storage=True, max_envelopes_per_second=500) + self.assertIsNotNone(base._rate_limiter) + self.assertEqual(base._rate_limiter._max_per_second, 500.0) + + def test_transmit_rate_limited_batch_returns_retryable(self): + """When the entire batch is rejected by the rate limiter, _transmit returns FAILED_RETRYABLE.""" + from unittest import mock + from datetime import datetime + from azure.monitor.opentelemetry.exporter.export._base import ( + BaseExporter, + ExportResult, + ) + from azure.monitor.opentelemetry.exporter._generated.exporter.models import ( + TelemetryItem, + ) + + base = BaseExporter(disable_offline_storage=True, max_envelopes_per_second=10) + # Drain the bucket + base._rate_limiter.try_consume(10) + + envelopes = [TelemetryItem(name="Test", time=datetime.now()) for _ in range(5)] + result = base._transmit(envelopes) + self.assertEqual(result, ExportResult.FAILED_RETRYABLE) + + def test_transmit_partial_rate_limit_sends_admitted(self): + """When the rate limiter admits only part of the batch, the admitted portion is sent.""" + from unittest import mock + from datetime import datetime + from azure.monitor.opentelemetry.exporter.export._base import ( + BaseExporter, + ExportResult, + ) + from azure.monitor.opentelemetry.exporter._generated.exporter.models import ( + TelemetryItem, + TrackResponse, + ) + + base = BaseExporter(disable_offline_storage=True, max_envelopes_per_second=5) + # Drain bucket, then let a few tokens refill + base._rate_limiter.try_consume(5) + + # Manually set tokens to 3 for deterministic test + base._rate_limiter._tokens = 3 + + envelopes = [TelemetryItem(name="Test", time=datetime.now()) for _ in range(10)] + + with mock.patch.object(base.client, "track") as mock_track: + mock_track.return_value = TrackResponse(items_received=3, items_accepted=3) + result = base._transmit(envelopes) + # Should have sent only 3 envelopes + self.assertEqual(len(mock_track.call_args[0][0]), 3) + self.assertEqual(result, ExportResult.SUCCESS) + + def test_stats_exporter_bypasses_rate_limiter(self): + """Stats exporters should not be rate limited.""" + from unittest import mock + from datetime import datetime + from azure.monitor.opentelemetry.exporter.export._base import ( + BaseExporter, + ExportResult, + ) + from azure.monitor.opentelemetry.exporter._generated.exporter.models import ( + TelemetryItem, + TrackResponse, + ) + + base = BaseExporter(disable_offline_storage=True, max_envelopes_per_second=1) + base._is_sdkstats = True # Mark as stats exporter + # Drain the rate limiter + base._rate_limiter.try_consume(1) + + envelopes = [TelemetryItem(name="Test", time=datetime.now()) for _ in range(5)] + + with mock.patch.object(base.client, "track") as mock_track: + mock_track.return_value = TrackResponse(items_received=5, items_accepted=5) + result = base._transmit(envelopes) + # All 5 should be sent despite rate limiter being empty + self.assertEqual(len(mock_track.call_args[0][0]), 5) + self.assertEqual(result, ExportResult.SUCCESS) + + +if __name__ == "__main__": + unittest.main() From 2b4ce8cc1695e2893e4c66985b79cbcfacebe794 Mon Sep 17 00:00:00 2001 From: Hector Hernandez <39923391+hectorhdzg@users.noreply.github.com> Date: Tue, 19 May 2026 14:41:56 -0700 Subject: [PATCH 2/4] Address PR review: fix overflow double-persist, validate negative rate limit values - Mutate envelopes list in-place (del envelopes[granted:]) so callers' _handle_transmit_from_storage sees only admitted envelopes, preventing duplicate storage of overflow on retryable failures - Log a warning when overflow is deferred but storage is disabled - Reject negative max_envelopes_per_second with ValueError instead of silently disabling rate limiting (only 0 disables, per documentation) - Add clarifying comment that per-exporter rate limiting is intentional - Add tests: negative value rejection, in-place mutation, no-storage overflow --- .../opentelemetry/exporter/export/_base.py | 147 ++++++------------ .../exporter/export/_rate_limiter.py | 4 +- .../tests/test_rate_limiter.py | 38 ++++- 3 files changed, 84 insertions(+), 105 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py index 3f46a250731b..041f8b0a4d24 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py @@ -106,9 +106,7 @@ def __init__(self, **kwargs: Any) -> None: :keyword int max_envelopes_per_second: Maximum number of telemetry envelopes sent per second. Acts as a client-side safety cap to prevent overloading shared ingestion infrastructure during telemetry bursts. Defaults to 10000. Set to 0 to disable rate limiting. :rtype: None """ - parsed_connection_string = ConnectionStringParser( - kwargs.get("connection_string") - ) + parsed_connection_string = ConnectionStringParser(kwargs.get("connection_string")) # TODO: Uncomment configuration changes once testing is completed # Get the configuration manager @@ -140,31 +138,29 @@ def __init__(self, **kwargs: Any) -> None: if "storage_directory" in kwargs: self._storage_directory = kwargs.get("storage_directory") elif not self._disable_offline_storage: - self._storage_directory = _get_storage_directory( - self._instrumentation_key or "" - ) + self._storage_directory = _get_storage_directory(self._instrumentation_key or "") else: self._storage_directory = None self._storage_retention_period = kwargs.get( "storage_retention_period", 48 * 60 * 60 ) # Retention period in seconds (default 48 hrs) self._timeout = kwargs.get("timeout", 10.0) # networking timeout in seconds - max_eps = kwargs.get( - "max_envelopes_per_second", _DEFAULT_MAX_ENVELOPES_PER_SECOND - ) + max_eps = kwargs.get("max_envelopes_per_second", _DEFAULT_MAX_ENVELOPES_PER_SECOND) + if max_eps is not None and max_eps < 0: + raise ValueError("max_envelopes_per_second must be non-negative (0 disables rate limiting)") + # Each exporter instance gets its own rate limiter. This is intentional: + # different telemetry types (traces, logs, metrics) have different + # ingestion characteristics and burst profiles, so per-exporter caps + # provide more predictable behaviour than a shared process-wide bucket. if max_eps and max_eps > 0: - self._rate_limiter: Optional[_TokenBucketRateLimiter] = ( - _TokenBucketRateLimiter(max_eps) - ) + self._rate_limiter: Optional[_TokenBucketRateLimiter] = _TokenBucketRateLimiter(max_eps) else: self._rate_limiter = None self._distro_version = kwargs.get( _AZURE_MONITOR_DISTRO_VERSION_ARG, "" ) # If set, indicates the exporter is instantiated via Azure monitor OpenTelemetry distro. Versions corresponds to distro version. # specifies whether current exporter is used for collection of instrumentation metrics - self._instrumentation_collection = kwargs.get( - "instrumentation_collection", False - ) + self._instrumentation_collection = kwargs.get("instrumentation_collection", False) config = AzureMonitorClientConfiguration(self._endpoint, **kwargs) policies = [ @@ -176,9 +172,7 @@ def __init__(self, **kwargs: Any) -> None: # Handle redirects in exporter, set new endpoint if redirected RedirectPolicy(permit_redirects=False), config.retry_policy, - _get_auth_policy( - self._credential, config.authentication_policy, self._aad_audience - ), + _get_auth_policy(self._credential, config.authentication_policy, self._aad_audience), config.custom_hook_policy, config.logging_policy, # Explicitly disabling to avoid infinite loop of Span creation when data is exported @@ -259,9 +253,7 @@ def _transmit_from_storage(self) -> None: # If blob.get() returns None, delete the corrupted blob blob.delete() - def _handle_transmit_from_storage( - self, envelopes: List[TelemetryItem], result: ExportResult - ) -> None: + def _handle_transmit_from_storage(self, envelopes: List[TelemetryItem], result: ExportResult) -> None: if self.storage: if result == ExportResult.FAILED_RETRYABLE: envelopes_to_store = [x.as_dict() for x in envelopes] @@ -294,32 +286,36 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: if len(envelopes) > 0: # Client-side rate limiting: cap send rate to protect shared ingestion infrastructure. # Stats exporters bypass rate limiting to ensure observability data is not lost. - if ( - self._rate_limiter - and not self._is_stats_exporter() - and not self._is_customer_sdkstats_exporter() - ): + if self._rate_limiter and not self._is_stats_exporter() and not self._is_customer_sdkstats_exporter(): granted = self._rate_limiter.try_consume(len(envelopes)) if granted == 0: logger.warning( - "Rate limiter rejected entire batch of %d envelopes. " - "Routing to local storage for retry.", + "Rate limiter rejected entire batch of %d envelopes. " "Routing to local storage for retry.", len(envelopes), ) return ExportResult.FAILED_RETRYABLE if granted < len(envelopes): - # Send what we can, route the rest to local storage via FAILED_RETRYABLE + # Send what we can, route the rest to local storage. + # We mutate the list in-place so that the caller's reference + # (used later in _handle_transmit_from_storage) only sees + # the admitted envelopes, preventing double-persist of the + # overflow on a subsequent retryable failure. overflow = envelopes[granted:] - envelopes = envelopes[:granted] + del envelopes[granted:] logger.info( - "Rate limiter admitted %d of %d envelopes; " - "%d envelopes deferred to local storage.", + "Rate limiter admitted %d of %d envelopes; " "%d envelopes deferred to local storage.", granted, granted + len(overflow), len(overflow), ) if self.storage: self.storage.put([x.as_dict() for x in overflow]) + else: + logger.warning( + "Rate limiter deferred %d envelopes but offline " + "storage is disabled; these envelopes are dropped.", + len(overflow), + ) result = ExportResult.SUCCESS # Track whether or not exporter has successfully reached ingestion @@ -356,11 +352,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: logger.info( "Data dropped due to ingestion sampling: %s %s.", error.message, - ( - envelopes[error.index] - if error.index is not None - else "" - ), + (envelopes[error.index] if error.index is not None else ""), ) elif _is_retryable_code(error.status_code): resend_envelopes.append(envelopes[error.index]) # type: ignore @@ -377,33 +369,23 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: and error.index is not None and isinstance(error.status_code, int) ): - track_dropped_items( - [envelopes[error.index]], error.status_code - ) + track_dropped_items([envelopes[error.index]], error.status_code) logger.error( "Data drop %s: %s %s.", error.status_code, error.message, - ( - envelopes[error.index] - if error.index is not None - else "" - ), + (envelopes[error.index] if error.index is not None else ""), ) if self.storage and resend_envelopes: envelopes_to_store = [x.as_dict() for x in resend_envelopes] result_from_storage = self.storage.put(envelopes_to_store, 0) if self._should_collect_customer_sdkstats(): - track_dropped_items_from_storage( - result_from_storage, resend_envelopes - ) + track_dropped_items_from_storage(result_from_storage, resend_envelopes) self._consecutive_redirects = 0 elif resend_envelopes: # Track items that would have been retried but are dropped since client has local storage disabled if self._should_collect_customer_sdkstats(): - track_dropped_items( - resend_envelopes, DropCode.CLIENT_STORAGE_DISABLED - ) + track_dropped_items(resend_envelopes, DropCode.CLIENT_STORAGE_DISABLED) # Mark as not retryable because we already write to storage here result = ExportResult.FAILED_NOT_RETRYABLE except HttpResponseError as response_error: @@ -412,9 +394,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: reach_ingestion = True if _is_retryable_code(response_error.status_code): if self._should_collect_stats(): - _update_requests_map( - _REQ_RETRY_NAME[1], value=response_error.status_code - ) + _update_requests_map(_REQ_RETRY_NAME[1], value=response_error.status_code) result = ExportResult.FAILED_RETRYABLE # Log error for 401: Unauthorized, 403: Forbidden to assist with customer troubleshooting if not self._is_stats_exporter(): @@ -438,15 +418,11 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: ) elif _is_throttle_code(response_error.status_code): if self._should_collect_stats(): - _update_requests_map( - _REQ_THROTTLE_NAME[1], value=response_error.status_code - ) + _update_requests_map(_REQ_THROTTLE_NAME[1], value=response_error.status_code) result = ExportResult.FAILED_NOT_RETRYABLE if not self._is_stats_exporter(): - if self._should_collect_customer_sdkstats() and isinstance( - response_error.status_code, int - ): + if self._should_collect_customer_sdkstats() and isinstance(response_error.status_code, int): track_dropped_items(envelopes, response_error.status_code) elif _is_redirect_code(response_error.status_code): self._consecutive_redirects = self._consecutive_redirects + 1 @@ -458,13 +434,9 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: url = urlparse(location) else: redirect_has_headers = False - if ( - redirect_has_headers and url.scheme and url.netloc - ): # pylint: disable=E0606 + if redirect_has_headers and url.scheme and url.netloc: # pylint: disable=E0606 # Change the host to the new redirected host - self.client._config.host = "{}://{}".format( - url.scheme, url.netloc - ) # pylint: disable=W0212 + self.client._config.host = "{}://{}".format(url.scheme, url.netloc) # pylint: disable=W0212 # Attempt to export again result = self._transmit(envelopes) else: @@ -494,27 +466,21 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: ) # If redirect but did not return, exception occurred if self._should_collect_stats(): - _update_requests_map( - _REQ_EXCEPTION_NAME[1], value="Circular Redirect" - ) + _update_requests_map(_REQ_EXCEPTION_NAME[1], value="Circular Redirect") result = ExportResult.FAILED_NOT_RETRYABLE else: # Any other status code counts as failure (non-retryable) # 400 - Invalid - The server cannot or will not process the request due to the invalid telemetry (invalid data, iKey, etc.) # 404 - Ingestion is allowed only from stamp specific endpoint - must update connection string if self._should_collect_stats(): - _update_requests_map( - _REQ_FAILURE_NAME[1], value=response_error.status_code - ) + _update_requests_map(_REQ_FAILURE_NAME[1], value=response_error.status_code) if not self._is_stats_exporter(): logger.error( "Non-retryable server side error: %s.", response_error.message, ) # Track dropped items in customer sdkstats, non-retryable scenario - if self._should_collect_customer_sdkstats() and isinstance( - response_error.status_code, int - ): + if self._should_collect_customer_sdkstats() and isinstance(response_error.status_code, int): track_dropped_items(envelopes, response_error.status_code) if _is_invalid_code(response_error.status_code): # Shutdown statsbeat on invalid code from customer endpoint @@ -534,9 +500,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: # Errors when we're fairly sure that the server did not receive the # request, so it should be safe to retry. # ServiceRequestError is raised by azure.core for these cases - logger.warning( - "Retrying due to server request error: %s.", request_error.message - ) + logger.warning("Retrying due to server request error: %s.", request_error.message) # Track retry items in customer sdkstats for client-side exceptions if self._should_collect_customer_sdkstats(): @@ -544,9 +508,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: if self._should_collect_stats(): exc_type = request_error.exc_type - if exc_type is None or exc_type is type( - None - ): # pylint: disable=unidiomatic-typecheck + if exc_type is None or exc_type is type(None): # pylint: disable=unidiomatic-typecheck exc_type = request_error.__class__.__name__ # type: ignore _update_requests_map(_REQ_EXCEPTION_NAME[1], value=exc_type) result = ExportResult.FAILED_RETRYABLE @@ -564,17 +526,13 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: ) if self._should_collect_stats(): - _update_requests_map( - _REQ_EXCEPTION_NAME[1], value=ex.__class__.__name__ - ) + _update_requests_map(_REQ_EXCEPTION_NAME[1], value=ex.__class__.__name__) result = ExportResult.FAILED_NOT_RETRYABLE finally: if self._should_collect_stats(): end_time = time.time() _update_requests_map("count", 1) - _update_requests_map( - _REQ_DURATION_NAME[1], value=end_time - start_time - ) + _update_requests_map(_REQ_DURATION_NAME[1], value=end_time - start_time) if self._is_statsbeat_initializing_state(): # Update statsbeat initial success state if reached ingestion if reach_ingestion: @@ -622,11 +580,7 @@ def _should_collect_customer_sdkstats(self): # check to see if statsbeat is in "attempting to be initialized" state def _is_statsbeat_initializing_state(self): - return ( - self._is_stats_exporter() - and not get_statsbeat_shutdown() - and not get_statsbeat_initial_success() - ) + return self._is_stats_exporter() and not get_statsbeat_shutdown() and not get_statsbeat_initial_success() def _is_stats_exporter(self): return getattr(self, "_is_sdkstats", False) @@ -713,14 +667,9 @@ def _get_authentication_credential( kv_pairs = auth_string.split(";") auth_string_d = dict(s.split("=") for s in kv_pairs) auth_string_d = {key.lower(): value for key, value in auth_string_d.items()} - if ( - "authorization" in auth_string_d - and auth_string_d["authorization"] == "AAD" - ): + if "authorization" in auth_string_d and auth_string_d["authorization"] == "AAD": if "clientid" in auth_string_d: - credential = ManagedIdentityCredential( - client_id=auth_string_d["clientid"] - ) + credential = ManagedIdentityCredential(client_id=auth_string_d["clientid"]) return credential credential = ManagedIdentityCredential() return credential diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_rate_limiter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_rate_limiter.py index 86ae7fb7ae4b..dbe36344609b 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_rate_limiter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_rate_limiter.py @@ -27,9 +27,7 @@ class _TokenBucketRateLimiter: def __init__(self, max_per_second: float) -> None: if max_per_second < _MIN_MAX_ENVELOPES_PER_SECOND: - raise ValueError( - f"max_per_second must be at least {_MIN_MAX_ENVELOPES_PER_SECOND}" - ) + raise ValueError(f"max_per_second must be at least {_MIN_MAX_ENVELOPES_PER_SECOND}") self._max_per_second = float(max_per_second) self._tokens = self._max_per_second # start full self._last_refill = time.monotonic() diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_rate_limiter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_rate_limiter.py index 08cb01358ed1..33df5fefcfa1 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_rate_limiter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_rate_limiter.py @@ -109,9 +109,7 @@ class TestBaseExporterRateLimiting(unittest.TestCase): def setUpClass(cls): import os - os.environ["APPINSIGHTS_INSTRUMENTATIONKEY"] = ( - "1234abcd-5678-4efa-8abc-1234567890ab" - ) + os.environ["APPINSIGHTS_INSTRUMENTATIONKEY"] = "1234abcd-5678-4efa-8abc-1234567890ab" os.environ["APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL"] = "true" os.environ["APPLICATIONINSIGHTS_SDKSTATS_DISABLED"] = "true" @@ -127,6 +125,12 @@ def test_rate_limiter_disabled_with_zero(self): base = BaseExporter(disable_offline_storage=True, max_envelopes_per_second=0) self.assertIsNone(base._rate_limiter) + def test_rate_limiter_rejects_negative(self): + from azure.monitor.opentelemetry.exporter.export._base import BaseExporter + + with self.assertRaises(ValueError): + BaseExporter(disable_offline_storage=True, max_envelopes_per_second=-1) + def test_rate_limiter_custom_value(self): from azure.monitor.opentelemetry.exporter.export._base import BaseExporter @@ -182,6 +186,34 @@ def test_transmit_partial_rate_limit_sends_admitted(self): # Should have sent only 3 envelopes self.assertEqual(len(mock_track.call_args[0][0]), 3) self.assertEqual(result, ExportResult.SUCCESS) + # envelopes list should be mutated in-place to only contain the admitted portion + self.assertEqual(len(envelopes), 3) + + def test_transmit_partial_rate_limit_no_storage_drops_overflow(self): + """When storage is disabled and overflow occurs, overflow is dropped and logged.""" + from unittest import mock + from datetime import datetime + from azure.monitor.opentelemetry.exporter.export._base import ( + BaseExporter, + ExportResult, + ) + from azure.monitor.opentelemetry.exporter._generated.exporter.models import ( + TelemetryItem, + TrackResponse, + ) + + base = BaseExporter(disable_offline_storage=True, max_envelopes_per_second=5) + base._rate_limiter._tokens = 3 + + envelopes = [TelemetryItem(name="Test", time=datetime.now()) for _ in range(10)] + + with mock.patch.object(base.client, "track") as mock_track: + mock_track.return_value = TrackResponse(items_received=3, items_accepted=3) + # Should not raise even with no storage + result = base._transmit(envelopes) + self.assertEqual(result, ExportResult.SUCCESS) + # Only admitted envelopes remain + self.assertEqual(len(envelopes), 3) def test_stats_exporter_bypasses_rate_limiter(self): """Stats exporters should not be rate limited.""" From 49c8b8ed05c8e4bf075fa3da2c70efb56e5005f9 Mon Sep 17 00:00:00 2001 From: Hector Hernandez <39923391+hectorhdzg@users.noreply.github.com> Date: Wed, 20 May 2026 14:07:41 -0700 Subject: [PATCH 3/4] Fix pylint warnings: implicit-str-concat and too-many-statements --- .../azure/monitor/opentelemetry/exporter/export/_base.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py index 041f8b0a4d24..b0343b8f384a 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py @@ -91,6 +91,7 @@ class ExportResult(Enum): # pylint: disable=broad-except # pylint: disable=too-many-instance-attributes +# pylint: disable=too-many-statements # pylint: disable=C0301 class BaseExporter: """Azure Monitor base exporter for OpenTelemetry.""" @@ -290,7 +291,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: granted = self._rate_limiter.try_consume(len(envelopes)) if granted == 0: logger.warning( - "Rate limiter rejected entire batch of %d envelopes. " "Routing to local storage for retry.", + "Rate limiter rejected entire batch of %d envelopes. Routing to local storage for retry.", len(envelopes), ) return ExportResult.FAILED_RETRYABLE @@ -303,7 +304,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: overflow = envelopes[granted:] del envelopes[granted:] logger.info( - "Rate limiter admitted %d of %d envelopes; " "%d envelopes deferred to local storage.", + "Rate limiter admitted %d of %d envelopes; %d envelopes deferred to local storage.", granted, granted + len(overflow), len(overflow), From 44a2d771394435d52ce6f9956076a6403d632efb Mon Sep 17 00:00:00 2001 From: Hector Hernandez <39923391+hectorhdzg@users.noreply.github.com> Date: Wed, 20 May 2026 17:29:30 -0700 Subject: [PATCH 4/4] Fix redirect double rate-limit: skip rate limiting on recursive _transmit calls Address JacksonWeber review: when _transmit() hits a 307/308 redirect and calls itself recursively, the rate-limiting logic at the top would consume tokens a second time for the same batch. Add _skip_rate_limit parameter that is set to True on recursive calls to prevent this. --- .../opentelemetry/exporter/export/_base.py | 136 ++++++++++++++---- .../exporter/export/_rate_limiter.py | 4 +- .../tests/test_rate_limiter.py | 42 +++++- 3 files changed, 149 insertions(+), 33 deletions(-) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py index b0343b8f384a..4bd5f27f913f 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py @@ -107,7 +107,9 @@ def __init__(self, **kwargs: Any) -> None: :keyword int max_envelopes_per_second: Maximum number of telemetry envelopes sent per second. Acts as a client-side safety cap to prevent overloading shared ingestion infrastructure during telemetry bursts. Defaults to 10000. Set to 0 to disable rate limiting. :rtype: None """ - parsed_connection_string = ConnectionStringParser(kwargs.get("connection_string")) + parsed_connection_string = ConnectionStringParser( + kwargs.get("connection_string") + ) # TODO: Uncomment configuration changes once testing is completed # Get the configuration manager @@ -139,29 +141,39 @@ def __init__(self, **kwargs: Any) -> None: if "storage_directory" in kwargs: self._storage_directory = kwargs.get("storage_directory") elif not self._disable_offline_storage: - self._storage_directory = _get_storage_directory(self._instrumentation_key or "") + self._storage_directory = _get_storage_directory( + self._instrumentation_key or "" + ) else: self._storage_directory = None self._storage_retention_period = kwargs.get( "storage_retention_period", 48 * 60 * 60 ) # Retention period in seconds (default 48 hrs) self._timeout = kwargs.get("timeout", 10.0) # networking timeout in seconds - max_eps = kwargs.get("max_envelopes_per_second", _DEFAULT_MAX_ENVELOPES_PER_SECOND) + max_eps = kwargs.get( + "max_envelopes_per_second", _DEFAULT_MAX_ENVELOPES_PER_SECOND + ) if max_eps is not None and max_eps < 0: - raise ValueError("max_envelopes_per_second must be non-negative (0 disables rate limiting)") + raise ValueError( + "max_envelopes_per_second must be non-negative (0 disables rate limiting)" + ) # Each exporter instance gets its own rate limiter. This is intentional: # different telemetry types (traces, logs, metrics) have different # ingestion characteristics and burst profiles, so per-exporter caps # provide more predictable behaviour than a shared process-wide bucket. if max_eps and max_eps > 0: - self._rate_limiter: Optional[_TokenBucketRateLimiter] = _TokenBucketRateLimiter(max_eps) + self._rate_limiter: Optional[_TokenBucketRateLimiter] = ( + _TokenBucketRateLimiter(max_eps) + ) else: self._rate_limiter = None self._distro_version = kwargs.get( _AZURE_MONITOR_DISTRO_VERSION_ARG, "" ) # If set, indicates the exporter is instantiated via Azure monitor OpenTelemetry distro. Versions corresponds to distro version. # specifies whether current exporter is used for collection of instrumentation metrics - self._instrumentation_collection = kwargs.get("instrumentation_collection", False) + self._instrumentation_collection = kwargs.get( + "instrumentation_collection", False + ) config = AzureMonitorClientConfiguration(self._endpoint, **kwargs) policies = [ @@ -173,7 +185,9 @@ def __init__(self, **kwargs: Any) -> None: # Handle redirects in exporter, set new endpoint if redirected RedirectPolicy(permit_redirects=False), config.retry_policy, - _get_auth_policy(self._credential, config.authentication_policy, self._aad_audience), + _get_auth_policy( + self._credential, config.authentication_policy, self._aad_audience + ), config.custom_hook_policy, config.logging_policy, # Explicitly disabling to avoid infinite loop of Span creation when data is exported @@ -254,7 +268,9 @@ def _transmit_from_storage(self) -> None: # If blob.get() returns None, delete the corrupted blob blob.delete() - def _handle_transmit_from_storage(self, envelopes: List[TelemetryItem], result: ExportResult) -> None: + def _handle_transmit_from_storage( + self, envelopes: List[TelemetryItem], result: ExportResult + ) -> None: if self.storage: if result == ExportResult.FAILED_RETRYABLE: envelopes_to_store = [x.as_dict() for x in envelopes] @@ -273,7 +289,9 @@ def _handle_transmit_from_storage(self, envelopes: List[TelemetryItem], result: # pylint: disable=too-many-branches # pylint: disable=too-many-nested-blocks # pylint: disable=too-many-statements - def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: + def _transmit( + self, envelopes: List[TelemetryItem], _skip_rate_limit: bool = False + ) -> ExportResult: """ Transmit the data envelopes to the ingestion service. @@ -281,13 +299,22 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: throw an exception. :param envelopes: The list of telemetry items to transmit. :type envelopes: list of ~azure.monitor.opentelemetry.exporter._generated.exporter.models.TelemetryItem + :param _skip_rate_limit: Internal flag to skip rate limiting on recursive calls (e.g. redirects). + :type _skip_rate_limit: bool :return: The result of the export. :rtype: ~azure.monitor.opentelemetry.exporter.export._base._ExportResult """ if len(envelopes) > 0: # Client-side rate limiting: cap send rate to protect shared ingestion infrastructure. # Stats exporters bypass rate limiting to ensure observability data is not lost. - if self._rate_limiter and not self._is_stats_exporter() and not self._is_customer_sdkstats_exporter(): + # Skip rate limiting on recursive calls (e.g. 307/308 redirects) to avoid + # double-consuming tokens for the same batch. + if ( + not _skip_rate_limit + and self._rate_limiter + and not self._is_stats_exporter() + and not self._is_customer_sdkstats_exporter() + ): granted = self._rate_limiter.try_consume(len(envelopes)) if granted == 0: logger.warning( @@ -353,7 +380,11 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: logger.info( "Data dropped due to ingestion sampling: %s %s.", error.message, - (envelopes[error.index] if error.index is not None else ""), + ( + envelopes[error.index] + if error.index is not None + else "" + ), ) elif _is_retryable_code(error.status_code): resend_envelopes.append(envelopes[error.index]) # type: ignore @@ -370,23 +401,33 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: and error.index is not None and isinstance(error.status_code, int) ): - track_dropped_items([envelopes[error.index]], error.status_code) + track_dropped_items( + [envelopes[error.index]], error.status_code + ) logger.error( "Data drop %s: %s %s.", error.status_code, error.message, - (envelopes[error.index] if error.index is not None else ""), + ( + envelopes[error.index] + if error.index is not None + else "" + ), ) if self.storage and resend_envelopes: envelopes_to_store = [x.as_dict() for x in resend_envelopes] result_from_storage = self.storage.put(envelopes_to_store, 0) if self._should_collect_customer_sdkstats(): - track_dropped_items_from_storage(result_from_storage, resend_envelopes) + track_dropped_items_from_storage( + result_from_storage, resend_envelopes + ) self._consecutive_redirects = 0 elif resend_envelopes: # Track items that would have been retried but are dropped since client has local storage disabled if self._should_collect_customer_sdkstats(): - track_dropped_items(resend_envelopes, DropCode.CLIENT_STORAGE_DISABLED) + track_dropped_items( + resend_envelopes, DropCode.CLIENT_STORAGE_DISABLED + ) # Mark as not retryable because we already write to storage here result = ExportResult.FAILED_NOT_RETRYABLE except HttpResponseError as response_error: @@ -395,7 +436,9 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: reach_ingestion = True if _is_retryable_code(response_error.status_code): if self._should_collect_stats(): - _update_requests_map(_REQ_RETRY_NAME[1], value=response_error.status_code) + _update_requests_map( + _REQ_RETRY_NAME[1], value=response_error.status_code + ) result = ExportResult.FAILED_RETRYABLE # Log error for 401: Unauthorized, 403: Forbidden to assist with customer troubleshooting if not self._is_stats_exporter(): @@ -419,11 +462,15 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: ) elif _is_throttle_code(response_error.status_code): if self._should_collect_stats(): - _update_requests_map(_REQ_THROTTLE_NAME[1], value=response_error.status_code) + _update_requests_map( + _REQ_THROTTLE_NAME[1], value=response_error.status_code + ) result = ExportResult.FAILED_NOT_RETRYABLE if not self._is_stats_exporter(): - if self._should_collect_customer_sdkstats() and isinstance(response_error.status_code, int): + if self._should_collect_customer_sdkstats() and isinstance( + response_error.status_code, int + ): track_dropped_items(envelopes, response_error.status_code) elif _is_redirect_code(response_error.status_code): self._consecutive_redirects = self._consecutive_redirects + 1 @@ -435,11 +482,15 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: url = urlparse(location) else: redirect_has_headers = False - if redirect_has_headers and url.scheme and url.netloc: # pylint: disable=E0606 + if ( + redirect_has_headers and url.scheme and url.netloc + ): # pylint: disable=E0606 # Change the host to the new redirected host - self.client._config.host = "{}://{}".format(url.scheme, url.netloc) # pylint: disable=W0212 + self.client._config.host = "{}://{}".format( + url.scheme, url.netloc + ) # pylint: disable=W0212 # Attempt to export again - result = self._transmit(envelopes) + result = self._transmit(envelopes, _skip_rate_limit=True) else: if not self._is_stats_exporter(): if self._should_collect_customer_sdkstats(): @@ -467,21 +518,27 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: ) # If redirect but did not return, exception occurred if self._should_collect_stats(): - _update_requests_map(_REQ_EXCEPTION_NAME[1], value="Circular Redirect") + _update_requests_map( + _REQ_EXCEPTION_NAME[1], value="Circular Redirect" + ) result = ExportResult.FAILED_NOT_RETRYABLE else: # Any other status code counts as failure (non-retryable) # 400 - Invalid - The server cannot or will not process the request due to the invalid telemetry (invalid data, iKey, etc.) # 404 - Ingestion is allowed only from stamp specific endpoint - must update connection string if self._should_collect_stats(): - _update_requests_map(_REQ_FAILURE_NAME[1], value=response_error.status_code) + _update_requests_map( + _REQ_FAILURE_NAME[1], value=response_error.status_code + ) if not self._is_stats_exporter(): logger.error( "Non-retryable server side error: %s.", response_error.message, ) # Track dropped items in customer sdkstats, non-retryable scenario - if self._should_collect_customer_sdkstats() and isinstance(response_error.status_code, int): + if self._should_collect_customer_sdkstats() and isinstance( + response_error.status_code, int + ): track_dropped_items(envelopes, response_error.status_code) if _is_invalid_code(response_error.status_code): # Shutdown statsbeat on invalid code from customer endpoint @@ -501,7 +558,9 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: # Errors when we're fairly sure that the server did not receive the # request, so it should be safe to retry. # ServiceRequestError is raised by azure.core for these cases - logger.warning("Retrying due to server request error: %s.", request_error.message) + logger.warning( + "Retrying due to server request error: %s.", request_error.message + ) # Track retry items in customer sdkstats for client-side exceptions if self._should_collect_customer_sdkstats(): @@ -509,7 +568,9 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: if self._should_collect_stats(): exc_type = request_error.exc_type - if exc_type is None or exc_type is type(None): # pylint: disable=unidiomatic-typecheck + if exc_type is None or exc_type is type( + None + ): # pylint: disable=unidiomatic-typecheck exc_type = request_error.__class__.__name__ # type: ignore _update_requests_map(_REQ_EXCEPTION_NAME[1], value=exc_type) result = ExportResult.FAILED_RETRYABLE @@ -527,13 +588,17 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult: ) if self._should_collect_stats(): - _update_requests_map(_REQ_EXCEPTION_NAME[1], value=ex.__class__.__name__) + _update_requests_map( + _REQ_EXCEPTION_NAME[1], value=ex.__class__.__name__ + ) result = ExportResult.FAILED_NOT_RETRYABLE finally: if self._should_collect_stats(): end_time = time.time() _update_requests_map("count", 1) - _update_requests_map(_REQ_DURATION_NAME[1], value=end_time - start_time) + _update_requests_map( + _REQ_DURATION_NAME[1], value=end_time - start_time + ) if self._is_statsbeat_initializing_state(): # Update statsbeat initial success state if reached ingestion if reach_ingestion: @@ -581,7 +646,11 @@ def _should_collect_customer_sdkstats(self): # check to see if statsbeat is in "attempting to be initialized" state def _is_statsbeat_initializing_state(self): - return self._is_stats_exporter() and not get_statsbeat_shutdown() and not get_statsbeat_initial_success() + return ( + self._is_stats_exporter() + and not get_statsbeat_shutdown() + and not get_statsbeat_initial_success() + ) def _is_stats_exporter(self): return getattr(self, "_is_sdkstats", False) @@ -668,9 +737,14 @@ def _get_authentication_credential( kv_pairs = auth_string.split(";") auth_string_d = dict(s.split("=") for s in kv_pairs) auth_string_d = {key.lower(): value for key, value in auth_string_d.items()} - if "authorization" in auth_string_d and auth_string_d["authorization"] == "AAD": + if ( + "authorization" in auth_string_d + and auth_string_d["authorization"] == "AAD" + ): if "clientid" in auth_string_d: - credential = ManagedIdentityCredential(client_id=auth_string_d["clientid"]) + credential = ManagedIdentityCredential( + client_id=auth_string_d["clientid"] + ) return credential credential = ManagedIdentityCredential() return credential diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_rate_limiter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_rate_limiter.py index dbe36344609b..86ae7fb7ae4b 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_rate_limiter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_rate_limiter.py @@ -27,7 +27,9 @@ class _TokenBucketRateLimiter: def __init__(self, max_per_second: float) -> None: if max_per_second < _MIN_MAX_ENVELOPES_PER_SECOND: - raise ValueError(f"max_per_second must be at least {_MIN_MAX_ENVELOPES_PER_SECOND}") + raise ValueError( + f"max_per_second must be at least {_MIN_MAX_ENVELOPES_PER_SECOND}" + ) self._max_per_second = float(max_per_second) self._tokens = self._max_per_second # start full self._last_refill = time.monotonic() diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_rate_limiter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_rate_limiter.py index 33df5fefcfa1..ae32ef8e31d7 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_rate_limiter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_rate_limiter.py @@ -109,7 +109,9 @@ class TestBaseExporterRateLimiting(unittest.TestCase): def setUpClass(cls): import os - os.environ["APPINSIGHTS_INSTRUMENTATIONKEY"] = "1234abcd-5678-4efa-8abc-1234567890ab" + os.environ["APPINSIGHTS_INSTRUMENTATIONKEY"] = ( + "1234abcd-5678-4efa-8abc-1234567890ab" + ) os.environ["APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL"] = "true" os.environ["APPLICATIONINSIGHTS_SDKSTATS_DISABLED"] = "true" @@ -242,6 +244,44 @@ def test_stats_exporter_bypasses_rate_limiter(self): self.assertEqual(len(mock_track.call_args[0][0]), 5) self.assertEqual(result, ExportResult.SUCCESS) + def test_redirect_does_not_double_consume_rate_limiter(self): + """When _transmit recurses on a 307/308 redirect, rate limiting should be skipped.""" + from unittest import mock + from datetime import datetime + from azure.core.exceptions import HttpResponseError + from azure.monitor.opentelemetry.exporter.export._base import ( + BaseExporter, + ExportResult, + ) + from azure.monitor.opentelemetry.exporter._generated.exporter.models import ( + TelemetryItem, + TrackResponse, + ) + + base = BaseExporter(disable_offline_storage=True, max_envelopes_per_second=5) + + envelopes = [TelemetryItem(name="Test", time=datetime.now()) for _ in range(5)] + + # First call raises a 307 redirect, second call succeeds + mock_response = mock.Mock() + mock_response.headers = {"location": "https://redirected.example.com/v2/track"} + redirect_error = HttpResponseError( + message="Temporary Redirect", + response=mock_response, + ) + redirect_error.status_code = 307 + + with mock.patch.object(base.client, "track") as mock_track: + mock_track.side_effect = [ + redirect_error, + TrackResponse(items_received=5, items_accepted=5), + ] + result = base._transmit(envelopes) + # Should succeed after redirect + self.assertEqual(result, ExportResult.SUCCESS) + # track should have been called twice (original + redirect) + self.assertEqual(mock_track.call_count, 2) + if __name__ == "__main__": unittest.main()