diff --git a/CHANGELOG.md b/CHANGELOG.md index a97931b1..d5fca5b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,20 @@ - Sync options: `newClientImplementation` is now the default. - Make `androidx.sqlite:sqlite-bundled` an API dependency of `:core` to avoid toolchain warnings. +- Add `DispatchStrategy` API for customizing how database operations are dispatched to coroutine contexts. + By default, operations use `Dispatchers.IO`, but you can now provide a custom `CoroutineDispatcher` or + a fully custom `DispatchFunction` for complete control over the execution context. + + ```kotlin + // Use default (Dispatchers.IO) + PowerSyncDatabase(factory, schema) + + // Use a specific dispatcher + PowerSyncDatabase(factory, schema, dispatchStrategy = DispatchStrategy.Dispatcher(Dispatchers.Default)) + + // Use a custom function + PowerSyncDatabase(factory, schema, dispatchStrategy = DispatchStrategy.Custom(myCustomFunction)) + ``` ## 1.8.1 @@ -389,4 +403,4 @@ params = params * Replaced default Logger with [Kermit Logger](https://kermit.touchlab.co/) which allows users to more easily use and/or change Logger settings * Add `retryDelay` and `crudThrottle` options when setting up database connection -* Changed `_viewNameOverride` to `viewNameOverride` +* Changed `_viewNameOverride` to `viewNameOverride` \ No newline at end of file diff --git a/common/src/commonMain/kotlin/com/powersync/DispatchStrategy.kt b/common/src/commonMain/kotlin/com/powersync/DispatchStrategy.kt new file mode 100644 index 00000000..3b90965a --- /dev/null +++ b/common/src/commonMain/kotlin/com/powersync/DispatchStrategy.kt @@ -0,0 +1,109 @@ +package com.powersync + +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.IO +import kotlinx.coroutines.withContext + +/** + * Function interface for dispatching database operations to a specific coroutine context. + * + * By default, operations are dispatched to [Dispatchers.IO]. Custom implementations + * can be provided to control the execution context of database operations. + * + * This interface supports the `operator invoke` syntax, allowing you to call it like: + * ``` + * dispatchFunction { /* your code */ } + * ``` + * + * **Design Note:** This must be an interface (not a function type) because Kotlin does not + * support function types with generic type parameters. Since the dispatch function needs to + * accept and return generic types ``, an interface with an `operator invoke` method is + * the appropriate solution. This allows the same convenient syntax as function types while + * supporting generics. + * + * @see DispatchStrategy for dispatch strategy options + */ +public interface DispatchFunction { + /** + * Dispatches the given block to the appropriate coroutine context. + * + * @param block The suspend function to execute in the dispatch context. + * @return The result of executing the block. + */ + public suspend operator fun invoke(block: suspend () -> R): R +} + +/** + * Strategy for dispatching database operations to a specific coroutine context. + * + * This sealed class allows you to specify how database operations should be dispatched: + * - [Default]: Use the default dispatcher ([Dispatchers.IO]) + * - [Dispatcher]: Use a specific [CoroutineDispatcher] + * - [Custom]: Use a custom [DispatchFunction] for full control + * + * Each variant provides a [dispatchFunction] that implements the actual dispatching logic. + * + * Example usage: + * ``` + * // Use default (Dispatchers.IO) - this is the default if not specified + * PowerSyncDatabase(factory, schema) + * // or explicitly: + * PowerSyncDatabase(factory, schema, dispatchStrategy = DispatchStrategy.Default) + * + * // Use a specific dispatcher + * PowerSyncDatabase(factory, schema, dispatchStrategy = DispatchStrategy.Dispatcher(Dispatchers.Default)) + * + * // Use a custom function + * PowerSyncDatabase(factory, schema, dispatchStrategy = DispatchStrategy.Custom(myCustomFunction)) + * ``` + * + * @see DispatchFunction for the dispatch function interface + */ +public sealed class DispatchStrategy { + /** + * Returns the [DispatchFunction] that implements the dispatching logic for this strategy. + */ + public abstract val dispatchFunction: DispatchFunction + + /** + * Use the default dispatcher ([Dispatchers.IO]) for database operations. + * + * This is the recommended default for most use cases, as it provides + * a dedicated thread pool for I/O-bound operations. + */ + public object Default : DispatchStrategy() { + override val dispatchFunction: DispatchFunction = + Dispatcher(Dispatchers.IO).dispatchFunction + } + + /** + * Use a specific [CoroutineDispatcher] for database operations. + * + * This allows you to use any coroutine dispatcher, such as: + * - [Dispatchers.Default] for CPU-bound work + * - [Dispatchers.Main] for UI operations + * - A custom dispatcher for your specific needs + * + * @property dispatcher The coroutine dispatcher to use. + */ + public data class Dispatcher( + val dispatcher: CoroutineDispatcher, + ) : DispatchStrategy() { + override val dispatchFunction: DispatchFunction = + object : DispatchFunction { + override suspend fun invoke(block: suspend () -> R): R = withContext(dispatcher) { block() } + } + } + + /** + * Use a custom [DispatchFunction] for full control over dispatching. + * + * @property function The custom dispatch function to use. + */ + public data class Custom( + val function: DispatchFunction, + ) : DispatchStrategy() { + override val dispatchFunction: DispatchFunction = function + } +} diff --git a/common/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt b/common/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt index 42271ebe..f93a0499 100644 --- a/common/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt +++ b/common/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt @@ -252,9 +252,10 @@ public interface PowerSyncDatabase : Queries { schema: Schema, identifier: String, logger: Logger, + dispatchStrategy: DispatchStrategy = DispatchStrategy.Default, ): PowerSyncDatabase { val group = ActiveDatabaseGroup.referenceDatabase(logger, identifier) - return openedWithGroup(pool, scope, schema, logger, group) + return openedWithGroup(pool, scope, schema, logger, group, dispatchStrategy) } /** @@ -268,11 +269,13 @@ public interface PowerSyncDatabase : Queries { schema: Schema, scope: CoroutineScope, logger: Logger? = null, + dispatchStrategy: DispatchStrategy = DispatchStrategy.Default, ): PowerSyncDatabase { val logger = generateLogger(logger) // Since this returns a fresh in-memory database every time, use a fresh group to avoid warnings about the // same database being opened multiple times. - val collection = ActiveDatabaseGroup.GroupsCollection().referenceDatabase(logger, "test") + val collection = + ActiveDatabaseGroup.GroupsCollection().referenceDatabase(logger, "test") return openedWithGroup( SingleConnectionPool(factory.openInMemoryConnection()), @@ -280,6 +283,7 @@ public interface PowerSyncDatabase : Queries { schema, logger, collection, + dispatchStrategy, ) } @@ -289,6 +293,7 @@ public interface PowerSyncDatabase : Queries { schema: Schema, logger: Logger, group: Pair, + dispatchStrategy: DispatchStrategy = DispatchStrategy.Default, ): PowerSyncDatabase = PowerSyncDatabaseImpl( schema, @@ -296,6 +301,7 @@ public interface PowerSyncDatabase : Queries { pool, logger, group, + dispatchStrategy, ) } } diff --git a/common/src/commonMain/kotlin/com/powersync/PowerSyncDatabaseFactory.kt b/common/src/commonMain/kotlin/com/powersync/PowerSyncDatabaseFactory.kt index 824b6e04..26560e54 100644 --- a/common/src/commonMain/kotlin/com/powersync/PowerSyncDatabaseFactory.kt +++ b/common/src/commonMain/kotlin/com/powersync/PowerSyncDatabaseFactory.kt @@ -25,6 +25,7 @@ public fun PowerSyncDatabase( dbFilename: String = DEFAULT_DB_FILENAME, scope: CoroutineScope = GlobalScope, logger: Logger? = null, + dispatchStrategy: DispatchStrategy = DispatchStrategy.Default, /** * Optional database file directory path. * This parameter is ignored for iOS. @@ -40,10 +41,10 @@ public fun PowerSyncDatabase( scope = scope, logger = generatedLogger, dbDirectory = dbDirectory, + dispatchStrategy = dispatchStrategy, ) } -@OptIn(ExperimentalPowerSyncAPI::class) internal fun createPowerSyncDatabaseImpl( factory: PersistentConnectionFactory, schema: Schema, @@ -51,6 +52,7 @@ internal fun createPowerSyncDatabaseImpl( scope: CoroutineScope, logger: Logger, dbDirectory: String?, + dispatchStrategy: DispatchStrategy = DispatchStrategy.Default, ): PowerSyncDatabaseImpl { val identifier = dbDirectory + dbFilename val activeDatabaseGroup = ActiveDatabaseGroup.referenceDatabase(logger, identifier) @@ -72,5 +74,6 @@ internal fun createPowerSyncDatabaseImpl( schema, logger, activeDatabaseGroup, + dispatchStrategy, ) as PowerSyncDatabaseImpl } diff --git a/common/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/common/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index bf23a5af..afb009dd 100644 --- a/common/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/common/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -1,6 +1,7 @@ package com.powersync.db import co.touchlab.kermit.Logger +import com.powersync.DispatchStrategy import com.powersync.ExperimentalPowerSyncAPI import com.powersync.PowerSyncDatabase import com.powersync.PowerSyncException @@ -61,6 +62,7 @@ internal class PowerSyncDatabaseImpl( pool: SQLiteConnectionPool, val logger: Logger, private val activeDatabaseGroup: Pair, + dispatchStrategy: DispatchStrategy, ) : PowerSyncDatabase { companion object { internal val streamConflictMessage = @@ -79,7 +81,7 @@ internal class PowerSyncDatabaseImpl( private val resource = activeDatabaseGroup.first private val streams = StreamTracker(this) - private val internalDb = InternalDatabaseImpl(pool, logger) + private val internalDb = InternalDatabaseImpl(pool, logger, dispatchStrategy = dispatchStrategy) internal val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger) @@ -391,7 +393,7 @@ internal class PowerSyncDatabaseImpl( override suspend fun readTransaction(callback: ThrowableTransactionCallback): R { waitReady() - return internalDb.writeTransaction(callback) + return internalDb.readTransaction(callback) } override suspend fun writeLock(callback: ThrowableLockCallback): R { diff --git a/common/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/common/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index d1185c8b..6d1135ee 100644 --- a/common/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/common/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -1,6 +1,8 @@ package com.powersync.db.internal import co.touchlab.kermit.Logger +import com.powersync.DispatchFunction +import com.powersync.DispatchStrategy import com.powersync.ExperimentalPowerSyncAPI import com.powersync.db.SqlCursor import com.powersync.db.ThrowableLockCallback @@ -11,8 +13,6 @@ import com.powersync.db.runWrapped import com.powersync.utils.AtomicMutableSet import com.powersync.utils.JsonUtil import com.powersync.utils.throttle -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.IO import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.emitAll @@ -20,17 +20,14 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onSubscription import kotlinx.coroutines.flow.transform -import kotlinx.coroutines.withContext import kotlin.time.Duration.Companion.milliseconds @OptIn(ExperimentalPowerSyncAPI::class) internal class InternalDatabaseImpl( private val pool: SQLiteConnectionPool, private val logger: Logger, + dispatchStrategy: DispatchStrategy, ) : InternalDatabase { - // Could be scope.coroutineContext, but the default is GlobalScope, which seems like a bad idea. To discuss. - private val dbContext = Dispatchers.IO - override suspend fun execute( sql: String, parameters: List?, @@ -39,8 +36,10 @@ internal class InternalDatabaseImpl( context.execute(sql, parameters) } + private val dispatch: DispatchFunction = dispatchStrategy.dispatchFunction + override suspend fun updateSchema(schemaJson: String) { - withContext(dbContext) { + dispatch { runWrapped { pool.withAllConnections { writer, readers -> writer.runTransaction { tx -> @@ -167,7 +166,7 @@ internal class InternalDatabaseImpl( */ @OptIn(ExperimentalPowerSyncAPI::class) private suspend fun internalReadLock(callback: suspend (SQLiteConnectionLease) -> R): R = - withContext(dbContext) { + dispatch { runWrapped { useConnection(true) { connection -> callback(connection) @@ -189,7 +188,7 @@ internal class InternalDatabaseImpl( @OptIn(ExperimentalPowerSyncAPI::class) private suspend fun internalWriteLock(callback: suspend (SQLiteConnectionLease) -> R): R = - withContext(dbContext) { + dispatch { pool.write { writer -> runWrapped { callback(writer) diff --git a/internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/pool/SwiftSQLiteConnectionPool.kt b/internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/pool/SwiftSQLiteConnectionPool.kt index d5988732..077ba81c 100644 --- a/internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/pool/SwiftSQLiteConnectionPool.kt +++ b/internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/pool/SwiftSQLiteConnectionPool.kt @@ -1,6 +1,8 @@ package com.powersync.pool import co.touchlab.kermit.Logger +import com.powersync.DispatchFunction +import com.powersync.DispatchStrategy import com.powersync.PowerSyncDatabase import com.powersync.db.driver.SQLiteConnectionLease import com.powersync.db.driver.SQLiteConnectionPool @@ -104,4 +106,13 @@ public fun openPowerSyncWithPool( schema = schema, identifier = identifier, logger = logger, + dispatchStrategy = + DispatchStrategy.Custom( + object : DispatchFunction { + override suspend fun invoke(block: suspend () -> R): R { + // We leave the dispatching up to the pool + return block() + } + }, + ), )