@@ -3,26 +3,31 @@ import {
33 createBaseLogger ,
44 createLogger ,
55 DEFAULT_CRUD_UPLOAD_THROTTLE_MS ,
6- DEFAULT_STREAMING_SYNC_OPTIONS ,
76 SqliteBucketStorage ,
87 SyncStatus
98} from '@powersync/common' ;
109import {
10+ OpenAsyncDatabaseConnection ,
1111 SharedWebStreamingSyncImplementation ,
1212 SharedWebStreamingSyncImplementationOptions ,
13+ WASqliteConnection ,
1314 WebRemote
1415} from '@powersync/web' ;
15-
16- import { beforeAll , describe , expect , it , vi } from 'vitest' ;
16+ import * as Comlink from 'comlink' ;
17+ import { beforeAll , describe , expect , it , onTestFinished , vi } from 'vitest' ;
18+ import { LockedAsyncDatabaseAdapter } from '../src/db/adapters/LockedAsyncDatabaseAdapter' ;
1719import { WebDBAdapter } from '../src/db/adapters/WebDBAdapter' ;
20+ import { WorkerWrappedAsyncDatabaseConnection } from '../src/db/adapters/WorkerWrappedAsyncDatabaseConnection' ;
1821import { TestConnector } from './utils/MockStreamOpenFactory' ;
1922import { generateTestDb , testSchema } from './utils/testDb' ;
2023
24+ const DB_FILENAME = 'test-multiple-instances.db' ;
25+
2126describe ( 'Multiple Instances' , { sequential : true } , ( ) => {
2227 const openDatabase = ( ) =>
2328 generateTestDb ( {
2429 database : {
25- dbFilename : `test-multiple-instances.db`
30+ dbFilename : DB_FILENAME
2631 } ,
2732 schema : testSchema
2833 } ) ;
@@ -99,6 +104,106 @@ describe('Multiple Instances', { sequential: true }, () => {
99104 await createAsset ( powersync2 ) ;
100105 } ) ;
101106
107+ it ( 'should handled interrupted transactions' , { timeout : Infinity } , async ( ) => {
108+ //Create a shared PowerSync database. We'll just use this for internally managing connections.
109+ const powersync = openDatabase ( ) ;
110+ await powersync . init ( ) ;
111+
112+ // Now get a shared connection to the same database
113+ const webAdapter = powersync . database as WebDBAdapter ;
114+
115+ // Allow us to share the connection. This is what shared sync workers will use.
116+ const shared = await webAdapter . shareConnection ( ) ;
117+ const config = webAdapter . getConfiguration ( ) ;
118+ const opener = Comlink . wrap < OpenAsyncDatabaseConnection > ( shared . port ) ;
119+
120+ // Open up a shared connection
121+ const initialSharedConnection = ( await opener ( config ) ) as Comlink . Remote < WASqliteConnection > ;
122+ onTestFinished ( async ( ) => {
123+ await initialSharedConnection . close ( ) ;
124+ } ) ;
125+
126+ // This will simulate another subsequent shared connection
127+ const subsequentSharedConnection = ( await opener ( config ) ) as Comlink . Remote < WASqliteConnection > ;
128+ onTestFinished ( async ( ) => {
129+ await subsequentSharedConnection . close ( ) ;
130+ } ) ;
131+
132+ // In the beginning, we should not be in a transaction
133+ const isAutoCommit = await initialSharedConnection . isAutoCommit ( ) ;
134+ // Should be true initially
135+ expect ( isAutoCommit ) . true ;
136+
137+ // Now we'll simulate the locked connections which are used by the shared sync worker
138+ const wrappedInitialSharedConnection = new WorkerWrappedAsyncDatabaseConnection ( {
139+ baseConnection : initialSharedConnection ,
140+ identifier : DB_FILENAME ,
141+ remoteCanCloseUnexpectedly : true ,
142+ remote : opener
143+ } ) ;
144+
145+ // Wrap the second connection in a locked adapter, this simulates the actual use case
146+ const lockedInitialConnection = new LockedAsyncDatabaseAdapter ( {
147+ name : DB_FILENAME ,
148+ openConnection : async ( ) => wrappedInitialSharedConnection
149+ } ) ;
150+
151+ // Allows us to unblock a transaction which is awaiting a promise
152+ let unblockTransaction : ( ( ) => void ) | undefined ;
153+
154+ // Start a transaction that will be interrupted
155+ const transactionPromise = lockedInitialConnection . writeTransaction ( async ( tx ) => {
156+ // Transaction should be started now
157+
158+ // Wait till we are unblocked. Keep this transaction open.
159+ await new Promise < void > ( ( resolve ) => {
160+ unblockTransaction = resolve ;
161+ } ) ;
162+
163+ // This should throw if the db was closed
164+ await tx . get ( 'SELECT 1' ) ;
165+ } ) ;
166+
167+ // Wait for the transaction to have started
168+ await vi . waitFor ( ( ) => expect ( unblockTransaction ) . toBeDefined ( ) , { timeout : 2000 } ) ;
169+
170+ // Since we're in a transaction from above
171+ expect ( await initialSharedConnection . isAutoCommit ( ) ) . false ;
172+
173+ // The in-use connection should be closed now
174+ // This simulates a tab being closed.
175+ await wrappedInitialSharedConnection . close ( ) ;
176+ wrappedInitialSharedConnection . markRemoteClosed ( ) ;
177+
178+ // The transaction should be unblocked now
179+ unblockTransaction ?.( ) ;
180+
181+ // Since we closed while in the transaction, the execution call should have thrown
182+ await expect ( transactionPromise ) . rejects . toThrow ( 'Called operation on closed remote' ) ;
183+
184+ // It will still be false until we request a new hold
185+ // Requesting a new hold will cleanup the previous transaction.
186+ expect ( await subsequentSharedConnection . isAutoCommit ( ) ) . false ;
187+
188+ // Allows us to simulate a new locked shared connection.
189+ const lockedSubsequentConnection = new LockedAsyncDatabaseAdapter ( {
190+ name : DB_FILENAME ,
191+ openConnection : async ( ) =>
192+ new WorkerWrappedAsyncDatabaseConnection ( {
193+ baseConnection : subsequentSharedConnection ,
194+ identifier : DB_FILENAME ,
195+ remoteCanCloseUnexpectedly : true ,
196+ remote : opener
197+ } )
198+ } ) ;
199+
200+ // Starting a new transaction should work cleanup the old and work as expected
201+ await lockedSubsequentConnection . writeTransaction ( async ( tx ) => {
202+ await tx . get ( 'SELECT 1' ) ;
203+ expect ( await subsequentSharedConnection . isAutoCommit ( ) ) . false ;
204+ } ) ;
205+ } ) ;
206+
102207 it ( 'should watch table changes between instances' , async ( ) => {
103208 const db1 = openDatabase ( ) ;
104209 const db2 = openDatabase ( ) ;
0 commit comments