diff --git a/src/CoreSync.Http.Client/Implementation/SyncProviderHttpClient.cs b/src/CoreSync.Http.Client/Implementation/SyncProviderHttpClient.cs index 73fdfd9..bdfc39b 100644 --- a/src/CoreSync.Http.Client/Implementation/SyncProviderHttpClient.cs +++ b/src/CoreSync.Http.Client/Implementation/SyncProviderHttpClient.cs @@ -216,16 +216,17 @@ private static void ConvertJsonValueToNetObject(SyncChangeSet changeSet) { foreach (var itemValueEntry in item.Values.Where(_ => _.Key != "__OP").ToList()) { - item.Values[itemValueEntry.Key].Value = itemValueEntry.Value.Value == null ? null : - ConvertJsonValueToNetObject((JsonElement)itemValueEntry.Value.Value, itemValueEntry.Value.Type); - - //WARN: can't convert every Id to lowercase because sqlite is case sensitive! - //if (itemValueEntry.Value.Value != null && - // itemValueEntry.Value.Type == SyncItemValueType.String && - // (itemValueEntry.Key == "Id" || itemValueEntry.Key.EndsWith("Id"))) - //{ - // item.Values[itemValueEntry.Key].Value = itemValueEntry.Value.Value.ToString().ToLowerInvariant(); - //} + var rawValue = itemValueEntry.Value.Value; + if (rawValue == null) + { + item.Values[itemValueEntry.Key].Value = null; + } + else if (rawValue is JsonElement jsonElement) + { + item.Values[itemValueEntry.Key].Value = + ConvertJsonValueToNetObject(jsonElement, itemValueEntry.Value.Type); + } + // else: value is already a native .NET type — no conversion needed. } } } diff --git a/src/CoreSync.Http.Server/SyncAgentController.cs b/src/CoreSync.Http.Server/SyncAgentController.cs index 9826b54..98a889d 100644 --- a/src/CoreSync.Http.Server/SyncAgentController.cs +++ b/src/CoreSync.Http.Server/SyncAgentController.cs @@ -170,8 +170,18 @@ public async Task CompleteApplyBulkChangesAsync(Guid sessionId) { foreach (var itemValueEntry in item.Values.Where(_ => _.Key != "__OP").ToList()) { - item.Values[itemValueEntry.Key].Value = itemValueEntry.Value.Value == null ? null : - ConvertJsonElementToObject((JsonElement)itemValueEntry.Value.Value, itemValueEntry.Value.Type); + var rawValue = itemValueEntry.Value.Value; + if (rawValue == null) + { + item.Values[itemValueEntry.Key].Value = null; + } + else if (rawValue is JsonElement jsonElement) + { + item.Values[itemValueEntry.Key].Value = + ConvertJsonElementToObject(jsonElement, itemValueEntry.Value.Type); + } + // else: value is already a native .NET type (e.g. from MessagePack deserialization + // or newer System.Text.Json versions) — no conversion needed. } } diff --git a/src/CoreSync.PostgreSQL/PostgreSQLSyncProvider.cs b/src/CoreSync.PostgreSQL/PostgreSQLSyncProvider.cs index 2d1d4a5..b3fa4e6 100644 --- a/src/CoreSync.PostgreSQL/PostgreSQLSyncProvider.cs +++ b/src/CoreSync.PostgreSQL/PostgreSQLSyncProvider.cs @@ -84,12 +84,18 @@ public async Task ApplyChangesAsync([NotNull] SyncChangeSet changeSe try { + // Use a savepoint so that a failed statement does not abort + // the entire PostgreSQL transaction (error state 25P02). + await tr.SaveAsync("sync_item", cancellationToken); + affectedRows = await cmd.ExecuteNonQueryAsync(cancellationToken); if (affectedRows > 0) { _logger?.Trace($"[{_storeId}] Successfully applied {item}"); } + + await tr.ReleaseAsync("sync_item", cancellationToken); } catch (OperationCanceledException) { @@ -98,6 +104,9 @@ public async Task ApplyChangesAsync([NotNull] SyncChangeSet changeSe catch (Exception ex) { _logger?.Error($"Unable to {itemChangeType} item {item} to store for table {table}.{Environment.NewLine}{ex}{Environment.NewLine}Generated SQL:{Environment.NewLine}{cmd.CommandText}"); + // Rollback to the savepoint so the transaction can continue + // processing remaining items. + try { await tr.RollbackAsync("sync_item", cancellationToken); } catch { /* already rolled back */ } } if (affectedRows == 0)