Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dotnet/src/SmooAI.Observability/SmooAI.Observability.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
</PropertyGroup>

<ItemGroup>
<!--
Outbound webhook delivery (Transport.cs) goes through SmooAI.Fetch — the
polyglot resilient fetch (Polly retry, per-request timeout, circuit
breaking) — instead of a raw HttpClient.
-->
<PackageReference Include="SmooAI.Fetch" Version="3.3.10" />
<PackageReference Include="OpenTelemetry" Version="1.16.0" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.16.0" />
<!--
Expand Down
108 changes: 96 additions & 12 deletions dotnet/src/SmooAI.Observability/Transport.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Net.Http;
using System.Text;
using SmooAI.Fetch;

namespace SmooAI.Observability;

Expand All @@ -22,6 +23,14 @@ public sealed class TransportOptions

/// <summary>Per-request timeout. Default 10s.</summary>
public TimeSpan RequestTimeout { get; set; } = TimeSpan.FromSeconds(10);

/// <summary>
/// Retries applied <em>within a single flush attempt</em> by the resilient
/// <see cref="SmooFetch"/> transport (transient exceptions + 429/5xx). This is
/// distinct from the transport's own batch-requeue, which only kicks in once a
/// flush has exhausted these retries and still failed. Default 2.
/// </summary>
public int MaxRetries { get; set; } = 2;
}

/// <summary>
Expand All @@ -30,13 +39,22 @@ public sealed class TransportOptions
/// retries a failed batch by pushing it back to the front of the queue. Errors
/// are swallowed — observability must never throw into the host application.
///
/// Outbound delivery goes through <see cref="SmooFetch"/> (the SmooAI resilient
/// fetch — Polly-backed retry, per-request timeout, circuit breaking) by default.
/// A caller-supplied <see cref="HttpClient"/> is honored as an escape hatch and
/// used directly (no SmooFetch resilience layer), for tests or hosts that need a
/// fully custom pipeline.
///
/// Port of the TS <c>Transport</c> (without the browser <c>sendBeacon</c> path,
/// which has no .NET analogue).
/// </summary>
public sealed class Transport : IDisposable, IAsyncDisposable
{
private readonly TransportOptions _options;
private readonly HttpClient _httpClient;
// Exactly one of these is set: _fetch by default, _httpClient when the caller
// supplies a custom client via the escape hatch.
private readonly SmooFetch? _fetch;
private readonly HttpClient? _httpClient;
private readonly bool _ownsHttpClient;
private readonly Queue<ObservabilityEvent> _queue = new();
private readonly object _gate = new();
Expand All @@ -45,8 +63,10 @@ public sealed class Transport : IDisposable, IAsyncDisposable
private bool _disposed;

/// <summary>
/// Create a transport. If <paramref name="httpClient"/> is null a private
/// <see cref="HttpClient"/> is created and disposed with this transport.
/// Create a transport. By default delivery uses an internally-managed,
/// resilient <see cref="SmooFetch"/>. If <paramref name="httpClient"/> is
/// supplied it is used directly as an escape hatch — the caller owns its
/// disposal and is responsible for any resilience behavior.
/// </summary>
public Transport(TransportOptions options, HttpClient? httpClient = null)
{
Expand All @@ -55,9 +75,36 @@ public Transport(TransportOptions options, HttpClient? httpClient = null)
{
throw new ArgumentException("Transport requires a DSN.", nameof(options));
}
_ownsHttpClient = httpClient is null;
_httpClient = httpClient ?? new HttpClient();
_httpClient.Timeout = _options.RequestTimeout;

if (httpClient is null)
{
// Default path: resilient SmooFetch with the transport's timeout +
// retry config and a circuit breaker so a hard-down ingest endpoint
// stops hammering after repeated failures.
_fetch = SmooFetch.Create(o =>
{
o.Timeout = _options.RequestTimeout;
o.RetryPolicy = _options.MaxRetries > 0
? RetryPolicy.ExponentialBackoff(_options.MaxRetries)
: RetryPolicy.None;
// Stop hammering a hard-down ingest endpoint: open after 5
// consecutive failures for 30s. A tripped breaker throws, which
// PostAsync catches and reports as a failed delivery, so the
// batch is requeued rather than lost.
o.CircuitBreaker = new CircuitBreakerOptions(
FailureThreshold: 5,
OpenDuration: TimeSpan.FromSeconds(30));
});
_ownsHttpClient = false;
}
else
{
// Escape hatch: use the caller's client verbatim.
_httpClient = httpClient;
_httpClient.Timeout = _options.RequestTimeout;
_ownsHttpClient = false;
}

// Disarmed timer; armed on demand when the first event is enqueued.
_timer = new Timer(_ => _ = FlushAsync(), null, Timeout.Infinite, Timeout.Infinite);
}
Expand Down Expand Up @@ -123,10 +170,10 @@ public async Task FlushAsync()
{
var payload = new IngestPayload { Type = "error", Events = batch };
var json = ObservabilityJson.Serialize(payload);
using var content = new StringContent(json, Encoding.UTF8, "application/json");
using var response = await _httpClient.PostAsync(_options.Dsn, content).ConfigureAwait(false);
// Non-2xx is a delivery failure: requeue for the next attempt.
if (!response.IsSuccessStatusCode)
var delivered = await PostAsync(json).ConfigureAwait(false);
// Non-2xx (or transport failure) is a delivery failure: requeue for
// the next attempt.
if (!delivered)
{
RestoreBatch(batch);
}
Expand All @@ -149,6 +196,43 @@ public async Task FlushAsync()
}
}

/// <summary>
/// POST the serialized payload via SmooFetch (default) or the escape-hatch
/// <see cref="HttpClient"/>. Returns <c>true</c> on a 2xx, <c>false</c>
/// otherwise. Never throws (transport exceptions are caught and reported as a
/// failed delivery so the batch is requeued).
/// </summary>
private async Task<bool> PostAsync(string json)
{
if (_fetch is not null)
{
// SmooFetch serializes typed bodies itself, but we need the exact
// ObservabilityJson wire bytes (camelCase + omit-nulls), so build the
// request with pre-serialized content and use the low-level SendAsync
// (which applies retry/timeout/circuit-breaking but does NOT throw on
// non-2xx — we inspect the status ourselves).
using var request = new HttpRequestMessage(HttpMethod.Post, _options.Dsn)
{
Content = new StringContent(json, Encoding.UTF8, "application/json"),
};
try
{
using var response = await _fetch.SendAsync(request).ConfigureAwait(false);
return response.IsSuccessStatusCode;
}
catch
{
// Retries / circuit breaker exhausted — treat as a failed delivery.
return false;
}
}

// Escape hatch: raw HttpClient, no SmooFetch resilience.
using var content = new StringContent(json, Encoding.UTF8, "application/json");
using var raw = await _httpClient!.PostAsync(_options.Dsn, content).ConfigureAwait(false);
return raw.IsSuccessStatusCode;
}

/// <summary>Current queue depth — exposed for tests.</summary>
public int QueueSize
{
Expand Down Expand Up @@ -227,7 +311,7 @@ public void Dispose()
_timer.Dispose();
if (_ownsHttpClient)
{
_httpClient.Dispose();
_httpClient?.Dispose();
}
}

Expand All @@ -250,7 +334,7 @@ public async ValueTask DisposeAsync()
await _timer.DisposeAsync().ConfigureAwait(false);
if (_ownsHttpClient)
{
_httpClient.Dispose();
_httpClient?.Dispose();
}
}
}
103 changes: 103 additions & 0 deletions dotnet/tests/SmooAI.Observability.Tests/LoopbackServer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
using System.Net;

namespace SmooAI.Observability.Tests;

/// <summary>
/// Minimal in-process HTTP server for exercising the default SmooFetch transport
/// path (which owns a real <see cref="HttpClient"/> and cannot take a stub
/// handler). Listens on a free loopback port, records each request body, and
/// replies with a caller-chosen status code.
/// </summary>
public sealed class LoopbackServer : IDisposable
{
private readonly HttpListener _listener;
private readonly Func<string, HttpStatusCode> _responder;
private readonly CancellationTokenSource _cts = new();
private int _requestCount;

public LoopbackServer(Func<string, HttpStatusCode> responder)
{
_responder = responder;
var port = GetFreePort();
Url = $"http://127.0.0.1:{port}/ingest";
_listener = new HttpListener();
_listener.Prefixes.Add($"http://127.0.0.1:{port}/");
_listener.Start();
_ = Task.Run(AcceptLoopAsync);
}

/// <summary>Full URL (including path) to POST to.</summary>
public string Url { get; }

/// <summary>Number of requests received so far.</summary>
public int RequestCount => Volatile.Read(ref _requestCount);

/// <summary>Body of the most recently received request.</summary>
public string LastBody { get; private set; } = string.Empty;

private async Task AcceptLoopAsync()
{
while (!_cts.IsCancellationRequested)
{
HttpListenerContext context;
try
{
context = await _listener.GetContextAsync().ConfigureAwait(false);
}
catch
{
return; // listener stopped
}

try
{
using var reader = new StreamReader(context.Request.InputStream, context.Request.ContentEncoding);
var body = await reader.ReadToEndAsync().ConfigureAwait(false);
LastBody = body;
Interlocked.Increment(ref _requestCount);

var status = _responder(body);
context.Response.StatusCode = (int)status;
context.Response.Close();
}
catch
{
try
{
context.Response.Abort();
}
catch
{
// ignore
}
}
}
}

/// <summary>Reserve a free port, release it, and return a URL that will refuse connections.</summary>
public static string ReserveUnusedUrl() => $"http://127.0.0.1:{GetFreePort()}/ingest";

private static int GetFreePort()
{
var listener = new System.Net.Sockets.TcpListener(IPAddress.Loopback, 0);
listener.Start();
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
listener.Stop();
return port;
}

public void Dispose()
{
_cts.Cancel();
try
{
_listener.Stop();
_listener.Close();
}
catch
{
// ignore
}
_cts.Dispose();
}
}
56 changes: 56 additions & 0 deletions dotnet/tests/SmooAI.Observability.Tests/TransportTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,62 @@ public void Constructor_RejectsEmptyDsn()
Assert.Throws<ArgumentException>(() => new Transport(new TransportOptions { Dsn = "" }));
}

// --- Default transport (SmooFetch) path ----------------------------------
//
// The default Transport (no caller-supplied HttpClient) delivers through
// SmooFetch, which owns its own real HttpClient and cannot accept a stub
// handler. These tests exercise it end-to-end against a loopback HTTP server
// so the SmooFetch retry/timeout/circuit-breaking pipeline is the code under
// test, not a mock.

[Fact]
public async Task DefaultTransport_DeliversBatchViaSmooFetch()
{
using var server = new LoopbackServer(_ => HttpStatusCode.OK);
await using var transport = new Transport(new TransportOptions { Dsn = server.Url });

transport.Enqueue(NewEvent("a"));
transport.Enqueue(NewEvent("b"));
await transport.FlushAsync();

await WaitFor(() => server.RequestCount >= 1);
Assert.Equal(1, server.RequestCount);
Assert.Contains("\"type\":\"error\"", server.LastBody);
Assert.Contains("\"eventId\":\"a\"", server.LastBody);
Assert.Equal(0, transport.QueueSize);
}

[Fact]
public async Task DefaultTransport_RequeuesAfterRetriesExhausted()
{
// Always 500: SmooFetch retries within the flush, then PostAsync reports
// a failed delivery and the batch is requeued.
using var server = new LoopbackServer(_ => HttpStatusCode.InternalServerError);
await using var transport = new Transport(
new TransportOptions { Dsn = server.Url, MaxRetries = 1 });

transport.Enqueue(NewEvent("a"));
await transport.FlushAsync();

Assert.Equal(1, transport.QueueSize);
// Initial attempt + 1 retry = 2 requests reached the server.
Assert.True(server.RequestCount >= 2, $"expected >=2 requests, saw {server.RequestCount}");
}

[Fact]
public async Task DefaultTransport_RequeuesWhenEndpointUnreachable()
{
// Reserve and immediately release a port so connections are refused.
var url = LoopbackServer.ReserveUnusedUrl();
await using var transport = new Transport(
new TransportOptions { Dsn = url, MaxRetries = 0, RequestTimeout = TimeSpan.FromSeconds(2) });

transport.Enqueue(NewEvent("a"));
await transport.FlushAsync(); // must not throw

Assert.Equal(1, transport.QueueSize);
}

private static async Task WaitFor(Func<bool> condition, int timeoutMs = 2000)
{
var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs);
Expand Down
Loading