diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props
index 29f3ae023a..fa44b13aa2 100644
--- a/src/Directory.Packages.props
+++ b/src/Directory.Packages.props
@@ -9,6 +9,7 @@
+
@@ -20,6 +21,10 @@
+
+
+
+
@@ -52,6 +57,7 @@
+
@@ -92,4 +98,4 @@
-
+
\ No newline at end of file
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/.editorconfig b/src/ServiceControl.Audit.Persistence.Sql.Core/.editorconfig
new file mode 100644
index 0000000000..fc68ac3228
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/.editorconfig
@@ -0,0 +1,9 @@
+[*.cs]
+
+# Justification: ServiceControl app has no synchronization context
+dotnet_diagnostic.CA2007.severity = none
+
+# Disable style rules for auto-generated EF migrations
+[Migrations/**.cs]
+dotnet_diagnostic.IDE0065.severity = none
+generated_code = true
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/AuditSqlPersisterSettings.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/AuditSqlPersisterSettings.cs
new file mode 100644
index 0000000000..7893f9ecc3
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/AuditSqlPersisterSettings.cs
@@ -0,0 +1,18 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Abstractions;
+
+public abstract class AuditSqlPersisterSettings : PersistenceSettings
+{
+ protected AuditSqlPersisterSettings(
+ TimeSpan auditRetentionPeriod,
+ bool enableFullTextSearchOnBodies,
+ int maxBodySizeToStore)
+ : base(auditRetentionPeriod, enableFullTextSearchOnBodies, maxBodySizeToStore)
+ {
+ }
+
+ public required string ConnectionString { get; set; }
+ public int CommandTimeout { get; set; } = 30;
+ public bool EnableSensitiveDataLogging { get; set; } = false;
+ public int MinBodySizeForCompression { get; set; } = 4096;
+ public bool StoreMessageBodiesOnDisk { get; set; } = true;
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/BaseAuditPersistence.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/BaseAuditPersistence.cs
new file mode 100644
index 0000000000..da04f5f6a4
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/BaseAuditPersistence.cs
@@ -0,0 +1,30 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Abstractions;
+
+using Azure.Storage.Blobs;
+using Implementation;
+using Implementation.UnitOfWork;
+using Infrastructure;
+using Microsoft.Extensions.DependencyInjection;
+using ServiceControl.Audit.Auditing.BodyStorage;
+using ServiceControl.Audit.Persistence.UnitOfWork;
+
+public abstract class BaseAuditPersistence
+{
+ protected static void RegisterDataStores(IServiceCollection services, AuditSqlPersisterSettings settings)
+ {
+ services.AddSingleton();
+ if (!string.IsNullOrEmpty(settings.MessageBodyStoragePath))
+ {
+ services.AddSingleton();
+ }
+ else
+ {
+ services.AddSingleton();
+ }
+ services.AddScoped();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton(TimeProvider.System);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/IAuditDatabaseMigrator.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/IAuditDatabaseMigrator.cs
new file mode 100644
index 0000000000..e987af8f5b
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/IAuditDatabaseMigrator.cs
@@ -0,0 +1,6 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Abstractions;
+
+public interface IAuditDatabaseMigrator
+{
+ Task ApplyMigrations(CancellationToken cancellationToken = default);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/MinimumRequiredStorageState.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/MinimumRequiredStorageState.cs
new file mode 100644
index 0000000000..72fd05b84a
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Abstractions/MinimumRequiredStorageState.cs
@@ -0,0 +1,6 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Abstractions;
+
+public class MinimumRequiredStorageState
+{
+ public bool CanIngestMore { get; set; } = true;
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/DbContexts/AuditDbContextBase.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/DbContexts/AuditDbContextBase.cs
new file mode 100644
index 0000000000..36211fd95e
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/DbContexts/AuditDbContextBase.cs
@@ -0,0 +1,40 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.DbContexts;
+
+using Entities;
+using EntityConfigurations;
+using Microsoft.EntityFrameworkCore;
+
+public abstract class AuditDbContextBase : DbContext
+{
+ protected AuditDbContextBase(DbContextOptions options) : base(options)
+ {
+ }
+
+ public DbSet ProcessedMessages { get; set; }
+ public DbSet FailedAuditImports { get; set; }
+ public DbSet SagaSnapshots { get; set; }
+ public DbSet KnownEndpoints { get; set; }
+ public DbSet KnownEndpointsInsertOnly { get; set; }
+
+ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
+ {
+ optionsBuilder.EnableDetailedErrors();
+ }
+
+ protected override void OnModelCreating(ModelBuilder modelBuilder)
+ {
+ base.OnModelCreating(modelBuilder);
+
+ modelBuilder.ApplyConfiguration(new ProcessedMessageConfiguration());
+ modelBuilder.ApplyConfiguration(new FailedAuditImportConfiguration());
+ modelBuilder.ApplyConfiguration(new SagaSnapshotConfiguration());
+ modelBuilder.ApplyConfiguration(new KnownEndpointConfiguration());
+ modelBuilder.ApplyConfiguration(new KnownEndpointInsertOnlyConfiguration());
+
+ OnModelCreatingProvider(modelBuilder);
+ }
+
+ protected virtual void OnModelCreatingProvider(ModelBuilder modelBuilder)
+ {
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/FailedAuditImportEntity.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/FailedAuditImportEntity.cs
new file mode 100644
index 0000000000..8292a48cf2
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/FailedAuditImportEntity.cs
@@ -0,0 +1,8 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Entities;
+
+public class FailedAuditImportEntity
+{
+ public Guid Id { get; set; }
+ public string MessageJson { get; set; } = null!;
+ public string? ExceptionInfo { get; set; }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/KnownEndpointEntity.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/KnownEndpointEntity.cs
new file mode 100644
index 0000000000..21ef329ca7
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/KnownEndpointEntity.cs
@@ -0,0 +1,13 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Entities;
+
+public class KnownEndpointEntity
+{
+ public Guid Id { get; set; }
+ public string? Name { get; set; }
+
+ public Guid HostId { get; set; }
+
+ public string? Host { get; set; }
+
+ public DateTime LastSeen { get; set; }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/KnownEndpointInsertOnlyEntity.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/KnownEndpointInsertOnlyEntity.cs
new file mode 100644
index 0000000000..0f6fb46c5c
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/KnownEndpointInsertOnlyEntity.cs
@@ -0,0 +1,15 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Entities;
+
+public class KnownEndpointInsertOnlyEntity
+{
+ public long Id { get; set; }
+ public Guid KnownEndpointId { get; set; }
+
+ public string? Name { get; set; }
+
+ public Guid HostId { get; set; }
+
+ public string? Host { get; set; }
+
+ public DateTime LastSeen { get; set; }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/ProcessedMessageEntity.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/ProcessedMessageEntity.cs
new file mode 100644
index 0000000000..cecc8d6715
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/ProcessedMessageEntity.cs
@@ -0,0 +1,35 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Entities;
+
+public class ProcessedMessageEntity
+{
+ public long Id { get; set; }
+ public string UniqueMessageId { get; set; } = null!;
+
+ // JSON columns for complex nested data
+ public string HeadersJson { get; set; } = null!;
+
+ // Full-text search column (combines headers JSON and message body for indexing)
+ public string? SearchableContent { get; set; }
+
+ // Denormalized fields for efficient querying
+ public string? MessageId { get; set; }
+ public string? MessageType { get; set; }
+ public DateTime? TimeSent { get; set; }
+ public DateTime CreatedOn { get; set; }
+ public bool IsSystemMessage { get; set; }
+ public int Status { get; set; }
+ public string? ConversationId { get; set; }
+
+ // Endpoint details (denormalized from MessageMetadata)
+ public string? ReceivingEndpointName { get; set; }
+
+ // Performance metrics (stored as ticks for precision)
+ public long? CriticalTimeTicks { get; set; }
+ public long? ProcessingTimeTicks { get; set; }
+ public long? DeliveryTimeTicks { get; set; }
+
+ // Body storage info
+ public int BodySize { get; set; }
+ public string? BodyUrl { get; set; }
+ public bool BodyNotStored { get; set; }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/SagaSnapshotEntity.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/SagaSnapshotEntity.cs
new file mode 100644
index 0000000000..4b158572bb
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Entities/SagaSnapshotEntity.cs
@@ -0,0 +1,18 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Entities;
+
+using ServiceControl.SagaAudit;
+
+public class SagaSnapshotEntity
+{
+ public long Id { get; set; }
+ public Guid SagaId { get; set; }
+ public string? SagaType { get; set; }
+ public DateTime StartTime { get; set; }
+ public DateTime FinishTime { get; set; }
+ public SagaStateChangeStatus Status { get; set; }
+ public string? StateAfterChange { get; set; }
+ public string? InitiatingMessageJson { get; set; }
+ public string? OutgoingMessagesJson { get; set; }
+ public string? Endpoint { get; set; }
+ public DateTime CreatedOn { get; set; }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/FailedAuditImportConfiguration.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/FailedAuditImportConfiguration.cs
new file mode 100644
index 0000000000..21f1cca11f
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/FailedAuditImportConfiguration.cs
@@ -0,0 +1,16 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class FailedAuditImportConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("FailedAuditImports");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id).IsRequired();
+ builder.Property(e => e.MessageJson).IsRequired();
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/KnownEndpointConfiguration.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/KnownEndpointConfiguration.cs
new file mode 100644
index 0000000000..de2253de2d
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/KnownEndpointConfiguration.cs
@@ -0,0 +1,21 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class KnownEndpointConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("KnownEndpoints");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id).ValueGeneratedNever();
+ builder.Property(e => e.Name).IsRequired();
+ builder.Property(e => e.HostId).IsRequired();
+ builder.Property(e => e.Host).IsRequired();
+ builder.Property(e => e.LastSeen).IsRequired();
+
+ builder.HasIndex(e => e.LastSeen);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/KnownEndpointInsertOnlyConfiguration.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/KnownEndpointInsertOnlyConfiguration.cs
new file mode 100644
index 0000000000..e2d10ff6ab
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/KnownEndpointInsertOnlyConfiguration.cs
@@ -0,0 +1,23 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class KnownEndpointInsertOnlyConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("KnownEndpointsInsertOnly");
+ builder.HasKey(e => e.Id);
+ builder.Property(e => e.Id).ValueGeneratedOnAdd();
+ builder.Property(e => e.KnownEndpointId).IsRequired();
+ builder.Property(e => e.Name).IsRequired();
+ builder.Property(e => e.HostId).IsRequired();
+ builder.Property(e => e.Host).IsRequired();
+ builder.Property(e => e.LastSeen).IsRequired();
+
+ builder.HasIndex(e => e.LastSeen);
+ builder.HasIndex(e => e.KnownEndpointId);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/ProcessedMessageConfiguration.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/ProcessedMessageConfiguration.cs
new file mode 100644
index 0000000000..9ef6a92220
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/ProcessedMessageConfiguration.cs
@@ -0,0 +1,43 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class ProcessedMessageConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("ProcessedMessages");
+ builder.HasKey(e => new { e.Id, e.CreatedOn });
+ builder.Property(e => e.Id).ValueGeneratedOnAdd();
+ builder.Property(e => e.CreatedOn).IsRequired();
+ builder.Property(e => e.UniqueMessageId).HasMaxLength(200).IsRequired();
+
+ // JSON columns
+ builder.Property(e => e.HeadersJson).IsRequired();
+
+ // Full-text search column (combines header values + body text)
+ builder.Property(e => e.SearchableContent);
+
+ // Denormalized query fields
+ builder.Property(e => e.MessageId).HasMaxLength(200);
+ builder.Property(e => e.MessageType).HasMaxLength(500);
+ builder.Property(e => e.ConversationId).HasMaxLength(200);
+ builder.Property(e => e.ReceivingEndpointName).HasMaxLength(500);
+ builder.Property(e => e.BodyUrl).HasMaxLength(500);
+ builder.Property(e => e.TimeSent);
+ builder.Property(e => e.IsSystemMessage).IsRequired();
+ builder.Property(e => e.Status).IsRequired();
+ builder.Property(e => e.BodySize).IsRequired();
+ builder.Property(e => e.BodyNotStored).IsRequired();
+ builder.Property(e => e.CriticalTimeTicks);
+ builder.Property(e => e.ProcessingTimeTicks);
+ builder.Property(e => e.DeliveryTimeTicks);
+
+ builder.HasIndex(e => e.UniqueMessageId);
+ builder.HasIndex(e => e.ConversationId);
+ builder.HasIndex(e => e.MessageId);
+ builder.HasIndex(e => e.TimeSent);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/SagaSnapshotConfiguration.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/SagaSnapshotConfiguration.cs
new file mode 100644
index 0000000000..24f3e249b4
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/EntityConfigurations/SagaSnapshotConfiguration.cs
@@ -0,0 +1,27 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.EntityConfigurations;
+
+using Entities;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Builders;
+
+class SagaSnapshotConfiguration : IEntityTypeConfiguration
+{
+ public void Configure(EntityTypeBuilder builder)
+ {
+ builder.ToTable("SagaSnapshots");
+ builder.HasKey(e => new { e.Id, e.CreatedOn });
+ builder.Property(e => e.Id).ValueGeneratedOnAdd();
+ builder.Property(e => e.CreatedOn).IsRequired();
+ builder.Property(e => e.SagaId).IsRequired();
+ builder.Property(e => e.SagaType).IsRequired();
+ builder.Property(e => e.StartTime).IsRequired();
+ builder.Property(e => e.FinishTime);
+ builder.Property(e => e.Status).IsRequired();
+ builder.Property(e => e.StateAfterChange).IsRequired();
+ builder.Property(e => e.InitiatingMessageJson).IsRequired();
+ builder.Property(e => e.OutgoingMessagesJson).IsRequired();
+ builder.Property(e => e.Endpoint).IsRequired();
+
+ builder.HasIndex(e => e.SagaId);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/FullTextSearch/IAuditFullTextSearchProvider.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/FullTextSearch/IAuditFullTextSearchProvider.cs
new file mode 100644
index 0000000000..ef303ba399
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/FullTextSearch/IAuditFullTextSearchProvider.cs
@@ -0,0 +1,10 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.FullTextSearch;
+
+using Entities;
+
+public interface IAuditFullTextSearchProvider
+{
+ IQueryable ApplyFullTextSearch(
+ IQueryable query,
+ string searchTerms);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/AzureBlobBodyStoragePersistence.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/AzureBlobBodyStoragePersistence.cs
new file mode 100644
index 0000000000..ca75116a7f
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/AzureBlobBodyStoragePersistence.cs
@@ -0,0 +1,147 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+using System.Buffers;
+using System.IO.Compression;
+using Azure.Storage;
+using Azure.Storage.Blobs;
+using Azure.Storage.Blobs.Models;
+using ServiceControl.Audit.Persistence.Sql.Core.Abstractions;
+
+public class AzureBlobBodyStoragePersistence : IBodyStoragePersistence
+{
+ const string FormatVersion = "1";
+ readonly AuditSqlPersisterSettings settings;
+ readonly BlobContainerClient blobContainerClient;
+
+ public AzureBlobBodyStoragePersistence(AuditSqlPersisterSettings settings)
+ {
+ this.settings = settings;
+
+ var blobClient = new BlobServiceClient(settings.MessageBodyStorageConnectionString);
+ blobContainerClient = blobClient.GetBlobContainerClient("audit-bodies");
+ }
+
+ public async Task WriteBodyAsync(string bodyId, DateTime createdOn, ReadOnlyMemory body, string contentType, CancellationToken cancellationToken = default)
+ {
+ var datePrefix = createdOn.ToString("yyyy-MM-dd-HH");
+ var blob = blobContainerClient.GetBlobClient($"{datePrefix}/{bodyId}");
+ var shouldCompress = body.Length >= settings.MinBodySizeForCompression;
+
+ BinaryData data;
+ byte[]? rentedBuffer = null;
+
+ try
+ {
+ if (shouldCompress)
+ {
+ var maxCompressedSize = BrotliEncoder.GetMaxCompressedLength(body.Length);
+ rentedBuffer = ArrayPool.Shared.Rent(maxCompressedSize);
+
+ if (!BrotliEncoder.TryCompress(body.Span, rentedBuffer, out var bytesWritten, quality: 1, window: 22))
+ {
+ // Compression failed, fall back to uncompressed
+ data = BinaryData.FromBytes(body);
+ shouldCompress = false;
+ }
+ else
+ {
+ data = BinaryData.FromBytes(new ReadOnlyMemory(rentedBuffer, 0, bytesWritten));
+ }
+ }
+ else
+ {
+ data = BinaryData.FromBytes(body);
+ }
+
+ var options = new BlobUploadOptions
+ {
+ TransferValidation = new UploadTransferValidationOptions
+ {
+ ChecksumAlgorithm = StorageChecksumAlgorithm.Auto
+ },
+ Metadata = new Dictionary
+ {
+ { "FormatVersion", FormatVersion },
+ { "ContentType", Uri.EscapeDataString(contentType) },
+ { "BodySize", body.Length.ToString() },
+ { "IsCompressed", shouldCompress.ToString() }
+ }
+ };
+
+ await blob.UploadAsync(data, options, cancellationToken);
+ }
+ finally
+ {
+ if (rentedBuffer != null)
+ {
+ ArrayPool.Shared.Return(rentedBuffer);
+ }
+ }
+ }
+
+ public async Task ReadBodyAsync(string bodyId, DateTime createdOn, CancellationToken cancellationToken = default)
+ {
+ var datePrefix = createdOn.ToString("yyyy-MM-dd-HH");
+ var blob = blobContainerClient.GetBlobClient($"{datePrefix}/{bodyId}");
+
+ try
+ {
+ var response = await blob.DownloadContentAsync(cancellationToken);
+ var properties = response.Value;
+ var metadata = properties.Details.Metadata;
+
+ // Check format version
+ if (metadata.TryGetValue("FormatVersion", out var version) && version != FormatVersion)
+ {
+ throw new InvalidOperationException($"Unsupported blob format version: {version}");
+ }
+
+ var contentType = metadata.TryGetValue("ContentType", out var ct) ? Uri.UnescapeDataString(ct) : "application/octet-stream";
+ var bodySize = metadata.TryGetValue("BodySize", out var sizeStr) && int.TryParse(sizeStr, out var size) ? size : 0;
+ var isCompressed = metadata.TryGetValue("IsCompressed", out var compressedStr) && bool.TryParse(compressedStr, out var compressed) && compressed;
+ var etag = properties.Details.ETag.ToString();
+
+ Stream stream;
+ if (isCompressed)
+ {
+ var compressedData = properties.Content.ToMemory();
+ var decompressedBuffer = new byte[bodySize];
+
+ if (!BrotliDecoder.TryDecompress(compressedData.Span, decompressedBuffer, out var bytesWritten) || bytesWritten != bodySize)
+ {
+ throw new InvalidOperationException($"Failed to decompress body for {bodyId}");
+ }
+
+ stream = new MemoryStream(decompressedBuffer, writable: false);
+ }
+ else
+ {
+ stream = properties.Content.ToStream();
+ }
+
+ return new MessageBodyFileResult
+ {
+ Stream = stream,
+ ContentType = contentType,
+ BodySize = bodySize,
+ Etag = etag
+ };
+ }
+ catch (Azure.RequestFailedException ex) when (ex.Status == 404)
+ {
+ return null;
+ }
+ }
+
+ public Task DeleteBodiesForHour(DateTime hour, CancellationToken cancellationToken = default)
+ {
+ // var hourPrefix = hour.ToString("yyyy-MM-dd-HH") + "/";
+
+ // await foreach (var blobItem in blobContainerClient.GetBlobsAsync(BlobTraits.None, BlobStates.None, hourPrefix, cancellationToken))
+ // {
+ // await blobContainerClient.DeleteBlobIfExistsAsync(blobItem.Name, cancellationToken: cancellationToken);
+ // }
+
+ return Task.CompletedTask;
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/BodyStorageFetcher.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/BodyStorageFetcher.cs
new file mode 100644
index 0000000000..a54b4a6f46
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/BodyStorageFetcher.cs
@@ -0,0 +1,44 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Implementation;
+
+using Microsoft.EntityFrameworkCore;
+using ServiceControl.Audit.Auditing.BodyStorage;
+using ServiceControl.Audit.Persistence.Sql.Core.DbContexts;
+using ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+class BodyStorageFetcher(IBodyStoragePersistence storagePersistence, AuditDbContextBase dbContext) : IBodyStorage
+{
+ public async Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
+ {
+ throw new NotImplementedException();
+ }
+
+ public async Task TryFetch(string bodyId, CancellationToken cancellationToken)
+ {
+ // Look up CreatedOn from the database to locate the correct hourly folder
+ var createdOn = await dbContext.ProcessedMessages
+ .Where(m => m.UniqueMessageId == bodyId)
+ .Select(m => m.CreatedOn)
+ .FirstOrDefaultAsync(cancellationToken);
+
+ if (createdOn == default)
+ {
+ return new StreamResult { HasResult = false };
+ }
+
+ var result = await storagePersistence.ReadBodyAsync(bodyId, createdOn, cancellationToken);
+
+ if (result == null)
+ {
+ return new StreamResult { HasResult = false };
+ }
+
+ return new StreamResult
+ {
+ HasResult = true,
+ Stream = result.Stream,
+ ContentType = result.ContentType,
+ BodySize = result.BodySize,
+ Etag = result.Etag
+ };
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/EFAuditDataStore.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/EFAuditDataStore.cs
new file mode 100644
index 0000000000..8b805955b8
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/EFAuditDataStore.cs
@@ -0,0 +1,39 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Implementation;
+
+using ServiceControl.Audit.Auditing;
+using ServiceControl.Audit.Auditing.MessagesView;
+using ServiceControl.Audit.Infrastructure;
+using ServiceControl.Audit.Monitoring;
+using ServiceControl.SagaAudit;
+
+class EFAuditDataStore : IAuditDataStore
+{
+ static readonly QueryStatsInfo EmptyStats = new(string.Empty, 0);
+
+ public Task>> QueryKnownEndpoints(CancellationToken cancellationToken)
+ => Task.FromResult(new QueryResult>([], EmptyStats));
+
+ public Task> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken)
+ => Task.FromResult(QueryResult.Empty());
+
+ public Task>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default)
+ => Task.FromResult(new QueryResult>([], EmptyStats));
+
+ public Task>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default)
+ => Task.FromResult(new QueryResult>([], EmptyStats));
+
+ public Task>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default)
+ => Task.FromResult(new QueryResult>([], EmptyStats));
+
+ public Task>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default)
+ => Task.FromResult(new QueryResult>([], EmptyStats));
+
+ public Task>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
+ => Task.FromResult(new QueryResult>([], EmptyStats));
+
+ public Task GetMessageBody(string messageId, CancellationToken cancellationToken)
+ => Task.FromResult(MessageBodyView.NoContent());
+
+ public Task>> QueryAuditCounts(string endpointName, CancellationToken cancellationToken)
+ => Task.FromResult(new QueryResult>([], EmptyStats));
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/EFFailedAuditStorage.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/EFFailedAuditStorage.cs
new file mode 100644
index 0000000000..fdc6e2f4b1
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/EFFailedAuditStorage.cs
@@ -0,0 +1,17 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Implementation;
+
+using ServiceControl.Audit.Auditing;
+
+class EFFailedAuditStorage : IFailedAuditStorage
+{
+ public Task SaveFailedAuditImport(FailedAuditImport message)
+ => Task.CompletedTask;
+
+ public Task ProcessFailedMessages(
+ Func, CancellationToken, Task> onMessage,
+ CancellationToken cancellationToken)
+ => Task.CompletedTask;
+
+ public Task GetFailedAuditsCount()
+ => Task.FromResult(0);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/FileSystemBodyStoragePersistence.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/FileSystemBodyStoragePersistence.cs
new file mode 100644
index 0000000000..f72e53a14f
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/FileSystemBodyStoragePersistence.cs
@@ -0,0 +1,181 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+using System.IO.Compression;
+using Abstractions;
+
+public class FileSystemBodyStoragePersistence(AuditSqlPersisterSettings settings) : IBodyStoragePersistence
+{
+ const int FormatVersion = 1;
+
+ public async Task WriteBodyAsync(
+ string bodyId,
+ DateTime createdOn,
+ ReadOnlyMemory body,
+ string contentType,
+ CancellationToken cancellationToken = default)
+ {
+ var dateFolder = createdOn.ToString("yyyy-MM-dd-HH");
+ var filePath = Path.Combine(settings.MessageBodyStoragePath, dateFolder, $"{bodyId}.body");
+
+ // Bodies are immutable - skip if file already exists
+ if (File.Exists(filePath))
+ {
+ return;
+ }
+
+ // Ensure directory exists
+ var directory = Path.GetDirectoryName(filePath);
+ if (!string.IsNullOrEmpty(directory) && !Directory.Exists(directory))
+ {
+ Directory.CreateDirectory(directory);
+ }
+
+ // Write to temp file first for atomic operation
+ var tempFilePath = filePath + ".tmp";
+
+ try
+ {
+ var fileStream = new FileStream(
+ tempFilePath,
+ FileMode.Create,
+ FileAccess.Write,
+ FileShare.None,
+ bufferSize: 4096,
+ useAsync: true);
+
+ await using (fileStream.ConfigureAwait(false))
+ {
+ using var writer = new BinaryWriter(fileStream, System.Text.Encoding.UTF8, leaveOpen: true);
+
+ var shouldCompress = body.Length >= settings.MinBodySizeForCompression;
+
+ // Write header
+ writer.Write(FormatVersion);
+ writer.Write(contentType);
+ writer.Write(body.Length); // Original uncompressed size
+ writer.Write(shouldCompress);
+ writer.Write(Guid.NewGuid().ToString()); // Generate ETag
+
+ // Flush the header before writing body
+ writer.Flush();
+
+ // Write body (compressed or not)
+ if (shouldCompress)
+ {
+ var brotliStream = new BrotliStream(fileStream, CompressionLevel.Fastest, leaveOpen: true);
+ await using (brotliStream.ConfigureAwait(false))
+ {
+ await brotliStream.WriteAsync(body, cancellationToken).ConfigureAwait(false);
+ }
+ }
+ else
+ {
+ await fileStream.WriteAsync(body, cancellationToken).ConfigureAwait(false);
+ }
+
+ await fileStream.FlushAsync(cancellationToken).ConfigureAwait(false);
+ }
+
+ // Atomic rename
+ File.Move(tempFilePath, filePath, overwrite: false);
+ }
+ catch
+ {
+ // Clean up temp file if it exists
+ if (File.Exists(tempFilePath))
+ {
+ try
+ {
+ File.Delete(tempFilePath);
+ }
+ catch
+ {
+ // Ignore cleanup errors
+ }
+ }
+ throw;
+ }
+ }
+
+ public Task ReadBodyAsync(string bodyId, DateTime createdOn, CancellationToken cancellationToken = default)
+ {
+ var dateFolder = createdOn.ToString("yyyy-MM-dd-HH");
+ var filePath = Path.Combine(settings.MessageBodyStoragePath, dateFolder, $"{bodyId}.body");
+
+ if (!File.Exists(filePath))
+ {
+ return Task.FromResult(null);
+ }
+
+ try
+ {
+ var fileStream = new FileStream(
+ filePath,
+ FileMode.Open,
+ FileAccess.Read,
+ FileShare.Read,
+ bufferSize: 4096,
+ useAsync: true);
+
+ var reader = new BinaryReader(fileStream, System.Text.Encoding.UTF8, leaveOpen: true);
+
+ // Read header
+ var formatVersion = reader.ReadInt32();
+ if (formatVersion != FormatVersion)
+ {
+ fileStream.Dispose();
+ throw new InvalidOperationException($"Unsupported body file format version: {formatVersion}");
+ }
+
+ var contentType = reader.ReadString();
+ var bodySize = reader.ReadInt32();
+ var isCompressed = reader.ReadBoolean();
+ var etag = reader.ReadString();
+
+ // Create appropriate stream wrapper for body data
+ Stream bodyStream = fileStream;
+ if (isCompressed)
+ {
+ bodyStream = new BrotliStream(fileStream, CompressionMode.Decompress, leaveOpen: false);
+ }
+
+ var result = new MessageBodyFileResult
+ {
+ Stream = bodyStream,
+ ContentType = contentType,
+ BodySize = bodySize,
+ Etag = etag
+ };
+
+ return Task.FromResult(result);
+ }
+ catch (FileNotFoundException)
+ {
+ return Task.FromResult(null);
+ }
+ catch (IOException ex)
+ {
+ throw new InvalidOperationException($"Failed to read body file for {bodyId}", ex);
+ }
+ }
+
+ public Task DeleteBodiesForHour(DateTime hour, CancellationToken cancellationToken = default)
+ {
+ var dateFolder = hour.ToString("yyyy-MM-dd-HH");
+ var directoryPath = Path.Combine(settings.MessageBodyStoragePath, dateFolder);
+
+ try
+ {
+ if (Directory.Exists(directoryPath))
+ {
+ Directory.Delete(directoryPath, recursive: true);
+ }
+ }
+ catch (DirectoryNotFoundException)
+ {
+ // Already gone
+ }
+
+ return Task.CompletedTask;
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/AuditIngestionUnitOfWork.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/AuditIngestionUnitOfWork.cs
new file mode 100644
index 0000000000..d6a69ebdb4
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/AuditIngestionUnitOfWork.cs
@@ -0,0 +1,242 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Implementation.UnitOfWork;
+
+using System.Net.Mime;
+using System.Text;
+using System.Text.Json;
+using Abstractions;
+using DbContexts;
+using Entities;
+using Infrastructure;
+using ServiceControl.Audit.Auditing;
+using ServiceControl.Audit.Monitoring;
+using ServiceControl.Audit.Persistence.Infrastructure;
+using ServiceControl.Audit.Persistence.Monitoring;
+using ServiceControl.Audit.Persistence.UnitOfWork;
+using ServiceControl.SagaAudit;
+
+class AuditIngestionUnitOfWork(
+ AuditDbContextBase dbContext,
+ IBodyStoragePersistence bodyPersistence,
+ AuditSqlPersisterSettings settings
+ )
+ : IAuditIngestionUnitOfWork
+{
+ readonly List bodyStorageTasks = [];
+ // Large object heap starts above 85000 bytes
+ const int LargeObjectHeapThreshold = 85_000;
+ static readonly Encoding Utf8 = new UTF8Encoding(true, true);
+
+ public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory body = default, CancellationToken cancellationToken = default)
+ {
+ var createdOn = TruncateToHour(DateTime.UtcNow);
+
+ var entity = new ProcessedMessageEntity
+ {
+ CreatedOn = createdOn,
+ UniqueMessageId = processedMessage.UniqueMessageId,
+ // We probably should split the keys and values into separate columns, so that we can index only the values for FTS. See comment below about combining headers and body for FTS indexing.
+ HeadersJson = JsonSerializer.Serialize(processedMessage.Headers, ProcessedMessageJsonContext.Default.DictionaryStringString),
+
+ // Denormalized fields
+ MessageId = GetMetadata(processedMessage.MessageMetadata, "MessageId"),
+ MessageType = GetMetadata(processedMessage.MessageMetadata, "MessageType"),
+ TimeSent = GetMetadata(processedMessage.MessageMetadata, "TimeSent"),
+ IsSystemMessage = GetMetadata(processedMessage.MessageMetadata, "IsSystemMessage"),
+ Status = (int)(GetMetadata(processedMessage.MessageMetadata, "IsRetried") ? MessageStatus.ResolvedSuccessfully : MessageStatus.Successful),
+ ConversationId = GetMetadata(processedMessage.MessageMetadata, "ConversationId"),
+
+ // Endpoint details
+ ReceivingEndpointName = GetEndpointName(processedMessage.MessageMetadata, "ReceivingEndpoint"),
+
+ // Performance metrics
+ CriticalTimeTicks = GetMetadata(processedMessage.MessageMetadata, "CriticalTime")?.Ticks,
+ ProcessingTimeTicks = GetMetadata(processedMessage.MessageMetadata, "ProcessingTime")?.Ticks,
+ DeliveryTimeTicks = GetMetadata(processedMessage.MessageMetadata, "DeliveryTime")?.Ticks,
+
+ // Here we are combining header values with body text for full-text search indexing.
+ // This should be modified so that we don't do this combining, and there fore we don't always need to store the body in external storage.
+ SearchableContent = settings.EnableFullTextSearchOnBodies ? BuildSearchableContent(processedMessage.Headers, body) : null,
+ BodySize = body.Length,
+ BodyUrl = body.IsEmpty ? null : $"/messages/{processedMessage.Id}/body",
+ BodyNotStored = !settings.StoreMessageBodiesOnDisk || body.Length > settings.MaxBodySizeToStore
+ };
+
+ dbContext.ProcessedMessages.Add(entity);
+
+ //Store body if below threshold and storage is enabled
+ if (settings.StoreMessageBodiesOnDisk && !body.IsEmpty && body.Length < settings.MaxBodySizeToStore)
+ {
+ var contentType = GetContentType(processedMessage.Headers, MediaTypeNames.Text.Plain);
+
+ // Queue body storage to run in parallel, awaited in DisposeAsync
+ bodyStorageTasks.Add(bodyPersistence.WriteBodyAsync(processedMessage.UniqueMessageId, createdOn, body, contentType, cancellationToken));
+ }
+ }
+
+ public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken = default)
+ {
+ var entity = new SagaSnapshotEntity
+ {
+ CreatedOn = TruncateToHour(DateTime.UtcNow),
+ SagaId = sagaSnapshot.SagaId,
+ SagaType = sagaSnapshot.SagaType,
+ StartTime = sagaSnapshot.StartTime,
+ FinishTime = sagaSnapshot.FinishTime,
+ Endpoint = sagaSnapshot.Endpoint,
+ Status = sagaSnapshot.Status,
+ InitiatingMessageJson = JsonSerializer.Serialize(sagaSnapshot.InitiatingMessage, SagaSnapshotJsonContext.Default.InitiatingMessage),
+ OutgoingMessagesJson = JsonSerializer.Serialize(sagaSnapshot.OutgoingMessages, SagaSnapshotJsonContext.Default.ListResultingMessage),
+ StateAfterChange = sagaSnapshot.StateAfterChange,
+ };
+
+ dbContext.SagaSnapshots.Add(entity);
+
+ return Task.CompletedTask;
+ }
+
+ public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken = default)
+ {
+ var entity = new KnownEndpointInsertOnlyEntity
+ {
+ KnownEndpointId = DeterministicGuid.MakeId(knownEndpoint.Name, knownEndpoint.HostId.ToString()),
+ Name = knownEndpoint.Name,
+ HostId = knownEndpoint.HostId,
+ Host = knownEndpoint.Host,
+ LastSeen = knownEndpoint.LastSeen
+ };
+
+ dbContext.KnownEndpointsInsertOnly.Add(entity);
+
+ return Task.CompletedTask;
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ try
+ {
+ // Wait for all body storage operations to complete first
+ await Task.WhenAll(bodyStorageTasks);
+ await dbContext.SaveChangesAsync();
+ }
+ finally
+ {
+ await dbContext.DisposeAsync();
+ }
+ }
+
+ static string GetContentType(IReadOnlyDictionary headers, string defaultContentType)
+ => headers.TryGetValue(Headers.ContentType, out var contentType) ? contentType : defaultContentType;
+
+ static T? GetMetadata(Dictionary metadata, string key)
+ {
+ if (metadata.TryGetValue(key, out var value))
+ {
+ if (value is T typedValue)
+ {
+ return typedValue;
+ }
+
+ // Handle JSON deserialized types
+ if (value is JsonElement jsonElement)
+ {
+ return DeserializeJsonElement(jsonElement);
+ }
+ }
+ return default;
+ }
+
+ static T? DeserializeJsonElement(JsonElement element)
+ {
+ try
+ {
+ return element.Deserialize(JsonSerializationOptions.Default);
+ }
+ catch
+ {
+ return default;
+ }
+ }
+
+ static string? GetEndpointName(Dictionary metadata, string key)
+ {
+ if (metadata.TryGetValue(key, out var value))
+ {
+ if (value is EndpointDetails endpoint)
+ {
+ return endpoint.Name;
+ }
+
+ if (value is JsonElement jsonElement)
+ {
+ try
+ {
+ var endpoint2 = jsonElement.Deserialize(JsonSerializationOptions.Default);
+ return endpoint2?.Name;
+ }
+ catch
+ {
+ return null;
+ }
+ }
+ }
+ return null;
+ }
+
+ static string BuildSearchableContent(Dictionary headers, ReadOnlyMemory body)
+ {
+ // Combine header values (not keys) with body text for FTS indexing
+ var headerValues = string.Join(" ", headers.Values);
+
+ var bodyString = GetBodyAsText(headers, body);
+ if (string.IsNullOrWhiteSpace(bodyString))
+ {
+ return headerValues;
+ }
+
+ return headerValues + " " + bodyString;
+ }
+
+ static string? GetBodyAsText(Dictionary headers, ReadOnlyMemory body)
+ {
+ if (body.IsEmpty)
+ {
+ return null;
+ }
+
+ var avoidsLargeObjectHeap = body.Length < LargeObjectHeapThreshold;
+ var isBinary = IsBinaryContent(headers);
+
+ if (avoidsLargeObjectHeap && !isBinary)
+ {
+ try
+ {
+ var bodyString = Utf8.GetString(body.Span);
+ if (!string.IsNullOrWhiteSpace(bodyString))
+ {
+ return bodyString;
+ }
+ }
+ catch
+ {
+ // If it won't decode to text, don't index it
+ }
+ }
+
+ return null;
+ }
+
+ static bool IsBinaryContent(Dictionary headers)
+ {
+ if (headers.TryGetValue(Headers.ContentType, out var contentType))
+ {
+ return contentType.Contains("octet-stream") ||
+ contentType.Contains("application/x-") ||
+ contentType.Contains("image/") ||
+ contentType.Contains("audio/") ||
+ contentType.Contains("video/");
+ }
+ return false;
+ }
+
+ static DateTime TruncateToHour(DateTime dt) => new(dt.Year, dt.Month, dt.Day, dt.Hour, 0, 0, dt.Kind);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/AuditIngestionUnitOfWorkFactory.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/AuditIngestionUnitOfWorkFactory.cs
new file mode 100644
index 0000000000..6a9b8737f4
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/AuditIngestionUnitOfWorkFactory.cs
@@ -0,0 +1,25 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Implementation.UnitOfWork;
+
+using Abstractions;
+using DbContexts;
+using Infrastructure;
+using Microsoft.Extensions.DependencyInjection;
+using ServiceControl.Audit.Persistence.UnitOfWork;
+
+class AuditIngestionUnitOfWorkFactory(
+ IServiceProvider serviceProvider,
+ MinimumRequiredStorageState storageState,
+ IBodyStoragePersistence storagePersistence)
+ : IAuditIngestionUnitOfWorkFactory
+{
+ public ValueTask StartNew(int batchSize, CancellationToken cancellationToken)
+ {
+ var scope = serviceProvider.CreateScope();
+ var dbContext = scope.ServiceProvider.GetRequiredService();
+ var settings = scope.ServiceProvider.GetRequiredService();
+ var unitOfWork = new AuditIngestionUnitOfWork(dbContext, storagePersistence, settings);
+ return ValueTask.FromResult(unitOfWork);
+ }
+
+ public bool CanIngestMore() => storageState.CanIngestMore;
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/ProcessedMessageJsonContext.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/ProcessedMessageJsonContext.cs
new file mode 100644
index 0000000000..e790b614d6
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/ProcessedMessageJsonContext.cs
@@ -0,0 +1,11 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Implementation.UnitOfWork;
+
+using System.Text.Json.Serialization;
+
+[JsonSourceGenerationOptions(
+ PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase,
+ WriteIndented = false)]
+[JsonSerializable(typeof(Dictionary))]
+partial class ProcessedMessageJsonContext : JsonSerializerContext
+{
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/SagaSnapshotJsonContext.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/SagaSnapshotJsonContext.cs
new file mode 100644
index 0000000000..6dbac3c2c2
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Implementation/UnitOfWork/SagaSnapshotJsonContext.cs
@@ -0,0 +1,13 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Implementation.UnitOfWork;
+
+using System.Text.Json.Serialization;
+using ServiceControl.SagaAudit;
+
+[JsonSourceGenerationOptions(
+ PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase,
+ WriteIndented = false)]
+[JsonSerializable(typeof(InitiatingMessage))]
+[JsonSerializable(typeof(List))]
+partial class SagaSnapshotJsonContext : JsonSerializerContext
+{
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/IBodyStoragePersistence.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/IBodyStoragePersistence.cs
new file mode 100644
index 0000000000..f0832e428e
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/IBodyStoragePersistence.cs
@@ -0,0 +1,8 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+public interface IBodyStoragePersistence
+{
+ Task WriteBodyAsync(string bodyId, DateTime createdOn, ReadOnlyMemory body, string contentType, CancellationToken cancellationToken = default);
+ Task ReadBodyAsync(string bodyId, DateTime createdOn, CancellationToken cancellationToken = default);
+ Task DeleteBodiesForHour(DateTime hour, CancellationToken cancellationToken = default);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/IPartitionManager.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/IPartitionManager.cs
new file mode 100644
index 0000000000..6d69c3994e
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/IPartitionManager.cs
@@ -0,0 +1,21 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+using DbContexts;
+
+public interface IPartitionManager
+{
+ ///
+ /// Creates hourly partitions from through + .
+ ///
+ Task EnsurePartitionsExist(AuditDbContextBase dbContext, DateTime currentHour, int hoursAhead, CancellationToken ct);
+
+ ///
+ /// Drops the partition for the specified hour for both ProcessedMessages and SagaSnapshots.
+ ///
+ Task DropPartition(AuditDbContextBase dbContext, DateTime partitionHour, CancellationToken ct);
+
+ ///
+ /// Returns hour-precision timestamps of all partitions older than .
+ ///
+ Task> GetExpiredPartitions(AuditDbContextBase dbContext, DateTime cutoff, CancellationToken ct);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/InsertOnlyTableReconciler.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/InsertOnlyTableReconciler.cs
new file mode 100644
index 0000000000..f226a513d3
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/InsertOnlyTableReconciler.cs
@@ -0,0 +1,67 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+using DbContexts;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+public abstract class InsertOnlyTableReconciler(
+ ILogger logger,
+ TimeProvider timeProvider,
+ IServiceScopeFactory serviceScopeFactory,
+ string serviceName) : BackgroundService
+ where TInsertOnly : class
+ where TTarget : class
+{
+ protected const int BatchSize = 1000;
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ logger.LogInformation("Starting {ServiceName}", serviceName);
+
+ try
+ {
+ await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
+
+ using PeriodicTimer timer = new(TimeSpan.FromSeconds(30), timeProvider);
+
+ do
+ {
+ try
+ {
+ await Reconcile(stoppingToken);
+ }
+ catch (Exception ex) when (ex is not OperationCanceledException)
+ {
+ logger.LogError(ex, "Error during {ServiceName} reconciliation", serviceName);
+ }
+ } while (await timer.WaitForNextTickAsync(stoppingToken));
+ }
+ catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
+ {
+ logger.LogInformation("Stopping {ServiceName}", serviceName);
+ }
+ }
+
+ async Task Reconcile(CancellationToken stoppingToken)
+ {
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ using var scope = serviceScopeFactory.CreateScope();
+ var dbContext = scope.ServiceProvider.GetRequiredService();
+
+ await using var transaction = await dbContext.Database.BeginTransactionAsync(stoppingToken);
+ var rowsAffected = await ReconcileBatch(dbContext, stoppingToken);
+ await transaction.CommitAsync(stoppingToken);
+
+ if (rowsAffected < BatchSize)
+ {
+ break;
+ }
+
+ await Task.Delay(TimeSpan.FromSeconds(2), stoppingToken);
+ }
+ }
+
+ protected abstract Task ReconcileBatch(AuditDbContextBase dbContext, CancellationToken stoppingToken);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/JsonSerializationOptions.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/JsonSerializationOptions.cs
new file mode 100644
index 0000000000..5e6269310f
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/JsonSerializationOptions.cs
@@ -0,0 +1,12 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+using System.Text.Json;
+
+static class JsonSerializationOptions
+{
+ public static readonly JsonSerializerOptions Default = new()
+ {
+ PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
+ WriteIndented = false
+ };
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/MessageBodyFileResult.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/MessageBodyFileResult.cs
new file mode 100644
index 0000000000..65cf7b6597
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/MessageBodyFileResult.cs
@@ -0,0 +1,9 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+public class MessageBodyFileResult
+{
+ public Stream Stream { get; set; } = null!;
+ public string ContentType { get; set; } = null!;
+ public int BodySize { get; set; }
+ public string Etag { get; set; } = null!;
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/RetentionCleaner.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/RetentionCleaner.cs
new file mode 100644
index 0000000000..56a5d5b2ab
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/RetentionCleaner.cs
@@ -0,0 +1,110 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+using System.Data.Common;
+using System.Diagnostics;
+using Abstractions;
+using DbContexts;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+public abstract class RetentionCleaner(
+ ILogger logger,
+ TimeProvider timeProvider,
+ IServiceScopeFactory serviceScopeFactory,
+ AuditSqlPersisterSettings settings,
+ IBodyStoragePersistence bodyPersistence,
+ IPartitionManager partitionManager,
+ RetentionMetrics metrics) : BackgroundService
+{
+ const int HoursAhead = 6;
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ logger.LogInformation("Starting {ServiceName}", nameof(RetentionCleaner));
+
+ try
+ {
+ await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken);
+
+ using PeriodicTimer timer = new(TimeSpan.FromHours(1), timeProvider);
+
+ do
+ {
+ try
+ {
+ await Clean(stoppingToken);
+ }
+ catch (Exception ex) when (ex is not OperationCanceledException)
+ {
+ logger.LogError(ex, "Failed to run retention cleaner");
+ }
+ } while (await timer.WaitForNextTickAsync(stoppingToken));
+ }
+ catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
+ {
+ logger.LogInformation("Stopping {ServiceName}", nameof(RetentionCleaner));
+ }
+ }
+
+ async Task Clean(CancellationToken stoppingToken)
+ {
+ // Use a dedicated connection for the distributed lock so it is not affected
+ // by connection drops or resets on the main DbContext during cleanup operations
+ await using var lockConnection = CreateConnection();
+ await lockConnection.OpenAsync(stoppingToken);
+
+ using var scope = serviceScopeFactory.CreateScope();
+ var dbContext = scope.ServiceProvider.GetRequiredService();
+
+ var stopwatch = Stopwatch.StartNew();
+ // Round up to whole hours since partitions are hourly
+ var retentionPeriod = TimeSpan.FromHours(Math.Ceiling(settings.AuditRetentionPeriod.TotalHours));
+ var cutoff = timeProvider.GetUtcNow().UtcDateTime - retentionPeriod;
+ var now = timeProvider.GetUtcNow().UtcDateTime;
+
+ using var cycleMetrics = metrics.BeginCleanupCycle();
+
+ if (!await TryAcquireLock(lockConnection, stoppingToken))
+ {
+ logger.LogDebug("Another instance is running retention cleanup, skipping this cycle");
+ metrics.RecordLockSkipped();
+ return;
+ }
+
+ try
+ {
+ // Ensure partitions exist for upcoming hours
+ await partitionManager.EnsurePartitionsExist(dbContext, now, HoursAhead, stoppingToken);
+
+ // Find and drop expired partitions
+ var expiredPartitions = await partitionManager.GetExpiredPartitions(dbContext, cutoff, stoppingToken);
+
+ foreach (var hour in expiredPartitions)
+ {
+ // Delete body storage for this hour first
+ await bodyPersistence.DeleteBodiesForHour(hour, stoppingToken);
+
+ // Drop the database partition
+ await partitionManager.DropPartition(dbContext, hour, stoppingToken);
+
+ metrics.RecordPartitionDropped();
+
+ logger.LogInformation("Dropped partition for {Hour}", hour.ToString("yyyy-MM-dd HH:00"));
+ }
+
+ cycleMetrics.Complete();
+
+ logger.LogInformation("Retention cleanup dropped {Partitions} partition(s) in {Elapsed}",
+ expiredPartitions.Count, stopwatch.Elapsed.ToString(@"hh\:mm\:ss"));
+ }
+ finally
+ {
+ await ReleaseLock(lockConnection, stoppingToken);
+ }
+ }
+
+ protected abstract DbConnection CreateConnection();
+ protected abstract Task TryAcquireLock(DbConnection lockConnection, CancellationToken stoppingToken);
+ protected abstract Task ReleaseLock(DbConnection lockConnection, CancellationToken stoppingToken);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/RetentionMetrics.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/RetentionMetrics.cs
new file mode 100644
index 0000000000..01e0e74f9c
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/RetentionMetrics.cs
@@ -0,0 +1,78 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+using System.Diagnostics;
+using System.Diagnostics.Metrics;
+
+public class RetentionMetrics
+{
+ public const string MeterName = "Particular.ServiceControl.Audit";
+
+ public static readonly string CleanupDurationInstrumentName = $"{InstrumentPrefix}.cleanup_duration";
+ public static readonly string PartitionsDroppedInstrumentName = $"{InstrumentPrefix}.partitions_dropped_total";
+
+ public RetentionMetrics(IMeterFactory meterFactory)
+ {
+ var meter = meterFactory.Create(MeterName, MeterVersion);
+
+ cleanupDuration = meter.CreateHistogram(CleanupDurationInstrumentName, unit: "s", description: "Retention cleanup cycle duration");
+ partitionsDropped = meter.CreateCounter(PartitionsDroppedInstrumentName, description: "Total partitions dropped by retention cleanup");
+ consecutiveFailureGauge = meter.CreateObservableGauge($"{InstrumentPrefix}.consecutive_failures_total", () => consecutiveFailures, description: "Consecutive retention cleanup failures");
+ lockSkippedCounter = meter.CreateCounter($"{InstrumentPrefix}.lock_skipped_total", description: "Number of times cleanup was skipped due to another instance holding the lock");
+ }
+
+ public CleanupCycleMetrics BeginCleanupCycle() => new(cleanupDuration, RecordCycleOutcome);
+
+ public void RecordPartitionDropped() => partitionsDropped.Add(1);
+
+ public void RecordLockSkipped() => lockSkippedCounter.Add(1);
+
+ void RecordCycleOutcome(bool success)
+ {
+ if (success)
+ {
+ consecutiveFailures = 0;
+ }
+ else
+ {
+ consecutiveFailures++;
+ }
+ }
+
+ long consecutiveFailures;
+
+ readonly Histogram cleanupDuration;
+ readonly Counter partitionsDropped;
+#pragma warning disable IDE0052
+ readonly ObservableGauge consecutiveFailureGauge;
+#pragma warning restore IDE0052
+ readonly Counter lockSkippedCounter;
+
+ const string MeterVersion = "0.1.0";
+ const string InstrumentPrefix = "sc.audit.retention";
+}
+
+public class CleanupCycleMetrics : IDisposable
+{
+ readonly Histogram cleanupDuration;
+ readonly Action recordOutcome;
+ readonly Stopwatch stopwatch = Stopwatch.StartNew();
+
+ bool completed;
+
+ internal CleanupCycleMetrics(Histogram cleanupDuration, Action recordOutcome)
+ {
+ this.cleanupDuration = cleanupDuration;
+ this.recordOutcome = recordOutcome;
+ }
+
+ public void Complete() => completed = true;
+
+ public void Dispose()
+ {
+ var result = completed ? "success" : "failed";
+ var tags = new TagList { { "result", result } };
+
+ cleanupDuration.Record(stopwatch.Elapsed.TotalSeconds, tags);
+ recordOutcome(completed);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/SequentialGuidGenerator.cs b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/SequentialGuidGenerator.cs
new file mode 100644
index 0000000000..0bfbccfd4a
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/Infrastructure/SequentialGuidGenerator.cs
@@ -0,0 +1,51 @@
+namespace ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+///
+/// Generates sequential GUIDs for database primary keys to minimize page fragmentation
+/// and improve insert performance while maintaining security benefits of GUIDs.
+///
+///
+/// This implementation creates time-ordered GUIDs similar to .NET 9's Guid.CreateVersion7()
+/// but compatible with .NET 8. The GUIDs are ordered by timestamp to reduce B-tree page splits
+/// in clustered indexes, which significantly improves insert performance compared to random GUIDs.
+///
+/// Benefits:
+/// - Database agnostic (works with SQL Server, PostgreSQL, MySQL, SQLite)
+/// - Sequential ordering reduces page fragmentation
+/// - Better insert performance than random GUIDs
+/// - Can easily migrate to Guid.CreateVersion7() when upgrading to .NET 9+
+/// - No external dependencies
+///
+/// Security:
+/// - Still cryptographically secure (uses Guid.NewGuid() as base)
+/// - Not guessable (unlike sequential integers)
+/// - Safe to expose in APIs
+///
+public static class SequentialGuidGenerator
+{
+ ///
+ /// Generate a sequential GUID with timestamp-based ordering for optimal database performance.
+ ///
+ /// A new GUID with sequential characteristics.
+ public static Guid NewSequentialGuid()
+ {
+ var guidBytes = Guid.NewGuid().ToByteArray();
+ var now = DateTime.UtcNow;
+
+ // Get timestamp in milliseconds since Unix epoch (similar to Version 7 GUIDs)
+ var timestamp = (long)(now - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalMilliseconds;
+ var timestampBytes = BitConverter.GetBytes(timestamp);
+
+ // Reverse if little-endian to get big-endian byte order for proper sorting
+ if (BitConverter.IsLittleEndian)
+ {
+ Array.Reverse(timestampBytes);
+ }
+
+ // Replace last 6 bytes with timestamp for sequential ordering
+ // This placement works well with SQL Server's GUID comparison semantics
+ Array.Copy(timestampBytes, 2, guidBytes, 10, 6);
+
+ return new Guid(guidBytes);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.Core/ServiceControl.Audit.Persistence.Sql.Core.csproj b/src/ServiceControl.Audit.Persistence.Sql.Core/ServiceControl.Audit.Persistence.Sql.Core.csproj
new file mode 100644
index 0000000000..573a686553
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.Core/ServiceControl.Audit.Persistence.Sql.Core.csproj
@@ -0,0 +1,21 @@
+
+
+
+ net10.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/.editorconfig b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/.editorconfig
new file mode 100644
index 0000000000..fc68ac3228
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/.editorconfig
@@ -0,0 +1,9 @@
+[*.cs]
+
+# Justification: ServiceControl app has no synchronization context
+dotnet_diagnostic.CA2007.severity = none
+
+# Disable style rules for auto-generated EF migrations
+[Migrations/**.cs]
+dotnet_diagnostic.IDE0065.severity = none
+generated_code = true
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/KnownEndpointsReconciler.cs b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/KnownEndpointsReconciler.cs
new file mode 100644
index 0000000000..542bbd826c
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/KnownEndpointsReconciler.cs
@@ -0,0 +1,46 @@
+namespace ServiceControl.Audit.Persistence.Sql.PostgreSQL.Infrastructure;
+
+using Core.DbContexts;
+using Core.Entities;
+using Core.Infrastructure;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+
+class KnownEndpointsReconciler(
+ ILogger logger,
+ TimeProvider timeProvider,
+ IServiceScopeFactory serviceScopeFactory)
+ : InsertOnlyTableReconciler(
+ logger, timeProvider, serviceScopeFactory, nameof(KnownEndpointsReconciler))
+{
+ protected override async Task ReconcileBatch(AuditDbContextBase dbContext, CancellationToken stoppingToken)
+ {
+ var sql = @"
+ WITH lock_check AS (
+ SELECT pg_try_advisory_xact_lock(hashtext('known_endpoints_sync')) AS acquired
+ ),
+ deleted AS (
+ DELETE FROM known_endpoints_insert_only
+ WHERE (SELECT acquired FROM lock_check)
+ AND ctid IN (
+ SELECT ctid FROM known_endpoints_insert_only LIMIT @batchSize
+ )
+ RETURNING known_endpoint_id, name, host_id, host, last_seen
+ ),
+ aggregated AS (
+ SELECT DISTINCT ON (known_endpoint_id) known_endpoint_id, name, host_id, host, last_seen
+ FROM deleted
+ ORDER BY known_endpoint_id, last_seen DESC
+ )
+ INSERT INTO known_endpoints (id, name, host_id, host, last_seen)
+ SELECT known_endpoint_id, name, host_id, host, last_seen
+ FROM aggregated
+ ON CONFLICT (id) DO UPDATE SET
+ last_seen = GREATEST(known_endpoints.last_seen, EXCLUDED.last_seen);
+ ";
+
+ var rowsAffected = await dbContext.Database.ExecuteSqlRawAsync(sql, [new Npgsql.NpgsqlParameter("@batchSize", BatchSize)], stoppingToken);
+ return rowsAffected;
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/PostgreSqlPartitionManager.cs b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/PostgreSqlPartitionManager.cs
new file mode 100644
index 0000000000..f9107de18b
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/PostgreSqlPartitionManager.cs
@@ -0,0 +1,89 @@
+namespace ServiceControl.Audit.Persistence.Sql.PostgreSQL.Infrastructure;
+
+using Microsoft.EntityFrameworkCore;
+using ServiceControl.Audit.Persistence.Sql.Core.DbContexts;
+using ServiceControl.Audit.Persistence.Sql.Core.Infrastructure;
+
+// Partition/table names cannot be parameterized in SQL; all values come from internal constants and date formatting
+#pragma warning disable EF1002, EF1003
+public class PostgreSqlPartitionManager : IPartitionManager
+{
+ static readonly (string ParentTable, string Prefix)[] PartitionedTables =
+ [
+ ("processed_messages", "processed_messages"),
+ ("saga_snapshots", "saga_snapshots")
+ ];
+
+ public async Task EnsurePartitionsExist(AuditDbContextBase dbContext, DateTime currentHour, int hoursAhead, CancellationToken ct)
+ {
+ var truncatedHour = TruncateToHour(currentHour);
+ var targetHour = truncatedHour.AddHours(hoursAhead);
+
+ for (var hour = truncatedHour; hour <= targetHour; hour = hour.AddHours(1))
+ {
+ var nextHour = hour.AddHours(1);
+ var hourSuffix = hour.ToString("yyyyMMddHH");
+ var hourStr = hour.ToString("yyyy-MM-dd HH:00:00");
+ var nextHourStr = nextHour.ToString("yyyy-MM-dd HH:00:00");
+
+ foreach (var (parentTable, prefix) in PartitionedTables)
+ {
+ var partitionName = prefix + "_" + hourSuffix;
+
+ await dbContext.Database.ExecuteSqlRawAsync(
+ "CREATE TABLE IF NOT EXISTS " + partitionName +
+ " PARTITION OF " + parentTable +
+ " FOR VALUES FROM ('" + hourStr + "') TO ('" + nextHourStr + "')", ct);
+ }
+ }
+ }
+
+ public async Task DropPartition(AuditDbContextBase dbContext, DateTime partitionHour, CancellationToken ct)
+ {
+ var hourSuffix = TruncateToHour(partitionHour).ToString("yyyyMMddHH");
+
+ foreach (var (parentTable, prefix) in PartitionedTables)
+ {
+ var partitionName = prefix + "_" + hourSuffix;
+
+ await dbContext.Database.ExecuteSqlRawAsync(
+ "ALTER TABLE " + parentTable + " DETACH PARTITION " + partitionName, ct);
+
+ await dbContext.Database.ExecuteSqlRawAsync(
+ "DROP TABLE " + partitionName, ct);
+ }
+ }
+
+ public async Task> GetExpiredPartitions(AuditDbContextBase dbContext, DateTime cutoff, CancellationToken ct)
+ {
+ var truncatedCutoff = TruncateToHour(cutoff);
+
+ var partitionNames = await dbContext.Database
+ .SqlQueryRaw(
+ "SELECT c.relname AS Value " +
+ "FROM pg_class c " +
+ "INNER JOIN pg_inherits i ON c.oid = i.inhrelid " +
+ "INNER JOIN pg_class parent ON i.inhparent = parent.oid " +
+ "WHERE parent.relname = 'processed_messages' " +
+ "AND c.relkind = 'r' " +
+ "ORDER BY c.relname")
+ .ToListAsync(ct);
+
+ var result = new List();
+
+ foreach (var name in partitionNames)
+ {
+ // Parse hour from partition name: processed_messages_yyyyMMddHH
+ var datePart = name.Replace("processed_messages_", "");
+ if (DateTime.TryParseExact(datePart, "yyyyMMddHH", null, System.Globalization.DateTimeStyles.None, out var hour)
+ && hour < truncatedCutoff)
+ {
+ result.Add(hour);
+ }
+ }
+
+ return result;
+ }
+
+ static DateTime TruncateToHour(DateTime dt) => new(dt.Year, dt.Month, dt.Day, dt.Hour, 0, 0, dt.Kind);
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/RetentionCleaner.cs b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/RetentionCleaner.cs
new file mode 100644
index 0000000000..1689afe28a
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Infrastructure/RetentionCleaner.cs
@@ -0,0 +1,40 @@
+namespace ServiceControl.Audit.Persistence.Sql.PostgreSQL.Infrastructure;
+
+using System.Data.Common;
+using Core.Abstractions;
+using Core.Infrastructure;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Npgsql;
+
+class RetentionCleaner(
+ ILogger logger,
+ TimeProvider timeProvider,
+ IServiceScopeFactory serviceScopeFactory,
+ AuditSqlPersisterSettings settings,
+ IBodyStoragePersistence bodyPersistence,
+ IPartitionManager partitionManager,
+ RetentionMetrics metrics)
+ : Core.Infrastructure.RetentionCleaner(logger, timeProvider, serviceScopeFactory, settings, bodyPersistence, partitionManager, metrics)
+{
+ readonly string connectionString = settings.ConnectionString;
+
+ protected override DbConnection CreateConnection() => new NpgsqlConnection(connectionString);
+
+ protected override async Task TryAcquireLock(DbConnection lockConnection, CancellationToken stoppingToken)
+ {
+ await using var cmd = lockConnection.CreateCommand();
+ cmd.CommandText = "SELECT pg_try_advisory_lock(hashtext('retention_cleaner'))";
+
+ var result = await cmd.ExecuteScalarAsync(stoppingToken);
+ return result is true;
+ }
+
+ protected override async Task ReleaseLock(DbConnection lockConnection, CancellationToken stoppingToken)
+ {
+ await using var cmd = lockConnection.CreateCommand();
+ cmd.CommandText = "SELECT pg_advisory_unlock(hashtext('retention_cleaner'))";
+
+ await cmd.ExecuteNonQueryAsync(stoppingToken);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031455_InitialCreate.Designer.cs b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031455_InitialCreate.Designer.cs
new file mode 100644
index 0000000000..d0457f3931
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031455_InitialCreate.Designer.cs
@@ -0,0 +1,281 @@
+//
+using System;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Infrastructure;
+using Microsoft.EntityFrameworkCore.Migrations;
+using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
+using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
+using ServiceControl.Audit.Persistence.Sql.PostgreSQL;
+
+#nullable disable
+
+namespace ServiceControl.Audit.Persistence.Sql.PostgreSQL.Migrations
+{
+ [DbContext(typeof(PostgreSqlAuditDbContext))]
+ [Migration("20260214031455_InitialCreate")]
+ partial class InitialCreate
+ {
+ ///
+ protected override void BuildTargetModel(ModelBuilder modelBuilder)
+ {
+#pragma warning disable 612, 618
+ modelBuilder
+ .HasAnnotation("ProductVersion", "10.0.3")
+ .HasAnnotation("Relational:MaxIdentifierLength", 63);
+
+ NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.FailedAuditImportEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid")
+ .HasColumnName("id");
+
+ b.Property("ExceptionInfo")
+ .HasColumnType("text")
+ .HasColumnName("exception_info");
+
+ b.Property("MessageJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("message_json");
+
+ b.HasKey("Id");
+
+ b.ToTable("failed_audit_imports", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.KnownEndpointEntity", b =>
+ {
+ b.Property("Id")
+ .HasColumnType("uuid")
+ .HasColumnName("id");
+
+ b.Property("Host")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("host");
+
+ b.Property("HostId")
+ .HasColumnType("uuid")
+ .HasColumnName("host_id");
+
+ b.Property("LastSeen")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("last_seen");
+
+ b.Property("Name")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("name");
+
+ b.HasKey("Id");
+
+ b.HasIndex("LastSeen");
+
+ b.ToTable("known_endpoints", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.KnownEndpointInsertOnlyEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("bigint")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("Host")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("host");
+
+ b.Property("HostId")
+ .HasColumnType("uuid")
+ .HasColumnName("host_id");
+
+ b.Property("KnownEndpointId")
+ .HasColumnType("uuid")
+ .HasColumnName("known_endpoint_id");
+
+ b.Property("LastSeen")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("last_seen");
+
+ b.Property("Name")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("name");
+
+ b.HasKey("Id");
+
+ b.HasIndex("KnownEndpointId");
+
+ b.HasIndex("LastSeen");
+
+ b.ToTable("known_endpoints_insert_only", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.ProcessedMessageEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("bigint")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("CreatedOn")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("created_on");
+
+ b.Property("BodyNotStored")
+ .HasColumnType("boolean")
+ .HasColumnName("body_not_stored");
+
+ b.Property("BodySize")
+ .HasColumnType("integer")
+ .HasColumnName("body_size");
+
+ b.Property("BodyUrl")
+ .HasMaxLength(500)
+ .HasColumnType("character varying(500)")
+ .HasColumnName("body_url");
+
+ b.Property("ConversationId")
+ .HasMaxLength(200)
+ .HasColumnType("character varying(200)")
+ .HasColumnName("conversation_id");
+
+ b.Property("CriticalTimeTicks")
+ .HasColumnType("bigint")
+ .HasColumnName("critical_time_ticks");
+
+ b.Property("DeliveryTimeTicks")
+ .HasColumnType("bigint")
+ .HasColumnName("delivery_time_ticks");
+
+ b.Property("HeadersJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("headers_json");
+
+ b.Property("IsSystemMessage")
+ .HasColumnType("boolean")
+ .HasColumnName("is_system_message");
+
+ b.Property("MessageId")
+ .HasMaxLength(200)
+ .HasColumnType("character varying(200)")
+ .HasColumnName("message_id");
+
+ b.Property("MessageType")
+ .HasMaxLength(500)
+ .HasColumnType("character varying(500)")
+ .HasColumnName("message_type");
+
+ b.Property("ProcessingTimeTicks")
+ .HasColumnType("bigint")
+ .HasColumnName("processing_time_ticks");
+
+ b.Property("ReceivingEndpointName")
+ .HasMaxLength(500)
+ .HasColumnType("character varying(500)")
+ .HasColumnName("receiving_endpoint_name");
+
+ b.Property("SearchableContent")
+ .HasColumnType("text")
+ .HasColumnName("searchable_content");
+
+ b.Property("Status")
+ .HasColumnType("integer")
+ .HasColumnName("status");
+
+ b.Property("TimeSent")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("time_sent");
+
+ b.Property("UniqueMessageId")
+ .IsRequired()
+ .HasMaxLength(200)
+ .HasColumnType("character varying(200)")
+ .HasColumnName("unique_message_id");
+
+ b.HasKey("Id", "CreatedOn");
+
+ b.HasIndex("ConversationId");
+
+ b.HasIndex("MessageId");
+
+ b.HasIndex("TimeSent");
+
+ b.HasIndex("UniqueMessageId");
+
+ b.ToTable("processed_messages", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.SagaSnapshotEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("bigint")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("CreatedOn")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("created_on");
+
+ b.Property("Endpoint")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("endpoint");
+
+ b.Property("FinishTime")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("finish_time");
+
+ b.Property("InitiatingMessageJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("initiating_message_json");
+
+ b.Property("OutgoingMessagesJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("outgoing_messages_json");
+
+ b.Property("SagaId")
+ .HasColumnType("uuid")
+ .HasColumnName("saga_id");
+
+ b.Property("SagaType")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("saga_type");
+
+ b.Property("StartTime")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("start_time");
+
+ b.Property("StateAfterChange")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("state_after_change");
+
+ b.Property("Status")
+ .HasColumnType("integer")
+ .HasColumnName("status");
+
+ b.HasKey("Id", "CreatedOn");
+
+ b.HasIndex("SagaId");
+
+ b.ToTable("saga_snapshots", (string)null);
+ });
+#pragma warning restore 612, 618
+ }
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031455_InitialCreate.cs b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031455_InitialCreate.cs
new file mode 100644
index 0000000000..3780d85378
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031455_InitialCreate.cs
@@ -0,0 +1,171 @@
+using System;
+using Microsoft.EntityFrameworkCore.Migrations;
+using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
+
+#nullable disable
+
+namespace ServiceControl.Audit.Persistence.Sql.PostgreSQL.Migrations
+{
+ ///
+ public partial class InitialCreate : Migration
+ {
+ ///
+ protected override void Up(MigrationBuilder migrationBuilder)
+ {
+ migrationBuilder.CreateTable(
+ name: "failed_audit_imports",
+ columns: table => new
+ {
+ id = table.Column(type: "uuid", nullable: false),
+ message_json = table.Column(type: "text", nullable: false),
+ exception_info = table.Column(type: "text", nullable: true)
+ },
+ constraints: table =>
+ {
+ table.PrimaryKey("PK_failed_audit_imports", x => x.id);
+ });
+
+ migrationBuilder.CreateTable(
+ name: "known_endpoints",
+ columns: table => new
+ {
+ id = table.Column(type: "uuid", nullable: false),
+ name = table.Column(type: "text", nullable: false),
+ host_id = table.Column(type: "uuid", nullable: false),
+ host = table.Column(type: "text", nullable: false),
+ last_seen = table.Column(type: "timestamp with time zone", nullable: false)
+ },
+ constraints: table =>
+ {
+ table.PrimaryKey("PK_known_endpoints", x => x.id);
+ });
+
+ migrationBuilder.CreateTable(
+ name: "known_endpoints_insert_only",
+ columns: table => new
+ {
+ id = table.Column(type: "bigint", nullable: false)
+ .Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn),
+ known_endpoint_id = table.Column(type: "uuid", nullable: false),
+ name = table.Column(type: "text", nullable: false),
+ host_id = table.Column(type: "uuid", nullable: false),
+ host = table.Column(type: "text", nullable: false),
+ last_seen = table.Column(type: "timestamp with time zone", nullable: false)
+ },
+ constraints: table =>
+ {
+ table.PrimaryKey("PK_known_endpoints_insert_only", x => x.id);
+ });
+
+ migrationBuilder.CreateTable(
+ name: "processed_messages",
+ columns: table => new
+ {
+ id = table.Column(type: "bigint", nullable: false)
+ .Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn),
+ created_on = table.Column(type: "timestamp with time zone", nullable: false),
+ unique_message_id = table.Column(type: "character varying(200)", maxLength: 200, nullable: false),
+ headers_json = table.Column(type: "text", nullable: false),
+ searchable_content = table.Column(type: "text", nullable: true),
+ message_id = table.Column(type: "character varying(200)", maxLength: 200, nullable: true),
+ message_type = table.Column(type: "character varying(500)", maxLength: 500, nullable: true),
+ time_sent = table.Column(type: "timestamp with time zone", nullable: true),
+ is_system_message = table.Column(type: "boolean", nullable: false),
+ status = table.Column(type: "integer", nullable: false),
+ conversation_id = table.Column(type: "character varying(200)", maxLength: 200, nullable: true),
+ receiving_endpoint_name = table.Column(type: "character varying(500)", maxLength: 500, nullable: true),
+ critical_time_ticks = table.Column(type: "bigint", nullable: true),
+ processing_time_ticks = table.Column(type: "bigint", nullable: true),
+ delivery_time_ticks = table.Column(type: "bigint", nullable: true),
+ body_size = table.Column(type: "integer", nullable: false),
+ body_url = table.Column(type: "character varying(500)", maxLength: 500, nullable: true),
+ body_not_stored = table.Column(type: "boolean", nullable: false)
+ },
+ constraints: table =>
+ {
+ table.PrimaryKey("PK_processed_messages", x => new { x.id, x.created_on });
+ });
+
+ migrationBuilder.CreateTable(
+ name: "saga_snapshots",
+ columns: table => new
+ {
+ id = table.Column(type: "bigint", nullable: false)
+ .Annotation("Npgsql:ValueGenerationStrategy", NpgsqlValueGenerationStrategy.IdentityByDefaultColumn),
+ created_on = table.Column(type: "timestamp with time zone", nullable: false),
+ saga_id = table.Column(type: "uuid", nullable: false),
+ saga_type = table.Column(type: "text", nullable: false),
+ start_time = table.Column(type: "timestamp with time zone", nullable: false),
+ finish_time = table.Column(type: "timestamp with time zone", nullable: false),
+ status = table.Column(type: "integer", nullable: false),
+ state_after_change = table.Column(type: "text", nullable: false),
+ initiating_message_json = table.Column(type: "text", nullable: false),
+ outgoing_messages_json = table.Column(type: "text", nullable: false),
+ endpoint = table.Column(type: "text", nullable: false)
+ },
+ constraints: table =>
+ {
+ table.PrimaryKey("PK_saga_snapshots", x => new { x.id, x.created_on });
+ });
+
+ migrationBuilder.CreateIndex(
+ name: "IX_known_endpoints_last_seen",
+ table: "known_endpoints",
+ column: "last_seen");
+
+ migrationBuilder.CreateIndex(
+ name: "IX_known_endpoints_insert_only_known_endpoint_id",
+ table: "known_endpoints_insert_only",
+ column: "known_endpoint_id");
+
+ migrationBuilder.CreateIndex(
+ name: "IX_known_endpoints_insert_only_last_seen",
+ table: "known_endpoints_insert_only",
+ column: "last_seen");
+
+ migrationBuilder.CreateIndex(
+ name: "IX_processed_messages_conversation_id",
+ table: "processed_messages",
+ column: "conversation_id");
+
+ migrationBuilder.CreateIndex(
+ name: "IX_processed_messages_message_id",
+ table: "processed_messages",
+ column: "message_id");
+
+ migrationBuilder.CreateIndex(
+ name: "IX_processed_messages_time_sent",
+ table: "processed_messages",
+ column: "time_sent");
+
+ migrationBuilder.CreateIndex(
+ name: "IX_processed_messages_unique_message_id",
+ table: "processed_messages",
+ column: "unique_message_id");
+
+ migrationBuilder.CreateIndex(
+ name: "IX_saga_snapshots_saga_id",
+ table: "saga_snapshots",
+ column: "saga_id");
+ }
+
+ ///
+ protected override void Down(MigrationBuilder migrationBuilder)
+ {
+ migrationBuilder.DropTable(
+ name: "failed_audit_imports");
+
+ migrationBuilder.DropTable(
+ name: "known_endpoints");
+
+ migrationBuilder.DropTable(
+ name: "known_endpoints_insert_only");
+
+ migrationBuilder.DropTable(
+ name: "processed_messages");
+
+ migrationBuilder.DropTable(
+ name: "saga_snapshots");
+ }
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031511_AddPartitioning.Designer.cs b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031511_AddPartitioning.Designer.cs
new file mode 100644
index 0000000000..cd7c4bcb6a
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031511_AddPartitioning.Designer.cs
@@ -0,0 +1,281 @@
+//
+using System;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Infrastructure;
+using Microsoft.EntityFrameworkCore.Migrations;
+using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
+using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
+using ServiceControl.Audit.Persistence.Sql.PostgreSQL;
+
+#nullable disable
+
+namespace ServiceControl.Audit.Persistence.Sql.PostgreSQL.Migrations
+{
+ [DbContext(typeof(PostgreSqlAuditDbContext))]
+ [Migration("20260214031511_AddPartitioning")]
+ partial class AddPartitioning
+ {
+ ///
+ protected override void BuildTargetModel(ModelBuilder modelBuilder)
+ {
+#pragma warning disable 612, 618
+ modelBuilder
+ .HasAnnotation("ProductVersion", "10.0.3")
+ .HasAnnotation("Relational:MaxIdentifierLength", 63);
+
+ NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.FailedAuditImportEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid")
+ .HasColumnName("id");
+
+ b.Property("ExceptionInfo")
+ .HasColumnType("text")
+ .HasColumnName("exception_info");
+
+ b.Property("MessageJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("message_json");
+
+ b.HasKey("Id");
+
+ b.ToTable("failed_audit_imports", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.KnownEndpointEntity", b =>
+ {
+ b.Property("Id")
+ .HasColumnType("uuid")
+ .HasColumnName("id");
+
+ b.Property("Host")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("host");
+
+ b.Property("HostId")
+ .HasColumnType("uuid")
+ .HasColumnName("host_id");
+
+ b.Property("LastSeen")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("last_seen");
+
+ b.Property("Name")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("name");
+
+ b.HasKey("Id");
+
+ b.HasIndex("LastSeen");
+
+ b.ToTable("known_endpoints", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.KnownEndpointInsertOnlyEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("bigint")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("Host")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("host");
+
+ b.Property("HostId")
+ .HasColumnType("uuid")
+ .HasColumnName("host_id");
+
+ b.Property("KnownEndpointId")
+ .HasColumnType("uuid")
+ .HasColumnName("known_endpoint_id");
+
+ b.Property("LastSeen")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("last_seen");
+
+ b.Property("Name")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("name");
+
+ b.HasKey("Id");
+
+ b.HasIndex("KnownEndpointId");
+
+ b.HasIndex("LastSeen");
+
+ b.ToTable("known_endpoints_insert_only", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.ProcessedMessageEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("bigint")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("CreatedOn")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("created_on");
+
+ b.Property("BodyNotStored")
+ .HasColumnType("boolean")
+ .HasColumnName("body_not_stored");
+
+ b.Property("BodySize")
+ .HasColumnType("integer")
+ .HasColumnName("body_size");
+
+ b.Property("BodyUrl")
+ .HasMaxLength(500)
+ .HasColumnType("character varying(500)")
+ .HasColumnName("body_url");
+
+ b.Property("ConversationId")
+ .HasMaxLength(200)
+ .HasColumnType("character varying(200)")
+ .HasColumnName("conversation_id");
+
+ b.Property("CriticalTimeTicks")
+ .HasColumnType("bigint")
+ .HasColumnName("critical_time_ticks");
+
+ b.Property("DeliveryTimeTicks")
+ .HasColumnType("bigint")
+ .HasColumnName("delivery_time_ticks");
+
+ b.Property("HeadersJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("headers_json");
+
+ b.Property("IsSystemMessage")
+ .HasColumnType("boolean")
+ .HasColumnName("is_system_message");
+
+ b.Property("MessageId")
+ .HasMaxLength(200)
+ .HasColumnType("character varying(200)")
+ .HasColumnName("message_id");
+
+ b.Property("MessageType")
+ .HasMaxLength(500)
+ .HasColumnType("character varying(500)")
+ .HasColumnName("message_type");
+
+ b.Property("ProcessingTimeTicks")
+ .HasColumnType("bigint")
+ .HasColumnName("processing_time_ticks");
+
+ b.Property("ReceivingEndpointName")
+ .HasMaxLength(500)
+ .HasColumnType("character varying(500)")
+ .HasColumnName("receiving_endpoint_name");
+
+ b.Property("SearchableContent")
+ .HasColumnType("text")
+ .HasColumnName("searchable_content");
+
+ b.Property("Status")
+ .HasColumnType("integer")
+ .HasColumnName("status");
+
+ b.Property("TimeSent")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("time_sent");
+
+ b.Property("UniqueMessageId")
+ .IsRequired()
+ .HasMaxLength(200)
+ .HasColumnType("character varying(200)")
+ .HasColumnName("unique_message_id");
+
+ b.HasKey("Id", "CreatedOn");
+
+ b.HasIndex("ConversationId");
+
+ b.HasIndex("MessageId");
+
+ b.HasIndex("TimeSent");
+
+ b.HasIndex("UniqueMessageId");
+
+ b.ToTable("processed_messages", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.SagaSnapshotEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("bigint")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("CreatedOn")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("created_on");
+
+ b.Property("Endpoint")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("endpoint");
+
+ b.Property("FinishTime")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("finish_time");
+
+ b.Property("InitiatingMessageJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("initiating_message_json");
+
+ b.Property("OutgoingMessagesJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("outgoing_messages_json");
+
+ b.Property("SagaId")
+ .HasColumnType("uuid")
+ .HasColumnName("saga_id");
+
+ b.Property("SagaType")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("saga_type");
+
+ b.Property("StartTime")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("start_time");
+
+ b.Property("StateAfterChange")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("state_after_change");
+
+ b.Property("Status")
+ .HasColumnType("integer")
+ .HasColumnName("status");
+
+ b.HasKey("Id", "CreatedOn");
+
+ b.HasIndex("SagaId");
+
+ b.ToTable("saga_snapshots", (string)null);
+ });
+#pragma warning restore 612, 618
+ }
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031511_AddPartitioning.cs b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031511_AddPartitioning.cs
new file mode 100644
index 0000000000..d30c186b5b
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031511_AddPartitioning.cs
@@ -0,0 +1,40 @@
+using Microsoft.EntityFrameworkCore.Migrations;
+
+#nullable disable
+
+namespace ServiceControl.Audit.Persistence.Sql.PostgreSQL.Migrations
+{
+ ///
+ public partial class AddPartitioning : Migration
+ {
+ static readonly string[] Tables = ["processed_messages", "saga_snapshots"];
+
+ ///
+ protected override void Up(MigrationBuilder migrationBuilder)
+ {
+ foreach (var table in Tables)
+ {
+ migrationBuilder.Sql($"""
+ CREATE TABLE {table}_tmp (LIKE {table} INCLUDING ALL);
+ DROP TABLE {table};
+ CREATE TABLE {table} (LIKE {table}_tmp INCLUDING ALL) PARTITION BY RANGE (created_on);
+ DROP TABLE {table}_tmp;
+ """);
+ }
+ }
+
+ ///
+ protected override void Down(MigrationBuilder migrationBuilder)
+ {
+ foreach (var table in Tables)
+ {
+ migrationBuilder.Sql($"""
+ CREATE TABLE {table}_tmp (LIKE {table} INCLUDING ALL);
+ DROP TABLE {table};
+ CREATE TABLE {table} (LIKE {table}_tmp INCLUDING ALL);
+ DROP TABLE {table}_tmp;
+ """);
+ }
+ }
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031534_AddFullTextSearch.Designer.cs b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031534_AddFullTextSearch.Designer.cs
new file mode 100644
index 0000000000..6f6a3a343d
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.Sql.PostgreSQL/Migrations/20260214031534_AddFullTextSearch.Designer.cs
@@ -0,0 +1,281 @@
+//
+using System;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Infrastructure;
+using Microsoft.EntityFrameworkCore.Migrations;
+using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
+using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
+using ServiceControl.Audit.Persistence.Sql.PostgreSQL;
+
+#nullable disable
+
+namespace ServiceControl.Audit.Persistence.Sql.PostgreSQL.Migrations
+{
+ [DbContext(typeof(PostgreSqlAuditDbContext))]
+ [Migration("20260214031534_AddFullTextSearch")]
+ partial class AddFullTextSearch
+ {
+ ///
+ protected override void BuildTargetModel(ModelBuilder modelBuilder)
+ {
+#pragma warning disable 612, 618
+ modelBuilder
+ .HasAnnotation("ProductVersion", "10.0.3")
+ .HasAnnotation("Relational:MaxIdentifierLength", 63);
+
+ NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.FailedAuditImportEntity", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid")
+ .HasColumnName("id");
+
+ b.Property("ExceptionInfo")
+ .HasColumnType("text")
+ .HasColumnName("exception_info");
+
+ b.Property("MessageJson")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("message_json");
+
+ b.HasKey("Id");
+
+ b.ToTable("failed_audit_imports", (string)null);
+ });
+
+ modelBuilder.Entity("ServiceControl.Audit.Persistence.Sql.Core.Entities.KnownEndpointEntity", b =>
+ {
+ b.Property("Id")
+ .HasColumnType("uuid")
+ .HasColumnName("id");
+
+ b.Property("Host")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("host");
+
+ b.Property("HostId")
+ .HasColumnType("uuid")
+ .HasColumnName("host_id");
+
+ b.Property("LastSeen")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("last_seen");
+
+ b.Property