From bf94dafcf3738427f85b83cc4b0b08d16a8d98a2 Mon Sep 17 00:00:00 2001 From: Igor Barakaiev Date: Mon, 22 Dec 2025 16:18:12 -0800 Subject: [PATCH 1/3] fix: handle overlapping subset queries returning same row with different values MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When multiple subset queries return the same row (e.g., different WHERE clauses that both match the same record), the server sends `insert` operations for each response. If the row's data changed between requests (e.g., timestamp field updated), this caused a DuplicateKeySyncError because TanStack DB's sync layer throws when inserting an existing key with a different value. This fix tracks synced keys in the Electric adapter and converts subsequent `insert` operations to `update` for keys that have already been synced. The tracked keys are cleared on truncate/must-refetch to stay in sync with the collection state. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .changeset/quick-tables-glow.md | 7 +++++ .../electric-db-collection/src/electric.ts | 30 +++++++++++++++++-- 2 files changed, 35 insertions(+), 2 deletions(-) create mode 100644 .changeset/quick-tables-glow.md diff --git a/.changeset/quick-tables-glow.md b/.changeset/quick-tables-glow.md new file mode 100644 index 000000000..d7174503c --- /dev/null +++ b/.changeset/quick-tables-glow.md @@ -0,0 +1,7 @@ +--- +'@tanstack/electric-db-collection': patch +--- + +Fix duplicate key error when overlapping subset queries return the same row with different values. + +When multiple subset queries return the same row (e.g., different WHERE clauses that both match the same record), the server sends `insert` operations for each response. If the row's data changed between requests (e.g., timestamp field updated), this caused a `DuplicateKeySyncError`. The adapter now tracks synced keys and converts subsequent inserts to updates. diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 477e37641..d0ddab654 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -1233,6 +1233,12 @@ function createElectricSync>( syncMode === `progressive` && !hasReceivedUpToDate const bufferedMessages: Array> = [] // Buffer change messages during initial sync + // Track keys that have been synced to handle overlapping subset queries. + // When multiple subset queries return the same row, the server sends `insert` + // for each response. We convert subsequent inserts to updates to avoid + // duplicate key errors when the row's data has changed between requests. + const syncedKeys = new Set() + /** * Process a change message: handle tags and write the mutation */ @@ -1249,14 +1255,28 @@ function createElectricSync>( const rowId = collection.getKeyFromItem(changeMessage.value) const operation = changeMessage.headers.operation - if (operation === `delete`) { + // Track synced keys and handle overlapping subset queries. + // When multiple subset queries return the same row, the server sends + // `insert` for each response. We convert subsequent inserts to updates + // to avoid duplicate key errors when the row's data has changed. + const isDelete = operation === `delete` + const isDuplicateInsert = + operation === `insert` && syncedKeys.has(rowId) + + if (isDelete) { + syncedKeys.delete(rowId) + } else { + syncedKeys.add(rowId) + } + + if (isDelete) { clearTagsForRow(rowId) } else if (hasTags) { processTagsForChangeMessage(tags, removedTags, rowId) } write({ - type: changeMessage.headers.operation, + type: isDuplicateInsert ? `update` : operation, value: changeMessage.value, // Include the primary key and relation info in the metadata metadata: { @@ -1392,6 +1412,9 @@ function createElectricSync>( // Clear tag tracking state clearTagTrackingState() + // Clear synced keys tracking since we're starting fresh + syncedKeys.clear() + // Reset the loadSubset deduplication state since we're starting fresh // This ensures that previously loaded predicates don't prevent refetching after truncate loadSubsetDedupe?.reset() @@ -1419,6 +1442,9 @@ function createElectricSync>( // Clear tag tracking state for atomic swap clearTagTrackingState() + // Clear synced keys tracking for atomic swap + syncedKeys.clear() + // Apply all buffered change messages and extract txids/snapshots for (const bufferedMsg of bufferedMessages) { if (isChangeMessage(bufferedMsg)) { From 54cf61c5433560046e042c9dd57e532411bc9caa Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 29 Dec 2025 15:04:10 +0000 Subject: [PATCH 2/3] add tests --- .../tests/electric.test.ts | 289 ++++++++++++++++++ 1 file changed, 289 insertions(+) diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index 87d5567e8..9f76258dd 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -2757,6 +2757,295 @@ describe(`Electric Integration`, () => { }) }) + // Tests for overlapping subset queries with duplicate keys + describe(`Overlapping subset queries with duplicate keys`, () => { + it(`should convert duplicate inserts to updates when overlapping subset queries return the same row with different values`, () => { + // This test reproduces the issue where: + // 1. Multiple subset queries return the same row (e.g., different WHERE clauses that both match the same record) + // 2. The server sends `insert` operations for each response + // 3. If the row's data changed between requests (e.g., timestamp field updated), this caused a DuplicateKeySyncError + + const config = { + id: `duplicate-insert-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // First subset query returns a row + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1`, updated_at: `2024-01-01T00:00:00Z` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Verify initial data is present + expect(testCollection.has(1)).toBe(true) + expect(testCollection.get(1)).toEqual({ + id: 1, + name: `User 1`, + updated_at: `2024-01-01T00:00:00Z`, + }) + + // Second subset query returns the SAME row but with a different timestamp + // This would throw DuplicateKeySyncError without the fix because: + // 1. The key already exists in syncedData + // 2. The value is different (timestamp changed) + // 3. Without the Electric adapter converting insert->update, sync.ts throws + expect(() => { + subscriber([ + { + key: `1`, + value: { + id: 1, + name: `User 1`, + updated_at: `2024-01-01T00:00:01Z`, + }, // Different timestamp! + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + }).not.toThrow() + + // The row should be updated with the new value + expect(testCollection.has(1)).toBe(true) + expect(testCollection.get(1)).toEqual({ + id: 1, + name: `User 1`, + updated_at: `2024-01-01T00:00:01Z`, + }) + }) + + it(`should handle multiple duplicate inserts across several batches`, () => { + const config = { + id: `multiple-duplicate-inserts-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // First batch - initial inserts + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1`, version: 1 }, + headers: { operation: `insert` }, + }, + { + key: `2`, + value: { id: 2, name: `User 2`, version: 1 }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(testCollection.size).toBe(2) + expect(testCollection.get(1)).toEqual({ id: 1, name: `User 1`, version: 1 }) + expect(testCollection.get(2)).toEqual({ id: 2, name: `User 2`, version: 1 }) + + // Second batch - overlapping subset query returns same rows with different values + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1`, version: 2 }, // version changed + headers: { operation: `insert` }, + }, + { + key: `2`, + value: { id: 2, name: `User 2`, version: 2 }, // version changed + headers: { operation: `insert` }, + }, + { + key: `3`, + value: { id: 3, name: `User 3`, version: 1 }, // new row + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // All rows should be present with updated values + expect(testCollection.size).toBe(3) + expect(testCollection.get(1)).toEqual({ id: 1, name: `User 1`, version: 2 }) + expect(testCollection.get(2)).toEqual({ id: 2, name: `User 2`, version: 2 }) + expect(testCollection.get(3)).toEqual({ id: 3, name: `User 3`, version: 1 }) + }) + + it(`should reset synced keys tracking on must-refetch`, () => { + const config = { + id: `must-refetch-reset-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Initial sync + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(testCollection.has(1)).toBe(true) + + // Trigger must-refetch (clears collection and syncedKeys tracking) + subscriber([ + { + headers: { control: `must-refetch` }, + }, + ]) + + // After must-refetch, sending the same key as insert should work + // because syncedKeys tracking was cleared + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1 After Refetch` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(testCollection.has(1)).toBe(true) + expect(testCollection.get(1)).toEqual({ id: 1, name: `User 1 After Refetch` }) + }) + + it(`should handle delete followed by insert of the same key`, () => { + const config = { + id: `delete-then-insert-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Initial insert + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(testCollection.has(1)).toBe(true) + + // Delete the row + subscriber([ + { + key: `1`, + value: { id: 1 }, + headers: { operation: `delete` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(testCollection.has(1)).toBe(false) + + // Re-insert the same key - should work because delete cleared the syncedKeys tracking + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1 Recreated` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(testCollection.has(1)).toBe(true) + expect(testCollection.get(1)).toEqual({ id: 1, name: `User 1 Recreated` }) + }) + + it(`should handle duplicate inserts within the same batch`, () => { + const config = { + id: `same-batch-duplicate-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Single batch with duplicate inserts for the same key + // This can happen when multiple subset responses are batched together + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1`, version: 1 }, + headers: { operation: `insert` }, + }, + { + key: `1`, + value: { id: 1, name: `User 1`, version: 2 }, // Same key, different value + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Should have the latest value + expect(testCollection.has(1)).toBe(true) + expect(testCollection.get(1)).toEqual({ id: 1, name: `User 1`, version: 2 }) + }) + }) + // Tests for commit and ready behavior with snapshot-end and up-to-date messages describe(`Commit and ready behavior`, () => { it(`should ignore snapshot-end before first up-to-date in progressive mode`, () => { From 70209e6a3b473c0f8aa68fbcbab13d0bdf945d60 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Mon, 29 Dec 2025 15:05:03 +0000 Subject: [PATCH 3/3] ci: apply automated fixes --- .../tests/electric.test.ts | 41 +++++++++++++++---- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index 9f76258dd..b0edbed89 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -2863,8 +2863,16 @@ describe(`Electric Integration`, () => { ]) expect(testCollection.size).toBe(2) - expect(testCollection.get(1)).toEqual({ id: 1, name: `User 1`, version: 1 }) - expect(testCollection.get(2)).toEqual({ id: 2, name: `User 2`, version: 1 }) + expect(testCollection.get(1)).toEqual({ + id: 1, + name: `User 1`, + version: 1, + }) + expect(testCollection.get(2)).toEqual({ + id: 2, + name: `User 2`, + version: 1, + }) // Second batch - overlapping subset query returns same rows with different values subscriber([ @@ -2890,9 +2898,21 @@ describe(`Electric Integration`, () => { // All rows should be present with updated values expect(testCollection.size).toBe(3) - expect(testCollection.get(1)).toEqual({ id: 1, name: `User 1`, version: 2 }) - expect(testCollection.get(2)).toEqual({ id: 2, name: `User 2`, version: 2 }) - expect(testCollection.get(3)).toEqual({ id: 3, name: `User 3`, version: 1 }) + expect(testCollection.get(1)).toEqual({ + id: 1, + name: `User 1`, + version: 2, + }) + expect(testCollection.get(2)).toEqual({ + id: 2, + name: `User 2`, + version: 2, + }) + expect(testCollection.get(3)).toEqual({ + id: 3, + name: `User 3`, + version: 1, + }) }) it(`should reset synced keys tracking on must-refetch`, () => { @@ -2945,7 +2965,10 @@ describe(`Electric Integration`, () => { ]) expect(testCollection.has(1)).toBe(true) - expect(testCollection.get(1)).toEqual({ id: 1, name: `User 1 After Refetch` }) + expect(testCollection.get(1)).toEqual({ + id: 1, + name: `User 1 After Refetch`, + }) }) it(`should handle delete followed by insert of the same key`, () => { @@ -3042,7 +3065,11 @@ describe(`Electric Integration`, () => { // Should have the latest value expect(testCollection.has(1)).toBe(true) - expect(testCollection.get(1)).toEqual({ id: 1, name: `User 1`, version: 2 }) + expect(testCollection.get(1)).toEqual({ + id: 1, + name: `User 1`, + version: 2, + }) }) })