From 0d6001b1b0e00e88d7261e2021158d280735ecd6 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Wed, 24 Jun 2026 13:50:50 +0200 Subject: [PATCH 1/3] =?UTF-8?q?=E2=9C=A8=20Reduce=20SQL=20Server=20queue?= =?UTF-8?q?=20length=20monitoring=20query=20volume?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the per-queue length probe (one IF EXISTS + max-min(RowVersion) statement per tracked queue, every 200ms) with a single catalog-view query per catalog that reads approximate row counts from sys.partitions. This reads no queue table data, takes no locks, and needs only SELECT on the queue tables. Also make the poll interval configurable via the QueueLengthQueryDelayInterval connection string part (default 200ms), and add optional adaptive back-off up to QueueLengthQueryMaxDelayInterval while all monitored queues are empty (disabled by default; base interval is always used while any queue has work, preserving the fix for #4556). --- .../QueueLengthQueryTests.cs | 128 ++++++++++++++++++ .../ConnectionStringExtensions.cs | 35 +++++ .../QueueLengthProvider.cs | 111 +++++++++++---- ...ServiceControl.Transports.SqlServer.csproj | 4 + .../SqlTable.cs | 54 ++++++++ 5 files changed, 304 insertions(+), 28 deletions(-) create mode 100644 src/ServiceControl.Transports.SqlServer.Tests/QueueLengthQueryTests.cs diff --git a/src/ServiceControl.Transports.SqlServer.Tests/QueueLengthQueryTests.cs b/src/ServiceControl.Transports.SqlServer.Tests/QueueLengthQueryTests.cs new file mode 100644 index 0000000000..15f178537b --- /dev/null +++ b/src/ServiceControl.Transports.SqlServer.Tests/QueueLengthQueryTests.cs @@ -0,0 +1,128 @@ +namespace ServiceControl.Transport.Tests; + +using System; +using System.Linq; +using NUnit.Framework; +using Transports.SqlServer; + +[TestFixture] +class QueueLengthQueryTests +{ + [Test] + public void BuildBulkLengthQuery_emits_a_single_catalog_view_query_for_all_tables() + { + var tables = new[] + { + SqlTable.Parse("Sales", "dbo"), + SqlTable.Parse("Billing", "dbo"), + }; + + var query = SqlTable.BuildBulkLengthQuery(catalog: null, tables); + + // One statement, read from the catalog views — no per-table IF EXISTS, no scan of the queue tables. + Assert.That(query, Does.Contain("sys.partitions")); + Assert.That(query, Does.Contain("p.index_id IN (0, 1)")); + Assert.That(query, Does.Not.Contain("INFORMATION_SCHEMA")); + Assert.That(query, Does.Not.Contain("RowVersion")); + + // Both tracked tables are covered by the single query. + Assert.That(query, Does.Contain("t.name = 'Sales'")); + Assert.That(query, Does.Contain("t.name = 'Billing'")); + + // Exactly one SELECT that returns rows (the leading SET is not a result-producing statement). + Assert.That(System.Text.RegularExpressions.Regex.Matches(query, "SELECT").Count, Is.EqualTo(1)); + } + + [Test] + public void BuildBulkLengthQuery_prefixes_the_catalog_when_supplied() + { + var tables = new[] { SqlTable.Parse("Sales@dbo@MyCatalog", "dbo") }; + + var query = SqlTable.BuildBulkLengthQuery(catalog: "MyCatalog", tables); + + Assert.That(query, Does.Contain("[MyCatalog].sys.partitions")); + Assert.That(query, Does.Contain("[MyCatalog].sys.tables")); + Assert.That(query, Does.Contain("[MyCatalog].sys.schemas")); + } + + [Test] + public void RemoveQueueLengthQueryDelayInterval_parses_and_strips_the_custom_part() + { + const string connectionString = "Data Source=.;Initial Catalog=nsb;Integrated Security=true;QueueLengthQueryDelayInterval=1000"; + + var cleaned = connectionString.RemoveQueueLengthQueryDelayInterval(out var interval); + + Assert.That(interval, Is.EqualTo(TimeSpan.FromSeconds(1))); + // The custom key must be stripped so SqlConnection never sees the unknown keyword. + Assert.That(cleaned, Does.Not.Contain("QueueLengthQueryDelayInterval").IgnoreCase); + Assert.That(cleaned, Does.Contain("Initial Catalog=nsb").IgnoreCase); + } + + [Test] + public void RemoveQueueLengthQueryDelayInterval_defaults_to_null_when_absent() + { + const string connectionString = "Data Source=.;Initial Catalog=nsb;Integrated Security=true"; + + connectionString.RemoveQueueLengthQueryDelayInterval(out var interval); + + Assert.That(interval, Is.Null); + } + + [Test] + public void RemoveQueueLengthQueryDelayInterval_throws_on_non_numeric_value() + { + const string connectionString = "Data Source=.;Initial Catalog=nsb;QueueLengthQueryDelayInterval=soon"; + + Assert.That(() => connectionString.RemoveQueueLengthQueryDelayInterval(out _), + Throws.Exception.With.Message.Contains("QueueLengthQueryDelayInterval")); + } + + [Test] + public void RemoveQueueLengthQueryMaxDelayInterval_parses_and_strips_the_custom_part() + { + const string connectionString = "Data Source=.;Initial Catalog=nsb;QueueLengthQueryMaxDelayInterval=60000"; + + var cleaned = connectionString.RemoveQueueLengthQueryMaxDelayInterval(out var interval); + + Assert.That(interval, Is.EqualTo(TimeSpan.FromSeconds(60))); + Assert.That(cleaned, Does.Not.Contain("QueueLengthQueryMaxDelayInterval").IgnoreCase); + } + + static readonly TimeSpan Base = TimeSpan.FromMilliseconds(200); + static readonly TimeSpan Max = TimeSpan.FromSeconds(60); + + [Test] + public void NextDelay_snaps_back_to_base_when_any_queue_has_work() + { + // Even fully backed off, a single non-empty queue returns to full speed immediately. + Assert.That(QueueLengthProvider.NextDelay(Max, Base, Max, maxObservedLength: 1), Is.EqualTo(Base)); + } + + [Test] + public void NextDelay_doubles_while_idle() + { + Assert.That(QueueLengthProvider.NextDelay(Base, Base, Max, maxObservedLength: 0), + Is.EqualTo(TimeSpan.FromMilliseconds(400))); + } + + [Test] + public void NextDelay_caps_at_max_while_idle() + { + Assert.That(QueueLengthProvider.NextDelay(TimeSpan.FromSeconds(40), Base, Max, maxObservedLength: 0), + Is.EqualTo(Max)); + } + + [Test] + public void NextDelay_never_drops_below_base() + { + Assert.That(QueueLengthProvider.NextDelay(TimeSpan.FromMilliseconds(50), Base, Max, maxObservedLength: 0), + Is.EqualTo(Base)); + } + + [Test] + public void NextDelay_with_max_equal_to_base_disables_backoff() + { + // Operator did not opt in (max == base) -> cadence is constant at base, even while idle. + Assert.That(QueueLengthProvider.NextDelay(Base, Base, Base, maxObservedLength: 0), Is.EqualTo(Base)); + } +} diff --git a/src/ServiceControl.Transports.SqlServer/ConnectionStringExtensions.cs b/src/ServiceControl.Transports.SqlServer/ConnectionStringExtensions.cs index 277e82596f..539864df64 100644 --- a/src/ServiceControl.Transports.SqlServer/ConnectionStringExtensions.cs +++ b/src/ServiceControl.Transports.SqlServer/ConnectionStringExtensions.cs @@ -1,5 +1,6 @@ namespace ServiceControl.Transports.SqlServer { + using System; using System.Data.Common; static class ConnectionStringExtensions @@ -11,6 +12,38 @@ public static string RemoveCustomConnectionStringParts(this string connectionStr .RemoveCustomConnectionStringPart(subscriptionsTableName, out subscriptionTable); } + // Extracts the optional, ServiceControl-specific 'QueueLengthQueryDelayInterval' (milliseconds) from + // the connection string and removes it so it is never handed to SqlConnection (which would reject the + // unknown keyword). Mirrors the existing convention used by the Azure Service Bus transport. + // This is the BASE interval used while any monitored queue has messages. + public static string RemoveQueueLengthQueryDelayInterval(this string connectionString, out TimeSpan? interval) => + connectionString.RemoveIntervalMilliseconds(queueLengthQueryDelayInterval, out interval); + + // Extracts the optional 'QueueLengthQueryMaxDelayInterval' (milliseconds) — the upper bound the adaptive + // back-off ramps to while every monitored queue is empty. Equal to the base interval => back-off disabled. + public static string RemoveQueueLengthQueryMaxDelayInterval(this string connectionString, out TimeSpan? interval) => + connectionString.RemoveIntervalMilliseconds(queueLengthQueryMaxDelayInterval, out interval); + + static string RemoveIntervalMilliseconds(this string connectionString, string key, out TimeSpan? interval) + { + interval = null; + + var builder = new DbConnectionStringBuilder { ConnectionString = connectionString }; + + if (builder.TryGetValue(key, out var value)) + { + if (!int.TryParse(value.ToString(), out var milliseconds)) + { + throw new Exception($"Can't parse '{value}' as a valid {key} (expected an integer number of milliseconds)."); + } + + interval = TimeSpan.FromMilliseconds(milliseconds); + builder.Remove(key); + } + + return builder.ConnectionString; + } + public static string RemoveCustomConnectionStringPart(this string connectionString, string partName, out string schema) { var builder = new DbConnectionStringBuilder @@ -30,5 +63,7 @@ public static string RemoveCustomConnectionStringPart(this string connectionStri const string queueSchemaName = "Queue Schema"; const string subscriptionsTableName = "Subscriptions Table"; + const string queueLengthQueryDelayInterval = "QueueLengthQueryDelayInterval"; + const string queueLengthQueryMaxDelayInterval = "QueueLengthQueryMaxDelayInterval"; } } \ No newline at end of file diff --git a/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs b/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs index e89fe36d25..a8b3659588 100644 --- a/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs +++ b/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs @@ -14,10 +14,21 @@ class QueueLengthProvider : AbstractQueueLengthProvider public QueueLengthProvider(TransportSettings settings, Action store, ILogger logger) : base(settings, store) { connectionString = ConnectionString - .RemoveCustomConnectionStringParts(out var customSchema, out _); + .RemoveCustomConnectionStringParts(out var customSchema, out _) + .RemoveQueueLengthQueryDelayInterval(out var configuredInterval) + .RemoveQueueLengthQueryMaxDelayInterval(out var configuredMaxInterval); + + baseDelay = configuredInterval ?? DefaultQueryDelayInterval; + // Default the back-off ceiling to the base interval => adaptive back-off OFF unless an operator + // opts in with a larger QueueLengthQueryMaxDelayInterval. Never let it fall below the base. + maxDelay = configuredMaxInterval is { } max && max > baseDelay ? max : baseDelay; + currentDelay = baseDelay; defaultSchema = customSchema ?? "dbo"; this.logger = logger; + + logger.LogInformation("SQL queue length query interval: base {BaseDelay}, max {MaxDelay} (adaptive back-off {State})", + baseDelay, maxDelay, maxDelay > baseDelay ? "enabled" : "disabled"); } public override void TrackEndpointInputQueue(EndpointToQueueMapping queueToTrack) { @@ -42,11 +53,18 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { - await Task.Delay(QueryDelayInterval, stoppingToken); + await Task.Delay(currentDelay, stoppingToken); await QueryTableSizes(stoppingToken); UpdateQueueLengthStore(); + + // Adapt the cadence: full speed while any queue has work, exponential back-off while the + // whole system is idle. Backing off only when EVERY queue is empty keeps the fix for + // issue #4556 intact — the false-zero "sawtooth" only affects non-empty queues, and those + // are always sampled at the base interval here. + var maxObservedLength = tableSizes.IsEmpty ? 0 : tableSizes.Values.Max(); + currentDelay = NextDelay(currentDelay, baseDelay, maxDelay, maxObservedLength); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { @@ -59,6 +77,23 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) } } + // Pure cadence policy (no I/O) so it can be unit tested without a database or the polling loop. + internal static TimeSpan NextDelay(TimeSpan current, TimeSpan baseDelay, TimeSpan maxDelay, long maxObservedLength) + { + if (maxObservedLength > 0) + { + return baseDelay; // work present -> snap back to full speed + } + + var doubled = TimeSpan.FromTicks(current.Ticks * 2); + if (doubled < baseDelay) + { + doubled = baseDelay; + } + + return doubled > maxDelay ? maxDelay : doubled; + } + void UpdateQueueLengthStore() { var nowTicks = DateTime.UtcNow.Ticks; @@ -79,60 +114,80 @@ void UpdateQueueLengthStore() async Task QueryTableSizes(CancellationToken cancellationToken) { - var chunks = tableSizes - .Select((i, index) => new - { - i, - index - }) - .GroupBy(p => p.index / QueryChunkSize) - .Select(grp => grp.Select(g => g.i).ToArray()) - .ToList(); + // sys.partitions is per-database, so group the tracked tables by catalog and issue one + // bulk query per distinct catalog. In the common single-catalog setup this is ONE query + // per poll for the whole system, regardless of how many endpoints are monitored. + var byCatalog = tableSizes.Keys.GroupBy(t => t.Catalog); await using var connection = new SqlConnection(connectionString); await connection.OpenAsync(cancellationToken); - foreach (var chunk in chunks) + foreach (var catalogGroup in byCatalog) { - await UpdateChunk(connection, chunk, cancellationToken); + await QueryCatalog(connection, catalogGroup.Key, catalogGroup.ToArray(), cancellationToken); } } - async Task UpdateChunk(SqlConnection connection, KeyValuePair[] chunk, CancellationToken cancellationToken) + async Task QueryCatalog(SqlConnection connection, string catalog, SqlTable[] tables, CancellationToken cancellationToken) { - var query = string.Join(Environment.NewLine, chunk.Select(c => c.Key.LengthQuery)); + var query = SqlTable.BuildBulkLengthQuery(catalog, tables); + + // (schema, name) -> length, for matching the result rows back to the tracked tables. + var results = new Dictionary<(string, string), int>(SchemaNameComparer); - await using var command = new SqlCommand(query, connection); - await using var reader = await command.ExecuteReaderAsync(cancellationToken); - foreach (var chunkPair in chunk) + await using (var command = new SqlCommand(query, connection)) + await using (var reader = await command.ExecuteReaderAsync(cancellationToken)) { - await reader.ReadAsync(cancellationToken); + while (await reader.ReadAsync(cancellationToken)) + { + var schema = reader.GetString(0); + var name = reader.GetString(1); + var length = reader.GetInt64(2); // SUM(p.rows) is bigint - var queueLength = reader.GetInt32(0); + results[(schema, name)] = length > int.MaxValue ? int.MaxValue : (int)length; + } + } - if (queueLength == -1) + foreach (var table in tables) + { + if (results.TryGetValue((table.Schema, table.Name), out var length)) { - logger.LogWarning("Table {TableName} does not exist", chunkPair.Key); + tableSizes[table] = length; } else { - tableSizes.TryUpdate(chunkPair.Key, queueLength, chunkPair.Value); + // No catalog row for this table -> the queue table does not (yet) exist. + logger.LogWarning("Table {TableName} does not exist", table); } - - await reader.NextResultAsync(cancellationToken); } } + static readonly IEqualityComparer<(string, string)> SchemaNameComparer = + new SchemaNameEqualityComparer(); + + sealed class SchemaNameEqualityComparer : IEqualityComparer<(string, string)> + { + public bool Equals((string, string) x, (string, string) y) => + StringComparer.OrdinalIgnoreCase.Equals(x.Item1, y.Item1) && + StringComparer.OrdinalIgnoreCase.Equals(x.Item2, y.Item2); + + public int GetHashCode((string, string) obj) => + HashCode.Combine( + StringComparer.OrdinalIgnoreCase.GetHashCode(obj.Item1), + StringComparer.OrdinalIgnoreCase.GetHashCode(obj.Item2)); + } + readonly ConcurrentDictionary tableNames = new ConcurrentDictionary(); readonly ConcurrentDictionary tableSizes = new ConcurrentDictionary(); readonly string connectionString; readonly string defaultSchema; + readonly TimeSpan baseDelay; + readonly TimeSpan maxDelay; + TimeSpan currentDelay; readonly ILogger logger; - static readonly TimeSpan QueryDelayInterval = TimeSpan.FromMilliseconds(200); - - const int QueryChunkSize = 10; + static readonly TimeSpan DefaultQueryDelayInterval = TimeSpan.FromMilliseconds(200); } } diff --git a/src/ServiceControl.Transports.SqlServer/ServiceControl.Transports.SqlServer.csproj b/src/ServiceControl.Transports.SqlServer/ServiceControl.Transports.SqlServer.csproj index fb0fd8ff89..9065f6306e 100644 --- a/src/ServiceControl.Transports.SqlServer/ServiceControl.Transports.SqlServer.csproj +++ b/src/ServiceControl.Transports.SqlServer/ServiceControl.Transports.SqlServer.csproj @@ -14,6 +14,10 @@ + + + + diff --git a/src/ServiceControl.Transports.SqlServer/SqlTable.cs b/src/ServiceControl.Transports.SqlServer/SqlTable.cs index 0e629b1762..4924abf6c6 100644 --- a/src/ServiceControl.Transports.SqlServer/SqlTable.cs +++ b/src/ServiceControl.Transports.SqlServer/SqlTable.cs @@ -1,6 +1,8 @@ #nullable enable namespace ServiceControl.Transports.SqlServer { + using System.Collections.Generic; + using System.Linq; class SqlTable { @@ -10,6 +12,13 @@ class SqlTable var unquotedName = NameHelper.Unquote(name); var quotedName = NameHelper.Quote(name); var quotedSchema = NameHelper.Quote(schema); + + // Unquoted identifiers, exposed so the bulk catalog-view query (see QueueLengthProvider) + // can group tables by catalog and match rows from sys.schemas / sys.tables back to the + // tracked tables without parsing the composed full name. + Name = unquotedName; + Schema = unquotedSchema; + Catalog = catalog == null ? null : NameHelper.Unquote(catalog); //HINT: The query approximates queue length value based on max and min // of RowVersion IDENTITY(1,1) column. There are couple of scenarios // that might lead to the approximation being off. More details here: @@ -42,11 +51,56 @@ SELECT isnull(cast(max([RowVersion]) - min([RowVersion]) + 1 AS int), 0) FROM {_ } readonly string _fullTableName; + + // Unquoted identifier parts, used to group/match against the catalog views in the bulk query. + public string Name { get; } + public string Schema { get; } + public string? Catalog { get; } + + // Legacy per-table length query. Retained as a documented fallback and for comparison; + // the default code path now uses the single bulk catalog-view query in QueueLengthProvider. public string LengthQuery { get; } public override string ToString() => _fullTableName; + // Builds a SINGLE query that returns the (approximate) length of every supplied table in one + // catalog, read entirely from the system catalog views (sys.partitions/sys.tables/sys.schemas). + // + // Why this is better than the per-table LengthQuery: + // * One statement covers N queues instead of N statements (the customer in case 00105882 saw + // thousands of statements/min; this collapses them to one per catalog per poll). + // * sys.partitions reports the row count maintained by the engine, so it never reads, scans or + // locks the queue tables themselves — the IF EXISTS guard and the max-min RowVersion scan are + // both gone. Metadata visibility means SELECT permission on the queue tables is enough; no + // VIEW DATABASE STATE is required (unlike the dm_db_partition_stats DMV). + // * The queue tables are heaps (index_id 0) with non-clustered indexes only, so index_id IN (0,1) + // yields the table's row count. + // + // The result is still an approximation — comparable to the existing max-min(RowVersion) estimate, + // which itself over-counts identity gaps — which is acceptable for the queue-length monitoring graph. + public static string BuildBulkLengthQuery(string? catalog, IReadOnlyCollection tables) + { + var prefix = catalog == null ? string.Empty : $"{NameHelper.Quote(catalog)}."; + + var predicate = string.Join( + "\n OR ", + tables.Select(t => $"(s.name = '{Escape(t.Schema)}' AND t.name = '{Escape(t.Name)}')")); + + return $""" + SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED; + SELECT s.name AS TableSchema, t.name AS TableName, SUM(p.rows) AS [RowCount] + FROM {prefix}sys.partitions p + INNER JOIN {prefix}sys.tables t ON t.object_id = p.object_id + INNER JOIN {prefix}sys.schemas s ON s.schema_id = t.schema_id + WHERE p.index_id IN (0, 1) + AND ({predicate}) + GROUP BY s.name, t.name; + """; + } + + static string Escape(string identifier) => identifier.Replace("'", "''"); + public static SqlTable Parse(string address, string defaultSchema) { var parts = address.Split('@'); From 6317004f677bdddf4e1b8c40529ad6627270dbe8 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Thu, 25 Jun 2026 10:19:14 +0200 Subject: [PATCH 2/3] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Address=20PR=20review:?= =?UTF-8?q?=20Unquoted*=20naming,=20primary=20ctor,=20concurrent=20pacing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SqlTable: rename Name/Schema/Catalog to Unquoted* to make the quoted-vs-unquoted contract explicit (review feedback), and convert to a primary constructor. The now-dead per-table LengthQuery's duplicated if/else is collapsed into BuildFullTableName/BuildLengthQuery helpers. QueueLengthProvider: pace the poll interval concurrently with the query so the effective cadence is max(interval, queryDuration) instead of interval + queryDuration. The additive query-time term was what let the cadence drift past the 1s monitoring bucket and starve buckets of samples (#4556 false-zero sawtooth). With drift removed, restore sane defaults — base 1s (matches the finest monitoring bucket) ramping to a 10s idle ceiling — superseding the #4557 200ms oversampling workaround. Adaptive back-off is now ON by default. --- .../ConnectionStringExtensions.cs | 3 +- .../QueueLengthProvider.cs | 33 ++++++-- .../SqlTable.cs | 80 ++++++++----------- 3 files changed, 59 insertions(+), 57 deletions(-) diff --git a/src/ServiceControl.Transports.SqlServer/ConnectionStringExtensions.cs b/src/ServiceControl.Transports.SqlServer/ConnectionStringExtensions.cs index 539864df64..296b59b987 100644 --- a/src/ServiceControl.Transports.SqlServer/ConnectionStringExtensions.cs +++ b/src/ServiceControl.Transports.SqlServer/ConnectionStringExtensions.cs @@ -20,7 +20,8 @@ public static string RemoveQueueLengthQueryDelayInterval(this string connectionS connectionString.RemoveIntervalMilliseconds(queueLengthQueryDelayInterval, out interval); // Extracts the optional 'QueueLengthQueryMaxDelayInterval' (milliseconds) — the upper bound the adaptive - // back-off ramps to while every monitored queue is empty. Equal to the base interval => back-off disabled. + // back-off ramps to while every monitored queue is empty. When omitted a default ceiling applies; set it + // equal to the base interval to disable back-off. public static string RemoveQueueLengthQueryMaxDelayInterval(this string connectionString, out TimeSpan? interval) => connectionString.RemoveIntervalMilliseconds(queueLengthQueryMaxDelayInterval, out interval); diff --git a/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs b/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs index a8b3659588..66679e59b7 100644 --- a/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs +++ b/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs @@ -19,9 +19,14 @@ public QueueLengthProvider(TransportSettings settings, Action adaptive back-off OFF unless an operator - // opts in with a larger QueueLengthQueryMaxDelayInterval. Never let it fall below the base. - maxDelay = configuredMaxInterval is { } max && max > baseDelay ? max : baseDelay; + // Adaptive back-off is ON by default: while every monitored queue is idle the cadence ramps from + // the base interval up to this ceiling. An operator can widen or effectively disable it (set equal + // to the base) via QueueLengthQueryMaxDelayInterval. Never let it fall below the base. + maxDelay = configuredMaxInterval ?? DefaultQueryMaxDelayInterval; + if (maxDelay < baseDelay) + { + maxDelay = baseDelay; + } currentDelay = baseDelay; defaultSchema = customSchema ?? "dbo"; @@ -53,7 +58,12 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { - await Task.Delay(currentDelay, stoppingToken); + // Pace concurrently with the query so the effective cadence is max(interval, queryDuration) + // instead of interval + queryDuration. The additive query-time term is what let the actual + // cadence drift past the 1s monitoring bucket and starve buckets of samples, producing the + // #4556 false-zero "sawtooth". Starting the timer before awaiting the query is what makes + // them overlap; a slow query (> interval) simply paces at its own duration with no catch-up. + var pacing = Task.Delay(currentDelay, stoppingToken); await QueryTableSizes(stoppingToken); @@ -62,9 +72,12 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) // Adapt the cadence: full speed while any queue has work, exponential back-off while the // whole system is idle. Backing off only when EVERY queue is empty keeps the fix for // issue #4556 intact — the false-zero "sawtooth" only affects non-empty queues, and those - // are always sampled at the base interval here. + // are always sampled at the base interval here. The reassignment applies next iteration; + // the already-started pacing task captured the previous interval. var maxObservedLength = tableSizes.IsEmpty ? 0 : tableSizes.Values.Max(); currentDelay = NextDelay(currentDelay, baseDelay, maxDelay, maxObservedLength); + + await pacing; } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { @@ -117,7 +130,7 @@ async Task QueryTableSizes(CancellationToken cancellationToken) // sys.partitions is per-database, so group the tracked tables by catalog and issue one // bulk query per distinct catalog. In the common single-catalog setup this is ONE query // per poll for the whole system, regardless of how many endpoints are monitored. - var byCatalog = tableSizes.Keys.GroupBy(t => t.Catalog); + var byCatalog = tableSizes.Keys.GroupBy(t => t.UnquotedCatalog); await using var connection = new SqlConnection(connectionString); await connection.OpenAsync(cancellationToken); @@ -150,7 +163,7 @@ async Task QueryCatalog(SqlConnection connection, string catalog, SqlTable[] tab foreach (var table in tables) { - if (results.TryGetValue((table.Schema, table.Name), out var length)) + if (results.TryGetValue((table.UnquotedSchema, table.UnquotedName), out var length)) { tableSizes[table] = length; } @@ -188,6 +201,10 @@ public int GetHashCode((string, string) obj) => readonly ILogger logger; - static readonly TimeSpan DefaultQueryDelayInterval = TimeSpan.FromMilliseconds(200); + // Base interval matches the finest monitoring bucket (1-minute history / 60 = 1s per point). With the + // concurrent pacing above this yields ~one sample per bucket, so the old 200ms oversampling workaround + // for #4556 is no longer needed. + static readonly TimeSpan DefaultQueryDelayInterval = TimeSpan.FromSeconds(1); + static readonly TimeSpan DefaultQueryMaxDelayInterval = TimeSpan.FromSeconds(10); } } diff --git a/src/ServiceControl.Transports.SqlServer/SqlTable.cs b/src/ServiceControl.Transports.SqlServer/SqlTable.cs index 4924abf6c6..e6e41a1012 100644 --- a/src/ServiceControl.Transports.SqlServer/SqlTable.cs +++ b/src/ServiceControl.Transports.SqlServer/SqlTable.cs @@ -4,62 +4,46 @@ namespace ServiceControl.Transports.SqlServer using System.Collections.Generic; using System.Linq; - class SqlTable + class SqlTable(string name, string schema, string? catalog) { - SqlTable(string name, string schema, string? catalog) + // Unquoted identifier parts, exposed so the bulk catalog-view query (see QueueLengthProvider) + // can group tables by catalog and match rows from sys.schemas / sys.tables back to the + // tracked tables without parsing the composed full name. + public string UnquotedName { get; } = NameHelper.Unquote(name); + public string UnquotedSchema { get; } = NameHelper.Unquote(schema); + public string? UnquotedCatalog { get; } = catalog == null ? null : NameHelper.Unquote(catalog); + + readonly string _fullTableName = BuildFullTableName(name, schema, catalog); + + // Legacy per-table length query. Retained as a documented fallback and for comparison; + // the default code path now uses the single bulk catalog-view query in QueueLengthProvider. + public string LengthQuery { get; } = BuildLengthQuery(name, schema, catalog); + + static string BuildFullTableName(string name, string schema, string? catalog) => + catalog == null + ? $"{NameHelper.Quote(schema)}.{NameHelper.Quote(name)}" + : $"{NameHelper.Quote(catalog)}.{NameHelper.Quote(schema)}.{NameHelper.Quote(name)}"; + + static string BuildLengthQuery(string name, string schema, string? catalog) { - var unquotedSchema = NameHelper.Unquote(schema); - var unquotedName = NameHelper.Unquote(name); - var quotedName = NameHelper.Quote(name); - var quotedSchema = NameHelper.Quote(schema); - - // Unquoted identifiers, exposed so the bulk catalog-view query (see QueueLengthProvider) - // can group tables by catalog and match rows from sys.schemas / sys.tables back to the - // tracked tables without parsing the composed full name. - Name = unquotedName; - Schema = unquotedSchema; - Catalog = catalog == null ? null : NameHelper.Unquote(catalog); //HINT: The query approximates queue length value based on max and min // of RowVersion IDENTITY(1,1) column. There are couple of scenarios // that might lead to the approximation being off. More details here: // https://docs.microsoft.com/en-us/sql/t-sql/statements/create-table-transact-sql-identity-property?view=sql-server-ver15#remarks // // Min and Max values return NULL when no rows are found. - if (catalog == null) - { - _fullTableName = $"{quotedSchema}.{quotedName}"; - - LengthQuery = $""" - IF (EXISTS (SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{unquotedSchema}' AND TABLE_NAME = '{unquotedName}')) - SELECT isnull(cast(max([RowVersion]) - min([RowVersion]) + 1 AS int), 0) FROM {_fullTableName} WITH (nolock) - ELSE - SELECT -1; - """; - } - else - { - var quotedCatalog = NameHelper.Quote(catalog); - _fullTableName = $"{quotedCatalog}.{quotedSchema}.{quotedName}"; - - LengthQuery = $""" - IF (EXISTS (SELECT TABLE_NAME FROM {quotedCatalog}.INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{unquotedSchema}' AND TABLE_NAME = '{unquotedName}')) - SELECT isnull(cast(max([RowVersion]) - min([RowVersion]) + 1 AS int), 0) FROM {_fullTableName} WITH (nolock) - ELSE - SELECT -1; - """; - } - } - - readonly string _fullTableName; - - // Unquoted identifier parts, used to group/match against the catalog views in the bulk query. - public string Name { get; } - public string Schema { get; } - public string? Catalog { get; } + var unquotedSchema = NameHelper.Unquote(schema); + var unquotedName = NameHelper.Unquote(name); + var fullTableName = BuildFullTableName(name, schema, catalog); + var catalogPrefix = catalog == null ? string.Empty : $"{NameHelper.Quote(catalog)}."; - // Legacy per-table length query. Retained as a documented fallback and for comparison; - // the default code path now uses the single bulk catalog-view query in QueueLengthProvider. - public string LengthQuery { get; } + return $""" + IF (EXISTS (SELECT TABLE_NAME FROM {catalogPrefix}INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{unquotedSchema}' AND TABLE_NAME = '{unquotedName}')) + SELECT isnull(cast(max([RowVersion]) - min([RowVersion]) + 1 AS int), 0) FROM {fullTableName} WITH (nolock) + ELSE + SELECT -1; + """; + } public override string ToString() => _fullTableName; @@ -85,7 +69,7 @@ public static string BuildBulkLengthQuery(string? catalog, IReadOnlyCollection $"(s.name = '{Escape(t.Schema)}' AND t.name = '{Escape(t.Name)}')")); + tables.Select(t => $"(s.name = '{Escape(t.UnquotedSchema)}' AND t.name = '{Escape(t.UnquotedName)}')")); return $""" SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED; From a03711bdb0c862b5af91576c988f16e55af85c86 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Fri, 26 Jun 2026 10:13:27 +0200 Subject: [PATCH 3/3] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Address=20review:=20th?= =?UTF-8?q?rottle=20error=20loop,=20immediate=20back-off=20snap-back,=20NO?= =?UTF-8?q?LOCK,=20validation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Pace after the query (subtracting iteration time) instead of pre-starting Task.Delay: a fast persistent failure no longer spins the loop with zero delay, and a back-off snap-back to base takes effect on the current wait instead of one backed-off wait later. - Reject non-positive QueueLengthQuery(Max)DelayInterval up front (a negative value previously threw inside Task.Delay mid-loop; zero meant no pacing). - Record 0 for a not-found table so a dropped queue no longer pins the cadence at base. - Statement-scoped WITH (NOLOCK) on the catalog views instead of a connection-scoped SET TRANSACTION ISOLATION LEVEL that would persist on the pooled connection. - Guard BuildBulkLengthQuery against an empty table set (would emit invalid `AND ()`). - Remove now-dead per-table LengthQuery/BuildLengthQuery. --- .../QueueLengthQueryTests.cs | 38 +++++++++++++- .../ConnectionStringExtensions.cs | 4 +- .../QueueLengthProvider.cs | 50 ++++++++++++++----- .../SqlTable.cs | 49 ++++++------------ 4 files changed, 91 insertions(+), 50 deletions(-) diff --git a/src/ServiceControl.Transports.SqlServer.Tests/QueueLengthQueryTests.cs b/src/ServiceControl.Transports.SqlServer.Tests/QueueLengthQueryTests.cs index 15f178537b..b274eab39a 100644 --- a/src/ServiceControl.Transports.SqlServer.Tests/QueueLengthQueryTests.cs +++ b/src/ServiceControl.Transports.SqlServer.Tests/QueueLengthQueryTests.cs @@ -25,14 +25,30 @@ public void BuildBulkLengthQuery_emits_a_single_catalog_view_query_for_all_table Assert.That(query, Does.Not.Contain("INFORMATION_SCHEMA")); Assert.That(query, Does.Not.Contain("RowVersion")); + // Dirty reads are scoped to this statement via NOLOCK hints, not a connection-scoped SET that + // would leak READ UNCOMMITTED onto the pooled connection. + Assert.That(query, Does.Contain("WITH (NOLOCK)")); + Assert.That(query, Does.Not.Contain("ISOLATION LEVEL")); + // Both tracked tables are covered by the single query. Assert.That(query, Does.Contain("t.name = 'Sales'")); Assert.That(query, Does.Contain("t.name = 'Billing'")); - // Exactly one SELECT that returns rows (the leading SET is not a result-producing statement). + // Exactly one result-producing statement. Assert.That(System.Text.RegularExpressions.Regex.Matches(query, "SELECT").Count, Is.EqualTo(1)); } + [Test] + public void BuildBulkLengthQuery_with_no_tables_stays_valid_and_returns_no_rows() + { + var query = SqlTable.BuildBulkLengthQuery(catalog: null, []); + + // No predicate terms must not produce the invalid `AND ()`; a constant-false predicate keeps the + // query syntactically valid while matching nothing. + Assert.That(query, Does.Not.Contain("AND ()")); + Assert.That(query, Does.Contain("1 = 0")); + } + [Test] public void BuildBulkLengthQuery_prefixes_the_catalog_when_supplied() { @@ -77,6 +93,26 @@ public void RemoveQueueLengthQueryDelayInterval_throws_on_non_numeric_value() Throws.Exception.With.Message.Contains("QueueLengthQueryDelayInterval")); } + [Test] + public void RemoveQueueLengthQueryDelayInterval_throws_on_negative_value() + { + const string connectionString = "Data Source=.;Initial Catalog=nsb;QueueLengthQueryDelayInterval=-5"; + + // A negative interval would flow through to Task.Delay and throw mid-loop; reject it up front. + Assert.That(() => connectionString.RemoveQueueLengthQueryDelayInterval(out _), + Throws.Exception.With.Message.Contains("QueueLengthQueryDelayInterval")); + } + + [Test] + public void RemoveQueueLengthQueryDelayInterval_throws_on_zero_value() + { + const string connectionString = "Data Source=.;Initial Catalog=nsb;QueueLengthQueryDelayInterval=0"; + + // Zero would poll with no pacing at all; reject it. + Assert.That(() => connectionString.RemoveQueueLengthQueryDelayInterval(out _), + Throws.Exception.With.Message.Contains("QueueLengthQueryDelayInterval")); + } + [Test] public void RemoveQueueLengthQueryMaxDelayInterval_parses_and_strips_the_custom_part() { diff --git a/src/ServiceControl.Transports.SqlServer/ConnectionStringExtensions.cs b/src/ServiceControl.Transports.SqlServer/ConnectionStringExtensions.cs index 296b59b987..b96b85aa59 100644 --- a/src/ServiceControl.Transports.SqlServer/ConnectionStringExtensions.cs +++ b/src/ServiceControl.Transports.SqlServer/ConnectionStringExtensions.cs @@ -33,9 +33,9 @@ static string RemoveIntervalMilliseconds(this string connectionString, string ke if (builder.TryGetValue(key, out var value)) { - if (!int.TryParse(value.ToString(), out var milliseconds)) + if (!int.TryParse(value.ToString(), out var milliseconds) || milliseconds <= 0) { - throw new Exception($"Can't parse '{value}' as a valid {key} (expected an integer number of milliseconds)."); + throw new Exception($"Can't parse '{value}' as a valid {key} (expected a positive integer number of milliseconds)."); } interval = TimeSpan.FromMilliseconds(milliseconds); diff --git a/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs b/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs index 66679e59b7..b2cdfd653f 100644 --- a/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs +++ b/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; + using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -56,15 +57,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { + // Time the whole iteration so the pacing delay below can subtract the work already done this + // cycle. The effective cadence is then max(currentDelay, iterationDuration) — the query + // overlaps the wait instead of being added to it. That additive query-time term is what let + // the cadence drift past the 1s monitoring bucket and starve buckets of samples, producing the + // #4556 false-zero "sawtooth"; a slow query (> interval) simply paces at its own duration. + var iterationStart = Stopwatch.GetTimestamp(); + try { - // Pace concurrently with the query so the effective cadence is max(interval, queryDuration) - // instead of interval + queryDuration. The additive query-time term is what let the actual - // cadence drift past the 1s monitoring bucket and starve buckets of samples, producing the - // #4556 false-zero "sawtooth". Starting the timer before awaiting the query is what makes - // them overlap; a slow query (> interval) simply paces at its own duration with no catch-up. - var pacing = Task.Delay(currentDelay, stoppingToken); - await QueryTableSizes(stoppingToken); UpdateQueueLengthStore(); @@ -72,21 +73,37 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) // Adapt the cadence: full speed while any queue has work, exponential back-off while the // whole system is idle. Backing off only when EVERY queue is empty keeps the fix for // issue #4556 intact — the false-zero "sawtooth" only affects non-empty queues, and those - // are always sampled at the base interval here. The reassignment applies next iteration; - // the already-started pacing task captured the previous interval. + // are always sampled at the base interval here. Computed after the query (not before) so a + // snap-back to the base interval takes effect on THIS iteration's pacing rather than one + // backed-off wait later. var maxObservedLength = tableSizes.IsEmpty ? 0 : tableSizes.Values.Max(); currentDelay = NextDelay(currentDelay, baseDelay, maxDelay, maxObservedLength); - - await pacing; } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { - // no-op + break; } catch (Exception e) { logger.LogError(e, "Error querying sql queue sizes"); } + + // Pace AFTER the try so a failed query is still throttled by the interval. Otherwise a fast, + // persistent failure (bad login, denied SELECT) would spin the loop with no delay, hammering + // the server and flooding the log. currentDelay is unchanged on the failure path, so the last + // good cadence is reused. + var remaining = currentDelay - Stopwatch.GetElapsedTime(iterationStart); + if (remaining > TimeSpan.Zero) + { + try + { + await Task.Delay(remaining, stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + } } } @@ -165,11 +182,18 @@ async Task QueryCatalog(SqlConnection connection, string catalog, SqlTable[] tab { if (results.TryGetValue((table.UnquotedSchema, table.UnquotedName), out var length)) { + // Indexer rather than TryUpdate: record the freshly read length unconditionally. A + // concurrent TrackEndpointInputQueue that removed this key could be resurrected here, but + // there is no untrack path today so the entry would persist regardless — kept simple. tableSizes[table] = length; } else { - // No catalog row for this table -> the queue table does not (yet) exist. + // No catalog row for this table -> the queue table does not (yet) exist. Record 0 rather + // than leaving a stale value: maxObservedLength (the back-off trigger) is a Max over + // tableSizes, so a lingering non-zero here would pin the cadence at the base interval and + // permanently defeat the adaptive back-off once a tracked queue is dropped. + tableSizes[table] = 0; logger.LogWarning("Table {TableName} does not exist", table); } } diff --git a/src/ServiceControl.Transports.SqlServer/SqlTable.cs b/src/ServiceControl.Transports.SqlServer/SqlTable.cs index e6e41a1012..70cc88dd2e 100644 --- a/src/ServiceControl.Transports.SqlServer/SqlTable.cs +++ b/src/ServiceControl.Transports.SqlServer/SqlTable.cs @@ -15,43 +15,18 @@ class SqlTable(string name, string schema, string? catalog) readonly string _fullTableName = BuildFullTableName(name, schema, catalog); - // Legacy per-table length query. Retained as a documented fallback and for comparison; - // the default code path now uses the single bulk catalog-view query in QueueLengthProvider. - public string LengthQuery { get; } = BuildLengthQuery(name, schema, catalog); - static string BuildFullTableName(string name, string schema, string? catalog) => catalog == null ? $"{NameHelper.Quote(schema)}.{NameHelper.Quote(name)}" : $"{NameHelper.Quote(catalog)}.{NameHelper.Quote(schema)}.{NameHelper.Quote(name)}"; - static string BuildLengthQuery(string name, string schema, string? catalog) - { - //HINT: The query approximates queue length value based on max and min - // of RowVersion IDENTITY(1,1) column. There are couple of scenarios - // that might lead to the approximation being off. More details here: - // https://docs.microsoft.com/en-us/sql/t-sql/statements/create-table-transact-sql-identity-property?view=sql-server-ver15#remarks - // - // Min and Max values return NULL when no rows are found. - var unquotedSchema = NameHelper.Unquote(schema); - var unquotedName = NameHelper.Unquote(name); - var fullTableName = BuildFullTableName(name, schema, catalog); - var catalogPrefix = catalog == null ? string.Empty : $"{NameHelper.Quote(catalog)}."; - - return $""" - IF (EXISTS (SELECT TABLE_NAME FROM {catalogPrefix}INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{unquotedSchema}' AND TABLE_NAME = '{unquotedName}')) - SELECT isnull(cast(max([RowVersion]) - min([RowVersion]) + 1 AS int), 0) FROM {fullTableName} WITH (nolock) - ELSE - SELECT -1; - """; - } - public override string ToString() => _fullTableName; // Builds a SINGLE query that returns the (approximate) length of every supplied table in one // catalog, read entirely from the system catalog views (sys.partitions/sys.tables/sys.schemas). // - // Why this is better than the per-table LengthQuery: + // Why this is better than the previous per-table IF EXISTS + max-min(RowVersion) probe: // * One statement covers N queues instead of N statements (the customer in case 00105882 saw // thousands of statements/min; this collapses them to one per catalog per poll). // * sys.partitions reports the row count maintained by the engine, so it never reads, scans or @@ -67,16 +42,22 @@ public static string BuildBulkLengthQuery(string? catalog, IReadOnlyCollection $"(s.name = '{Escape(t.UnquotedSchema)}' AND t.name = '{Escape(t.UnquotedName)}')")); - + // With no tables the predicate would be empty, producing the invalid `AND ()`. Emit a + // constant-false predicate instead so the query stays valid and simply returns no rows. + var predicate = tables.Count == 0 + ? "1 = 0" + : string.Join( + "\n OR ", + tables.Select(t => $"(s.name = '{Escape(t.UnquotedSchema)}' AND t.name = '{Escape(t.UnquotedName)}')")); + + // NOLOCK hints (statement-scoped) rather than SET TRANSACTION ISOLATION LEVEL READ + // UNCOMMITTED: the latter is connection-scoped and would persist on the pooled physical + // connection. The hint keeps the dirty-read scope on this single catalog-view read. return $""" - SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED; SELECT s.name AS TableSchema, t.name AS TableName, SUM(p.rows) AS [RowCount] - FROM {prefix}sys.partitions p - INNER JOIN {prefix}sys.tables t ON t.object_id = p.object_id - INNER JOIN {prefix}sys.schemas s ON s.schema_id = t.schema_id + FROM {prefix}sys.partitions p WITH (NOLOCK) + INNER JOIN {prefix}sys.tables t WITH (NOLOCK) ON t.object_id = p.object_id + INNER JOIN {prefix}sys.schemas s WITH (NOLOCK) ON s.schema_id = t.schema_id WHERE p.index_id IN (0, 1) AND ({predicate}) GROUP BY s.name, t.name;