Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions src/ServiceControl.Transports.SqlServer.Tests/QueueLengthQueryTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
namespace ServiceControl.Transport.Tests;

using System;
using System.Linq;
using NUnit.Framework;
using Transports.SqlServer;

[TestFixture]
class QueueLengthQueryTests
{
[Test]
public void BuildBulkLengthQuery_emits_a_single_catalog_view_query_for_all_tables()
{
var tables = new[]
{
SqlTable.Parse("Sales", "dbo"),
SqlTable.Parse("Billing", "dbo"),
};

var query = SqlTable.BuildBulkLengthQuery(catalog: null, tables);

// One statement, read from the catalog views — no per-table IF EXISTS, no scan of the queue tables.
Assert.That(query, Does.Contain("sys.partitions"));
Assert.That(query, Does.Contain("p.index_id IN (0, 1)"));
Assert.That(query, Does.Not.Contain("INFORMATION_SCHEMA"));
Assert.That(query, Does.Not.Contain("RowVersion"));

// Dirty reads are scoped to this statement via NOLOCK hints, not a connection-scoped SET that
// would leak READ UNCOMMITTED onto the pooled connection.
Assert.That(query, Does.Contain("WITH (NOLOCK)"));
Assert.That(query, Does.Not.Contain("ISOLATION LEVEL"));

// Both tracked tables are covered by the single query.
Assert.That(query, Does.Contain("t.name = 'Sales'"));
Assert.That(query, Does.Contain("t.name = 'Billing'"));

// Exactly one result-producing statement.
Assert.That(System.Text.RegularExpressions.Regex.Matches(query, "SELECT").Count, Is.EqualTo(1));
}

[Test]
public void BuildBulkLengthQuery_with_no_tables_stays_valid_and_returns_no_rows()
{
var query = SqlTable.BuildBulkLengthQuery(catalog: null, []);

// No predicate terms must not produce the invalid `AND ()`; a constant-false predicate keeps the
// query syntactically valid while matching nothing.
Assert.That(query, Does.Not.Contain("AND ()"));
Assert.That(query, Does.Contain("1 = 0"));
}

[Test]
public void BuildBulkLengthQuery_prefixes_the_catalog_when_supplied()
{
var tables = new[] { SqlTable.Parse("Sales@dbo@MyCatalog", "dbo") };

var query = SqlTable.BuildBulkLengthQuery(catalog: "MyCatalog", tables);

Assert.That(query, Does.Contain("[MyCatalog].sys.partitions"));
Assert.That(query, Does.Contain("[MyCatalog].sys.tables"));
Assert.That(query, Does.Contain("[MyCatalog].sys.schemas"));
}

[Test]
public void RemoveQueueLengthQueryDelayInterval_parses_and_strips_the_custom_part()
{
const string connectionString = "Data Source=.;Initial Catalog=nsb;Integrated Security=true;QueueLengthQueryDelayInterval=1000";

var cleaned = connectionString.RemoveQueueLengthQueryDelayInterval(out var interval);

Assert.That(interval, Is.EqualTo(TimeSpan.FromSeconds(1)));
// The custom key must be stripped so SqlConnection never sees the unknown keyword.
Assert.That(cleaned, Does.Not.Contain("QueueLengthQueryDelayInterval").IgnoreCase);
Assert.That(cleaned, Does.Contain("Initial Catalog=nsb").IgnoreCase);
}

[Test]
public void RemoveQueueLengthQueryDelayInterval_defaults_to_null_when_absent()
{
const string connectionString = "Data Source=.;Initial Catalog=nsb;Integrated Security=true";

connectionString.RemoveQueueLengthQueryDelayInterval(out var interval);

Assert.That(interval, Is.Null);
}

[Test]
public void RemoveQueueLengthQueryDelayInterval_throws_on_non_numeric_value()
{
const string connectionString = "Data Source=.;Initial Catalog=nsb;QueueLengthQueryDelayInterval=soon";

Assert.That(() => connectionString.RemoveQueueLengthQueryDelayInterval(out _),
Throws.Exception.With.Message.Contains("QueueLengthQueryDelayInterval"));
}

[Test]
public void RemoveQueueLengthQueryDelayInterval_throws_on_negative_value()
{
const string connectionString = "Data Source=.;Initial Catalog=nsb;QueueLengthQueryDelayInterval=-5";

// A negative interval would flow through to Task.Delay and throw mid-loop; reject it up front.
Assert.That(() => connectionString.RemoveQueueLengthQueryDelayInterval(out _),
Throws.Exception.With.Message.Contains("QueueLengthQueryDelayInterval"));
}

[Test]
public void RemoveQueueLengthQueryDelayInterval_throws_on_zero_value()
{
const string connectionString = "Data Source=.;Initial Catalog=nsb;QueueLengthQueryDelayInterval=0";

// Zero would poll with no pacing at all; reject it.
Assert.That(() => connectionString.RemoveQueueLengthQueryDelayInterval(out _),
Throws.Exception.With.Message.Contains("QueueLengthQueryDelayInterval"));
}

[Test]
public void RemoveQueueLengthQueryMaxDelayInterval_parses_and_strips_the_custom_part()
{
const string connectionString = "Data Source=.;Initial Catalog=nsb;QueueLengthQueryMaxDelayInterval=60000";

var cleaned = connectionString.RemoveQueueLengthQueryMaxDelayInterval(out var interval);

Assert.That(interval, Is.EqualTo(TimeSpan.FromSeconds(60)));
Assert.That(cleaned, Does.Not.Contain("QueueLengthQueryMaxDelayInterval").IgnoreCase);
}

static readonly TimeSpan Base = TimeSpan.FromMilliseconds(200);
static readonly TimeSpan Max = TimeSpan.FromSeconds(60);

[Test]
public void NextDelay_snaps_back_to_base_when_any_queue_has_work()
{
// Even fully backed off, a single non-empty queue returns to full speed immediately.
Assert.That(QueueLengthProvider.NextDelay(Max, Base, Max, maxObservedLength: 1), Is.EqualTo(Base));
}

[Test]
public void NextDelay_doubles_while_idle()
{
Assert.That(QueueLengthProvider.NextDelay(Base, Base, Max, maxObservedLength: 0),
Is.EqualTo(TimeSpan.FromMilliseconds(400)));
}

[Test]
public void NextDelay_caps_at_max_while_idle()
{
Assert.That(QueueLengthProvider.NextDelay(TimeSpan.FromSeconds(40), Base, Max, maxObservedLength: 0),
Is.EqualTo(Max));
}

[Test]
public void NextDelay_never_drops_below_base()
{
Assert.That(QueueLengthProvider.NextDelay(TimeSpan.FromMilliseconds(50), Base, Max, maxObservedLength: 0),
Is.EqualTo(Base));
}

[Test]
public void NextDelay_with_max_equal_to_base_disables_backoff()
{
// Operator did not opt in (max == base) -> cadence is constant at base, even while idle.
Assert.That(QueueLengthProvider.NextDelay(Base, Base, Base, maxObservedLength: 0), Is.EqualTo(Base));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace ServiceControl.Transports.SqlServer
{
using System;
using System.Data.Common;

static class ConnectionStringExtensions
Expand All @@ -11,6 +12,39 @@ public static string RemoveCustomConnectionStringParts(this string connectionStr
.RemoveCustomConnectionStringPart(subscriptionsTableName, out subscriptionTable);
}

// Extracts the optional, ServiceControl-specific 'QueueLengthQueryDelayInterval' (milliseconds) from
// the connection string and removes it so it is never handed to SqlConnection (which would reject the
// unknown keyword). Mirrors the existing convention used by the Azure Service Bus transport.
// This is the BASE interval used while any monitored queue has messages.
public static string RemoveQueueLengthQueryDelayInterval(this string connectionString, out TimeSpan? interval) =>
connectionString.RemoveIntervalMilliseconds(queueLengthQueryDelayInterval, out interval);

// Extracts the optional 'QueueLengthQueryMaxDelayInterval' (milliseconds) — the upper bound the adaptive
// back-off ramps to while every monitored queue is empty. When omitted a default ceiling applies; set it
// equal to the base interval to disable back-off.
public static string RemoveQueueLengthQueryMaxDelayInterval(this string connectionString, out TimeSpan? interval) =>
connectionString.RemoveIntervalMilliseconds(queueLengthQueryMaxDelayInterval, out interval);

static string RemoveIntervalMilliseconds(this string connectionString, string key, out TimeSpan? interval)
{
interval = null;

var builder = new DbConnectionStringBuilder { ConnectionString = connectionString };

if (builder.TryGetValue(key, out var value))
{
if (!int.TryParse(value.ToString(), out var milliseconds) || milliseconds <= 0)
{
throw new Exception($"Can't parse '{value}' as a valid {key} (expected a positive integer number of milliseconds).");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new Exception($"Can't parse '{value}' as a valid {key} (expected a positive integer number of milliseconds).");
throw new Exception($"Can't parse '{value}' as a valid {key} (expected an integer number of milliseconds greater than zero).");

}

interval = TimeSpan.FromMilliseconds(milliseconds);
builder.Remove(key);
}

return builder.ConnectionString;
}

public static string RemoveCustomConnectionStringPart(this string connectionString, string partName, out string schema)
{
var builder = new DbConnectionStringBuilder
Expand All @@ -30,5 +64,7 @@ public static string RemoveCustomConnectionStringPart(this string connectionStri

const string queueSchemaName = "Queue Schema";
const string subscriptionsTableName = "Subscriptions Table";
const string queueLengthQueryDelayInterval = "QueueLengthQueryDelayInterval";
const string queueLengthQueryMaxDelayInterval = "QueueLengthQueryMaxDelayInterval";
}
}
Loading
Loading