Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/quick-tables-glow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@tanstack/electric-db-collection': patch
---

Fix duplicate key error when overlapping subset queries return the same row with different values.

When multiple subset queries return the same row (e.g., different WHERE clauses that both match the same record), the server sends `insert` operations for each response. If the row's data changed between requests (e.g., timestamp field updated), this caused a `DuplicateKeySyncError`. The adapter now tracks synced keys and converts subsequent inserts to updates.
30 changes: 28 additions & 2 deletions packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,12 @@ function createElectricSync<T extends Row<unknown>>(
syncMode === `progressive` && !hasReceivedUpToDate
const bufferedMessages: Array<Message<T>> = [] // Buffer change messages during initial sync

// Track keys that have been synced to handle overlapping subset queries.
// When multiple subset queries return the same row, the server sends `insert`
// for each response. We convert subsequent inserts to updates to avoid
// duplicate key errors when the row's data has changed between requests.
const syncedKeys = new Set<string | number>()

/**
* Process a change message: handle tags and write the mutation
*/
Expand All @@ -1249,14 +1255,28 @@ function createElectricSync<T extends Row<unknown>>(
const rowId = collection.getKeyFromItem(changeMessage.value)
const operation = changeMessage.headers.operation

if (operation === `delete`) {
// Track synced keys and handle overlapping subset queries.
// When multiple subset queries return the same row, the server sends
// `insert` for each response. We convert subsequent inserts to updates
// to avoid duplicate key errors when the row's data has changed.
const isDelete = operation === `delete`
const isDuplicateInsert =
operation === `insert` && syncedKeys.has(rowId)

if (isDelete) {
syncedKeys.delete(rowId)
} else {
syncedKeys.add(rowId)
}

if (isDelete) {
clearTagsForRow(rowId)
} else if (hasTags) {
processTagsForChangeMessage(tags, removedTags, rowId)
}

write({
type: changeMessage.headers.operation,
type: isDuplicateInsert ? `update` : operation,
value: changeMessage.value,
// Include the primary key and relation info in the metadata
metadata: {
Expand Down Expand Up @@ -1392,6 +1412,9 @@ function createElectricSync<T extends Row<unknown>>(
// Clear tag tracking state
clearTagTrackingState()

// Clear synced keys tracking since we're starting fresh
syncedKeys.clear()

// Reset the loadSubset deduplication state since we're starting fresh
// This ensures that previously loaded predicates don't prevent refetching after truncate
loadSubsetDedupe?.reset()
Expand Down Expand Up @@ -1419,6 +1442,9 @@ function createElectricSync<T extends Row<unknown>>(
// Clear tag tracking state for atomic swap
clearTagTrackingState()

// Clear synced keys tracking for atomic swap
syncedKeys.clear()

// Apply all buffered change messages and extract txids/snapshots
for (const bufferedMsg of bufferedMessages) {
if (isChangeMessage(bufferedMsg)) {
Expand Down
316 changes: 316 additions & 0 deletions packages/electric-db-collection/tests/electric.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2844,6 +2844,322 @@ describe(`Electric Integration`, () => {
})
})

// Tests for overlapping subset queries with duplicate keys
describe(`Overlapping subset queries with duplicate keys`, () => {
it(`should convert duplicate inserts to updates when overlapping subset queries return the same row with different values`, () => {
// This test reproduces the issue where:
// 1. Multiple subset queries return the same row (e.g., different WHERE clauses that both match the same record)
// 2. The server sends `insert` operations for each response
// 3. If the row's data changed between requests (e.g., timestamp field updated), this caused a DuplicateKeySyncError

const config = {
id: `duplicate-insert-test`,
shapeOptions: {
url: `http://test-url`,
params: {
table: `test_table`,
},
},
getKey: (item: Row) => item.id as number,
startSync: true,
}

const testCollection = createCollection(electricCollectionOptions(config))

// First subset query returns a row
subscriber([
{
key: `1`,
value: { id: 1, name: `User 1`, updated_at: `2024-01-01T00:00:00Z` },
headers: { operation: `insert` },
},
{
headers: { control: `up-to-date` },
},
])

// Verify initial data is present
expect(testCollection.has(1)).toBe(true)
expect(testCollection.get(1)).toEqual({
id: 1,
name: `User 1`,
updated_at: `2024-01-01T00:00:00Z`,
})

// Second subset query returns the SAME row but with a different timestamp
// This would throw DuplicateKeySyncError without the fix because:
// 1. The key already exists in syncedData
// 2. The value is different (timestamp changed)
// 3. Without the Electric adapter converting insert->update, sync.ts throws
expect(() => {
subscriber([
{
key: `1`,
value: {
id: 1,
name: `User 1`,
updated_at: `2024-01-01T00:00:01Z`,
}, // Different timestamp!
headers: { operation: `insert` },
},
{
headers: { control: `up-to-date` },
},
])
}).not.toThrow()

// The row should be updated with the new value
expect(testCollection.has(1)).toBe(true)
expect(testCollection.get(1)).toEqual({
id: 1,
name: `User 1`,
updated_at: `2024-01-01T00:00:01Z`,
})
})

it(`should handle multiple duplicate inserts across several batches`, () => {
const config = {
id: `multiple-duplicate-inserts-test`,
shapeOptions: {
url: `http://test-url`,
params: {
table: `test_table`,
},
},
getKey: (item: Row) => item.id as number,
startSync: true,
}

const testCollection = createCollection(electricCollectionOptions(config))

// First batch - initial inserts
subscriber([
{
key: `1`,
value: { id: 1, name: `User 1`, version: 1 },
headers: { operation: `insert` },
},
{
key: `2`,
value: { id: 2, name: `User 2`, version: 1 },
headers: { operation: `insert` },
},
{
headers: { control: `up-to-date` },
},
])

expect(testCollection.size).toBe(2)
expect(testCollection.get(1)).toEqual({
id: 1,
name: `User 1`,
version: 1,
})
expect(testCollection.get(2)).toEqual({
id: 2,
name: `User 2`,
version: 1,
})

// Second batch - overlapping subset query returns same rows with different values
subscriber([
{
key: `1`,
value: { id: 1, name: `User 1`, version: 2 }, // version changed
headers: { operation: `insert` },
},
{
key: `2`,
value: { id: 2, name: `User 2`, version: 2 }, // version changed
headers: { operation: `insert` },
},
{
key: `3`,
value: { id: 3, name: `User 3`, version: 1 }, // new row
headers: { operation: `insert` },
},
{
headers: { control: `up-to-date` },
},
])

// All rows should be present with updated values
expect(testCollection.size).toBe(3)
expect(testCollection.get(1)).toEqual({
id: 1,
name: `User 1`,
version: 2,
})
expect(testCollection.get(2)).toEqual({
id: 2,
name: `User 2`,
version: 2,
})
expect(testCollection.get(3)).toEqual({
id: 3,
name: `User 3`,
version: 1,
})
})

it(`should reset synced keys tracking on must-refetch`, () => {
const config = {
id: `must-refetch-reset-test`,
shapeOptions: {
url: `http://test-url`,
params: {
table: `test_table`,
},
},
getKey: (item: Row) => item.id as number,
startSync: true,
}

const testCollection = createCollection(electricCollectionOptions(config))

// Initial sync
subscriber([
{
key: `1`,
value: { id: 1, name: `User 1` },
headers: { operation: `insert` },
},
{
headers: { control: `up-to-date` },
},
])

expect(testCollection.has(1)).toBe(true)

// Trigger must-refetch (clears collection and syncedKeys tracking)
subscriber([
{
headers: { control: `must-refetch` },
},
])

// After must-refetch, sending the same key as insert should work
// because syncedKeys tracking was cleared
subscriber([
{
key: `1`,
value: { id: 1, name: `User 1 After Refetch` },
headers: { operation: `insert` },
},
{
headers: { control: `up-to-date` },
},
])

expect(testCollection.has(1)).toBe(true)
expect(testCollection.get(1)).toEqual({
id: 1,
name: `User 1 After Refetch`,
})
})

it(`should handle delete followed by insert of the same key`, () => {
const config = {
id: `delete-then-insert-test`,
shapeOptions: {
url: `http://test-url`,
params: {
table: `test_table`,
},
},
getKey: (item: Row) => item.id as number,
startSync: true,
}

const testCollection = createCollection(electricCollectionOptions(config))

// Initial insert
subscriber([
{
key: `1`,
value: { id: 1, name: `User 1` },
headers: { operation: `insert` },
},
{
headers: { control: `up-to-date` },
},
])

expect(testCollection.has(1)).toBe(true)

// Delete the row
subscriber([
{
key: `1`,
value: { id: 1 },
headers: { operation: `delete` },
},
{
headers: { control: `up-to-date` },
},
])

expect(testCollection.has(1)).toBe(false)

// Re-insert the same key - should work because delete cleared the syncedKeys tracking
subscriber([
{
key: `1`,
value: { id: 1, name: `User 1 Recreated` },
headers: { operation: `insert` },
},
{
headers: { control: `up-to-date` },
},
])

expect(testCollection.has(1)).toBe(true)
expect(testCollection.get(1)).toEqual({ id: 1, name: `User 1 Recreated` })
})

it(`should handle duplicate inserts within the same batch`, () => {
const config = {
id: `same-batch-duplicate-test`,
shapeOptions: {
url: `http://test-url`,
params: {
table: `test_table`,
},
},
getKey: (item: Row) => item.id as number,
startSync: true,
}

const testCollection = createCollection(electricCollectionOptions(config))

// Single batch with duplicate inserts for the same key
// This can happen when multiple subset responses are batched together
subscriber([
{
key: `1`,
value: { id: 1, name: `User 1`, version: 1 },
headers: { operation: `insert` },
},
{
key: `1`,
value: { id: 1, name: `User 1`, version: 2 }, // Same key, different value
headers: { operation: `insert` },
},
{
headers: { control: `up-to-date` },
},
])

// Should have the latest value
expect(testCollection.has(1)).toBe(true)
expect(testCollection.get(1)).toEqual({
id: 1,
name: `User 1`,
version: 2,
})
})
})

// Tests for commit and ready behavior with snapshot-end and up-to-date messages
describe(`Commit and ready behavior`, () => {
it(`should ignore snapshot-end before first up-to-date in progressive mode`, () => {
Expand Down
Loading