diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py index 14354219d503..c6ff7eebc152 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py @@ -210,10 +210,18 @@ def __init__(self, **kwargs: Any) -> None: # Collect customer sdkstats metrics collect_customer_sdkstats(self) + # Maximum number of blobs to drain from storage per invocation. + # Prevents a retry storm when many blobs have accumulated during + # sustained throttling (e.g. 429). + _MAX_STORAGE_DRAIN_BATCH = 10 + def _transmit_from_storage(self) -> None: if not self.storage: return + drained = 0 for blob in self.storage.gets(): + if drained >= self._MAX_STORAGE_DRAIN_BATCH: + break # give a few more seconds for blob lease operation # to reduce the chance of race (for perf consideration) if blob.lease(self._timeout + 5): @@ -223,11 +231,16 @@ def _transmit_from_storage(self) -> None: result = self._transmit(envelopes) if result == ExportResult.FAILED_RETRYABLE: blob.lease(1) - else: - blob.delete() + # Stop draining: the service is still under + # pressure. Remaining blobs will be retried on + # the next successful export cycle, avoiding a + # burst of requests that re-triggers throttling. + break + blob.delete() else: # If blob.get() returns None, delete the corrupted blob blob.delete() + drained += 1 def _handle_transmit_from_storage(self, envelopes: List[TelemetryItem], result: ExportResult) -> None: if self.storage: diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py index 33d8634e04da..96eb47890399 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py @@ -423,6 +423,60 @@ def test_transmit_from_storage_blob_get_returns_none(self): blob_mock.delete.assert_called_once() # Corrupted blob should be deleted transmit_mock.assert_not_called() # No transmission should occur + def test_transmit_from_storage_stops_on_retryable_failure(self): + """Test that _transmit_from_storage stops draining blobs when + a retryable failure (e.g. 429) occurs, preventing retry storms.""" + exporter = BaseExporter() + exporter.storage = mock.Mock() + envelope_mock = {"name": "test", "time": "time"} + # Create three blobs in storage + blob1 = mock.Mock() + blob1.lease.return_value = True + blob1.get.return_value = [envelope_mock] + blob2 = mock.Mock() + blob2.lease.return_value = True + blob2.get.return_value = [envelope_mock] + blob3 = mock.Mock() + blob3.lease.return_value = True + blob3.get.return_value = [envelope_mock] + exporter.storage.gets.return_value = [blob1, blob2, blob3] + with mock.patch.object(exporter, "_transmit") as transmit_mock: + # First blob transmit fails with retryable error + transmit_mock.return_value = ExportResult.FAILED_RETRYABLE + exporter._transmit_from_storage() + # Only the first blob should have been attempted; loop should break + transmit_mock.assert_called_once() + blob1.lease.assert_called() + # blob2 and blob3 should not have been touched + blob2.lease.assert_not_called() + blob3.lease.assert_not_called() + + def test_transmit_from_storage_caps_drain_batch_size(self): + """Test that _transmit_from_storage processes at most + _MAX_STORAGE_DRAIN_BATCH blobs per invocation to prevent + flooding the service on recovery from throttling.""" + exporter = BaseExporter() + exporter.storage = mock.Mock() + envelope_mock = {"name": "test", "time": "time"} + num_blobs = exporter._MAX_STORAGE_DRAIN_BATCH + 5 + blobs = [] + for _ in range(num_blobs): + b = mock.Mock() + b.lease.return_value = True + b.get.return_value = [envelope_mock] + blobs.append(b) + exporter.storage.gets.return_value = blobs + with mock.patch.object(exporter, "_transmit") as transmit_mock: + transmit_mock.return_value = ExportResult.SUCCESS + exporter._transmit_from_storage() + # Should only process _MAX_STORAGE_DRAIN_BATCH blobs + self.assertEqual(transmit_mock.call_count, exporter._MAX_STORAGE_DRAIN_BATCH) + # The extra blobs beyond the cap should not be deleted + for i in range(exporter._MAX_STORAGE_DRAIN_BATCH): + blobs[i].delete.assert_called_once() + for i in range(exporter._MAX_STORAGE_DRAIN_BATCH, num_blobs): + blobs[i].delete.assert_not_called() + def test_telemetry_item_dict_roundtrip(self): """Test that TelemetryItem correctly round-trips through as_dict() -> TelemetryItem(dict) for all telemetry data types used in offline storage."""