diff --git a/.changeset/dirty-papayas-attend.md b/.changeset/dirty-papayas-attend.md new file mode 100644 index 000000000..f46948972 --- /dev/null +++ b/.changeset/dirty-papayas-attend.md @@ -0,0 +1,5 @@ +--- +'@tanstack/offline-transactions': minor +--- + +Enrich the OfflineConfig interface with an onlineDetector option and Check online status before executing transactions. diff --git a/packages/offline-transactions/README.md b/packages/offline-transactions/README.md index 334125a60..4238160d4 100644 --- a/packages/offline-transactions/README.md +++ b/packages/offline-transactions/README.md @@ -5,6 +5,7 @@ Offline-first transaction capabilities for TanStack DB that provides durable per ## Features - **Outbox Pattern**: Persist mutations before dispatch for zero data loss +- **Offline Detection**: Skip retries when offline, auto-resume when connectivity restored - **Automatic Retry**: Exponential backoff with jitter for failed transactions - **Multi-tab Coordination**: Leader election ensures safe storage access - **FIFO Sequential Processing**: Transactions execute one at a time in creation order @@ -98,9 +99,39 @@ interface OfflineConfig { beforeRetry?: (transactions: OfflineTransaction[]) => OfflineTransaction[] onUnknownMutationFn?: (name: string, tx: OfflineTransaction) => void onLeadershipChange?: (isLeader: boolean) => void + onlineDetector?: OnlineDetector +} + +interface OnlineDetector { + subscribe: (callback: () => void) => () => void + notifyOnline: () => void + isOnline: () => boolean + dispose: () => void } ``` +### onlineDetector + +By default, `onlineDetector` is `undefined` and the system will use the built-in `DefaultOnlineDetector`. + +**How it works:** +- Provides an `isOnline()` method to check connectivity status before executing transactions + +**Transactions are skipped when offline** +- Avoid unnecessary retry attempts +- Allows subscribers to be notified when connectivity is restored, triggering pending transaction execution + +**DefaultOnlineDetector behavior:** +- Uses the browser's `navigator.onLine` API to detect online/offline state +- Automatically triggers transaction execution on these events: + - `online` event (browser detects network connection) + - `visibilitychange` event (when tab becomes visible) + +**Manual trigger:** +- `notifyOnline()` method can be used to manually trigger transaction execution +- Only succeeds if `isOnline()` returns `true` + + ### OfflineExecutor #### Properties @@ -113,7 +144,7 @@ interface OfflineConfig { - `waitForTransactionCompletion(id)` - Wait for a specific transaction to complete - `removeFromOutbox(id)` - Manually remove transaction from outbox - `peekOutbox()` - View all pending transactions -- `notifyOnline()` - Manually trigger retry execution +- `notifyOnline()` - Manually trigger transaction execution (only succeeds if online) - `dispose()` - Clean up resources ### Error Handling @@ -137,6 +168,55 @@ const mutationFn = async ({ transaction }) => { ## Advanced Usage +### Custom Online Detector + +By default, the executor uses the browser's `navigator.onLine` API to detect connectivity. You can provide a custom detector for more sophisticated detection logic: + +```typescript +class CustomOnlineDetector implements OnlineDetector { + private listeners = new Set<() => void>() + private online = true + + constructor() { + // Poll your API endpoint to check connectivity + setInterval(async () => { + try { + await fetch('/api/health', { method: 'HEAD' }) + const wasOffline = !this.online + this.online = true + if (wasOffline) { + this.notifyOnline() + } + } catch { + this.online = false + } + }, 60000) + } + + isOnline(): boolean { + return this.online + } + + subscribe(callback: () => void): () => void { + this.listeners.add(callback) + return () => this.listeners.delete(callback) + } + + notifyOnline(): void { + this.listeners.forEach((cb) => cb()) + } + + dispose(): void { + this.listeners.clear() + } +} + +const executor = startOfflineExecutor({ + onlineDetector: new CustomOnlineDetector(), + // ... other config +}) +``` + ### Custom Storage Adapter ```typescript diff --git a/packages/offline-transactions/src/OfflineExecutor.ts b/packages/offline-transactions/src/OfflineExecutor.ts index 8ae8bd375..8e5886c61 100644 --- a/packages/offline-transactions/src/OfflineExecutor.ts +++ b/packages/offline-transactions/src/OfflineExecutor.ts @@ -30,6 +30,7 @@ import type { OfflineConfig, OfflineMode, OfflineTransaction, + OnlineDetector, StorageAdapter, StorageDiagnostic, } from './types' @@ -44,7 +45,7 @@ export class OfflineExecutor { private scheduler: KeyScheduler private executor: TransactionExecutor | null private leaderElection: LeaderElection | null - private onlineDetector: DefaultOnlineDetector + private onlineDetector: OnlineDetector private isLeaderState = false private unsubscribeOnline: (() => void) | null = null private unsubscribeLeadership: (() => void) | null = null @@ -71,7 +72,11 @@ export class OfflineExecutor { constructor(config: OfflineConfig) { this.config = config this.scheduler = new KeyScheduler() - this.onlineDetector = new DefaultOnlineDetector() + + // Initialize onlineDetector based on config + // undefined = use DefaultOnlineDetector (default) + // custom = user-provided detector + this.onlineDetector = config.onlineDetector ?? new DefaultOnlineDetector() // Initialize as pending - will be set by async initialization this.storage = null @@ -259,6 +264,7 @@ export class OfflineExecutor { this.outbox, this.config, this, + this.onlineDetector, ) this.leaderElection = this.createLeaderElection() @@ -485,7 +491,7 @@ export class OfflineExecutor { return this.executor.getRunningCount() } - getOnlineDetector(): DefaultOnlineDetector { + getOnlineDetector(): OnlineDetector { return this.onlineDetector } diff --git a/packages/offline-transactions/src/connectivity/OnlineDetector.ts b/packages/offline-transactions/src/connectivity/OnlineDetector.ts index 9ebd587bf..14a1deb50 100644 --- a/packages/offline-transactions/src/connectivity/OnlineDetector.ts +++ b/packages/offline-transactions/src/connectivity/OnlineDetector.ts @@ -70,6 +70,10 @@ export class DefaultOnlineDetector implements OnlineDetector { } notifyOnline(): void { + if (!this.isOnline()) { + console.info('notifyOnline called while offline, skipping notification') + return + } this.notifyListeners() } diff --git a/packages/offline-transactions/src/executor/TransactionExecutor.ts b/packages/offline-transactions/src/executor/TransactionExecutor.ts index 441a87d88..540670cc6 100644 --- a/packages/offline-transactions/src/executor/TransactionExecutor.ts +++ b/packages/offline-transactions/src/executor/TransactionExecutor.ts @@ -3,7 +3,11 @@ import { NonRetriableError } from '../types' import { withNestedSpan } from '../telemetry/tracer' import type { KeyScheduler } from './KeyScheduler' import type { OutboxManager } from '../outbox/OutboxManager' -import type { OfflineConfig, OfflineTransaction } from '../types' +import type { + OfflineConfig, + OfflineTransaction, + OnlineDetector, +} from '../types' const HANDLED_EXECUTION_ERROR = Symbol(`HandledExecutionError`) @@ -16,18 +20,21 @@ export class TransactionExecutor { private executionPromise: Promise | null = null private offlineExecutor: any // Reference to OfflineExecutor for signaling private retryTimer: ReturnType | null = null + private onlineDetector: OnlineDetector constructor( scheduler: KeyScheduler, outbox: OutboxManager, config: OfflineConfig, offlineExecutor: any, + onlineDetector: OnlineDetector, ) { this.scheduler = scheduler this.outbox = outbox this.config = config this.retryPolicy = new DefaultRetryPolicy(10, config.jitter ?? true) this.offlineExecutor = offlineExecutor + this.onlineDetector = onlineDetector } async execute(transaction: OfflineTransaction): Promise { @@ -54,6 +61,11 @@ export class TransactionExecutor { private async runExecution(): Promise { const maxConcurrency = this.config.maxConcurrency ?? 3 + // Check online status before executing transactions + if (!this.onlineDetector.isOnline()) { + return + } + while (this.scheduler.getPendingCount() > 0) { const batch = this.scheduler.getNextBatch(maxConcurrency) @@ -223,6 +235,7 @@ export class TransactionExecutor { filteredTransactions = this.config.beforeRetry(transactions) } + for (const transaction of filteredTransactions) { this.scheduler.schedule(transaction) } diff --git a/packages/offline-transactions/src/types.ts b/packages/offline-transactions/src/types.ts index 8cf18cc88..670955c48 100644 --- a/packages/offline-transactions/src/types.ts +++ b/packages/offline-transactions/src/types.ts @@ -88,7 +88,7 @@ export interface StorageDiagnostic { } export interface OfflineConfig { - // eslint-disable-next-line @typescript-eslint/no-explicit-any + collections: Record> mutationFns: Record storage?: StorageAdapter @@ -101,6 +101,7 @@ export interface OfflineConfig { onLeadershipChange?: (isLeader: boolean) => void onStorageFailure?: (diagnostic: StorageDiagnostic) => void leaderElection?: LeaderElection + onlineDetector?: OnlineDetector } export interface StorageAdapter { @@ -126,6 +127,8 @@ export interface LeaderElection { export interface OnlineDetector { subscribe: (callback: () => void) => () => void notifyOnline: () => void + isOnline: () => boolean + dispose: () => void } export interface CreateOfflineTransactionOptions { diff --git a/packages/offline-transactions/tests/harness.ts b/packages/offline-transactions/tests/harness.ts index 7f8dc82ad..50670cf9f 100644 --- a/packages/offline-transactions/tests/harness.ts +++ b/packages/offline-transactions/tests/harness.ts @@ -5,6 +5,7 @@ import type { LeaderElection, OfflineConfig, OfflineMutationFnParams, + OnlineDetector, StorageAdapter, } from '../src/types' @@ -92,6 +93,36 @@ class FakeLeaderElection implements LeaderElection { } } +export class FakeOnlineDetector implements OnlineDetector { + private listeners = new Set<() => void>() + online = true + + isOnline(): boolean { + return this.online + } + + subscribe(callback: () => void): () => void { + this.listeners.add(callback) + return () => { + this.listeners.delete(callback) + } + } + + notifyOnline(): void { + for (const listener of this.listeners) { + try { + listener() + } catch (error) { + console.warn(`FakeOnlineDetector listener error:`, error) + } + } + } + + dispose(): void { + this.listeners.clear() + } +} + type TestMutationFn = ( params: OfflineMutationFnParams & { attempt: number }, ) => Promise @@ -243,6 +274,7 @@ export function createTestOfflineEnvironment( onUnknownMutationFn: options.config?.onUnknownMutationFn, onLeadershipChange: options.config?.onLeadershipChange, leaderElection: options.config?.leaderElection ?? leader, + onlineDetector: options.config?.onlineDetector, } const executor = startOfflineExecutor(config) diff --git a/packages/offline-transactions/tests/offline-e2e.test.ts b/packages/offline-transactions/tests/offline-e2e.test.ts index b4f9442df..c348fa297 100644 --- a/packages/offline-transactions/tests/offline-e2e.test.ts +++ b/packages/offline-transactions/tests/offline-e2e.test.ts @@ -1,6 +1,10 @@ import { describe, expect, it } from 'vitest' import { NonRetriableError } from '../src/types' -import { FakeStorageAdapter, createTestOfflineEnvironment } from './harness' +import { + FakeOnlineDetector, + FakeStorageAdapter, + createTestOfflineEnvironment, +} from './harness' import type { TestItem } from './harness' import type { OfflineMutationFnParams } from '../src/types' import type { PendingMutation } from '@tanstack/db' @@ -428,6 +432,7 @@ describe(`offline executor end-to-end`, () => { // Give the executor a moment to react to leadership change await flushMicrotasks() + // Should not be offline enabled when not leader expect(env.executor.isOfflineEnabled).toBe(false) @@ -466,4 +471,149 @@ describe(`offline executor end-to-end`, () => { env.executor.dispose() }) + + it(`accepts a custom OnlineDetector via config`, async () => { + const customDetector = new FakeOnlineDetector() + + const env = createTestOfflineEnvironment({ + config: { + onlineDetector: customDetector, + }, + }) + + await env.waitForLeader() + + + // Verify the executor uses the custom detector + expect(env.executor.getOnlineDetector()).toBe(customDetector) + + env.executor.dispose() + }) + + it(`uses custom OnlineDetector instead of default`, async () => { + const customDetector = new FakeOnlineDetector() + + // Set detector to offline initially + customDetector.online = false + + const env = createTestOfflineEnvironment({ + config: { + onlineDetector: customDetector, + }, + }) + + await env.waitForLeader() + + // Create a transaction while offline + const offlineTx = env.executor.createOfflineTransaction({ + mutationFnName: env.mutationFnName, + autoCommit: false, + }) + + offlineTx.mutate(() => { + env.collection.insert({ + id: `custom-detector-item`, + value: `test`, + completed: false, + updatedAt: new Date(), + }) + }) + + const commitPromise = offlineTx.commit() + await flushMicrotasks() + + // Transaction should not execute because custom detector reports offline + expect(env.mutationCalls.length).toBe(0) + + // Verify transaction is in outbox + let outboxEntries = await env.executor.peekOutbox() + expect(outboxEntries.length).toBe(1) + + // Set detector back online and notify + customDetector.online = true + customDetector.notifyOnline() + + // Wait for transaction to execute + await waitUntil(() => env.mutationCalls.length >= 1) + + await commitPromise + + // Verify transaction completed + outboxEntries = await env.executor.peekOutbox() + expect(outboxEntries).toEqual([]) + expect(env.serverState.get(`custom-detector-item`)?.value).toBe(`test`) + + env.executor.dispose() + }) + + it(`queues transaction created offline, then executes when going online`, async () => { + const customDetector = new FakeOnlineDetector() + + // Start offline + customDetector.online = false + + const env = createTestOfflineEnvironment({ + config: { + onlineDetector: customDetector, + }, + }) + + await env.waitForLeader() + + // Create and commit transaction while offline + const offlineTx = env.executor.createOfflineTransaction({ + mutationFnName: env.mutationFnName, + autoCommit: false, + }) + + const now = new Date() + offlineTx.mutate(() => { + env.collection.insert({ + id: `offline-queued`, + value: `will-sync-later`, + completed: false, + updatedAt: now, + }) + }) + + const commitPromise = offlineTx.commit() + await flushMicrotasks() + + // Transaction should not have executed yet (offline) + expect(env.mutationCalls.length).toBe(0) + + // Transaction should be in outbox (queued) + let outboxEntries = await env.executor.peekOutbox() + expect(outboxEntries.length).toBe(1) + expect(outboxEntries[0].id).toBe(offlineTx.id) + + // Local state should have the optimistic update + expect(env.collection.get(`offline-queued`)?.value).toBe(`will-sync-later`) + + // Server should not have it yet + expect(env.serverState.get(`offline-queued`)).toBeUndefined() + + // Go back online + customDetector.online = true + customDetector.notifyOnline() + + // Wait for the transaction to execute + await waitUntil(() => env.mutationCalls.length >= 1) + + // Commit promise should resolve + await commitPromise + + // Verify transaction executed successfully + expect(env.mutationCalls.length).toBe(1) + + // Transaction should be removed from outbox + outboxEntries = await env.executor.peekOutbox() + expect(outboxEntries).toEqual([]) + + // Both local and server should have the data + expect(env.collection.get(`offline-queued`)?.value).toBe(`will-sync-later`) + expect(env.serverState.get(`offline-queued`)?.value).toBe(`will-sync-later`) + + env.executor.dispose() + }) })