diff --git a/.gitignore b/.gitignore
index c5b6d2e..f83c870 100644
--- a/.gitignore
+++ b/.gitignore
@@ -92,6 +92,7 @@ $RECYCLE.BIN/
x64/
x86/
bld/
+build/
[Bb]in/
[Oo]bj/
[Ll]og/
diff --git a/Analytics-CSharp/Analytics-CSharp.csproj b/Analytics-CSharp/Analytics-CSharp.csproj
index 7729c72..32c7619 100644
--- a/Analytics-CSharp/Analytics-CSharp.csproj
+++ b/Analytics-CSharp/Analytics-CSharp.csproj
@@ -49,5 +49,8 @@
<_Parameter1>DynamicProxyGenAssembly2
+
+ <_Parameter1>e2e-cli
+
diff --git a/Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs b/Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs
index 9586be0..de0dd9d 100644
--- a/Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs
+++ b/Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs
@@ -1,3 +1,4 @@
+using Segment.Analytics.Retry;
using Segment.Analytics.Utilities;
using Segment.Serialization;
using Segment.Sovran;
@@ -77,10 +78,23 @@ public override void Update(Settings settings, UpdateType type)
base.Update(settings, type);
JsonObject segmentInfo = settings.Integrations?.GetJsonObject(Key);
+
string apiHost = segmentInfo?.GetString(ApiHost);
if (apiHost != null && _pipeline != null)
- {
_pipeline.ApiHost = apiHost;
+
+ JsonObject httpConfigJson = segmentInfo?.GetJsonObject("httpConfig");
+ if (httpConfigJson != null)
+ {
+ HttpConfig parsedConfig = HttpConfigParser.Parse(httpConfigJson);
+ if (parsedConfig != null)
+ {
+ EventPipeline concretePipeline = _pipeline as EventPipeline;
+ concretePipeline?.UpdateHttpConfig(parsedConfig);
+
+ SyncEventPipeline syncPipeline = _pipeline as SyncEventPipeline;
+ syncPipeline?.UpdateHttpConfig(parsedConfig);
+ }
}
}
diff --git a/Analytics-CSharp/Segment/Analytics/Retry/HttpConfigParser.cs b/Analytics-CSharp/Segment/Analytics/Retry/HttpConfigParser.cs
new file mode 100644
index 0000000..8412944
--- /dev/null
+++ b/Analytics-CSharp/Segment/Analytics/Retry/HttpConfigParser.cs
@@ -0,0 +1,128 @@
+using System.Collections.Generic;
+using System.Globalization;
+using Segment.Serialization;
+
+namespace Segment.Analytics.Retry
+{
+ internal static class HttpConfigParser
+ {
+ public static HttpConfig Parse(JsonObject httpConfigJson)
+ {
+ if (httpConfigJson == null)
+ return null;
+
+ JsonObject rateLimitJson = httpConfigJson.GetJsonObject("rateLimitConfig");
+ JsonObject backoffJson = httpConfigJson.GetJsonObject("backoffConfig");
+
+ // CDN-sourced config defaults enabled to true (presence implies active).
+ // Only honor explicit enabled: false from CDN.
+ bool rateLimitEnabled = true;
+ if (rateLimitJson != null)
+ {
+ string enabledStr = rateLimitJson.GetString("enabled");
+ if (enabledStr != null && bool.TryParse(enabledStr, out bool parsed))
+ rateLimitEnabled = parsed;
+ }
+
+ bool backoffEnabled = true;
+ if (backoffJson != null)
+ {
+ string enabledStr = backoffJson.GetString("enabled");
+ if (enabledStr != null && bool.TryParse(enabledStr, out bool parsed))
+ backoffEnabled = parsed;
+ }
+
+ RateLimitConfig rateLimitConfig = ParseRateLimitConfig(rateLimitJson, rateLimitEnabled);
+ BackoffConfig backoffConfig = ParseBackoffConfig(backoffJson, backoffEnabled);
+
+ return new HttpConfig(
+ rateLimitConfig: rateLimitConfig.Validated(),
+ backoffConfig: backoffConfig.Validated()
+ );
+ }
+
+ private static RateLimitConfig ParseRateLimitConfig(JsonObject json, bool enabled)
+ {
+ if (json == null)
+ return new RateLimitConfig(enabled: enabled);
+
+ int maxRetryCount = 100;
+ string maxRetriesStr = json.GetString("maxRetryCount");
+ if (maxRetriesStr != null && int.TryParse(maxRetriesStr, out int parsedMaxRetries))
+ maxRetryCount = parsedMaxRetries;
+
+ int maxRetryInterval = 300;
+ string intervalStr = json.GetString("maxRetryInterval");
+ if (intervalStr != null && int.TryParse(intervalStr, out int parsedInterval))
+ maxRetryInterval = parsedInterval;
+
+ return new RateLimitConfig(
+ enabled: enabled,
+ maxRetryCount: maxRetryCount,
+ maxRetryInterval: maxRetryInterval
+ );
+ }
+
+ private static BackoffConfig ParseBackoffConfig(JsonObject json, bool enabled)
+ {
+ if (json == null)
+ return new BackoffConfig(enabled: enabled);
+
+ int maxRetryCount = 100;
+ string maxRetriesStr = json.GetString("maxRetryCount");
+ if (maxRetriesStr != null && int.TryParse(maxRetriesStr, out int parsedMaxRetries))
+ maxRetryCount = parsedMaxRetries;
+
+ double baseBackoffInterval = 0.5;
+ string baseStr = json.GetString("baseBackoffInterval");
+ if (baseStr != null && double.TryParse(baseStr, NumberStyles.Float, CultureInfo.InvariantCulture, out double parsedBase))
+ baseBackoffInterval = parsedBase;
+
+ int maxBackoffInterval = 300;
+ string maxStr = json.GetString("maxBackoffInterval");
+ if (maxStr != null && int.TryParse(maxStr, out int parsedMax))
+ maxBackoffInterval = parsedMax;
+
+ long maxTotalBackoffDuration = 43200;
+ string durationStr = json.GetString("maxTotalBackoffDuration");
+ if (durationStr != null && long.TryParse(durationStr, out long parsedDuration))
+ maxTotalBackoffDuration = parsedDuration;
+
+ int jitterPercent = 10;
+ string jitterStr = json.GetString("jitterPercent");
+ if (jitterStr != null && int.TryParse(jitterStr, out int parsedJitter))
+ jitterPercent = parsedJitter;
+
+ Dictionary statusCodeOverrides = null;
+ JsonObject overridesJson = json.GetJsonObject("statusCodeOverrides");
+ if (overridesJson != null)
+ statusCodeOverrides = ParseStatusCodeOverrides(overridesJson);
+
+ return new BackoffConfig(
+ enabled: enabled,
+ maxRetryCount: maxRetryCount,
+ baseBackoffInterval: baseBackoffInterval,
+ maxBackoffInterval: maxBackoffInterval,
+ maxTotalBackoffDuration: maxTotalBackoffDuration,
+ jitterPercent: jitterPercent,
+ statusCodeOverrides: statusCodeOverrides
+ );
+ }
+
+ private static Dictionary ParseStatusCodeOverrides(JsonObject json)
+ {
+ var result = new Dictionary();
+ foreach (string key in json.Keys)
+ {
+ if (!int.TryParse(key, out int code) || code < 100 || code > 599)
+ continue;
+ string val = json.GetString(key);
+ if (val == "retry")
+ result[code] = RetryBehavior.Retry;
+ else if (val == "drop")
+ result[code] = RetryBehavior.Drop;
+ }
+ return result;
+ }
+ }
+}
diff --git a/Analytics-CSharp/Segment/Analytics/Retry/RetryConfig.cs b/Analytics-CSharp/Segment/Analytics/Retry/RetryConfig.cs
new file mode 100644
index 0000000..5be1331
--- /dev/null
+++ b/Analytics-CSharp/Segment/Analytics/Retry/RetryConfig.cs
@@ -0,0 +1,123 @@
+using System;
+using System.Collections.Generic;
+
+namespace Segment.Analytics.Retry
+{
+ internal class RateLimitConfig
+ {
+ public bool Enabled { get; }
+ public int MaxRetryCount { get; }
+ public int MaxRetryInterval { get; }
+
+ public RateLimitConfig(bool enabled = false, int maxRetryCount = 100, int maxRetryInterval = 300)
+ {
+ Enabled = enabled;
+ MaxRetryCount = maxRetryCount;
+ MaxRetryInterval = maxRetryInterval;
+ }
+
+ public RateLimitConfig Validated() => new RateLimitConfig(
+ enabled: Enabled,
+ maxRetryCount: Math.Max(0, Math.Min(MaxRetryCount, 1000)),
+ maxRetryInterval: Math.Max(1, Math.Min(MaxRetryInterval, 3600))
+ );
+ }
+
+ internal class BackoffConfig
+ {
+ public bool Enabled { get; }
+ public int MaxRetryCount { get; }
+ public double BaseBackoffInterval { get; }
+ public int MaxBackoffInterval { get; }
+ public long MaxTotalBackoffDuration { get; }
+ public int JitterPercent { get; }
+ public RetryBehavior Default4xxBehavior { get; }
+ public RetryBehavior Default5xxBehavior { get; }
+ public RetryBehavior UnknownCodeBehavior { get; }
+ public Dictionary StatusCodeOverrides { get; }
+
+ public BackoffConfig(
+ bool enabled = false,
+ int maxRetryCount = 100,
+ double baseBackoffInterval = 0.5,
+ int maxBackoffInterval = 300,
+ long maxTotalBackoffDuration = 43200,
+ int jitterPercent = 10,
+ RetryBehavior default4xxBehavior = RetryBehavior.Drop,
+ RetryBehavior default5xxBehavior = RetryBehavior.Retry,
+ RetryBehavior unknownCodeBehavior = RetryBehavior.Drop,
+ Dictionary statusCodeOverrides = null)
+ {
+ Enabled = enabled;
+ MaxRetryCount = maxRetryCount;
+ BaseBackoffInterval = baseBackoffInterval;
+ MaxBackoffInterval = maxBackoffInterval;
+ MaxTotalBackoffDuration = maxTotalBackoffDuration;
+ JitterPercent = jitterPercent;
+ Default4xxBehavior = default4xxBehavior;
+ Default5xxBehavior = default5xxBehavior;
+ UnknownCodeBehavior = unknownCodeBehavior;
+ StatusCodeOverrides = statusCodeOverrides ?? DefaultStatusCodeOverrides;
+ }
+
+ public BackoffConfig Validated() => new BackoffConfig(
+ enabled: Enabled,
+ maxRetryCount: Math.Max(0, Math.Min(MaxRetryCount, 1000)),
+ baseBackoffInterval: Math.Max(0.1, Math.Min(BaseBackoffInterval, 60.0)),
+ maxBackoffInterval: Math.Max(1, Math.Min(MaxBackoffInterval, 3600)),
+ maxTotalBackoffDuration: Math.Max(0, Math.Min(MaxTotalBackoffDuration, 604800)),
+ jitterPercent: Math.Max(0, Math.Min(JitterPercent, 50)),
+ default4xxBehavior: Default4xxBehavior,
+ default5xxBehavior: Default5xxBehavior,
+ unknownCodeBehavior: UnknownCodeBehavior,
+ statusCodeOverrides: ValidateOverrides(StatusCodeOverrides)
+ );
+
+ private static Dictionary ValidateOverrides(
+ Dictionary overrides)
+ {
+ var result = new Dictionary();
+ foreach (var kvp in overrides)
+ {
+ if (kvp.Key >= 100 && kvp.Key <= 599)
+ result[kvp.Key] = kvp.Value;
+ }
+ return result;
+ }
+
+ private static readonly Dictionary DefaultStatusCodeOverrides =
+ new Dictionary
+ {
+ { 408, RetryBehavior.Retry },
+ { 410, RetryBehavior.Retry },
+ { 429, RetryBehavior.Retry },
+ { 460, RetryBehavior.Retry },
+ { 501, RetryBehavior.Drop },
+ { 505, RetryBehavior.Drop }
+ };
+ }
+
+ internal class RetryConfig
+ {
+ public RateLimitConfig RateLimitConfig { get; }
+ public BackoffConfig BackoffConfig { get; }
+
+ public RetryConfig(RateLimitConfig rateLimitConfig = null, BackoffConfig backoffConfig = null)
+ {
+ RateLimitConfig = rateLimitConfig ?? new RateLimitConfig();
+ BackoffConfig = backoffConfig ?? new BackoffConfig();
+ }
+ }
+
+ internal class HttpConfig
+ {
+ public RateLimitConfig RateLimitConfig { get; }
+ public BackoffConfig BackoffConfig { get; }
+
+ public HttpConfig(RateLimitConfig rateLimitConfig = null, BackoffConfig backoffConfig = null)
+ {
+ RateLimitConfig = rateLimitConfig ?? new RateLimitConfig();
+ BackoffConfig = backoffConfig ?? new BackoffConfig();
+ }
+ }
+}
diff --git a/Analytics-CSharp/Segment/Analytics/Retry/RetryState.cs b/Analytics-CSharp/Segment/Analytics/Retry/RetryState.cs
new file mode 100644
index 0000000..f56b6a0
--- /dev/null
+++ b/Analytics-CSharp/Segment/Analytics/Retry/RetryState.cs
@@ -0,0 +1,93 @@
+using System.Collections.Generic;
+using System.Linq;
+
+namespace Segment.Analytics.Retry
+{
+ internal class BatchMetadata
+ {
+ public int FailureCount { get; }
+ public long? NextRetryTime { get; }
+ public long? FirstFailureTime { get; }
+
+ public BatchMetadata(int failureCount = 0, long? nextRetryTime = null, long? firstFailureTime = null)
+ {
+ FailureCount = failureCount;
+ NextRetryTime = nextRetryTime;
+ FirstFailureTime = firstFailureTime;
+ }
+
+ public bool ShouldRetry(long currentTime)
+ {
+ if (NextRetryTime == null) return true;
+ return currentTime >= NextRetryTime.Value;
+ }
+
+ public bool ExceedsMaxDuration(long currentTime, long maxDurationMs)
+ {
+ if (FirstFailureTime == null) return false;
+ return (currentTime - FirstFailureTime.Value) > maxDurationMs;
+ }
+ }
+
+ internal class RetryState
+ {
+ public PipelineState PipelineState { get; }
+ public long? WaitUntilTime { get; }
+ public int GlobalRetryCount { get; }
+ public Dictionary BatchMetadata { get; }
+
+ private static readonly Dictionary s_emptyMetadata =
+ new Dictionary();
+
+ public RetryState(
+ PipelineState pipelineState = PipelineState.Ready,
+ long? waitUntilTime = null,
+ int globalRetryCount = 0,
+ Dictionary batchMetadata = null)
+ {
+ PipelineState = pipelineState;
+ WaitUntilTime = waitUntilTime;
+ GlobalRetryCount = globalRetryCount;
+ BatchMetadata = batchMetadata ?? s_emptyMetadata;
+ }
+
+ public bool IsRateLimited(long currentTime)
+ {
+ return PipelineState == PipelineState.RateLimited
+ && WaitUntilTime != null
+ && currentTime < WaitUntilTime.Value;
+ }
+
+ public RetryState With(
+ PipelineState? pipelineState = null,
+ long? waitUntilTime = null,
+ bool clearWaitUntilTime = false,
+ int? globalRetryCount = null,
+ Dictionary batchMetadata = null)
+ {
+ return new RetryState(
+ pipelineState: pipelineState ?? PipelineState,
+ waitUntilTime: clearWaitUntilTime ? null : (waitUntilTime ?? WaitUntilTime),
+ globalRetryCount: globalRetryCount ?? GlobalRetryCount,
+ batchMetadata: batchMetadata ?? BatchMetadata
+ );
+ }
+
+ public RetryState RemoveBatch(string batchFile)
+ {
+ if (!BatchMetadata.ContainsKey(batchFile))
+ return this;
+
+ var newMetadata = BatchMetadata.Where(kvp => kvp.Key != batchFile)
+ .ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
+ return With(batchMetadata: newMetadata);
+ }
+
+ public RetryState SetBatchMetadata(string batchFile, BatchMetadata metadata)
+ {
+ var newMetadata = new Dictionary(BatchMetadata);
+ newMetadata[batchFile] = metadata;
+ return With(batchMetadata: newMetadata);
+ }
+ }
+}
diff --git a/Analytics-CSharp/Segment/Analytics/Retry/RetryStateMachine.cs b/Analytics-CSharp/Segment/Analytics/Retry/RetryStateMachine.cs
new file mode 100644
index 0000000..3538bc8
--- /dev/null
+++ b/Analytics-CSharp/Segment/Analytics/Retry/RetryStateMachine.cs
@@ -0,0 +1,226 @@
+using System;
+using System.Collections.Generic;
+
+namespace Segment.Analytics.Retry
+{
+ internal class RetryStateMachine
+ {
+ private readonly RetryConfig _config;
+ private readonly ITimeProvider _timeProvider;
+ private readonly Random _random;
+
+ public bool IsLegacyMode => !_config.RateLimitConfig.Enabled && !_config.BackoffConfig.Enabled;
+
+ public RetryStateMachine(RetryConfig config, ITimeProvider timeProvider = null, Random random = null)
+ {
+ _config = config ?? new RetryConfig();
+ _timeProvider = timeProvider ?? new SystemTimeProvider();
+ _random = random ?? new Random();
+ }
+
+ public RetryState HandleResponse(RetryState state, ResponseInfo response)
+ {
+ if (IsLegacyMode)
+ {
+ if (response.StatusCode >= 200 && response.StatusCode <= 299)
+ return state.RemoveBatch(response.BatchFile);
+ if (response.StatusCode == 429 || (response.StatusCode >= 500 && response.StatusCode <= 599))
+ return state; // Keep
+ return state.RemoveBatch(response.BatchFile); // Drop on 4xx
+ }
+
+ long currentTime = response.CurrentTime;
+
+ if (response.StatusCode >= 200 && response.StatusCode <= 299)
+ {
+ return state.With(
+ pipelineState: PipelineState.Ready,
+ clearWaitUntilTime: true,
+ globalRetryCount: 0,
+ batchMetadata: RemoveFromMetadata(state, response.BatchFile)
+ );
+ }
+
+ if (response.StatusCode == 429)
+ {
+ if (_config.RateLimitConfig.Enabled)
+ return HandleRateLimitResponse(state, response, currentTime);
+ return state.RemoveBatch(response.BatchFile);
+ }
+
+ RetryBehavior behavior = ResolveStatusCodeBehavior(response.StatusCode);
+ if (behavior == RetryBehavior.Retry && _config.BackoffConfig.Enabled)
+ return HandleRetryableError(state, response, currentTime);
+
+ return state.RemoveBatch(response.BatchFile);
+ }
+
+ public Tuple ShouldUploadBatch(RetryState state, string batchFile)
+ {
+ if (IsLegacyMode)
+ return Tuple.Create(UploadDecision.Proceed, state);
+
+ long currentTime = _timeProvider.CurrentTimeMillis();
+
+ // Check 1: Global rate limiting
+ if (state.IsRateLimited(currentTime))
+ return Tuple.Create(UploadDecision.SkipAllBatches, state);
+
+ // Clear stale rate limit state if it has expired
+ RetryState clearedState = state;
+ if (state.PipelineState == PipelineState.RateLimited
+ && state.WaitUntilTime != null
+ && currentTime >= state.WaitUntilTime.Value)
+ {
+ clearedState = state.With(
+ pipelineState: PipelineState.Ready,
+ clearWaitUntilTime: true
+ );
+ }
+
+ // Check 2: Global rate limit retry count
+ if (_config.RateLimitConfig.Enabled
+ && clearedState.GlobalRetryCount >= _config.RateLimitConfig.MaxRetryCount)
+ {
+ RetryState resetState = clearedState
+ .With(globalRetryCount: 0)
+ .RemoveBatch(batchFile);
+ return Tuple.Create(
+ UploadDecision.DropBatch(DropReason.MaxRetriesExceeded),
+ resetState);
+ }
+
+ // Check 3: Per-batch metadata
+ BatchMetadata metadata;
+ if (clearedState.BatchMetadata.TryGetValue(batchFile, out metadata))
+ {
+ // Check retry count limit
+ if (_config.BackoffConfig.Enabled
+ && metadata.FailureCount >= _config.BackoffConfig.MaxRetryCount)
+ {
+ return Tuple.Create(
+ UploadDecision.DropBatch(DropReason.MaxRetriesExceeded),
+ clearedState.RemoveBatch(batchFile));
+ }
+
+ // Check duration limit
+ if (_config.BackoffConfig.Enabled
+ && metadata.ExceedsMaxDuration(currentTime, _config.BackoffConfig.MaxTotalBackoffDuration * 1000))
+ {
+ return Tuple.Create(
+ UploadDecision.DropBatch(DropReason.MaxDurationExceeded),
+ clearedState.RemoveBatch(batchFile));
+ }
+
+ // Check if backoff time has passed
+ if (_config.BackoffConfig.Enabled && !metadata.ShouldRetry(currentTime))
+ {
+ return Tuple.Create(UploadDecision.SkipThisBatch, clearedState);
+ }
+ }
+
+ return Tuple.Create(UploadDecision.Proceed, clearedState);
+ }
+
+ public int GetRetryCount(RetryState state, string batchFile)
+ {
+ BatchMetadata metadata;
+ int batchRetryCount = state.BatchMetadata.TryGetValue(batchFile, out metadata)
+ ? metadata.FailureCount
+ : 0;
+ return Math.Max(batchRetryCount, state.GlobalRetryCount);
+ }
+
+ public bool ShouldDeleteBatch(int statusCode)
+ {
+ if (IsLegacyMode)
+ return statusCode >= 400 && statusCode <= 499 && statusCode != 429;
+
+ if (statusCode >= 200 && statusCode <= 299)
+ return true;
+
+ if (statusCode == 429)
+ return !_config.RateLimitConfig.Enabled;
+
+ RetryBehavior behavior = ResolveStatusCodeBehavior(statusCode);
+ if (behavior == RetryBehavior.Retry && !_config.BackoffConfig.Enabled)
+ return true;
+
+ return behavior == RetryBehavior.Drop;
+ }
+
+ private RetryState HandleRateLimitResponse(RetryState state, ResponseInfo response, long currentTime)
+ {
+ long waitUntilTimeMs = CalculateWaitUntilTimeMs(response.RetryAfterSeconds, currentTime);
+ return state.With(
+ pipelineState: PipelineState.RateLimited,
+ waitUntilTime: waitUntilTimeMs,
+ globalRetryCount: state.GlobalRetryCount + 1
+ );
+ }
+
+ private long CalculateWaitUntilTimeMs(int? retryAfterSeconds, long currentTime)
+ {
+ int seconds = retryAfterSeconds.HasValue
+ ? Math.Max(retryAfterSeconds.Value, 0)
+ : _config.RateLimitConfig.MaxRetryInterval;
+ int clampedSeconds = Math.Min(seconds, _config.RateLimitConfig.MaxRetryInterval);
+ return currentTime + (clampedSeconds * 1000L);
+ }
+
+ private RetryState HandleRetryableError(RetryState state, ResponseInfo response, long currentTime)
+ {
+ BatchMetadata existing;
+ state.BatchMetadata.TryGetValue(response.BatchFile, out existing);
+
+ int newFailureCount = (existing?.FailureCount ?? 0) + 1;
+ long firstFailureTime = existing?.FirstFailureTime ?? currentTime;
+ long nextRetryTime = currentTime + CalculateBackoffMs(newFailureCount);
+
+ var newMetadata = new BatchMetadata(
+ failureCount: newFailureCount,
+ nextRetryTime: nextRetryTime,
+ firstFailureTime: firstFailureTime
+ );
+
+ return state.SetBatchMetadata(response.BatchFile, newMetadata);
+ }
+
+ private long CalculateBackoffMs(int failureCount)
+ {
+ double baseMs = _config.BackoffConfig.BaseBackoffInterval * 1000;
+ long maxMs = _config.BackoffConfig.MaxBackoffInterval * 1000L;
+
+ double exponentialBackoff = baseMs * Math.Pow(2.0, failureCount - 1);
+ double cappedBackoff = Math.Min(exponentialBackoff, maxMs);
+
+ double jitterAmount = cappedBackoff * (_config.BackoffConfig.JitterPercent / 100.0);
+ double jitter = _random.NextDouble() * jitterAmount;
+
+ return (long)Math.Min(cappedBackoff + jitter, maxMs);
+ }
+
+ private RetryBehavior ResolveStatusCodeBehavior(int code)
+ {
+ RetryBehavior overrideBehavior;
+ if (_config.BackoffConfig.StatusCodeOverrides.TryGetValue(code, out overrideBehavior))
+ return overrideBehavior;
+
+ if (code >= 400 && code <= 499)
+ return _config.BackoffConfig.Default4xxBehavior;
+ if (code >= 500 && code <= 599)
+ return _config.BackoffConfig.Default5xxBehavior;
+ return _config.BackoffConfig.UnknownCodeBehavior;
+ }
+
+ private static Dictionary RemoveFromMetadata(RetryState state, string batchFile)
+ {
+ if (!state.BatchMetadata.ContainsKey(batchFile))
+ return state.BatchMetadata;
+
+ var newMetadata = new Dictionary(state.BatchMetadata);
+ newMetadata.Remove(batchFile);
+ return newMetadata;
+ }
+ }
+}
diff --git a/Analytics-CSharp/Segment/Analytics/Retry/RetryStateStorage.cs b/Analytics-CSharp/Segment/Analytics/Retry/RetryStateStorage.cs
new file mode 100644
index 0000000..f646b65
--- /dev/null
+++ b/Analytics-CSharp/Segment/Analytics/Retry/RetryStateStorage.cs
@@ -0,0 +1,220 @@
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Text;
+using Segment.Analytics.Utilities;
+
+namespace Segment.Analytics.Retry
+{
+ internal static class RetryStateStorage
+ {
+ public static void SaveRetryState(IStorage storage, RetryState state)
+ {
+ try
+ {
+ string json = SerializeState(state);
+ storage.WritePrefs(StorageConstants.RetryState, json);
+ }
+ catch (Exception)
+ {
+ // Defensive: never crash on serialization failure
+ }
+ }
+
+ public static RetryState LoadRetryState(IStorage storage)
+ {
+ try
+ {
+ string json = storage.Read(StorageConstants.RetryState);
+ if (string.IsNullOrEmpty(json))
+ return new RetryState();
+ return DeserializeState(json);
+ }
+ catch (Exception)
+ {
+ return new RetryState();
+ }
+ }
+
+ public static void ClearRetryState(IStorage storage)
+ {
+ storage.Remove(StorageConstants.RetryState);
+ }
+
+ private static string SerializeState(RetryState state)
+ {
+ var sb = new StringBuilder();
+ sb.Append("{");
+ sb.Append("\"pipelineState\":\"").Append((int)state.PipelineState).Append("\"");
+ if (state.WaitUntilTime.HasValue)
+ sb.Append(",\"waitUntilTime\":\"").Append(state.WaitUntilTime.Value.ToString(CultureInfo.InvariantCulture)).Append("\"");
+ sb.Append(",\"globalRetryCount\":\"").Append(state.GlobalRetryCount.ToString(CultureInfo.InvariantCulture)).Append("\"");
+
+ if (state.BatchMetadata.Count > 0)
+ {
+ sb.Append(",\"batchMetadata\":{");
+ bool first = true;
+ foreach (var kvp in state.BatchMetadata)
+ {
+ if (!first) sb.Append(",");
+ first = false;
+ sb.Append("\"").Append(EscapeJsonString(kvp.Key)).Append("\":{");
+ sb.Append("\"failureCount\":\"").Append(kvp.Value.FailureCount.ToString(CultureInfo.InvariantCulture)).Append("\"");
+ if (kvp.Value.NextRetryTime.HasValue)
+ sb.Append(",\"nextRetryTime\":\"").Append(kvp.Value.NextRetryTime.Value.ToString(CultureInfo.InvariantCulture)).Append("\"");
+ if (kvp.Value.FirstFailureTime.HasValue)
+ sb.Append(",\"firstFailureTime\":\"").Append(kvp.Value.FirstFailureTime.Value.ToString(CultureInfo.InvariantCulture)).Append("\"");
+ sb.Append("}");
+ }
+ sb.Append("}");
+ }
+
+ sb.Append("}");
+ return sb.ToString();
+ }
+
+ private static string EscapeJsonString(string s)
+ {
+ return s.Replace("\\", "\\\\").Replace("\"", "\\\"");
+ }
+
+ private static RetryState DeserializeState(string json)
+ {
+ // Manual parsing to avoid Serialization.NET's numeric string coercion.
+ // Format is well-defined since we control serialization.
+ var fields = ParseJsonFields(json);
+
+ PipelineState pipelineState = PipelineState.Ready;
+ if (fields.TryGetValue("pipelineState", out string psVal)
+ && int.TryParse(psVal, NumberStyles.Integer, CultureInfo.InvariantCulture, out int psInt)
+ && psInt == 1)
+ pipelineState = PipelineState.RateLimited;
+
+ long? waitUntilTime = null;
+ if (fields.TryGetValue("waitUntilTime", out string waitStr)
+ && long.TryParse(waitStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out long waitVal))
+ waitUntilTime = waitVal;
+
+ int globalRetryCount = 0;
+ if (fields.TryGetValue("globalRetryCount", out string grcStr)
+ && int.TryParse(grcStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out int grcVal))
+ globalRetryCount = grcVal;
+
+ var batchMetadata = new Dictionary();
+ int bmStart = json.IndexOf("\"batchMetadata\":{", StringComparison.Ordinal);
+ if (bmStart >= 0)
+ {
+ int objStart = json.IndexOf('{', bmStart + 16);
+ string bmJson = ExtractBalancedBraces(json, objStart);
+ if (bmJson != null)
+ batchMetadata = ParseBatchMetadata(bmJson);
+ }
+
+ return new RetryState(pipelineState, waitUntilTime, globalRetryCount, batchMetadata);
+ }
+
+ private static Dictionary ParseJsonFields(string json)
+ {
+ var result = new Dictionary();
+ int i = 0;
+ while (i < json.Length)
+ {
+ int keyStart = json.IndexOf('"', i);
+ if (keyStart < 0) break;
+ int keyEnd = json.IndexOf('"', keyStart + 1);
+ if (keyEnd < 0) break;
+ string key = json.Substring(keyStart + 1, keyEnd - keyStart - 1);
+
+ int colonIdx = json.IndexOf(':', keyEnd + 1);
+ if (colonIdx < 0) break;
+
+ int valStart = colonIdx + 1;
+ while (valStart < json.Length && json[valStart] == ' ') valStart++;
+
+ if (valStart >= json.Length) break;
+
+ if (json[valStart] == '{')
+ {
+ // Skip nested objects
+ i = SkipBraces(json, valStart) + 1;
+ continue;
+ }
+
+ if (json[valStart] == '"')
+ {
+ int valEnd = json.IndexOf('"', valStart + 1);
+ if (valEnd < 0) break;
+ result[key] = json.Substring(valStart + 1, valEnd - valStart - 1);
+ i = valEnd + 1;
+ }
+ else
+ {
+ int valEnd = valStart;
+ while (valEnd < json.Length && json[valEnd] != ',' && json[valEnd] != '}')
+ valEnd++;
+ result[key] = json.Substring(valStart, valEnd - valStart).Trim();
+ i = valEnd;
+ }
+ }
+ return result;
+ }
+
+ private static int SkipBraces(string json, int start)
+ {
+ int depth = 0;
+ for (int i = start; i < json.Length; i++)
+ {
+ if (json[i] == '{') depth++;
+ else if (json[i] == '}') { depth--; if (depth == 0) return i; }
+ }
+ return json.Length - 1;
+ }
+
+ private static string ExtractBalancedBraces(string json, int start)
+ {
+ if (start < 0 || start >= json.Length || json[start] != '{')
+ return null;
+ int end = SkipBraces(json, start);
+ return json.Substring(start, end - start + 1);
+ }
+
+ private static Dictionary ParseBatchMetadata(string json)
+ {
+ var result = new Dictionary();
+ int i = 1; // skip opening {
+ while (i < json.Length)
+ {
+ int keyStart = json.IndexOf('"', i);
+ if (keyStart < 0) break;
+ int keyEnd = json.IndexOf('"', keyStart + 1);
+ if (keyEnd < 0) break;
+ string batchFile = json.Substring(keyStart + 1, keyEnd - keyStart - 1);
+
+ int objStart = json.IndexOf('{', keyEnd + 1);
+ if (objStart < 0) break;
+ string metaJson = ExtractBalancedBraces(json, objStart);
+ if (metaJson == null) break;
+
+ var fields = ParseJsonFields(metaJson);
+
+ int failureCount = 0;
+ if (fields.TryGetValue("failureCount", out string fcStr))
+ int.TryParse(fcStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out failureCount);
+
+ long? nextRetryTime = null;
+ if (fields.TryGetValue("nextRetryTime", out string nrtStr)
+ && long.TryParse(nrtStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out long nrtVal))
+ nextRetryTime = nrtVal;
+
+ long? firstFailureTime = null;
+ if (fields.TryGetValue("firstFailureTime", out string fftStr)
+ && long.TryParse(fftStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out long fftVal))
+ firstFailureTime = fftVal;
+
+ result[batchFile] = new BatchMetadata(failureCount, nextRetryTime, firstFailureTime);
+ i = objStart + metaJson.Length + 1;
+ }
+ return result;
+ }
+ }
+}
diff --git a/Analytics-CSharp/Segment/Analytics/Retry/RetryTypes.cs b/Analytics-CSharp/Segment/Analytics/Retry/RetryTypes.cs
new file mode 100644
index 0000000..7a87348
--- /dev/null
+++ b/Analytics-CSharp/Segment/Analytics/Retry/RetryTypes.cs
@@ -0,0 +1,58 @@
+namespace Segment.Analytics.Retry
+{
+ internal enum PipelineState
+ {
+ Ready,
+ RateLimited
+ }
+
+ internal enum RetryBehavior
+ {
+ Retry,
+ Drop
+ }
+
+ internal enum DropReason
+ {
+ MaxRetriesExceeded,
+ MaxDurationExceeded,
+ NonRetryableError
+ }
+
+ internal abstract class UploadDecision
+ {
+ public static readonly UploadDecision Proceed = new ProceedDecision();
+ public static readonly UploadDecision SkipThisBatch = new SkipThisBatchDecision();
+ public static readonly UploadDecision SkipAllBatches = new SkipAllBatchesDecision();
+
+ public static UploadDecision DropBatch(DropReason reason) => new DropBatchDecision(reason);
+
+ private UploadDecision() { }
+
+ internal sealed class ProceedDecision : UploadDecision { }
+ internal sealed class SkipThisBatchDecision : UploadDecision { }
+ internal sealed class SkipAllBatchesDecision : UploadDecision { }
+
+ internal sealed class DropBatchDecision : UploadDecision
+ {
+ public DropReason Reason { get; }
+ public DropBatchDecision(DropReason reason) { Reason = reason; }
+ }
+ }
+
+ internal class ResponseInfo
+ {
+ public int StatusCode { get; }
+ public int? RetryAfterSeconds { get; }
+ public string BatchFile { get; }
+ public long CurrentTime { get; }
+
+ public ResponseInfo(int statusCode, int? retryAfterSeconds, string batchFile, long currentTime)
+ {
+ StatusCode = statusCode;
+ RetryAfterSeconds = retryAfterSeconds;
+ BatchFile = batchFile;
+ CurrentTime = currentTime;
+ }
+ }
+}
diff --git a/Analytics-CSharp/Segment/Analytics/Retry/TimeProvider.cs b/Analytics-CSharp/Segment/Analytics/Retry/TimeProvider.cs
new file mode 100644
index 0000000..bdfecca
--- /dev/null
+++ b/Analytics-CSharp/Segment/Analytics/Retry/TimeProvider.cs
@@ -0,0 +1,14 @@
+using System;
+
+namespace Segment.Analytics.Retry
+{
+ internal interface ITimeProvider
+ {
+ long CurrentTimeMillis();
+ }
+
+ internal class SystemTimeProvider : ITimeProvider
+ {
+ public long CurrentTimeMillis() => DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
+ }
+}
diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs b/Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs
index 21fe7ce..e390bae 100644
--- a/Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs
+++ b/Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs
@@ -2,6 +2,7 @@
using global::System;
using global::System.Linq;
using Segment.Analytics.Policies;
+using Segment.Analytics.Retry;
using Segment.Concurrent;
using Segment.Serialization;
@@ -19,10 +20,14 @@ public class EventPipeline: IEventPipeline
private Channel _uploadChannel;
- private readonly HTTPClient _httpClient;
+ internal readonly HTTPClient _httpClient;
private readonly IStorage _storage;
+ internal RetryStateMachine _retryStateMachine;
+
+ private RetryState _retryState;
+
public string ApiHost { get; set; }
public bool Running { get; private set; }
@@ -39,6 +44,15 @@ public EventPipeline(
string apiKey,
IList flushPolicies,
string apiHost = HTTPClient.DefaultAPIHost)
+ : this(analytics, logTag, apiKey, flushPolicies, apiHost, (HttpConfig)null) { }
+
+ internal EventPipeline(
+ Analytics analytics,
+ string logTag,
+ string apiKey,
+ IList flushPolicies,
+ string apiHost,
+ HttpConfig httpConfig)
{
_analytics = analytics;
_logTag = logTag;
@@ -51,6 +65,20 @@ public EventPipeline(
_httpClient.AnalyticsRef = analytics;
_storage = analytics.Storage;
Running = false;
+
+ var retryConfig = httpConfig != null
+ ? new RetryConfig(httpConfig.RateLimitConfig, httpConfig.BackoffConfig)
+ : new RetryConfig();
+ _retryStateMachine = new RetryStateMachine(retryConfig);
+ _retryState = RetryStateStorage.LoadRetryState(_storage);
+ }
+
+ internal void UpdateHttpConfig(HttpConfig config)
+ {
+ var retryConfig = config != null
+ ? new RetryConfig(config.RateLimitConfig, config.BackoffConfig)
+ : new RetryConfig();
+ _retryStateMachine = new RetryStateMachine(retryConfig);
}
public void Put(RawEvent @event) => _writeChannel.Send(@event);
@@ -134,27 +162,85 @@ private void Upload() => _analytics.AnalyticsScope.Launch(_analytics.NetworkIODi
foreach (string url in fileUrlList)
{
if (string.IsNullOrEmpty(url))
+ continue;
+
+ var decision = _retryStateMachine.ShouldUploadBatch(_retryState, url);
+ _retryState = decision.Item2;
+
+ if (decision.Item1 is UploadDecision.SkipAllBatchesDecision)
+ {
+ Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " skipping uploads: pipeline is rate-limited");
+ break;
+ }
+ if (decision.Item1 is UploadDecision.SkipThisBatchDecision)
{
+ Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " skipping batch " + url + ": not ready for retry");
+ continue;
+ }
+ if (decision.Item1 is UploadDecision.DropBatchDecision dropDecision)
+ {
+ Analytics.Logger.Log(LogLevel.Error, message: _logTag + " dropping batch " + url + ": " + dropDecision.Reason);
+ _analytics.ReportInternalError(AnalyticsErrorType.NetworkServerRejected,
+ message: "Batch dropped: " + dropDecision.Reason);
+ _storage.RemoveFile(url);
+ RetryStateStorage.SaveRetryState(_storage, _retryState);
continue;
}
+ // Proceed with upload
byte[] data = _storage.ReadAsBytes(url);
if (data == null)
- {
continue;
- }
+ int retryCount = _retryStateMachine.GetRetryCount(_retryState, url);
+ int statusCode = 0;
+ int? retryAfterSeconds = null;
bool shouldCleanup = true;
+
try
{
- shouldCleanup = await _httpClient.Upload(data);
- Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " uploaded " + url);
+ HTTPClient.Response response = await _httpClient.UploadWithResponse(data, retryCount);
+ statusCode = response.StatusCode;
+
+ if (!string.IsNullOrEmpty(response.RetryAfterHeader)
+ && int.TryParse(response.RetryAfterHeader.Trim(), out int parsedRetryAfter))
+ {
+ retryAfterSeconds = parsedRetryAfter;
+ }
+
+ if (response.IsSuccessStatusCode)
+ {
+ Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " uploaded " + url);
+ shouldCleanup = true;
+ }
+ else
+ {
+ Analytics.Logger.Log(LogLevel.Error, message: "Error " + statusCode + " uploading " + url);
+ shouldCleanup = _retryStateMachine.ShouldDeleteBatch(statusCode);
+ if (shouldCleanup)
+ {
+ _analytics.ReportInternalError(AnalyticsErrorType.NetworkServerRejected,
+ message: "HTTP " + statusCode + ": batch rejected by server");
+ }
+ }
}
catch (Exception e)
{
Analytics.Logger.Log(LogLevel.Error, e, _logTag + ": Error uploading to url");
+ statusCode = 0;
+ shouldCleanup = false;
}
+ // Update retry state based on response
+ var responseInfo = new ResponseInfo(
+ statusCode: statusCode > 0 ? statusCode : 500,
+ retryAfterSeconds: retryAfterSeconds,
+ batchFile: url,
+ currentTime: DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
+ );
+ _retryState = _retryStateMachine.HandleResponse(_retryState, responseInfo);
+ RetryStateStorage.SaveRetryState(_storage, _retryState);
+
if (shouldCleanup)
{
_storage.RemoveFile(url);
diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/HTTPClient.cs b/Analytics-CSharp/Segment/Analytics/Utilities/HTTPClient.cs
index 552383b..85b955f 100644
--- a/Analytics-CSharp/Segment/Analytics/Utilities/HTTPClient.cs
+++ b/Analytics-CSharp/Segment/Analytics/Utilities/HTTPClient.cs
@@ -1,10 +1,12 @@
using System;
using System.IO;
using System.IO.Compression;
+using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
+using Segment.Analytics.Retry;
using Segment.Serialization;
namespace Segment.Analytics.Utilities
@@ -105,22 +107,18 @@ public virtual async Task Upload(byte[] data)
{
Analytics.Logger.Log(LogLevel.Error, message: "Error " + response.StatusCode + " uploading to url");
- switch (response.StatusCode)
+ if (response.StatusCode == 429)
{
- case var n when n >= 1 && n < 300:
- return false;
- case var n when n >= 300 && n < 400:
- AnalyticsRef?.ReportInternalError(AnalyticsErrorType.NetworkUnexpectedHttpCode, message: "Response code: " + n);
- return false;
- case 429:
- AnalyticsRef?.ReportInternalError(AnalyticsErrorType.NetworkServerLimited, message: "Response code: 429");
- return false;
- case var n when n >= 400 && n < 500:
- AnalyticsRef?.ReportInternalError(AnalyticsErrorType.NetworkServerRejected, message: "Response code: " + n + ". Payloads were rejected by server. Marked for removal.");
- return true;
- default:
- return false;
+ AnalyticsRef?.ReportInternalError(AnalyticsErrorType.NetworkServerLimited, message: "Response code: 429");
+ return false;
}
+ if (response.StatusCode >= 400 && response.StatusCode < 500)
+ {
+ AnalyticsRef?.ReportInternalError(AnalyticsErrorType.NetworkServerRejected, message: "Response code: " + response.StatusCode + ". Payloads were rejected by server. Marked for removal.");
+ return true;
+ }
+ // 5xx and others — keep for retry
+ return false;
}
return true;
@@ -133,6 +131,12 @@ public virtual async Task Upload(byte[] data)
return false;
}
+ internal virtual async Task UploadWithResponse(byte[] data, int retryCount = 0)
+ {
+ string uploadURL = SegmentURL(_apiHost, "/b");
+ return await DoPost(uploadURL, data, retryCount);
+ }
+
///
/// Handle GET request
///
@@ -148,6 +152,16 @@ public virtual async Task Upload(byte[] data)
/// Awaitable response of the POST request
public abstract Task DoPost(string url, byte[] data);
+ ///
+ /// Handle POST request with retry count for the X-Retry-Count header.
+ /// Default implementation calls DoPost(url, data) — override in subclasses
+ /// that support the retry count header.
+ ///
+ public virtual Task DoPost(string url, byte[] data, int retryCount)
+ {
+ return DoPost(url, data);
+ }
+
///
/// A wrapper class for http response, so that the HTTPClient is
/// not dependent on a specific network library.
@@ -164,6 +178,11 @@ public class Response
///
public string Content { get; set; }
+ ///
+ /// Value of the Retry-After response header, or null if absent.
+ ///
+ public string RetryAfterHeader { get; set; }
+
///
/// A convenient method to check if the http request is successful
///
@@ -201,7 +220,12 @@ public override async Task DoGet(string url)
return result;
}
- public override async Task DoPost(string url, byte[] data)
+ public override Task DoPost(string url, byte[] data)
+ {
+ return DoPost(url, data, 0);
+ }
+
+ public override async Task DoPost(string url, byte[] data, int retryCount)
{
using (MemoryStream ms = new MemoryStream())
{
@@ -217,10 +241,21 @@ public override async Task DoPost(string url, byte[] data)
var request = new HttpRequestMessage(HttpMethod.Post, url);
request.Headers.Add("Connection", "close");
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/plain"));
+ if (retryCount > 0)
+ request.Headers.Add("X-Retry-Count", retryCount.ToString());
request.Content = streamContent;
HttpResponseMessage response = await _httpClient.SendAsync(request);
- var result = new Response {StatusCode = (int)response.StatusCode};
+
+ string retryAfterHeader = null;
+ if (response.Headers.TryGetValues("Retry-After", out var values))
+ retryAfterHeader = values.FirstOrDefault();
+
+ var result = new Response
+ {
+ StatusCode = (int)response.StatusCode,
+ RetryAfterHeader = retryAfterHeader
+ };
response.Dispose();
return result;
diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/Storage.cs b/Analytics-CSharp/Segment/Analytics/Utilities/Storage.cs
index a3ae9b9..b543a0e 100644
--- a/Analytics-CSharp/Segment/Analytics/Utilities/Storage.cs
+++ b/Analytics-CSharp/Segment/Analytics/Utilities/Storage.cs
@@ -29,12 +29,14 @@ public readonly struct StorageConstants
public const string _AnonymousId = "segment.anonymousId";
public const string _Settings = "segment.settings";
public const string _Events = "segment.events";
+ public const string _RetryState = "segment.retry.state";
// enum alternatives
public static readonly StorageConstants UserId = new StorageConstants(_UserId);
public static readonly StorageConstants Traits = new StorageConstants(_Traits);
public static readonly StorageConstants AnonymousId = new StorageConstants(_AnonymousId);
public static readonly StorageConstants Settings = new StorageConstants(_Settings);
public static readonly StorageConstants Events = new StorageConstants(_Events);
+ public static readonly StorageConstants RetryState = new StorageConstants(_RetryState);
}
#endregion
diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs b/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs
index 4411e6f..ad2e1a8 100644
--- a/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs
+++ b/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs
@@ -4,6 +4,7 @@
using global::System;
using global::System.Linq;
using Segment.Analytics.Policies;
+using Segment.Analytics.Retry;
using Segment.Concurrent;
using Segment.Serialization;
@@ -33,10 +34,14 @@ public class SyncEventPipeline: IEventPipeline
private Channel _uploadChannel;
- private readonly HTTPClient _httpClient;
+ internal readonly HTTPClient _httpClient;
private readonly IStorage _storage;
+ internal RetryStateMachine _retryStateMachine;
+
+ private RetryState _retryState;
+
public string ApiHost { get; set; }
public bool Running { get; private set; }
@@ -52,6 +57,17 @@ public SyncEventPipeline(
string apiHost = HTTPClient.DefaultAPIHost,
int flushTimeout = -1,
CancellationToken? flushCancellationToken = null)
+ : this(analytics, logTag, apiKey, flushPolicies, apiHost, flushTimeout, flushCancellationToken, null) { }
+
+ internal SyncEventPipeline(
+ Analytics analytics,
+ string logTag,
+ string apiKey,
+ IList flushPolicies,
+ string apiHost,
+ int flushTimeout,
+ CancellationToken? flushCancellationToken,
+ HttpConfig httpConfig)
{
_analytics = analytics;
_logTag = logTag;
@@ -66,6 +82,20 @@ public SyncEventPipeline(
Running = false;
_flushTimeout = flushTimeout;
_flushCancellationToken = flushCancellationToken ?? CancellationToken.None;
+
+ var retryConfig = httpConfig != null
+ ? new RetryConfig(httpConfig.RateLimitConfig, httpConfig.BackoffConfig)
+ : new RetryConfig();
+ _retryStateMachine = new RetryStateMachine(retryConfig);
+ _retryState = RetryStateStorage.LoadRetryState(_storage);
+ }
+
+ internal void UpdateHttpConfig(HttpConfig config)
+ {
+ var retryConfig = config != null
+ ? new RetryConfig(config.RateLimitConfig, config.BackoffConfig)
+ : new RetryConfig();
+ _retryStateMachine = new RetryStateMachine(retryConfig);
}
public void Put(RawEvent @event) => _writeChannel.Send(@event);
@@ -77,7 +107,7 @@ public void Flush() {
_writeChannel.Send(flushEvent);
flushEvent._semaphore.Wait(_flushTimeout, _flushCancellationToken);
}
- }
+ }
public void Start()
{
@@ -157,27 +187,85 @@ private void Upload() => _analytics.AnalyticsScope.Launch(_analytics.NetworkIODi
foreach (string url in fileUrlList)
{
if (string.IsNullOrEmpty(url))
+ continue;
+
+ var decision = _retryStateMachine.ShouldUploadBatch(_retryState, url);
+ _retryState = decision.Item2;
+
+ if (decision.Item1 is UploadDecision.SkipAllBatchesDecision)
+ {
+ Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " skipping uploads: pipeline is rate-limited");
+ break;
+ }
+ if (decision.Item1 is UploadDecision.SkipThisBatchDecision)
{
+ Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " skipping batch " + url + ": not ready for retry");
+ continue;
+ }
+ if (decision.Item1 is UploadDecision.DropBatchDecision dropDecision)
+ {
+ Analytics.Logger.Log(LogLevel.Error, message: _logTag + " dropping batch " + url + ": " + dropDecision.Reason);
+ _analytics.ReportInternalError(AnalyticsErrorType.NetworkServerRejected,
+ message: "Batch dropped: " + dropDecision.Reason);
+ _storage.RemoveFile(url);
+ RetryStateStorage.SaveRetryState(_storage, _retryState);
continue;
}
+ // Proceed with upload
byte[] data = _storage.ReadAsBytes(url);
if (data == null)
- {
continue;
- }
+ int retryCount = _retryStateMachine.GetRetryCount(_retryState, url);
+ int statusCode = 0;
+ int? retryAfterSeconds = null;
bool shouldCleanup = true;
+
try
{
- shouldCleanup = await _httpClient.Upload(data);
- Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " uploaded " + url);
+ HTTPClient.Response response = await _httpClient.UploadWithResponse(data, retryCount);
+ statusCode = response.StatusCode;
+
+ if (!string.IsNullOrEmpty(response.RetryAfterHeader)
+ && int.TryParse(response.RetryAfterHeader.Trim(), out int parsedRetryAfter))
+ {
+ retryAfterSeconds = parsedRetryAfter;
+ }
+
+ if (response.IsSuccessStatusCode)
+ {
+ Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " uploaded " + url);
+ shouldCleanup = true;
+ }
+ else
+ {
+ Analytics.Logger.Log(LogLevel.Error, message: "Error " + statusCode + " uploading " + url);
+ shouldCleanup = _retryStateMachine.ShouldDeleteBatch(statusCode);
+ if (shouldCleanup)
+ {
+ _analytics.ReportInternalError(AnalyticsErrorType.NetworkServerRejected,
+ message: "HTTP " + statusCode + ": batch rejected by server");
+ }
+ }
}
catch (Exception e)
{
Analytics.Logger.Log(LogLevel.Error, e, _logTag + ": Error uploading to url");
+ statusCode = 0;
+ shouldCleanup = false;
}
+ // Update retry state based on response
+ var responseInfo = new ResponseInfo(
+ statusCode: statusCode > 0 ? statusCode : 500,
+ retryAfterSeconds: retryAfterSeconds,
+ batchFile: url,
+ currentTime: DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
+ );
+ _retryState = _retryStateMachine.HandleResponse(_retryState, responseInfo);
+ RetryStateStorage.SaveRetryState(_storage, _retryState);
+
if (shouldCleanup)
{
_storage.RemoveFile(url);
diff --git a/Tests/Retry/HttpConfigParserTest.cs b/Tests/Retry/HttpConfigParserTest.cs
new file mode 100644
index 0000000..89ff4cf
--- /dev/null
+++ b/Tests/Retry/HttpConfigParserTest.cs
@@ -0,0 +1,107 @@
+using Segment.Analytics.Retry;
+using Segment.Serialization;
+using Xunit;
+
+namespace Tests.Retry
+{
+ public class HttpConfigParserTest
+ {
+ [Fact]
+ public void Parse_Null_ReturnsNull()
+ {
+ Assert.Null(HttpConfigParser.Parse(null));
+ }
+
+ [Fact]
+ public void Parse_EmptyObject_DefaultsEnabledTrue()
+ {
+ var json = JsonUtility.FromJson("{}");
+ HttpConfig config = HttpConfigParser.Parse(json);
+
+ Assert.NotNull(config);
+ Assert.True(config.RateLimitConfig.Enabled);
+ Assert.True(config.BackoffConfig.Enabled);
+ }
+
+ [Fact]
+ public void Parse_ExplicitEnabled_Respected()
+ {
+ var json = JsonUtility.FromJson(
+ "{\"rateLimitConfig\":{\"enabled\":\"false\"},\"backoffConfig\":{\"enabled\":\"true\"}}");
+ HttpConfig config = HttpConfigParser.Parse(json);
+
+ Assert.False(config.RateLimitConfig.Enabled);
+ Assert.True(config.BackoffConfig.Enabled);
+ }
+
+ [Fact]
+ public void Parse_BackoffConfig_ParsesValues()
+ {
+ var json = JsonUtility.FromJson(
+ "{\"backoffConfig\":{\"maxRetryCount\":\"5\",\"baseBackoffInterval\":\"1.0\",\"maxBackoffInterval\":\"60\"}}");
+ HttpConfig config = HttpConfigParser.Parse(json);
+
+ Assert.Equal(5, config.BackoffConfig.MaxRetryCount);
+ Assert.Equal(1.0, config.BackoffConfig.BaseBackoffInterval);
+ Assert.Equal(60, config.BackoffConfig.MaxBackoffInterval);
+ }
+
+ [Fact]
+ public void Parse_RateLimitConfig_ParsesValues()
+ {
+ var json = JsonUtility.FromJson(
+ "{\"rateLimitConfig\":{\"maxRetryCount\":\"10\",\"maxRetryInterval\":\"120\"}}");
+ HttpConfig config = HttpConfigParser.Parse(json);
+
+ Assert.Equal(10, config.RateLimitConfig.MaxRetryCount);
+ Assert.Equal(120, config.RateLimitConfig.MaxRetryInterval);
+ }
+
+ [Fact]
+ public void Parse_StatusCodeOverrides_Parsed()
+ {
+ var json = JsonUtility.FromJson(
+ "{\"backoffConfig\":{\"statusCodeOverrides\":{\"400\":\"retry\",\"500\":\"drop\"}}}");
+ HttpConfig config = HttpConfigParser.Parse(json);
+
+ Assert.Equal(RetryBehavior.Retry, config.BackoffConfig.StatusCodeOverrides[400]);
+ Assert.Equal(RetryBehavior.Drop, config.BackoffConfig.StatusCodeOverrides[500]);
+ }
+
+ [Fact]
+ public void Parse_InvalidStatusCodeOverrides_Filtered()
+ {
+ var json = JsonUtility.FromJson(
+ "{\"backoffConfig\":{\"statusCodeOverrides\":{\"abc\":\"retry\",\"999\":\"retry\",\"200\":\"invalid\"}}}");
+ HttpConfig config = HttpConfigParser.Parse(json);
+
+ Assert.Empty(config.BackoffConfig.StatusCodeOverrides);
+ }
+
+ [Fact]
+ public void Parse_ClampsValues()
+ {
+ var json = JsonUtility.FromJson(
+ "{\"rateLimitConfig\":{\"maxRetryCount\":\"9999\",\"maxRetryInterval\":\"99999\"}," +
+ "\"backoffConfig\":{\"baseBackoffInterval\":\"999\",\"maxBackoffInterval\":\"99999\"}}");
+ HttpConfig config = HttpConfigParser.Parse(json);
+
+ Assert.Equal(1000, config.RateLimitConfig.MaxRetryCount);
+ Assert.Equal(3600, config.RateLimitConfig.MaxRetryInterval);
+ Assert.Equal(60.0, config.BackoffConfig.BaseBackoffInterval);
+ Assert.Equal(3600, config.BackoffConfig.MaxBackoffInterval);
+ }
+
+ [Fact]
+ public void Parse_PartialConfig_UsesDefaults()
+ {
+ var json = JsonUtility.FromJson(
+ "{\"backoffConfig\":{\"maxRetryCount\":\"50\"}}");
+ HttpConfig config = HttpConfigParser.Parse(json);
+
+ Assert.Equal(50, config.BackoffConfig.MaxRetryCount);
+ Assert.Equal(0.5, config.BackoffConfig.BaseBackoffInterval); // default
+ Assert.Equal(300, config.BackoffConfig.MaxBackoffInterval); // default
+ }
+ }
+}
diff --git a/Tests/Retry/RetryStateMachineTest.cs b/Tests/Retry/RetryStateMachineTest.cs
new file mode 100644
index 0000000..69b0250
--- /dev/null
+++ b/Tests/Retry/RetryStateMachineTest.cs
@@ -0,0 +1,413 @@
+using System;
+using Segment.Analytics.Retry;
+using Xunit;
+
+namespace Tests.Retry
+{
+ public class FakeTimeProvider : ITimeProvider
+ {
+ public long Time { get; set; }
+ public long CurrentTimeMillis() => Time;
+ }
+
+ public class RetryStateMachineTest
+ {
+ private RetryStateMachine CreateMachine(
+ bool rateLimitEnabled = true,
+ bool backoffEnabled = true,
+ int maxRetryCount = 100,
+ int maxRetryInterval = 300,
+ FakeTimeProvider timeProvider = null)
+ {
+ var config = new RetryConfig(
+ new RateLimitConfig(enabled: rateLimitEnabled, maxRetryCount: maxRetryCount, maxRetryInterval: maxRetryInterval),
+ new BackoffConfig(enabled: backoffEnabled, maxRetryCount: maxRetryCount)
+ );
+ return new RetryStateMachine(config, timeProvider ?? new FakeTimeProvider(), new Random(42));
+ }
+
+ [Fact]
+ public void LegacyMode_BothDisabled()
+ {
+ var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: false);
+ Assert.True(machine.IsLegacyMode);
+ }
+
+ [Fact]
+ public void NotLegacyMode_WhenEitherEnabled()
+ {
+ var machine = CreateMachine(rateLimitEnabled: true, backoffEnabled: false);
+ Assert.False(machine.IsLegacyMode);
+ }
+
+ // --- HandleResponse tests ---
+
+ [Fact]
+ public void HandleResponse_Success_ClearsState()
+ {
+ var machine = CreateMachine();
+ var state = new RetryState(globalRetryCount: 5);
+ var response = new ResponseInfo(200, null, "batch1.json", 1000);
+
+ RetryState newState = machine.HandleResponse(state, response);
+
+ Assert.Equal(PipelineState.Ready, newState.PipelineState);
+ Assert.Equal(0, newState.GlobalRetryCount);
+ Assert.Null(newState.WaitUntilTime);
+ Assert.False(newState.BatchMetadata.ContainsKey("batch1.json"));
+ }
+
+ [Fact]
+ public void HandleResponse_429_SetsRateLimited()
+ {
+ var machine = CreateMachine(maxRetryInterval: 300);
+ var state = new RetryState();
+ var response = new ResponseInfo(429, 60, "batch1.json", 1000);
+
+ RetryState newState = machine.HandleResponse(state, response);
+
+ Assert.Equal(PipelineState.RateLimited, newState.PipelineState);
+ Assert.Equal(1, newState.GlobalRetryCount);
+ Assert.Equal(61000L, newState.WaitUntilTime); // 1000 + 60*1000
+ }
+
+ [Fact]
+ public void HandleResponse_429_ClampsRetryAfter()
+ {
+ var machine = CreateMachine(maxRetryInterval: 10);
+ var state = new RetryState();
+ var response = new ResponseInfo(429, 999, "batch1.json", 1000);
+
+ RetryState newState = machine.HandleResponse(state, response);
+
+ // Clamped to maxRetryInterval=10
+ Assert.Equal(11000L, newState.WaitUntilTime); // 1000 + 10*1000
+ }
+
+ [Fact]
+ public void HandleResponse_429_NullRetryAfter_UsesMaxInterval()
+ {
+ var machine = CreateMachine(maxRetryInterval: 300);
+ var state = new RetryState();
+ var response = new ResponseInfo(429, null, "batch1.json", 1000);
+
+ RetryState newState = machine.HandleResponse(state, response);
+
+ Assert.Equal(301000L, newState.WaitUntilTime); // 1000 + 300*1000
+ }
+
+ [Fact]
+ public void HandleResponse_429_RateLimitDisabled_DropsBatch()
+ {
+ var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: true);
+ var state = new RetryState(
+ batchMetadata: new System.Collections.Generic.Dictionary
+ {
+ { "batch1.json", new BatchMetadata(failureCount: 1) }
+ });
+ var response = new ResponseInfo(429, 60, "batch1.json", 1000);
+
+ RetryState newState = machine.HandleResponse(state, response);
+
+ Assert.False(newState.BatchMetadata.ContainsKey("batch1.json"));
+ }
+
+ [Fact]
+ public void HandleResponse_500_TracksBackoff()
+ {
+ var machine = CreateMachine();
+ var state = new RetryState();
+ var response = new ResponseInfo(500, null, "batch1.json", 1000);
+
+ RetryState newState = machine.HandleResponse(state, response);
+
+ Assert.True(newState.BatchMetadata.ContainsKey("batch1.json"));
+ Assert.Equal(1, newState.BatchMetadata["batch1.json"].FailureCount);
+ Assert.Equal(1000L, newState.BatchMetadata["batch1.json"].FirstFailureTime);
+ Assert.NotNull(newState.BatchMetadata["batch1.json"].NextRetryTime);
+ }
+
+ [Fact]
+ public void HandleResponse_500_IncreasesBackoff()
+ {
+ var machine = CreateMachine();
+ var existing = new BatchMetadata(failureCount: 2, firstFailureTime: 0, nextRetryTime: 500);
+ var state = new RetryState(
+ batchMetadata: new System.Collections.Generic.Dictionary
+ {
+ { "batch1.json", existing }
+ });
+ var response = new ResponseInfo(500, null, "batch1.json", 5000);
+
+ RetryState newState = machine.HandleResponse(state, response);
+
+ Assert.Equal(3, newState.BatchMetadata["batch1.json"].FailureCount);
+ Assert.True(newState.BatchMetadata["batch1.json"].NextRetryTime > 5000);
+ }
+
+ [Fact]
+ public void HandleResponse_400_DropsBatch()
+ {
+ var machine = CreateMachine();
+ var state = new RetryState(
+ batchMetadata: new System.Collections.Generic.Dictionary
+ {
+ { "batch1.json", new BatchMetadata(failureCount: 1) }
+ });
+ var response = new ResponseInfo(400, null, "batch1.json", 1000);
+
+ RetryState newState = machine.HandleResponse(state, response);
+
+ Assert.False(newState.BatchMetadata.ContainsKey("batch1.json"));
+ }
+
+ [Fact]
+ public void HandleResponse_501_DropsBatch()
+ {
+ var machine = CreateMachine();
+ var state = new RetryState();
+ var response = new ResponseInfo(501, null, "batch1.json", 1000);
+
+ RetryState newState = machine.HandleResponse(state, response);
+
+ Assert.False(newState.BatchMetadata.ContainsKey("batch1.json"));
+ }
+
+ [Fact]
+ public void HandleResponse_408_RetriesWithBackoff()
+ {
+ var machine = CreateMachine();
+ var state = new RetryState();
+ var response = new ResponseInfo(408, null, "batch1.json", 1000);
+
+ RetryState newState = machine.HandleResponse(state, response);
+
+ Assert.True(newState.BatchMetadata.ContainsKey("batch1.json"));
+ Assert.Equal(1, newState.BatchMetadata["batch1.json"].FailureCount);
+ }
+
+ // --- Legacy mode tests ---
+
+ [Fact]
+ public void HandleResponse_LegacyMode_429_KeepsBatch()
+ {
+ var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: false);
+ var state = new RetryState();
+ var response = new ResponseInfo(429, null, "batch1.json", 1000);
+
+ RetryState newState = machine.HandleResponse(state, response);
+
+ Assert.Same(state, newState); // unchanged
+ }
+
+ [Fact]
+ public void HandleResponse_LegacyMode_500_KeepsBatch()
+ {
+ var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: false);
+ var state = new RetryState();
+ var response = new ResponseInfo(500, null, "batch1.json", 1000);
+
+ RetryState newState = machine.HandleResponse(state, response);
+
+ Assert.Same(state, newState);
+ }
+
+ [Fact]
+ public void HandleResponse_LegacyMode_400_DropsBatch()
+ {
+ var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: false);
+ var state = new RetryState(
+ batchMetadata: new System.Collections.Generic.Dictionary
+ {
+ { "batch1.json", new BatchMetadata(failureCount: 1) }
+ });
+ var response = new ResponseInfo(400, null, "batch1.json", 1000);
+
+ RetryState newState = machine.HandleResponse(state, response);
+
+ Assert.False(newState.BatchMetadata.ContainsKey("batch1.json"));
+ }
+
+ // --- ShouldUploadBatch tests ---
+
+ [Fact]
+ public void ShouldUploadBatch_LegacyMode_AlwaysProceeds()
+ {
+ var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: false);
+ var state = new RetryState();
+
+ var result = machine.ShouldUploadBatch(state, "batch1.json");
+
+ Assert.IsType(result.Item1);
+ }
+
+ [Fact]
+ public void ShouldUploadBatch_RateLimited_SkipsAll()
+ {
+ var tp = new FakeTimeProvider { Time = 5000 };
+ var machine = CreateMachine(timeProvider: tp);
+ var state = new RetryState(
+ pipelineState: PipelineState.RateLimited,
+ waitUntilTime: 10000);
+
+ var result = machine.ShouldUploadBatch(state, "batch1.json");
+
+ Assert.IsType(result.Item1);
+ }
+
+ [Fact]
+ public void ShouldUploadBatch_RateLimitExpired_Proceeds()
+ {
+ var tp = new FakeTimeProvider { Time = 15000 };
+ var machine = CreateMachine(timeProvider: tp);
+ var state = new RetryState(
+ pipelineState: PipelineState.RateLimited,
+ waitUntilTime: 10000);
+
+ var result = machine.ShouldUploadBatch(state, "batch1.json");
+
+ Assert.IsType(result.Item1);
+ Assert.Equal(PipelineState.Ready, result.Item2.PipelineState);
+ }
+
+ [Fact]
+ public void ShouldUploadBatch_MaxRetriesExceeded_DropsBatch()
+ {
+ var tp = new FakeTimeProvider { Time = 1000 };
+ var machine = CreateMachine(maxRetryCount: 3, timeProvider: tp);
+ var state = new RetryState(
+ batchMetadata: new System.Collections.Generic.Dictionary
+ {
+ { "batch1.json", new BatchMetadata(failureCount: 3, nextRetryTime: 0, firstFailureTime: 0) }
+ });
+
+ var result = machine.ShouldUploadBatch(state, "batch1.json");
+
+ Assert.IsType(result.Item1);
+ Assert.Equal(DropReason.MaxRetriesExceeded,
+ ((UploadDecision.DropBatchDecision)result.Item1).Reason);
+ }
+
+ [Fact]
+ public void ShouldUploadBatch_BackoffNotReady_Skips()
+ {
+ var tp = new FakeTimeProvider { Time = 1000 };
+ var machine = CreateMachine(timeProvider: tp);
+ var state = new RetryState(
+ batchMetadata: new System.Collections.Generic.Dictionary
+ {
+ { "batch1.json", new BatchMetadata(failureCount: 1, nextRetryTime: 5000, firstFailureTime: 0) }
+ });
+
+ var result = machine.ShouldUploadBatch(state, "batch1.json");
+
+ Assert.IsType(result.Item1);
+ }
+
+ [Fact]
+ public void ShouldUploadBatch_BackoffReady_Proceeds()
+ {
+ var tp = new FakeTimeProvider { Time = 6000 };
+ var machine = CreateMachine(timeProvider: tp);
+ var state = new RetryState(
+ batchMetadata: new System.Collections.Generic.Dictionary
+ {
+ { "batch1.json", new BatchMetadata(failureCount: 1, nextRetryTime: 5000, firstFailureTime: 0) }
+ });
+
+ var result = machine.ShouldUploadBatch(state, "batch1.json");
+
+ Assert.IsType(result.Item1);
+ }
+
+ // --- ShouldDeleteBatch tests ---
+
+ [Fact]
+ public void ShouldDeleteBatch_LegacyMode_400_True()
+ {
+ var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: false);
+ Assert.True(machine.ShouldDeleteBatch(400));
+ }
+
+ [Fact]
+ public void ShouldDeleteBatch_LegacyMode_429_False()
+ {
+ var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: false);
+ Assert.False(machine.ShouldDeleteBatch(429));
+ }
+
+ [Fact]
+ public void ShouldDeleteBatch_LegacyMode_500_False()
+ {
+ var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: false);
+ Assert.False(machine.ShouldDeleteBatch(500));
+ }
+
+ [Fact]
+ public void ShouldDeleteBatch_SmartMode_400_True()
+ {
+ var machine = CreateMachine();
+ Assert.True(machine.ShouldDeleteBatch(400));
+ }
+
+ [Fact]
+ public void ShouldDeleteBatch_SmartMode_500_False()
+ {
+ var machine = CreateMachine();
+ Assert.False(machine.ShouldDeleteBatch(500));
+ }
+
+ [Fact]
+ public void ShouldDeleteBatch_SmartMode_501_True()
+ {
+ var machine = CreateMachine();
+ Assert.True(machine.ShouldDeleteBatch(501));
+ }
+
+ [Fact]
+ public void ShouldDeleteBatch_SmartMode_408_False()
+ {
+ var machine = CreateMachine();
+ Assert.False(machine.ShouldDeleteBatch(408));
+ }
+
+ // --- GetRetryCount tests ---
+
+ [Fact]
+ public void GetRetryCount_NoMetadata_ReturnsZero()
+ {
+ var machine = CreateMachine();
+ var state = new RetryState();
+
+ Assert.Equal(0, machine.GetRetryCount(state, "batch1.json"));
+ }
+
+ [Fact]
+ public void GetRetryCount_WithMetadata_ReturnsMax()
+ {
+ var machine = CreateMachine();
+ var state = new RetryState(
+ globalRetryCount: 2,
+ batchMetadata: new System.Collections.Generic.Dictionary
+ {
+ { "batch1.json", new BatchMetadata(failureCount: 5) }
+ });
+
+ Assert.Equal(5, machine.GetRetryCount(state, "batch1.json"));
+ }
+
+ [Fact]
+ public void GetRetryCount_GlobalHigher_ReturnsGlobal()
+ {
+ var machine = CreateMachine();
+ var state = new RetryState(
+ globalRetryCount: 10,
+ batchMetadata: new System.Collections.Generic.Dictionary
+ {
+ { "batch1.json", new BatchMetadata(failureCount: 2) }
+ });
+
+ Assert.Equal(10, machine.GetRetryCount(state, "batch1.json"));
+ }
+ }
+}
diff --git a/Tests/Retry/RetryStateStorageTest.cs b/Tests/Retry/RetryStateStorageTest.cs
new file mode 100644
index 0000000..187fab6
--- /dev/null
+++ b/Tests/Retry/RetryStateStorageTest.cs
@@ -0,0 +1,113 @@
+using System.Collections.Generic;
+using Moq;
+using Segment.Analytics.Retry;
+using Segment.Analytics.Utilities;
+using Xunit;
+
+namespace Tests.Retry
+{
+ public class RetryStateStorageTest
+ {
+ private readonly Mock _storage;
+ private string _savedValue;
+
+ public RetryStateStorageTest()
+ {
+ _storage = new Mock();
+ _storage
+ .Setup(s => s.WritePrefs(StorageConstants.RetryState, It.IsAny()))
+ .Callback((_, value) => _savedValue = value);
+ _storage
+ .Setup(s => s.Read(StorageConstants.RetryState))
+ .Returns(() => _savedValue);
+ }
+
+ [Fact]
+ public void RoundTrip_DefaultState()
+ {
+ var state = new RetryState();
+ RetryStateStorage.SaveRetryState(_storage.Object, state);
+ RetryState loaded = RetryStateStorage.LoadRetryState(_storage.Object);
+
+ Assert.Equal(PipelineState.Ready, loaded.PipelineState);
+ Assert.Null(loaded.WaitUntilTime);
+ Assert.Equal(0, loaded.GlobalRetryCount);
+ Assert.Empty(loaded.BatchMetadata);
+ }
+
+ [Fact]
+ public void RoundTrip_RateLimitedState()
+ {
+ var state = new RetryState(
+ pipelineState: PipelineState.RateLimited,
+ waitUntilTime: 123456789L,
+ globalRetryCount: 3);
+
+ RetryStateStorage.SaveRetryState(_storage.Object, state);
+ RetryState loaded = RetryStateStorage.LoadRetryState(_storage.Object);
+
+ Assert.Equal(PipelineState.RateLimited, loaded.PipelineState);
+ Assert.Equal(123456789L, loaded.WaitUntilTime);
+ Assert.Equal(3, loaded.GlobalRetryCount);
+ }
+
+ [Fact]
+ public void RoundTrip_WithBatchMetadata()
+ {
+ var metadata = new Dictionary
+ {
+ { "file1.json", new BatchMetadata(failureCount: 2, nextRetryTime: 5000, firstFailureTime: 1000) },
+ { "file2.json", new BatchMetadata(failureCount: 1, nextRetryTime: 3000, firstFailureTime: 2000) }
+ };
+ var state = new RetryState(batchMetadata: metadata);
+
+ RetryStateStorage.SaveRetryState(_storage.Object, state);
+ RetryState loaded = RetryStateStorage.LoadRetryState(_storage.Object);
+
+ Assert.Equal(2, loaded.BatchMetadata.Count);
+ Assert.Equal(2, loaded.BatchMetadata["file1.json"].FailureCount);
+ Assert.Equal(5000L, loaded.BatchMetadata["file1.json"].NextRetryTime);
+ Assert.Equal(1000L, loaded.BatchMetadata["file1.json"].FirstFailureTime);
+ Assert.Equal(1, loaded.BatchMetadata["file2.json"].FailureCount);
+ }
+
+ [Fact]
+ public void LoadRetryState_NullStorage_ReturnsDefault()
+ {
+ _storage.Setup(s => s.Read(StorageConstants.RetryState)).Returns((string)null);
+
+ RetryState loaded = RetryStateStorage.LoadRetryState(_storage.Object);
+
+ Assert.Equal(PipelineState.Ready, loaded.PipelineState);
+ Assert.Equal(0, loaded.GlobalRetryCount);
+ }
+
+ [Fact]
+ public void LoadRetryState_EmptyString_ReturnsDefault()
+ {
+ _storage.Setup(s => s.Read(StorageConstants.RetryState)).Returns("");
+
+ RetryState loaded = RetryStateStorage.LoadRetryState(_storage.Object);
+
+ Assert.Equal(PipelineState.Ready, loaded.PipelineState);
+ }
+
+ [Fact]
+ public void LoadRetryState_CorruptJson_ReturnsDefault()
+ {
+ _storage.Setup(s => s.Read(StorageConstants.RetryState)).Returns("not valid json{{{");
+
+ RetryState loaded = RetryStateStorage.LoadRetryState(_storage.Object);
+
+ Assert.Equal(PipelineState.Ready, loaded.PipelineState);
+ }
+
+ [Fact]
+ public void ClearRetryState_RemovesKey()
+ {
+ RetryStateStorage.ClearRetryState(_storage.Object);
+
+ _storage.Verify(s => s.Remove(StorageConstants.RetryState), Times.Once);
+ }
+ }
+}
diff --git a/Tests/Tests.csproj b/Tests/Tests.csproj
index 099502d..407cbd6 100644
--- a/Tests/Tests.csproj
+++ b/Tests/Tests.csproj
@@ -1,7 +1,7 @@
- net6.0;net46
+ net10.0;net6.0
false
diff --git a/Tests/Utilities/EventPipelineTest.cs b/Tests/Utilities/EventPipelineTest.cs
index 45ff74c..389bebf 100644
--- a/Tests/Utilities/EventPipelineTest.cs
+++ b/Tests/Utilities/EventPipelineTest.cs
@@ -44,6 +44,9 @@ public EventPipelineTest()
_mockHttpClient
.Setup(httpclient => httpclient.Upload(It.IsAny()))
.ReturnsAsync(true);
+ _mockHttpClient
+ .Setup(httpclient => httpclient.UploadWithResponse(It.IsAny(), It.IsAny()))
+ .ReturnsAsync(new HTTPClient.Response { StatusCode = 200 });
_storage = new Mock();
@@ -93,7 +96,7 @@ public async Task TestFlush(IEventPipelineProvider provider)
_storage.Verify(o => o.Rollover(), Times.Exactly(1));
_storage.Verify(o => o.Read(StorageConstants.Events), Times.Exactly(1));
- _mockHttpClient.Verify(o => o.Upload(_bytes), Times.Exactly(1));
+ _mockHttpClient.Verify(o => o.UploadWithResponse(_bytes, It.IsAny()), Times.Exactly(1));
_storage.Verify(o => o.RemoveFile(_file), Times.Exactly(1));
}
@@ -126,7 +129,7 @@ public async void TestStop(IEventPipelineProvider provider)
await Task.Delay(1000);
_storage.Verify(o => o.Rollover(), Times.Never);
_storage.Verify(o => o.Read(StorageConstants.Events), Times.Never);
- _mockHttpClient.Verify(o => o.Upload(_bytes), Times.Never);
+ _mockHttpClient.Verify(o => o.UploadWithResponse(_bytes, It.IsAny()), Times.Never);
_storage.Verify(o => o.RemoveFile(_file), Times.Never);
}
@@ -143,7 +146,7 @@ public async Task TestFlushCausedByOverflow(IEventPipelineProvider provider)
_storage.Verify(o => o.Rollover(), Times.Exactly(1));
_storage.Verify(o => o.Read(StorageConstants.Events), Times.Exactly(1));
- _mockHttpClient.Verify(o => o.Upload(_bytes), Times.Exactly(1));
+ _mockHttpClient.Verify(o => o.UploadWithResponse(_bytes, It.IsAny()), Times.Exactly(1));
_storage.Verify(o => o.RemoveFile(_file), Times.Exactly(1));
}
@@ -182,7 +185,7 @@ public async Task TestPeriodicalFlush(IEventPipelineProvider provider)
_storage.Verify(o => o.Rollover(), Times.Exactly(2));
_storage.Verify(o => o.Read(StorageConstants.Events), Times.Exactly(2));
- _mockHttpClient.Verify(o => o.Upload(_bytes), Times.Exactly(2));
+ _mockHttpClient.Verify(o => o.UploadWithResponse(_bytes, It.IsAny()), Times.Exactly(2));
_storage.Verify(o => o.RemoveFile(_file), Times.Exactly(2));
}
@@ -203,7 +206,7 @@ public async Task TestFlushInterruptedWhenNoFileExist(IEventPipelineProvider pro
_storage.Verify(o => o.Rollover(), Times.Exactly(1));
_storage.Verify(o => o.Read(StorageConstants.Events), Times.Exactly(1));
- _mockHttpClient.Verify(o => o.Upload(_bytes), Times.Exactly(0));
+ _mockHttpClient.Verify(o => o.UploadWithResponse(_bytes, It.IsAny()), Times.Exactly(0));
_storage.Verify(o => o.RemoveFile(_file), Times.Exactly(0));
}
@@ -237,14 +240,14 @@ public void TestSyncEventPipelineProviderWaits()
int totalUploads = 0;
_mockHttpClient
- .Setup(client => client.Upload(It.IsAny()))
- .Callback(bytes =>
+ .Setup(client => client.UploadWithResponse(It.IsAny(), It.IsAny()))
+ .Callback((bytes, _retryCount) =>
{
string content = System.Text.Encoding.UTF8.GetString(bytes);
int count = content.Split(new string[] { "test" }, StringSplitOptions.None).Length - 1;
totalUploads += count;
})
- .ReturnsAsync(true);
+ .ReturnsAsync(new HTTPClient.Response { StatusCode = 200 });
var config = new Configuration(
writeKey: "123",
@@ -272,9 +275,9 @@ public void TestSyncEventPipelineProviderWaits()
analytics.Flush();
#pragma warning disable CS4014 // Silly compiler, this isn't an invocation so it doesn't need to be awaited
- _mockHttpClient.Verify(client => client.Upload(It.IsAny()), Times.AtLeastOnce, $"Iteration {j} of {eventCount}");
-#pragma warning restore CS4014
- IInvocation lastUploadInvocation = _mockHttpClient.Invocations.Last(invocation => invocation.Method.Name == "Upload");
+ _mockHttpClient.Verify(client => client.UploadWithResponse(It.IsAny(), It.IsAny()), Times.AtLeastOnce, $"Iteration {j} of {eventCount}");
+#pragma warning restore CS4014
+ IInvocation lastUploadInvocation = _mockHttpClient.Invocations.Last(invocation => invocation.Method.Name == "UploadWithResponse");
int testsUploaded = System.Text.Encoding.UTF8
.GetString((byte[])lastUploadInvocation.Arguments[0])
.Split(new string[] { "test" }, StringSplitOptions.None).Length - 1;
diff --git a/e2e-cli/Program.cs b/e2e-cli/Program.cs
index c2f1c31..acd729b 100644
--- a/e2e-cli/Program.cs
+++ b/e2e-cli/Program.cs
@@ -4,9 +4,12 @@
using System;
using System.Collections.Generic;
+using System.Linq;
using System.Text.Json;
using System.Threading;
using Segment.Analytics;
+using Segment.Analytics.Plugins;
+using Segment.Analytics.Retry;
using Segment.Analytics.Utilities;
using Segment.Serialization;
using JsonUtility = Segment.Serialization.JsonUtility;
@@ -54,6 +57,8 @@
// config block (optional)
int flushAt = 15;
int flushInterval = 10; // seconds
+int maxRetries = 100;
+int timeoutSeconds = 20;
if (root.TryGetProperty("config", out var configEl))
{
if (configEl.TryGetProperty("flushAt", out var fa)) flushAt = fa.GetInt32();
@@ -63,16 +68,16 @@
int fiMs = fi.GetInt32();
flushInterval = Math.Max(1, fiMs / 1000);
}
+ if (configEl.TryGetProperty("maxRetries", out var mr)) maxRetries = mr.GetInt32();
+ if (configEl.TryGetProperty("timeout", out var to)) timeoutSeconds = to.GetInt32();
}
// ── Error handler ────────────────────────────────────────────────────────────
-var errors = new List();
-var errorHandler = new CapturingErrorHandler(errors);
+var deliveryErrors = new List();
+var errorHandler = new CapturingErrorHandler(deliveryErrors);
// ── Build configuration ──────────────────────────────────────────────────────
-// Determine scheme from apiHost so we can override SegmentURL for http:// targets
-// (the SDK always prepends "https://" by default).
// Determine scheme and strip it — the SDK prepends scheme via SegmentURL,
// which we override in PlainHttpClient to respect http:// targets.
string scheme = "https://";
@@ -97,6 +102,13 @@
var httpClientProvider = new PlainHttpClientProvider(scheme);
+// Enable smart retry directly via a custom pipeline provider (same approach as Kotlin e2e-cli).
+var retryHttpConfig = new HttpConfig(
+ new RateLimitConfig(enabled: true, maxRetryCount: maxRetries),
+ new BackoffConfig(enabled: true, maxRetryCount: maxRetries, baseBackoffInterval: 0.5)
+);
+var pipelineProvider = new RetryEnabledPipelineProvider(retryHttpConfig);
+
var configBuilder = new Configuration(
writeKey,
flushAt: flushAt,
@@ -105,13 +117,18 @@
storageProvider: new InMemoryStorageProvider(),
apiHost: rawApiHost,
cdnHost: rawCdnHost,
- httpClientProvider: httpClientProvider
+ httpClientProvider: httpClientProvider,
+ eventPipelineProvider: pipelineProvider
);
-Console.Error.WriteLine($"[e2e-cli] Initialising analytics (writeKey={writeKey[..Math.Min(8, writeKey.Length)]}…, apiHost={apiHost ?? "default"})");
+Console.Error.WriteLine($"[e2e-cli] Initialising analytics (writeKey={writeKey[..Math.Min(8, writeKey.Length)]}…, apiHost={apiHost ?? "default"}, maxRetries={maxRetries})");
var analytics = new Analytics(configBuilder);
+// Wait for SDK to initialize (settings fetch, pipeline start/stop cycle).
+// This prevents duplicate uploads from the pipeline restart during init.
+Thread.Sleep(2000);
+
// ── Process sequences ────────────────────────────────────────────────────────
int totalEvents = 0;
@@ -149,10 +166,6 @@
case "track":
{
- // Set userId state first if provided
- if (userId != null && analytics.UserId() != userId)
- analytics.Identify(userId);
-
string eventName = ev.TryGetProperty("event", out var enEl)
? enEl.GetString() ?? "Unknown"
: "Unknown";
@@ -163,9 +176,6 @@
case "page":
{
- if (userId != null && analytics.UserId() != userId)
- analytics.Identify(userId);
-
string title = ev.TryGetProperty("name", out var nameEl)
? nameEl.GetString() ?? ""
: "";
@@ -179,9 +189,6 @@
case "screen":
{
- if (userId != null && analytics.UserId() != userId)
- analytics.Identify(userId);
-
string title = ev.TryGetProperty("name", out var nameEl)
? nameEl.GetString() ?? ""
: "";
@@ -195,27 +202,15 @@
case "alias":
{
- // For alias: previousId becomes the current userId state, newId is the alias target.
- // The SDK Alias(newId) uses _userInfo._userId as previousId.
- string? previousId = ev.TryGetProperty("previousId", out var prevEl)
- ? prevEl.GetString()
- : null;
string newId = userId ?? (ev.TryGetProperty("newId", out var newIdEl)
? newIdEl.GetString() ?? ""
: "");
-
- if (previousId != null && analytics.UserId() != previousId)
- analytics.Identify(previousId);
-
analytics.Alias(newId);
break;
}
case "group":
{
- if (userId != null && analytics.UserId() != userId)
- analytics.Identify(userId);
-
string groupId = ev.TryGetProperty("groupId", out var gidEl)
? gidEl.GetString() ?? ""
: "";
@@ -234,14 +229,75 @@
}
}
+// ── Flush and poll until delivery completes ──────────────────────────────────
Console.Error.WriteLine($"[e2e-cli] Flushing {totalEvents} event(s)…");
+deliveryErrors.Clear();
+
+// The SDK's CountFlushPolicy (flushAt) auto-triggers uploads.
+// We trigger one explicit flush to handle cases where events haven't been flushed yet,
+// then poll and only trigger retries when pending files persist across cycles.
analytics.Flush();
-// Give the async pipeline time to upload
-Thread.Sleep(5000);
+// Poll until batch files are processed (uploaded or dropped).
+long deadlineMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + (timeoutSeconds * 1000L);
+bool everSeenPending = false;
+int pollInterval = 300;
+int pollCount = 0;
+int stableEmptyCount = 0;
+
+while (DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() < deadlineMs)
+{
+ Thread.Sleep(pollInterval);
+ pollCount++;
+
+ var pending = analytics.PendingUploads()
+ .Where(s => !string.IsNullOrEmpty(s))
+ .ToList();
+
+ if (pending.Count > 0)
+ {
+ everSeenPending = true;
+ stableEmptyCount = 0;
+ deliveryErrors.Clear();
+ // Trigger a new upload cycle for retry
+ analytics.Flush();
+ }
+ else if (everSeenPending)
+ {
+ // Files gone — wait for a stable "empty" state to confirm upload completed
+ stableEmptyCount++;
+ if (stableEmptyCount >= 2)
+ break;
+ }
+
+ // Adaptive intervals
+ if (pollCount >= 10 && pollInterval < 1000) pollInterval = 1000;
+ else if (pollCount >= 5 && pollInterval < 500) pollInterval = 500;
+}
// ── Output result ─────────────────────────────────────────────────────────────
-bool success = errors.Count == 0;
+var remaining = analytics.PendingUploads()
+ .Where(s => !string.IsNullOrEmpty(s))
+ .ToList();
+
+bool success;
+string? error = null;
+
+if (remaining.Count > 0)
+{
+ success = false;
+ error = $"Delivery incomplete: {remaining.Count} batch file(s) still pending";
+}
+else if (deliveryErrors.Count > 0)
+{
+ success = false;
+ error = "Delivery failed: " + string.Join("; ", deliveryErrors);
+}
+else
+{
+ success = true;
+}
+
if (success)
{
Console.WriteLine($"{{\"success\":true,\"sentBatches\":1}}");
@@ -249,8 +305,7 @@
}
else
{
- string combinedErrors = string.Join("; ", errors);
- Console.WriteLine($"{{\"success\":false,\"sentBatches\":0,\"error\":\"{Escape(combinedErrors)}\"}}");
+ Console.WriteLine($"{{\"success\":false,\"sentBatches\":0,\"error\":\"{Escape(error ?? "unknown")}\"}}");
Environment.Exit(1);
}
@@ -261,7 +316,6 @@
if (!parent.TryGetProperty(key, out var el) || el.ValueKind == JsonValueKind.Null)
return null;
- // Serialise the JsonElement back to a JSON string, then parse with Segment's JsonUtility
string json = el.GetRawText();
try
{
@@ -312,3 +366,20 @@ public void OnExceptionThrown(Exception e)
_errors.Add(msg);
}
}
+
+// ── Pipeline provider that enables retry from construction ────────────────────
+
+class RetryEnabledPipelineProvider : Segment.Analytics.Utilities.IEventPipelineProvider
+{
+ private readonly HttpConfig _httpConfig;
+ public RetryEnabledPipelineProvider(HttpConfig httpConfig) => _httpConfig = httpConfig;
+
+ public Segment.Analytics.Utilities.IEventPipeline Create(Analytics analytics, string key)
+ {
+ return new EventPipeline(analytics, key,
+ analytics.Configuration.WriteKey,
+ analytics.Configuration.FlushPolicies,
+ analytics.Configuration.ApiHost,
+ _httpConfig);
+ }
+}
diff --git a/e2e-cli/e2e-config.json b/e2e-cli/e2e-config.json
index eea1cca..91e0a5f 100644
--- a/e2e-cli/e2e-config.json
+++ b/e2e-cli/e2e-config.json
@@ -1,7 +1,9 @@
{
"sdk": "csharp",
- "test_suites": "basic",
- "auto_settings": false,
+ "test_suites": "basic,retry,retry-settings",
+ "auto_settings": true,
"patch": null,
- "env": {}
+ "env": {
+ "HTTP_CONFIG_SETTINGS": "true"
+ }
}