From f32944d1fdfc5de4d4e1ea818337cf11a74db780 Mon Sep 17 00:00:00 2001 From: David Ortinau Date: Fri, 20 Mar 2026 13:18:42 -0500 Subject: [PATCH] Fix InvalidCastException and PostgreSQL transaction abort in HTTP sync Bug 1: SyncAgentController.CompleteApplyBulkChangesAsync hard-casts values to JsonElement, which fails when values are already native .NET types (e.g. from MessagePack deserialization or newer System.Text.Json versions that may resolve object? differently). Fixed by using pattern matching (is JsonElement) and passing through native types without conversion. Applied the same fix to the client-side ConvertJsonValueToNetObject for robustness. Bug 2: PostgreSQLSyncProvider.ApplyChangesAsync catches DML exceptions but continues the transaction. Unlike SQLite, PostgreSQL enters an 'aborted' state (25P02) after any error within a transaction, causing all subsequent commands to fail. Fixed by wrapping each DML statement in a SAVEPOINT so failures can be rolled back without aborting the entire transaction. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Implementation/SyncProviderHttpClient.cs | 21 ++++++++++--------- .../SyncAgentController.cs | 14 +++++++++++-- .../PostgreSQLSyncProvider.cs | 9 ++++++++ 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/src/CoreSync.Http.Client/Implementation/SyncProviderHttpClient.cs b/src/CoreSync.Http.Client/Implementation/SyncProviderHttpClient.cs index 73fdfd9..bdfc39b 100644 --- a/src/CoreSync.Http.Client/Implementation/SyncProviderHttpClient.cs +++ b/src/CoreSync.Http.Client/Implementation/SyncProviderHttpClient.cs @@ -216,16 +216,17 @@ private static void ConvertJsonValueToNetObject(SyncChangeSet changeSet) { foreach (var itemValueEntry in item.Values.Where(_ => _.Key != "__OP").ToList()) { - item.Values[itemValueEntry.Key].Value = itemValueEntry.Value.Value == null ? null : - ConvertJsonValueToNetObject((JsonElement)itemValueEntry.Value.Value, itemValueEntry.Value.Type); - - //WARN: can't convert every Id to lowercase because sqlite is case sensitive! - //if (itemValueEntry.Value.Value != null && - // itemValueEntry.Value.Type == SyncItemValueType.String && - // (itemValueEntry.Key == "Id" || itemValueEntry.Key.EndsWith("Id"))) - //{ - // item.Values[itemValueEntry.Key].Value = itemValueEntry.Value.Value.ToString().ToLowerInvariant(); - //} + var rawValue = itemValueEntry.Value.Value; + if (rawValue == null) + { + item.Values[itemValueEntry.Key].Value = null; + } + else if (rawValue is JsonElement jsonElement) + { + item.Values[itemValueEntry.Key].Value = + ConvertJsonValueToNetObject(jsonElement, itemValueEntry.Value.Type); + } + // else: value is already a native .NET type — no conversion needed. } } } diff --git a/src/CoreSync.Http.Server/SyncAgentController.cs b/src/CoreSync.Http.Server/SyncAgentController.cs index 9826b54..98a889d 100644 --- a/src/CoreSync.Http.Server/SyncAgentController.cs +++ b/src/CoreSync.Http.Server/SyncAgentController.cs @@ -170,8 +170,18 @@ public async Task CompleteApplyBulkChangesAsync(Guid sessionId) { foreach (var itemValueEntry in item.Values.Where(_ => _.Key != "__OP").ToList()) { - item.Values[itemValueEntry.Key].Value = itemValueEntry.Value.Value == null ? null : - ConvertJsonElementToObject((JsonElement)itemValueEntry.Value.Value, itemValueEntry.Value.Type); + var rawValue = itemValueEntry.Value.Value; + if (rawValue == null) + { + item.Values[itemValueEntry.Key].Value = null; + } + else if (rawValue is JsonElement jsonElement) + { + item.Values[itemValueEntry.Key].Value = + ConvertJsonElementToObject(jsonElement, itemValueEntry.Value.Type); + } + // else: value is already a native .NET type (e.g. from MessagePack deserialization + // or newer System.Text.Json versions) — no conversion needed. } } diff --git a/src/CoreSync.PostgreSQL/PostgreSQLSyncProvider.cs b/src/CoreSync.PostgreSQL/PostgreSQLSyncProvider.cs index 2d1d4a5..b3fa4e6 100644 --- a/src/CoreSync.PostgreSQL/PostgreSQLSyncProvider.cs +++ b/src/CoreSync.PostgreSQL/PostgreSQLSyncProvider.cs @@ -84,12 +84,18 @@ public async Task ApplyChangesAsync([NotNull] SyncChangeSet changeSe try { + // Use a savepoint so that a failed statement does not abort + // the entire PostgreSQL transaction (error state 25P02). + await tr.SaveAsync("sync_item", cancellationToken); + affectedRows = await cmd.ExecuteNonQueryAsync(cancellationToken); if (affectedRows > 0) { _logger?.Trace($"[{_storeId}] Successfully applied {item}"); } + + await tr.ReleaseAsync("sync_item", cancellationToken); } catch (OperationCanceledException) { @@ -98,6 +104,9 @@ public async Task ApplyChangesAsync([NotNull] SyncChangeSet changeSe catch (Exception ex) { _logger?.Error($"Unable to {itemChangeType} item {item} to store for table {table}.{Environment.NewLine}{ex}{Environment.NewLine}Generated SQL:{Environment.NewLine}{cmd.CommandText}"); + // Rollback to the savepoint so the transaction can continue + // processing remaining items. + try { await tr.RollbackAsync("sync_item", cancellationToken); } catch { /* already rolled back */ } } if (affectedRows == 0)