diff --git a/.changeset/fix-isnull-predicate-subset.md b/.changeset/fix-isnull-predicate-subset.md new file mode 100644 index 000000000..80d843c77 --- /dev/null +++ b/.changeset/fix-isnull-predicate-subset.md @@ -0,0 +1,6 @@ +--- +'@tanstack/db': patch +'@tanstack/electric-db-collection': patch +--- + +Fix isNull predicate causing LiveQuery to never become ready when offline. Reorder predicate checks in `isWhereSubsetInternal` so OR superset handling runs before AND subset decomposition, allowing `and(eq, isNull)` to match structurally equal disjuncts. Also separate `forceDisconnectAndRefresh` error handling into its own try-catch with correct error attribution. diff --git a/packages/db/src/query/predicate-utils.ts b/packages/db/src/query/predicate-utils.ts index 96162e868..4483d44ae 100644 --- a/packages/db/src/query/predicate-utils.ts +++ b/packages/db/src/query/predicate-utils.ts @@ -87,6 +87,24 @@ function isWhereSubsetInternal( ) } + // Handle OR in subset: (A OR B) ⊆ C only if both A ⊆ C and B ⊆ C. + // Must be checked before OR superset so that or(A, B) ⊆ or(C, D) + // decomposes the subset first: A ⊆ or(C, D) AND B ⊆ or(C, D). + if (subset.type === `func` && subset.name === `or`) { + return subset.args.every((arg) => + isWhereSubsetInternal(arg as BasicExpression, superset), + ) + } + + // Handle OR in superset: subset ⊆ (A OR B) if subset ⊆ A or subset ⊆ B. + // Must be checked before decomposing AND subsets so that and(A, B) can + // match a structurally equal disjunct via areExpressionsEqual. + if (superset.type === `func` && superset.name === `or`) { + return superset.args.some((arg) => + isWhereSubsetInternal(subset, arg as BasicExpression), + ) + } + // Handle subset being an AND: (A AND B) implies both A and B if (subset.type === `func` && subset.name === `and`) { // For (A AND B) ⊆ C, since (A AND B) implies A, we check if any conjunct implies C @@ -111,22 +129,6 @@ function isWhereSubsetInternal( } } - // Handle OR in subset: (A OR B) is subset of C only if both A and B are subsets of C - if (subset.type === `func` && subset.name === `or`) { - return subset.args.every((arg) => - isWhereSubsetInternal(arg as BasicExpression, superset), - ) - } - - // Handle OR in superset: subset ⊆ (A OR B) if subset ⊆ A or subset ⊆ B - // (A OR B) as superset means data can satisfy A or B - // If subset is contained in any disjunct, it's contained in the union - if (superset.type === `func` && superset.name === `or`) { - return superset.args.some((arg) => - isWhereSubsetInternal(subset, arg as BasicExpression), - ) - } - // Handle comparison operators on the same field if (subset.type === `func` && superset.type === `func`) { const subsetFunc = subset as Func diff --git a/packages/db/tests/query/predicate-utils.test.ts b/packages/db/tests/query/predicate-utils.test.ts index 0cb14ec20..1f47eef23 100644 --- a/packages/db/tests/query/predicate-utils.test.ts +++ b/packages/db/tests/query/predicate-utils.test.ts @@ -345,6 +345,71 @@ describe(`isWhereSubset`, () => { }) }) + describe(`AND subset with OR superset`, () => { + it(`should recognize and(eq, isNull) as subset of or(and(eq, isNull), and(eq, isNull))`, () => { + const projectX = `4e164373-31b4-4b42-95c9-9c395cfb4916` + const projectY = `2fd4c147-2547-4b02-9554-9cd067187409` + + const queryX = and( + eq(ref(`project_id`), val(projectX)), + func(`isNull`, ref(`soft_deleted_at`)), + ) + const queryY = and( + eq(ref(`project_id`), val(projectY)), + func(`isNull`, ref(`soft_deleted_at`)), + ) + + const unionPredicate = or(queryX, queryY) + + expect(isWhereSubset(queryX, unionPredicate)).toBe(true) + expect(isWhereSubset(queryY, unionPredicate)).toBe(true) + }) + + it(`should recognize and(A, B) as subset of or(and(A, B), and(C, D))`, () => { + const subsetExpr = and(eq(ref(`id`), val(1)), gt(ref(`age`), val(20))) + const supersetExpr = or( + and(eq(ref(`id`), val(1)), gt(ref(`age`), val(20))), + and(eq(ref(`id`), val(2)), gt(ref(`age`), val(30))), + ) + expect(isWhereSubset(subsetExpr, supersetExpr)).toBe(true) + }) + + it(`should return false when and(A, B) matches no disjunct`, () => { + const subsetExpr = and(eq(ref(`id`), val(3)), gt(ref(`age`), val(20))) + const supersetExpr = or( + and(eq(ref(`id`), val(1)), gt(ref(`age`), val(20))), + and(eq(ref(`id`), val(2)), gt(ref(`age`), val(30))), + ) + expect(isWhereSubset(subsetExpr, supersetExpr)).toBe(false) + }) + }) + + describe(`isNull predicates`, () => { + it(`should return true for identical isNull expressions`, () => { + const a = func(`isNull`, ref(`deleted_at`)) + const b = func(`isNull`, ref(`deleted_at`)) + expect(isWhereSubset(a, b)).toBe(true) + }) + + it(`should return false for isNull on different fields`, () => { + const a = func(`isNull`, ref(`deleted_at`)) + const b = func(`isNull`, ref(`created_at`)) + expect(isWhereSubset(a, b)).toBe(false) + }) + + it(`should return true for and(eq, isNull) subset of identical and(eq, isNull)`, () => { + const subset = and( + eq(ref(`project_id`), val(`abc`)), + func(`isNull`, ref(`soft_deleted_at`)), + ) + const superset = and( + eq(ref(`project_id`), val(`abc`)), + func(`isNull`, ref(`soft_deleted_at`)), + ) + expect(isWhereSubset(subset, superset)).toBe(true) + }) + }) + describe(`different fields`, () => { it(`should return false for different fields with no relationship`, () => { expect( diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 261069db9..1197e7734 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -431,6 +431,25 @@ function createLoadSubsetDedupe>({ const { cursor, where, orderBy, limit } = opts + // When the stream is already up-to-date, it may be in a long-poll wait. + // Forcing a disconnect-and-refresh ensures requestSnapshot gets a response + // from a fresh server round-trip rather than waiting for the current poll to end. + // If the refresh fails (e.g., PauseLock held during subscriber processing in + // join pipelines), we fall through to requestSnapshot which still works. + if (stream.isUpToDate) { + try { + await stream.forceDisconnectAndRefresh() + } catch (error) { + if (handleSnapshotError(error, `forceDisconnectAndRefresh`)) { + return + } + debug( + `${logPrefix}forceDisconnectAndRefresh failed, proceeding to requestSnapshot: %o`, + error, + ) + } + } + try { if (cursor) { const whereCurrentOpts: LoadSubsetOptions = { diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index f48d2905e..8ed73a863 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -21,10 +21,13 @@ import type { StandardSchemaV1 } from '@standard-schema/spec' const mockSubscribe = vi.fn() const mockRequestSnapshot = vi.fn() const mockFetchSnapshot = vi.fn() +const mockForceDisconnectAndRefresh = vi.fn() const mockStream = { subscribe: mockSubscribe, requestSnapshot: mockRequestSnapshot, fetchSnapshot: mockFetchSnapshot, + forceDisconnectAndRefresh: mockForceDisconnectAndRefresh, + isUpToDate: false, } vi.mock(`@electric-sql/client`, async () => { @@ -56,6 +59,8 @@ describe(`Electric Integration`, () => { // Reset mock requestSnapshot mockRequestSnapshot.mockResolvedValue(undefined) + mockForceDisconnectAndRefresh.mockResolvedValue(undefined) + mockStream.isUpToDate = false // Create collection with Electric configuration const config = { @@ -2376,7 +2381,7 @@ describe(`Electric Integration`, () => { // In on-demand mode, calling loadSubset should request a snapshot await testCollection._sync.loadSubset({ limit: 10 }) - // Verify requestSnapshot was called + expect(mockForceDisconnectAndRefresh).not.toHaveBeenCalled() expect(mockRequestSnapshot).toHaveBeenCalledWith( expect.objectContaining({ limit: 10, @@ -2385,6 +2390,65 @@ describe(`Electric Integration`, () => { ) }) + it(`should refresh the stream before requesting on-demand snapshots when already up-to-date`, async () => { + vi.clearAllMocks() + + const config = { + id: `on-demand-refresh-before-snapshot-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + syncMode: `on-demand` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + mockStream.isUpToDate = true + + await testCollection._sync.loadSubset({ limit: 10 }) + + expect(mockForceDisconnectAndRefresh).toHaveBeenCalledTimes(1) + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) + const refreshCall = + mockForceDisconnectAndRefresh.mock.invocationCallOrder[0]! + const snapshotCall = mockRequestSnapshot.mock.invocationCallOrder[0]! + expect(refreshCall).toBeLessThan(snapshotCall) + }) + + it(`should fall through to requestSnapshot when forceDisconnectAndRefresh fails`, async () => { + vi.clearAllMocks() + + const config = { + id: `on-demand-refresh-fallthrough-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + syncMode: `on-demand` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + mockStream.isUpToDate = true + mockForceDisconnectAndRefresh.mockImplementationOnce(async () => { + throw new Error(`PauseLock held`) + }) + + await testCollection._sync.loadSubset({ limit: 10 }) + + expect(mockForceDisconnectAndRefresh).toHaveBeenCalledTimes(1) + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) + }) + it(`should fetch snapshots in progressive mode when loadSubset is called before sync completes`, async () => { vi.clearAllMocks()