Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,18 @@
DropCode,
_exception_categories,
)
from azure.monitor.opentelemetry.exporter._connection_string_parser import ConnectionStringParser
from azure.monitor.opentelemetry.exporter._connection_string_parser import (
ConnectionStringParser,
)
from azure.monitor.opentelemetry.exporter._storage import LocalFileStorage
from azure.monitor.opentelemetry.exporter._utils import (
_get_auth_policy,
_get_sha256_hash,
)
from azure.monitor.opentelemetry.exporter.export._rate_limiter import (
_TokenBucketRateLimiter,
_DEFAULT_MAX_ENVELOPES_PER_SECOND,
)
from azure.monitor.opentelemetry.exporter.statsbeat._state import (
get_statsbeat_initial_success,
get_statsbeat_shutdown,
Expand All @@ -70,7 +76,6 @@
get_customer_stats_manager,
)


logger = logging.getLogger(__name__)

_AZURE_TEMPDIR_PREFIX = "Microsoft-AzureMonitor-"
Expand All @@ -86,6 +91,7 @@ class ExportResult(Enum):

# pylint: disable=broad-except
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-statements
# pylint: disable=C0301
class BaseExporter:
"""Azure Monitor base exporter for OpenTelemetry."""
Expand All @@ -98,6 +104,7 @@ def __init__(self, **kwargs: Any) -> None:
:keyword ManagedIdentityCredential/ClientSecretCredential credential: Token credential, such as ManagedIdentityCredential or ClientSecretCredential, used for Azure Active Directory (AAD) authentication. Defaults to None.
:keyword bool disable_offline_storage: Determines whether to disable storing failed telemetry records for retry. Defaults to `False`.
:keyword str storage_directory: Storage path in which to store retry files. Defaults to `<tempfile.gettempdir()>/opentelemetry-python-<your-instrumentation-key>`.
:keyword int max_envelopes_per_second: Maximum number of telemetry envelopes sent per second. Acts as a client-side safety cap to prevent overloading shared ingestion infrastructure during telemetry bursts. Defaults to 10000. Set to 0 to disable rate limiting.
:rtype: None
"""
parsed_connection_string = ConnectionStringParser(kwargs.get("connection_string"))
Expand Down Expand Up @@ -139,6 +146,17 @@ def __init__(self, **kwargs: Any) -> None:
"storage_retention_period", 48 * 60 * 60
) # Retention period in seconds (default 48 hrs)
self._timeout = kwargs.get("timeout", 10.0) # networking timeout in seconds
max_eps = kwargs.get("max_envelopes_per_second", _DEFAULT_MAX_ENVELOPES_PER_SECOND)
if max_eps is not None and max_eps < 0:
raise ValueError("max_envelopes_per_second must be non-negative (0 disables rate limiting)")
# Each exporter instance gets its own rate limiter. This is intentional:
# different telemetry types (traces, logs, metrics) have different
# ingestion characteristics and burst profiles, so per-exporter caps
# provide more predictable behaviour than a shared process-wide bucket.
if max_eps and max_eps > 0:
self._rate_limiter: Optional[_TokenBucketRateLimiter] = _TokenBucketRateLimiter(max_eps)
else:
self._rate_limiter = None
Comment thread
hectorhdzg marked this conversation as resolved.
self._distro_version = kwargs.get(
_AZURE_MONITOR_DISTRO_VERSION_ARG, ""
) # If set, indicates the exporter is instantiated via Azure monitor OpenTelemetry distro. Versions corresponds to distro version.
Expand Down Expand Up @@ -168,7 +186,10 @@ def __init__(self, **kwargs: Any) -> None:
policies.append(config.http_logging_policy or HttpLoggingPolicy(**kwargs))

self.client: AzureMonitorClient = AzureMonitorClient(
host=self._endpoint, connection_timeout=self._timeout, policies=policies, **kwargs
host=self._endpoint,
connection_timeout=self._timeout,
policies=policies,
**kwargs,
)
# TODO: Uncomment configuration changes once testing is completed
# if self._configuration_manager:
Expand All @@ -195,7 +216,9 @@ def __init__(self, **kwargs: Any) -> None:
if self._should_collect_stats():
try:
# Import here to avoid circular dependencies
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import collect_statsbeat_metrics
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import (
collect_statsbeat_metrics,
)

collect_statsbeat_metrics(self)
except Exception as e: # pylint: disable=broad-except
Expand All @@ -205,7 +228,9 @@ def __init__(self, **kwargs: Any) -> None:

# customer sdkstats initialization
if self._should_collect_customer_sdkstats():
from azure.monitor.opentelemetry.exporter.statsbeat.customer import collect_customer_sdkstats
from azure.monitor.opentelemetry.exporter.statsbeat.customer import (
collect_customer_sdkstats,
)

# Collect customer sdkstats metrics
collect_customer_sdkstats(self)
Expand Down Expand Up @@ -260,6 +285,39 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
:rtype: ~azure.monitor.opentelemetry.exporter.export._base._ExportResult
"""
if len(envelopes) > 0:
# Client-side rate limiting: cap send rate to protect shared ingestion infrastructure.
# Stats exporters bypass rate limiting to ensure observability data is not lost.
if self._rate_limiter and not self._is_stats_exporter() and not self._is_customer_sdkstats_exporter():
granted = self._rate_limiter.try_consume(len(envelopes))
if granted == 0:
logger.warning(
"Rate limiter rejected entire batch of %d envelopes. Routing to local storage for retry.",
len(envelopes),
)
return ExportResult.FAILED_RETRYABLE
if granted < len(envelopes):
# Send what we can, route the rest to local storage.
# We mutate the list in-place so that the caller's reference
# (used later in _handle_transmit_from_storage) only sees
# the admitted envelopes, preventing double-persist of the
# overflow on a subsequent retryable failure.
overflow = envelopes[granted:]
del envelopes[granted:]
logger.info(
"Rate limiter admitted %d of %d envelopes; %d envelopes deferred to local storage.",
granted,
granted + len(overflow),
len(overflow),
)
if self.storage:
self.storage.put([x.as_dict() for x in overflow])
Comment thread
hectorhdzg marked this conversation as resolved.
else:
logger.warning(
"Rate limiter deferred %d envelopes but offline "
"storage is disabled; these envelopes are dropped.",
len(overflow),
)

result = ExportResult.SUCCESS
# Track whether or not exporter has successfully reached ingestion
# Currently only used for statsbeat exporter to detect shutdown cases
Expand Down Expand Up @@ -295,7 +353,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
logger.info(
"Data dropped due to ingestion sampling: %s %s.",
error.message,
envelopes[error.index] if error.index is not None else "",
(envelopes[error.index] if error.index is not None else ""),
)
elif _is_retryable_code(error.status_code):
resend_envelopes.append(envelopes[error.index]) # type: ignore
Expand All @@ -317,7 +375,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
"Data drop %s: %s %s.",
error.status_code,
error.message,
envelopes[error.index] if error.index is not None else "",
(envelopes[error.index] if error.index is not None else ""),
)
if self.storage and resend_envelopes:
envelopes_to_store = [x.as_dict() for x in resend_envelopes]
Expand Down Expand Up @@ -399,7 +457,9 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
# Track dropped items in customer sdkstats, non-retryable scenario
if self._should_collect_customer_sdkstats():
track_dropped_items(
envelopes, DropCode.CLIENT_EXCEPTION, _exception_categories.CLIENT_EXCEPTION.value
envelopes,
DropCode.CLIENT_EXCEPTION,
_exception_categories.CLIENT_EXCEPTION.value,
)
logger.error(
"Error sending telemetry because of circular redirects. "
Expand Down Expand Up @@ -461,7 +521,9 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
# Track dropped items in customer sdkstats for general exceptions
if self._should_collect_customer_sdkstats():
track_dropped_items(
envelopes, DropCode.CLIENT_EXCEPTION, _exception_categories.CLIENT_EXCEPTION.value
envelopes,
DropCode.CLIENT_EXCEPTION,
_exception_categories.CLIENT_EXCEPTION.value,
)

if self._should_collect_stats():
Expand Down Expand Up @@ -595,7 +657,9 @@ def _is_sampling_rejection(message: Optional[str]) -> bool:


# mypy: disable-error-code="union-attr"
def _get_authentication_credential(**kwargs: Any) -> Optional[ManagedIdentityCredential]:
def _get_authentication_credential(
**kwargs: Any,
) -> Optional[ManagedIdentityCredential]:
if "credential" in kwargs:
return kwargs.get("credential")
auth_string = os.getenv(_APPLICATIONINSIGHTS_AUTHENTICATION_STRING, "")
Expand All @@ -612,7 +676,9 @@ def _get_authentication_credential(**kwargs: Any) -> Optional[ManagedIdentityCre
return credential
except ValueError as exc:
logger.error( # pylint: disable=do-not-log-exceptions-if-not-debug
"APPLICATIONINSIGHTS_AUTHENTICATION_STRING, %s, has invalid format: %s", auth_string, exc
"APPLICATIONINSIGHTS_AUTHENTICATION_STRING, %s, has invalid format: %s",
auth_string,
exc,
)
except Exception as e:
logger.error( # pylint: disable=do-not-log-exceptions-if-not-debug
Expand Down Expand Up @@ -675,6 +741,8 @@ def _safe_psutil_call(func, default=""):
)
subdirectory = _get_sha256_hash(hash_input)
storage_directory = os.path.join(
shared_root, _AZURE_TEMPDIR_PREFIX + subdirectory, _TEMPDIR_PREFIX + instrumentation_key
shared_root,
_AZURE_TEMPDIR_PREFIX + subdirectory,
_TEMPDIR_PREFIX + instrumentation_key,
)
return storage_directory
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import logging
import threading
import time

logger = logging.getLogger(__name__)

# Default maximum envelopes per second across all telemetry types.
# This is a client-side safety cap to prevent self-inflicted overload
# of shared ingestion infrastructure during telemetry bursts.
_DEFAULT_MAX_ENVELOPES_PER_SECOND = 10000

# Minimum allowed value to prevent misconfiguration
_MIN_MAX_ENVELOPES_PER_SECOND = 1


class _TokenBucketRateLimiter:
"""Thread-safe token bucket rate limiter for outbound telemetry.

The bucket refills at ``max_per_second`` tokens per second and holds
at most ``max_per_second`` tokens (i.e. one second of burst capacity).

:param float max_per_second: Maximum tokens (envelopes) allowed per second.
"""

def __init__(self, max_per_second: float) -> None:
if max_per_second < _MIN_MAX_ENVELOPES_PER_SECOND:
raise ValueError(f"max_per_second must be at least {_MIN_MAX_ENVELOPES_PER_SECOND}")
self._max_per_second = float(max_per_second)
self._tokens = self._max_per_second # start full
self._last_refill = time.monotonic()
self._lock = threading.Lock()

def try_consume(self, count: int) -> int:
"""Try to consume *count* tokens from the bucket.

Returns the number of tokens actually consumed (i.e. how many
envelopes may be sent). The caller should handle the remainder
(e.g. store for retry or drop).

:param int count: Number of tokens requested.
:return: Number of tokens granted (<= *count*).
:rtype: int
"""
if count <= 0:
return 0

with self._lock:
now = time.monotonic()
elapsed = now - self._last_refill
self._last_refill = now

# Refill tokens based on elapsed time, capped at bucket capacity
self._tokens = min(
self._max_per_second,
self._tokens + elapsed * self._max_per_second,
)

granted = min(count, int(self._tokens))
self._tokens -= granted

return granted
Loading