Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/ServiceControl.Transports.RabbitMQ/NoOpQueueLengthProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace ServiceControl.Transports.RabbitMQ
{
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

// Used when DisableBrokerRequirementChecks=true and the RabbitMQ Management API is not available
sealed class NoOpQueueLengthProvider(ILogger<NoOpQueueLengthProvider> logger) : IProvideQueueLength
{
public void TrackEndpointInputQueue(EndpointToQueueMapping queueToTrack)
{
}

public Task StartAsync(CancellationToken cancellationToken)
{
logger.LogWarning("Queue length monitoring is disabled because RabbitMQ broker requirement checks are disabled via the connection string. Queue length data will not be available.");
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,24 @@ protected override RabbitMQTransport CreateTransport(TransportSettings transport
}

protected override void AddTransportForPrimaryCore(IServiceCollection services, TransportSettings transportSettings)
=> services.AddSingleton<IBrokerThroughputQuery, RabbitMQQuery>();
{
if (!RabbitMQTransportExtensions.HasBrokerRequirementChecksDisabled(transportSettings.ConnectionString))
{
services.AddSingleton<IBrokerThroughputQuery, RabbitMQQuery>();
}
}

protected sealed override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings)
{
services.AddSingleton<IProvideQueueLength, QueueLengthProvider>();
if (RabbitMQTransportExtensions.HasBrokerRequirementChecksDisabled(transportSettings.ConnectionString))
{
services.AddSingleton<IProvideQueueLength, NoOpQueueLengthProvider>();
}
else
{
services.AddSingleton<IProvideQueueLength, QueueLengthProvider>();
}

services.AddHostedService(provider => provider.GetRequiredService<IProvideQueueLength>());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,24 @@ protected override RabbitMQTransport CreateTransport(TransportSettings transport
}

protected override void AddTransportForPrimaryCore(IServiceCollection services, TransportSettings transportSettings)
=> services.AddSingleton<IBrokerThroughputQuery, RabbitMQQuery>();
{
if (!RabbitMQTransportExtensions.HasBrokerRequirementChecksDisabled(transportSettings.ConnectionString))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if that's false? I mean what uses the IBrokerThroughputQuery and is that dependency optional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
services.AddSingleton<IBrokerThroughputQuery, RabbitMQQuery>();
}
}

protected sealed override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings)
{
services.AddSingleton<IProvideQueueLength, QueueLengthProvider>();
if (RabbitMQTransportExtensions.HasBrokerRequirementChecksDisabled(transportSettings.ConnectionString))
{
services.AddSingleton<IProvideQueueLength, NoOpQueueLengthProvider>();
}
else
{
services.AddSingleton<IProvideQueueLength, QueueLengthProvider>();
}

services.AddHostedService(provider => provider.GetRequiredService<IProvideQueueLength>());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
namespace ServiceControl.Transports.RabbitMQ;
#nullable enable
namespace ServiceControl.Transports.RabbitMQ;

using System;
using System.Collections.Generic;
Expand All @@ -8,17 +9,27 @@

static class RabbitMQTransportExtensions
{
public static bool HasBrokerRequirementChecksDisabled(string connectionString)
{
var dictionary = ParseConnectionString(connectionString);
if (dictionary is null)
{
return false;
}

return dictionary.TryGetValue("DisableBrokerRequirementChecks", out var value)
&& bool.TryParse(value, out var disabled)
&& disabled;
}

public static void ApplySettingsFromConnectionString(this RabbitMQTransport transport, string connectionString)
{
if (connectionString.StartsWith("amqp", StringComparison.OrdinalIgnoreCase))
var dictionary = ParseConnectionString(connectionString);
if (dictionary is null)
{
return;
}

var dictionary = new DbConnectionStringBuilder { ConnectionString = connectionString }
.OfType<KeyValuePair<string, object>>()
.ToDictionary(pair => pair.Key, pair => pair.Value.ToString(), StringComparer.OrdinalIgnoreCase);

if (dictionary.TryGetValue("ValidateDeliveryLimits", out var validateDeliveryLimitsString))
{
_ = bool.TryParse(validateDeliveryLimitsString, out var validateDeliveryLimits);
Expand All @@ -42,5 +53,26 @@ public static void ApplySettingsFromConnectionString(this RabbitMQTransport tran
_ = bool.TryParse(useExternalAuthMechanismString, out var useExternalAuthMechanism);
transport.UseExternalAuthMechanism = useExternalAuthMechanism;
}

if (dictionary.TryGetValue("DisableBrokerRequirementChecks", out var disableBrokerRequirementChecksString)
&& bool.TryParse(disableBrokerRequirementChecksString, out var disableBrokerRequirementChecks)
&& disableBrokerRequirementChecks)
{
transport.DisabledBrokerRequirementChecks =
BrokerRequirementChecks.Version310OrNewer | BrokerRequirementChecks.StreamsEnabled;
transport.ValidateDeliveryLimits = false;
}
}

static Dictionary<string, string?>? ParseConnectionString(string connectionString)
{
if (connectionString.StartsWith("amqp", StringComparison.OrdinalIgnoreCase))
{
return null;
}

return new DbConnectionStringBuilder { ConnectionString = connectionString }
.OfType<KeyValuePair<string, object>>()
.ToDictionary(pair => pair.Key, pair => pair.Value.ToString(), StringComparer.OrdinalIgnoreCase);
}
}