Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,18 @@ def __init__(self, **kwargs: Any) -> None:
# Collect customer sdkstats metrics
collect_customer_sdkstats(self)

# Maximum number of blobs to drain from storage per invocation.
# Prevents a retry storm when many blobs have accumulated during
# sustained throttling (e.g. 429).
_MAX_STORAGE_DRAIN_BATCH = 10

def _transmit_from_storage(self) -> None:
if not self.storage:
return
drained = 0
for blob in self.storage.gets():
if drained >= self._MAX_STORAGE_DRAIN_BATCH:
break
# give a few more seconds for blob lease operation
# to reduce the chance of race (for perf consideration)
if blob.lease(self._timeout + 5):
Expand All @@ -223,11 +231,16 @@ def _transmit_from_storage(self) -> None:
result = self._transmit(envelopes)
if result == ExportResult.FAILED_RETRYABLE:
blob.lease(1)
else:
blob.delete()
# Stop draining: the service is still under
# pressure. Remaining blobs will be retried on
# the next successful export cycle, avoiding a
# burst of requests that re-triggers throttling.
break
blob.delete()
else:
# If blob.get() returns None, delete the corrupted blob
blob.delete()
drained += 1

def _handle_transmit_from_storage(self, envelopes: List[TelemetryItem], result: ExportResult) -> None:
if self.storage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,60 @@ def test_transmit_from_storage_blob_get_returns_none(self):
blob_mock.delete.assert_called_once() # Corrupted blob should be deleted
transmit_mock.assert_not_called() # No transmission should occur

def test_transmit_from_storage_stops_on_retryable_failure(self):
"""Test that _transmit_from_storage stops draining blobs when
a retryable failure (e.g. 429) occurs, preventing retry storms."""
exporter = BaseExporter()
exporter.storage = mock.Mock()
envelope_mock = {"name": "test", "time": "time"}
# Create three blobs in storage
blob1 = mock.Mock()
blob1.lease.return_value = True
blob1.get.return_value = [envelope_mock]
blob2 = mock.Mock()
blob2.lease.return_value = True
blob2.get.return_value = [envelope_mock]
blob3 = mock.Mock()
blob3.lease.return_value = True
blob3.get.return_value = [envelope_mock]
exporter.storage.gets.return_value = [blob1, blob2, blob3]
with mock.patch.object(exporter, "_transmit") as transmit_mock:
# First blob transmit fails with retryable error
transmit_mock.return_value = ExportResult.FAILED_RETRYABLE
exporter._transmit_from_storage()
# Only the first blob should have been attempted; loop should break
transmit_mock.assert_called_once()
blob1.lease.assert_called()
# blob2 and blob3 should not have been touched
blob2.lease.assert_not_called()
blob3.lease.assert_not_called()

def test_transmit_from_storage_caps_drain_batch_size(self):
"""Test that _transmit_from_storage processes at most
_MAX_STORAGE_DRAIN_BATCH blobs per invocation to prevent
flooding the service on recovery from throttling."""
exporter = BaseExporter()
exporter.storage = mock.Mock()
envelope_mock = {"name": "test", "time": "time"}
num_blobs = exporter._MAX_STORAGE_DRAIN_BATCH + 5
blobs = []
for _ in range(num_blobs):
b = mock.Mock()
b.lease.return_value = True
b.get.return_value = [envelope_mock]
blobs.append(b)
exporter.storage.gets.return_value = blobs
with mock.patch.object(exporter, "_transmit") as transmit_mock:
transmit_mock.return_value = ExportResult.SUCCESS
exporter._transmit_from_storage()
# Should only process _MAX_STORAGE_DRAIN_BATCH blobs
self.assertEqual(transmit_mock.call_count, exporter._MAX_STORAGE_DRAIN_BATCH)
# The extra blobs beyond the cap should not be deleted
for i in range(exporter._MAX_STORAGE_DRAIN_BATCH):
blobs[i].delete.assert_called_once()
for i in range(exporter._MAX_STORAGE_DRAIN_BATCH, num_blobs):
blobs[i].delete.assert_not_called()

def test_telemetry_item_dict_roundtrip(self):
"""Test that TelemetryItem correctly round-trips through as_dict() -> TelemetryItem(dict)
for all telemetry data types used in offline storage."""
Expand Down