From 29040ab1fc719306f2e0b27546d031a89afd2e56 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 1 Dec 2025 11:02:39 +0100 Subject: [PATCH 1/6] Use RSocket for clients without backpressure support --- common/build.gradle.kts | 2 + .../powersync/sync/UserAgent.commonJava.kt | 5 + .../com/powersync/sync/RSocketSupport.kt | 145 ++++++++++++++++++ .../com/powersync/sync/StreamingSync.kt | 44 ++++-- .../kotlin/com/powersync/sync/SyncOptions.kt | 12 +- .../kotlin/com/powersync/sync/UserAgent.kt | 2 + .../com/powersync/sync/UserAgent.native.kt | 10 ++ gradle/libs.versions.toml | 5 +- 8 files changed, 206 insertions(+), 19 deletions(-) create mode 100644 common/src/commonJava/kotlin/com/powersync/sync/UserAgent.commonJava.kt create mode 100644 common/src/commonMain/kotlin/com/powersync/sync/RSocketSupport.kt diff --git a/common/build.gradle.kts b/common/build.gradle.kts index 400b7322..88318210 100644 --- a/common/build.gradle.kts +++ b/common/build.gradle.kts @@ -165,6 +165,8 @@ kotlin { implementation(libs.kotlinx.datetime) implementation(libs.stately.concurrency) implementation(libs.configuration.annotations) + implementation(libs.rsocket.core) + implementation(libs.rsocket.transport.websocket) api(libs.ktor.client.core) api(libs.kermit) } diff --git a/common/src/commonJava/kotlin/com/powersync/sync/UserAgent.commonJava.kt b/common/src/commonJava/kotlin/com/powersync/sync/UserAgent.commonJava.kt new file mode 100644 index 00000000..4ce5a513 --- /dev/null +++ b/common/src/commonJava/kotlin/com/powersync/sync/UserAgent.commonJava.kt @@ -0,0 +1,5 @@ +package com.powersync.sync + +internal actual fun defaultClientImplementationSupportsBackpressure(): Boolean { + return true +} diff --git a/common/src/commonMain/kotlin/com/powersync/sync/RSocketSupport.kt b/common/src/commonMain/kotlin/com/powersync/sync/RSocketSupport.kt new file mode 100644 index 00000000..009a10c5 --- /dev/null +++ b/common/src/commonMain/kotlin/com/powersync/sync/RSocketSupport.kt @@ -0,0 +1,145 @@ +package com.powersync.sync + +import com.powersync.ExperimentalPowerSyncAPI +import com.powersync.bucket.PowerSyncControlArguments +import com.powersync.connectors.PowerSyncCredentials +import com.powersync.utils.JsonUtil +import io.ktor.client.HttpClient +import io.ktor.client.plugins.websocket.webSocketSession +import io.ktor.http.URLBuilder +import io.ktor.http.URLProtocol +import io.ktor.http.takeFrom +import io.rsocket.kotlin.core.RSocketConnector +import io.rsocket.kotlin.keepalive.KeepAlive +import io.rsocket.kotlin.payload.PayloadMimeType +import io.rsocket.kotlin.payload.buildPayload +import io.rsocket.kotlin.payload.data +import io.rsocket.kotlin.payload.metadata +import io.rsocket.kotlin.transport.RSocketClientTarget +import io.rsocket.kotlin.transport.RSocketConnection +import io.rsocket.kotlin.transport.RSocketTransportApi +import io.rsocket.kotlin.transport.ktor.websocket.internal.KtorWebSocketConnection +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.IO +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.map +import kotlinx.io.readByteArray +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonElement +import kotlin.coroutines.CoroutineContext +import kotlin.time.Duration.Companion.seconds + +/** + * Connects to the RSocket endpoint for receiving sync lines. + * + * Note that we reconstruct the transport layer for RSocket by opening a WebSocket connection + * manually instead of using the high-level RSocket Ktor integration. + * The reason is that every request to the sync service needs its own metadata and data payload + * (e.g. to transmit the token), but the Ktor integration only supports setting a single payload for + * the entire client. + */ +@OptIn(RSocketTransportApi::class, ExperimentalPowerSyncAPI::class) +internal fun HttpClient.rSocketSyncStream( + userAgent: String, + req: JsonElement, + credentials: PowerSyncCredentials, +): Flow = + flow { + val flowContext = currentCoroutineContext() + + val websocketUri = + URLBuilder(credentials.endpointUri("sync/stream")).apply { + protocol = + when (protocolOrNull) { + URLProtocol.HTTP -> URLProtocol.WS + else -> URLProtocol.WSS + } + } + + // Note: We're using a custom connector here because we need to set options for each request + // without creating a new HTTP client each time. The recommended approach would be to add an + // RSocket extension to the HTTP client, but that only allows us to set the SETUP metadata for + // all connections (bad because we need a short-lived token in there). + // https://github.com/rsocket/rsocket-kotlin/issues/311 + val target = + object : RSocketClientTarget { + @RSocketTransportApi + override suspend fun connectClient(): RSocketConnection { + val ws = + webSocketSession { + url.takeFrom(websocketUri) + } + return KtorWebSocketConnection(ws) + } + + override val coroutineContext: CoroutineContext + get() = flowContext + } + + val connector = + RSocketConnector { + connectionConfig { + payloadMimeType = + PayloadMimeType( + metadata = "application/json", + data = "application/json", + ) + + setupPayload { + buildPayload { + data("{}") + metadata( + JsonUtil.json.encodeToString( + ConnectionSetupMetadata( + token = "Bearer ${credentials.token}", + userAgent = userAgent, + ), + ), + ) + } + } + + keepAlive = KeepAlive(interval = 20.0.seconds, maxLifetime = 30.0.seconds) + } + } + + val rSocket = connector.connect(target) + emit(PowerSyncControlArguments.ConnectionEstablished) + val syncStream = + rSocket.requestStream( + buildPayload { + data(JsonUtil.json.encodeToString(req)) + metadata(JsonUtil.json.encodeToString(RequestStreamMetadata("/sync/stream"))) + }, + ) + + emitAll(syncStream.map { + PowerSyncControlArguments.BinaryLine(it.data.readByteArray()) + }.flowOn(Dispatchers.IO)) + emit(PowerSyncControlArguments.ResponseStreamEnd) + } + +/** + * The metadata payload we need to use when connecting with RSocket. + * + * This corresponds to `RSocketContextMeta` on the sync service. + */ +@Serializable +private class ConnectionSetupMetadata( + val token: String, + @SerialName("user_agent") + val userAgent: String, +) + +/** + * The metadata payload we send for the `REQUEST_STREAM` frame. + */ +@Serializable +private class RequestStreamMetadata( + val path: String, +) diff --git a/common/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt b/common/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt index 10997a34..4a1e782c 100644 --- a/common/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt +++ b/common/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt @@ -94,7 +94,11 @@ public fun HttpClientConfig<*>.configureSyncHttpClient(userAgent: String = userA socketTimeoutMillis = SOCKET_TIMEOUT } install(ContentNegotiation) - install(WebSockets) + install(WebSockets) { + // RSocket Frames (Header + Payload) MUST be limited to 16,777,215 bytes, regardless of whether the utilized + // transport protocol requires the Frame Length field: https://github.com/rsocket/rsocket/blob/master/Protocol.md#max-frame-size + maxFrameSize = 16_777_215 + 1024 // + some extra, you never know + } install(DefaultRequest) { headers { @@ -130,10 +134,7 @@ internal class StreamingSyncClient( when (val config = options.clientConfiguration) { is SyncClientConfiguration.ExtendedConfig -> createClient(options.userAgent, config.block) - is SyncClientConfiguration.ExistingClient -> config.client - - null -> createClient(options.userAgent) } private fun createClient( @@ -324,21 +325,36 @@ internal class StreamingSyncClient( } } - private fun receiveTextOrBinaryLines(req: JsonElement): Flow = - flow { - connectToSyncEndpoint(req, supportBson = false) { isBson, response -> - emit(PowerSyncControlArguments.ConnectionEstablished) - val body = response.body() + private fun receiveTextOrBinaryLines(req: JsonElement): Flow { + // If we can use streamed HTTP responses that respect backpressure, prefer to do that. + return if (options.clientConfiguration.supportsBackpressure) { + flow { + connectToSyncEndpoint(req, supportBson = false) { isBson, response -> + emit(PowerSyncControlArguments.ConnectionEstablished) + val body = response.body() - if (isBson) { - emitAll(body.bsonObjects().map { PowerSyncControlArguments.BinaryLine(it) }) - } else { - emitAll(body.lines().map { PowerSyncControlArguments.TextLine(it) }) + if (isBson) { + emitAll(body.bsonObjects().map { PowerSyncControlArguments.BinaryLine(it) }) + } else { + emitAll(body.lines().map { PowerSyncControlArguments.TextLine(it) }) + } + + emit(PowerSyncControlArguments.ResponseStreamEnd) } + } + } else { + // Use RSocket as a fallback to ensure we have backpressure on platforms that don't support it natively. + flow { + val credentials = requireNotNull(connector.getCredentialsCached()) { "Not logged in" } - emit(PowerSyncControlArguments.ResponseStreamEnd) + emitAll(httpClient.rSocketSyncStream( + userAgent = options.userAgent, + credentials = credentials, + req = req, + )) } } + } private suspend fun streamingSyncIteration(): SyncIterationResult = coroutineScope { diff --git a/common/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt b/common/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt index 3cad9022..b73b2613 100644 --- a/common/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt +++ b/common/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt @@ -1,10 +1,8 @@ package com.powersync.sync -import com.powersync.ExperimentalPowerSyncAPI import com.powersync.PowerSyncDatabase import io.ktor.client.HttpClient import io.ktor.client.HttpClientConfig -import kotlin.experimental.ExperimentalObjCRefinement import kotlin.native.HiddenFromObjC /** @@ -12,13 +10,18 @@ import kotlin.native.HiddenFromObjC * the HTTP client used to connect to the PowerSync service. */ public sealed class SyncClientConfiguration { + internal abstract val supportsBackpressure: Boolean + /** * Extends the default Ktor [HttpClient] configuration with the provided block. */ @HiddenFromObjC public class ExtendedConfig( public val block: HttpClientConfig<*>.() -> Unit, - ) : SyncClientConfiguration() + ) : SyncClientConfiguration() { + override val supportsBackpressure: Boolean + get() = defaultClientImplementationSupportsBackpressure() + } /** * Provides an existing [HttpClient] instance to use for connecting to the PowerSync service. @@ -30,6 +33,7 @@ public sealed class SyncClientConfiguration { @HiddenFromObjC public class ExistingClient( public val client: HttpClient, + override val supportsBackpressure: Boolean = true ) : SyncClientConfiguration() } @@ -52,7 +56,7 @@ public class SyncOptions( /** * Allows configuring the [HttpClient] used for connecting to the PowerSync service. */ - public val clientConfiguration: SyncClientConfiguration? = null, + public val clientConfiguration: SyncClientConfiguration = SyncClientConfiguration.ExtendedConfig {}, /** * Whether streams that have been defined with `auto_subscribe: true` should be synced even * when they don't have an explicit subscription. diff --git a/common/src/commonMain/kotlin/com/powersync/sync/UserAgent.kt b/common/src/commonMain/kotlin/com/powersync/sync/UserAgent.kt index 3f19a300..85a3bd67 100644 --- a/common/src/commonMain/kotlin/com/powersync/sync/UserAgent.kt +++ b/common/src/commonMain/kotlin/com/powersync/sync/UserAgent.kt @@ -1,3 +1,5 @@ package com.powersync.sync internal expect fun userAgent(): String + +internal expect fun defaultClientImplementationSupportsBackpressure(): Boolean diff --git a/common/src/nativeMain/kotlin/com/powersync/sync/UserAgent.native.kt b/common/src/nativeMain/kotlin/com/powersync/sync/UserAgent.native.kt index eff15b7f..3933a881 100644 --- a/common/src/nativeMain/kotlin/com/powersync/sync/UserAgent.native.kt +++ b/common/src/nativeMain/kotlin/com/powersync/sync/UserAgent.native.kt @@ -6,3 +6,13 @@ import kotlin.experimental.ExperimentalNativeApi @OptIn(ExperimentalNativeApi::class) internal actual fun userAgent(): String = "PowerSync Kotlin SDK v$LIBRARY_VERSION (running on ${Platform.cpuArchitecture.name} ${Platform.osFamily.name})" + +@OptIn(ExperimentalNativeApi::class) +internal actual fun defaultClientImplementationSupportsBackpressure(): Boolean { + return when (Platform.osFamily) { + // The NSURLSession API does not properly support backpressure for streaming HTTP responses. While it is + // possible to call suspend() and resume() on NSURLSessionTask, those are ignored sometimes. + OsFamily.IOS, OsFamily.WATCHOS, OsFamily.TVOS, OsFamily.MACOSX -> false + else -> true + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 1ff0962d..4a625dbd 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -16,9 +16,10 @@ coroutines = "1.10.2" kotlinx-datetime = "0.7.1" serialization = "1.9.0" kotlinx-io = "0.8.0" -ktor = "3.2.3" +ktor = "3.3.3" uuid = "0.8.4" powersync-core = "0.4.8" +rsocket = "0.20.0" turbine = "1.2.1" kotest = "5.9.1" # we can't upgrade to 6.x because that requires Java 11 or above (we need Java 8 support) @@ -95,6 +96,8 @@ ktor-serialization-json = { module = "io.ktor:ktor-serialization-kotlinx-json", kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "serialization" } kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "coroutines" } kotlinx-coroutines-swing = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-swing", version.ref = "coroutines" } +rsocket-core = { module = "io.rsocket.kotlin:rsocket-core", version.ref = "rsocket" } +rsocket-transport-websocket = { module = "io.rsocket.kotlin:rsocket-transport-ktor-websocket-internal", version.ref = "rsocket" } stately-concurrency = { module = "co.touchlab:stately-concurrency", version.ref = "stately" } supabase-client = { module = "io.github.jan-tennert.supabase:postgrest-kt", version.ref = "supabase" } From 3ae508b7db8a299af745cf809f2c25e7700ea6c3 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 1 Dec 2025 11:45:38 +0100 Subject: [PATCH 2/6] Opt-in on demand --- .../internal/InternalPowerSyncAPI.kt | 11 +++ .../internal/PlatformSpecificQuirks.kt | 16 ++++ .../com/powersync/sync/StreamingSync.kt | 64 ++------------ .../kotlin/com/powersync/sync/SyncOptions.kt | 7 +- .../kotlin/com/powersync/sync/UserAgent.kt | 2 - .../powersync/sync/configureSyncHttpClient.kt | 83 +++++++++++++++++++ .../com/powersync/sync/UserAgent.native.kt | 10 --- .../powersync/DatabaseDriverFactory.native.kt | 15 ++++ .../src/appleMain/kotlin/com/powersync/SDK.kt | 13 +++ 9 files changed, 146 insertions(+), 75 deletions(-) create mode 100644 common/src/commonMain/kotlin/com/powersync/internal/InternalPowerSyncAPI.kt create mode 100644 common/src/commonMain/kotlin/com/powersync/internal/PlatformSpecificQuirks.kt create mode 100644 common/src/commonMain/kotlin/com/powersync/sync/configureSyncHttpClient.kt diff --git a/common/src/commonMain/kotlin/com/powersync/internal/InternalPowerSyncAPI.kt b/common/src/commonMain/kotlin/com/powersync/internal/InternalPowerSyncAPI.kt new file mode 100644 index 00000000..a78ad368 --- /dev/null +++ b/common/src/commonMain/kotlin/com/powersync/internal/InternalPowerSyncAPI.kt @@ -0,0 +1,11 @@ +package com.powersync.internal + +@RequiresOptIn(message = "This API is internal to PowerSync and should never be used outside of the SDK.") +@Retention(AnnotationRetention.BINARY) +@Target( + AnnotationTarget.CLASS, + AnnotationTarget.FUNCTION, + AnnotationTarget.CONSTRUCTOR, + AnnotationTarget.PROPERTY, +) +public annotation class InternalPowerSyncAPI diff --git a/common/src/commonMain/kotlin/com/powersync/internal/PlatformSpecificQuirks.kt b/common/src/commonMain/kotlin/com/powersync/internal/PlatformSpecificQuirks.kt new file mode 100644 index 00000000..4e3c49c8 --- /dev/null +++ b/common/src/commonMain/kotlin/com/powersync/internal/PlatformSpecificQuirks.kt @@ -0,0 +1,16 @@ +package com.powersync.internal + +import io.ktor.client.engine.HttpClientEngineConfig +import kotlin.concurrent.atomics.AtomicReference +import kotlin.concurrent.atomics.ExperimentalAtomicApi + +@OptIn(ExperimentalAtomicApi::class) +@InternalPowerSyncAPI +public val httpClientIsKnownToNotSupportBackpressure: AtomicReference<((HttpClientEngineConfig) -> Boolean)?> = AtomicReference(null) + +@OptIn(ExperimentalAtomicApi::class, InternalPowerSyncAPI::class) +internal val HttpClientEngineConfig.isKnownToNotSupportBackpressure: Boolean + get() { + val check = httpClientIsKnownToNotSupportBackpressure.load() ?: return false + return check(this) + } diff --git a/common/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt b/common/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt index 4a1e782c..dd58d00e 100644 --- a/common/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt +++ b/common/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt @@ -16,15 +16,11 @@ import com.powersync.db.SubscriptionGroup import com.powersync.db.crud.CrudEntry import com.powersync.db.schema.Schema import com.powersync.db.schema.toSerializable -import com.powersync.sync.StreamingSyncClient.Companion.SOCKET_TIMEOUT import com.powersync.utils.JsonUtil import io.ktor.client.HttpClient import io.ktor.client.HttpClientConfig import io.ktor.client.call.body -import io.ktor.client.plugins.DefaultRequest -import io.ktor.client.plugins.HttpTimeout -import io.ktor.client.plugins.contentnegotiation.ContentNegotiation -import io.ktor.client.plugins.websocket.WebSockets +import io.ktor.client.plugins.pluginOrNull import io.ktor.client.request.accept import io.ktor.client.request.get import io.ktor.client.request.headers @@ -64,48 +60,8 @@ import kotlinx.io.readIntLe import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.encodeToJsonElement -import kotlin.experimental.ExperimentalObjCRefinement -import kotlin.native.HiddenFromObjC import kotlin.time.Clock -/** - * This API is experimental and may change in future releases. - * - * Configures a [HttpClient] for PowerSync sync operations. - * Configures required plugins and default request headers. - * - * This is currently only necessary when using a [SyncClientConfiguration.ExistingClient] for PowerSync - * network requests. - * - * Example usage: - * - * ```kotlin - * val client = HttpClient() { - * configureSyncHttpClient() - * // Your own config here - * } - * ``` - */ -@OptIn(ExperimentalObjCRefinement::class) -@HiddenFromObjC -@ExperimentalPowerSyncAPI -public fun HttpClientConfig<*>.configureSyncHttpClient(userAgent: String = userAgent()) { - install(HttpTimeout) { - socketTimeoutMillis = SOCKET_TIMEOUT - } - install(ContentNegotiation) - install(WebSockets) { - // RSocket Frames (Header + Payload) MUST be limited to 16,777,215 bytes, regardless of whether the utilized - // transport protocol requires the Frame Length field: https://github.com/rsocket/rsocket/blob/master/Protocol.md#max-frame-size - maxFrameSize = 16_777_215 + 1024 // + some extra, you never know - } - - install(DefaultRequest) { - headers { - append("User-Agent", userAgent) - } - } -} @OptIn(ExperimentalPowerSyncAPI::class) internal class StreamingSyncClient( @@ -132,19 +88,13 @@ internal class StreamingSyncClient( private val httpClient: HttpClient = when (val config = options.clientConfiguration) { - is SyncClientConfiguration.ExtendedConfig -> - createClient(options.userAgent, config.block) + is SyncClientConfiguration.ExtendedConfig -> HttpClient { + configureSyncHttpClient(options.userAgent) + config.block(this) + } is SyncClientConfiguration.ExistingClient -> config.client } - private fun createClient( - userAgent: String, - additionalConfig: HttpClientConfig<*>.() -> Unit = {}, - ) = HttpClient { - configureSyncHttpClient(userAgent) - additionalConfig() - } - fun invalidateCredentials() { connector.invalidateCredentials() } @@ -326,8 +276,8 @@ internal class StreamingSyncClient( } private fun receiveTextOrBinaryLines(req: JsonElement): Flow { - // If we can use streamed HTTP responses that respect backpressure, prefer to do that. - return if (options.clientConfiguration.supportsBackpressure) { + return if (httpClient.pluginOrNull(DoesNotSupportBackpressureMarker) == null) { + // If we can use streamed HTTP responses that respect backpressure, prefer to do that. flow { connectToSyncEndpoint(req, supportBson = false) { isBson, response -> emit(PowerSyncControlArguments.ConnectionEstablished) diff --git a/common/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt b/common/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt index b73b2613..49d0c02f 100644 --- a/common/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt +++ b/common/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt @@ -10,7 +10,6 @@ import kotlin.native.HiddenFromObjC * the HTTP client used to connect to the PowerSync service. */ public sealed class SyncClientConfiguration { - internal abstract val supportsBackpressure: Boolean /** * Extends the default Ktor [HttpClient] configuration with the provided block. @@ -18,10 +17,7 @@ public sealed class SyncClientConfiguration { @HiddenFromObjC public class ExtendedConfig( public val block: HttpClientConfig<*>.() -> Unit, - ) : SyncClientConfiguration() { - override val supportsBackpressure: Boolean - get() = defaultClientImplementationSupportsBackpressure() - } + ) : SyncClientConfiguration() /** * Provides an existing [HttpClient] instance to use for connecting to the PowerSync service. @@ -33,7 +29,6 @@ public sealed class SyncClientConfiguration { @HiddenFromObjC public class ExistingClient( public val client: HttpClient, - override val supportsBackpressure: Boolean = true ) : SyncClientConfiguration() } diff --git a/common/src/commonMain/kotlin/com/powersync/sync/UserAgent.kt b/common/src/commonMain/kotlin/com/powersync/sync/UserAgent.kt index 85a3bd67..3f19a300 100644 --- a/common/src/commonMain/kotlin/com/powersync/sync/UserAgent.kt +++ b/common/src/commonMain/kotlin/com/powersync/sync/UserAgent.kt @@ -1,5 +1,3 @@ package com.powersync.sync internal expect fun userAgent(): String - -internal expect fun defaultClientImplementationSupportsBackpressure(): Boolean diff --git a/common/src/commonMain/kotlin/com/powersync/sync/configureSyncHttpClient.kt b/common/src/commonMain/kotlin/com/powersync/sync/configureSyncHttpClient.kt new file mode 100644 index 00000000..38df6119 --- /dev/null +++ b/common/src/commonMain/kotlin/com/powersync/sync/configureSyncHttpClient.kt @@ -0,0 +1,83 @@ +package com.powersync.sync + +import com.powersync.ExperimentalPowerSyncAPI +import com.powersync.internal.isKnownToNotSupportBackpressure +import com.powersync.sync.StreamingSyncClient.Companion.SOCKET_TIMEOUT +import io.ktor.client.HttpClient +import io.ktor.client.HttpClientConfig +import io.ktor.client.plugins.DefaultRequest +import io.ktor.client.plugins.HttpClientPlugin +import io.ktor.client.plugins.HttpTimeout +import io.ktor.client.plugins.contentnegotiation.ContentNegotiation +import io.ktor.client.plugins.websocket.WebSockets +import io.ktor.client.request.headers +import io.ktor.util.AttributeKey +import kotlin.experimental.ExperimentalObjCRefinement +import kotlin.native.HiddenFromObjC + +/** + * This API is experimental and may change in future releases. + * + * Configures a [HttpClient] for PowerSync sync operations. + * Configures required plugins and default request headers. + * + * This is currently only necessary when using a [SyncClientConfiguration.ExistingClient] for PowerSync + * network requests. + * + * Example usage: + * + * ```kotlin + * val client = HttpClient() { + * configureSyncHttpClient() + * // Your own config here + * } + * ``` + */ +@OptIn(ExperimentalObjCRefinement::class) +@HiddenFromObjC +@ExperimentalPowerSyncAPI +public fun HttpClientConfig<*>.configureSyncHttpClient(userAgent: String = userAgent()) { + val config = this@configureSyncHttpClient + var supportsBackpressure = true + + config.engine { + if (isKnownToNotSupportBackpressure) { + supportsBackpressure = false + } + } + + install(HttpTimeout) { + socketTimeoutMillis = SOCKET_TIMEOUT + } + install(ContentNegotiation) + if (!supportsBackpressure) { + install(DoesNotSupportBackpressureMarker) + + install(WebSockets) { + // RSocket Frames (Header + Payload) MUST be limited to 16,777,215 bytes, regardless of whether the utilized + // transport protocol requires the Frame Length field: https://github.com/rsocket/rsocket/blob/master/Protocol.md#max-frame-size + maxFrameSize = 16_777_215 + 1024 // + some extra, you never know + } + } + + install(DefaultRequest) { + headers { + append("User-Agent", userAgent) + } + } +} + +internal object DoesNotSupportBackpressureMarker: HttpClientPlugin { + override val key: AttributeKey = AttributeKey("DoesNotSupportBackpressureMarker") + + override fun prepare(block: Unit.() -> Unit): DoesNotSupportBackpressureMarker { + return DoesNotSupportBackpressureMarker + } + + override fun install( + plugin: DoesNotSupportBackpressureMarker, + scope: HttpClient + ) { + // Nothing to do, marker plugin. + } +} diff --git a/common/src/nativeMain/kotlin/com/powersync/sync/UserAgent.native.kt b/common/src/nativeMain/kotlin/com/powersync/sync/UserAgent.native.kt index 3933a881..eff15b7f 100644 --- a/common/src/nativeMain/kotlin/com/powersync/sync/UserAgent.native.kt +++ b/common/src/nativeMain/kotlin/com/powersync/sync/UserAgent.native.kt @@ -6,13 +6,3 @@ import kotlin.experimental.ExperimentalNativeApi @OptIn(ExperimentalNativeApi::class) internal actual fun userAgent(): String = "PowerSync Kotlin SDK v$LIBRARY_VERSION (running on ${Platform.cpuArchitecture.name} ${Platform.osFamily.name})" - -@OptIn(ExperimentalNativeApi::class) -internal actual fun defaultClientImplementationSupportsBackpressure(): Boolean { - return when (Platform.osFamily) { - // The NSURLSession API does not properly support backpressure for streaming HTTP responses. While it is - // possible to call suspend() and resume() on NSURLSessionTask, those are ignored sometimes. - OsFamily.IOS, OsFamily.WATCHOS, OsFamily.TVOS, OsFamily.MACOSX -> false - else -> true - } -} diff --git a/core/src/appleMain/kotlin/com/powersync/DatabaseDriverFactory.native.kt b/core/src/appleMain/kotlin/com/powersync/DatabaseDriverFactory.native.kt index b8a6e035..411cd694 100644 --- a/core/src/appleMain/kotlin/com/powersync/DatabaseDriverFactory.native.kt +++ b/core/src/appleMain/kotlin/com/powersync/DatabaseDriverFactory.native.kt @@ -1,10 +1,25 @@ package com.powersync import com.powersync.db.NativeConnectionFactory +import com.powersync.internal.InternalPowerSyncAPI +import com.powersync.internal.httpClientIsKnownToNotSupportBackpressure +import io.ktor.client.engine.HttpClientEngineConfig +import io.ktor.client.engine.darwin.DarwinClientEngineConfig +import kotlin.concurrent.atomics.ExperimentalAtomicApi +@OptIn(ExperimentalAtomicApi::class, InternalPowerSyncAPI::class) @Suppress(names = ["EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING"]) public actual class DatabaseDriverFactory : NativeConnectionFactory() { + init { + // Hack: Install apple-specific httpClientIsKnownToNotSupportBackpressure hook. + httpClientIsKnownToNotSupportBackpressure.compareAndSet(null, ::appleClientKnownNotSupportBackpressure) + } + actual override fun resolveDefaultDatabasePath(dbFilename: String): String = appleDefaultDatabasePath(dbFilename) } +private fun appleClientKnownNotSupportBackpressure(config: HttpClientEngineConfig): Boolean { + return config is DarwinClientEngineConfig +} + internal actual val inMemoryDriver: InMemoryConnectionFactory = DatabaseDriverFactory() diff --git a/internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/SDK.kt b/internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/SDK.kt index af504a24..1a0313b0 100644 --- a/internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/SDK.kt +++ b/internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/SDK.kt @@ -7,6 +7,8 @@ import androidx.sqlite.execSQL import com.powersync.bucket.StreamPriority import com.powersync.db.NativeConnectionFactory import com.powersync.db.crud.CrudTransaction +import com.powersync.internal.InternalPowerSyncAPI +import com.powersync.internal.httpClientIsKnownToNotSupportBackpressure import com.powersync.sync.SyncClientConfiguration import com.powersync.sync.SyncOptions import com.powersync.sync.SyncStatusData @@ -14,16 +16,23 @@ import com.powersync.sync.SyncStream import com.powersync.sync.SyncStreamDescription import com.powersync.sync.SyncStreamStatus import com.powersync.sync.SyncStreamSubscription +import io.ktor.client.engine.HttpClientEngineConfig +import io.ktor.client.engine.darwin.DarwinClientEngineConfig import io.ktor.client.plugins.logging.LogLevel import io.ktor.client.plugins.logging.Logging import kotlinx.coroutines.CancellationException import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.map +import kotlin.concurrent.atomics.ExperimentalAtomicApi import kotlin.time.Duration.Companion.seconds import io.ktor.client.plugins.logging.Logger as KtorLogger +@OptIn(ExperimentalAtomicApi::class, InternalPowerSyncAPI::class) public fun sqlite3DatabaseFactory(initialStatements: List): PersistentConnectionFactory { + // Hack: Install apple-specific httpClientIsKnownToNotSupportBackpressure hook. + httpClientIsKnownToNotSupportBackpressure.compareAndSet(null, ::appleClientKnownNotSupportBackpressure) + @OptIn(ExperimentalPowerSyncAPI::class) return object : NativeConnectionFactory() { override fun resolveDefaultDatabasePath(dbFilename: String): String = appleDefaultDatabasePath(dbFilename) @@ -47,6 +56,10 @@ public fun sqlite3DatabaseFactory(initialStatements: List): PersistentCo } } +private fun appleClientKnownNotSupportBackpressure(config: HttpClientEngineConfig): Boolean { + return config is DarwinClientEngineConfig +} + /** * Helper class designed to bridge SKIEE methods and allow them to throw * `PowerSyncException`. This is necessary because these exceptions cannot From ccfe730aa9c6e608daeac5bfe49a4a8d16eecf29 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 1 Dec 2025 11:56:11 +0100 Subject: [PATCH 3/6] Use optional plugin hack --- .../powersync/sync/UserAgent.commonJava.kt | 5 -- .../com/powersync/sync/StreamingSync.kt | 4 +- .../powersync/sync/configureSyncHttpClient.kt | 50 +++++++++---------- 3 files changed, 26 insertions(+), 33 deletions(-) delete mode 100644 common/src/commonJava/kotlin/com/powersync/sync/UserAgent.commonJava.kt diff --git a/common/src/commonJava/kotlin/com/powersync/sync/UserAgent.commonJava.kt b/common/src/commonJava/kotlin/com/powersync/sync/UserAgent.commonJava.kt deleted file mode 100644 index 4ce5a513..00000000 --- a/common/src/commonJava/kotlin/com/powersync/sync/UserAgent.commonJava.kt +++ /dev/null @@ -1,5 +0,0 @@ -package com.powersync.sync - -internal actual fun defaultClientImplementationSupportsBackpressure(): Boolean { - return true -} diff --git a/common/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt b/common/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt index dd58d00e..1e7e3a27 100644 --- a/common/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt +++ b/common/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt @@ -276,7 +276,9 @@ internal class StreamingSyncClient( } private fun receiveTextOrBinaryLines(req: JsonElement): Flow { - return if (httpClient.pluginOrNull(DoesNotSupportBackpressureMarker) == null) { + val needsRSocket = httpClient.attributes[WebSocketIfNecessaryPlugin.needsRSocketKey] + + return if (!needsRSocket) { // If we can use streamed HTTP responses that respect backpressure, prefer to do that. flow { connectToSyncEndpoint(req, supportBson = false) { isBson, response -> diff --git a/common/src/commonMain/kotlin/com/powersync/sync/configureSyncHttpClient.kt b/common/src/commonMain/kotlin/com/powersync/sync/configureSyncHttpClient.kt index 38df6119..0add75ba 100644 --- a/common/src/commonMain/kotlin/com/powersync/sync/configureSyncHttpClient.kt +++ b/common/src/commonMain/kotlin/com/powersync/sync/configureSyncHttpClient.kt @@ -37,28 +37,11 @@ import kotlin.native.HiddenFromObjC @HiddenFromObjC @ExperimentalPowerSyncAPI public fun HttpClientConfig<*>.configureSyncHttpClient(userAgent: String = userAgent()) { - val config = this@configureSyncHttpClient - var supportsBackpressure = true - - config.engine { - if (isKnownToNotSupportBackpressure) { - supportsBackpressure = false - } - } - install(HttpTimeout) { socketTimeoutMillis = SOCKET_TIMEOUT } install(ContentNegotiation) - if (!supportsBackpressure) { - install(DoesNotSupportBackpressureMarker) - - install(WebSockets) { - // RSocket Frames (Header + Payload) MUST be limited to 16,777,215 bytes, regardless of whether the utilized - // transport protocol requires the Frame Length field: https://github.com/rsocket/rsocket/blob/master/Protocol.md#max-frame-size - maxFrameSize = 16_777_215 + 1024 // + some extra, you never know - } - } + install(WebSocketIfNecessaryPlugin) install(DefaultRequest) { headers { @@ -67,17 +50,30 @@ public fun HttpClientConfig<*>.configureSyncHttpClient(userAgent: String = userA } } -internal object DoesNotSupportBackpressureMarker: HttpClientPlugin { - override val key: AttributeKey = AttributeKey("DoesNotSupportBackpressureMarker") +/** + * A client plugin that installs WebSocket support and configures it only if the HTTP client implementation is known not + * to support backpressure properly (since that is the only case in which we need RSocket over WebSockets). + */ +internal object WebSocketIfNecessaryPlugin: HttpClientPlugin { + override val key: AttributeKey + get() = WebSockets.key - override fun prepare(block: Unit.() -> Unit): DoesNotSupportBackpressureMarker { - return DoesNotSupportBackpressureMarker + val needsRSocketKey = AttributeKey("NeedsRSocketSupport") + + override fun prepare(block: Unit.() -> Unit): WebSockets { + return WebSockets.prepare { + // RSocket Frames (Header + Payload) MUST be limited to 16,777,215 bytes, regardless of whether the utilized + // transport protocol requires the Frame Length field: https://github.com/rsocket/rsocket/blob/master/Protocol.md#max-frame-size + maxFrameSize = 16_777_215 + 1024 // + some extra, you never know + } } - override fun install( - plugin: DoesNotSupportBackpressureMarker, - scope: HttpClient - ) { - // Nothing to do, marker plugin. + override fun install(plugin: WebSockets, scope: HttpClient) { + if (scope.engineConfig.isKnownToNotSupportBackpressure) { + WebSockets.install(plugin, scope) + scope.attributes.put(needsRSocketKey, true) + } else { + scope.attributes.put(needsRSocketKey, false) + } } } From 9507d0bee496a794fb5e591797e1369ac5dd9eb6 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 1 Dec 2025 12:26:13 +0100 Subject: [PATCH 4/6] Update dependencies --- demos/supabase-todolist/shared/build.gradle.kts | 2 +- gradle/libs.versions.toml | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/demos/supabase-todolist/shared/build.gradle.kts b/demos/supabase-todolist/shared/build.gradle.kts index 28c4fc1e..2a6801ad 100644 --- a/demos/supabase-todolist/shared/build.gradle.kts +++ b/demos/supabase-todolist/shared/build.gradle.kts @@ -27,7 +27,7 @@ kotlin { ios.deploymentTarget = "14.1" podfile = project.file("../iosApp/Podfile") pod("powersync-sqlite-core") { - version = "0.4.0" + version = "0.4.9" linkOnly = true } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 4a625dbd..cce7ecd4 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -18,7 +18,7 @@ serialization = "1.9.0" kotlinx-io = "0.8.0" ktor = "3.3.3" uuid = "0.8.4" -powersync-core = "0.4.8" +powersync-core = "0.4.9" rsocket = "0.20.0" turbine = "1.2.1" kotest = "5.9.1" # we can't upgrade to 6.x because that requires Java 11 or above (we need Java 8 support) @@ -27,12 +27,12 @@ stately = "2.1.0" supabase = "3.2.2" junit = "4.13.2" -compose = "1.8.2" # This is for the multiplatform compose -androidCompose = "2025.08.00" -compose-preview = "1.9.0" -compose-lifecycle = "2.9.2" +compose = "1.9.3" # This is for the multiplatform compose +androidCompose = "2025.11.01" +compose-preview = "1.9.5" +compose-lifecycle = "2.9.6" androidxSqlite = "2.6.0" -androidxSplashscreen = "1.0.1" +androidxSplashscreen = "1.2.0" room = "2.8.0" sqldelight = "2.1.0" From ce6c4a689ff561d6f6bcd549121bf96e7ddcf370 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 1 Dec 2025 13:39:06 +0100 Subject: [PATCH 5/6] Reformat --- .../internal/PlatformSpecificQuirks.kt | 11 ++++++++++ ...tpClient.kt => ConfigureSyncHttpClient.kt} | 12 +++++----- .../com/powersync/sync/RSocketSupport.kt | 9 +++++--- .../com/powersync/sync/StreamingSync.kt | 22 ++++++++++--------- .../kotlin/com/powersync/sync/SyncOptions.kt | 1 - .../powersync/DatabaseDriverFactory.native.kt | 4 +--- .../src/appleMain/kotlin/com/powersync/SDK.kt | 4 +--- 7 files changed, 38 insertions(+), 25 deletions(-) rename common/src/commonMain/kotlin/com/powersync/sync/{configureSyncHttpClient.kt => ConfigureSyncHttpClient.kt} (90%) diff --git a/common/src/commonMain/kotlin/com/powersync/internal/PlatformSpecificQuirks.kt b/common/src/commonMain/kotlin/com/powersync/internal/PlatformSpecificQuirks.kt index 4e3c49c8..d8d6136d 100644 --- a/common/src/commonMain/kotlin/com/powersync/internal/PlatformSpecificQuirks.kt +++ b/common/src/commonMain/kotlin/com/powersync/internal/PlatformSpecificQuirks.kt @@ -1,9 +1,20 @@ package com.powersync.internal +import io.ktor.client.engine.HttpClientEngine import io.ktor.client.engine.HttpClientEngineConfig import kotlin.concurrent.atomics.AtomicReference import kotlin.concurrent.atomics.ExperimentalAtomicApi +/** + * A hook installed by the `:core` and `:internal:PowerSyncKotlin` projects. + * + * The hook is responsible for determining whether a given [HttpClientEngine] (expressed through + * [HttpClientEngineConfig] because the former is not always public) is known not to support backpressure. + * In particular, this is the case for the `Darwin` HTTP engine. + * + * When an engine is marked to not support backpressure handling, we will use a custom protocol with explicit + * flow control instead of relying on HTTP response streams. + */ @OptIn(ExperimentalAtomicApi::class) @InternalPowerSyncAPI public val httpClientIsKnownToNotSupportBackpressure: AtomicReference<((HttpClientEngineConfig) -> Boolean)?> = AtomicReference(null) diff --git a/common/src/commonMain/kotlin/com/powersync/sync/configureSyncHttpClient.kt b/common/src/commonMain/kotlin/com/powersync/sync/ConfigureSyncHttpClient.kt similarity index 90% rename from common/src/commonMain/kotlin/com/powersync/sync/configureSyncHttpClient.kt rename to common/src/commonMain/kotlin/com/powersync/sync/ConfigureSyncHttpClient.kt index 0add75ba..3adde9d7 100644 --- a/common/src/commonMain/kotlin/com/powersync/sync/configureSyncHttpClient.kt +++ b/common/src/commonMain/kotlin/com/powersync/sync/ConfigureSyncHttpClient.kt @@ -54,21 +54,23 @@ public fun HttpClientConfig<*>.configureSyncHttpClient(userAgent: String = userA * A client plugin that installs WebSocket support and configures it only if the HTTP client implementation is known not * to support backpressure properly (since that is the only case in which we need RSocket over WebSockets). */ -internal object WebSocketIfNecessaryPlugin: HttpClientPlugin { +internal object WebSocketIfNecessaryPlugin : HttpClientPlugin { override val key: AttributeKey get() = WebSockets.key val needsRSocketKey = AttributeKey("NeedsRSocketSupport") - override fun prepare(block: Unit.() -> Unit): WebSockets { - return WebSockets.prepare { + override fun prepare(block: Unit.() -> Unit): WebSockets = + WebSockets.prepare { // RSocket Frames (Header + Payload) MUST be limited to 16,777,215 bytes, regardless of whether the utilized // transport protocol requires the Frame Length field: https://github.com/rsocket/rsocket/blob/master/Protocol.md#max-frame-size maxFrameSize = 16_777_215 + 1024 // + some extra, you never know } - } - override fun install(plugin: WebSockets, scope: HttpClient) { + override fun install( + plugin: WebSockets, + scope: HttpClient, + ) { if (scope.engineConfig.isKnownToNotSupportBackpressure) { WebSockets.install(plugin, scope) scope.attributes.put(needsRSocketKey, true) diff --git a/common/src/commonMain/kotlin/com/powersync/sync/RSocketSupport.kt b/common/src/commonMain/kotlin/com/powersync/sync/RSocketSupport.kt index 009a10c5..fa378a6a 100644 --- a/common/src/commonMain/kotlin/com/powersync/sync/RSocketSupport.kt +++ b/common/src/commonMain/kotlin/com/powersync/sync/RSocketSupport.kt @@ -118,9 +118,12 @@ internal fun HttpClient.rSocketSyncStream( }, ) - emitAll(syncStream.map { - PowerSyncControlArguments.BinaryLine(it.data.readByteArray()) - }.flowOn(Dispatchers.IO)) + emitAll( + syncStream + .map { + PowerSyncControlArguments.BinaryLine(it.data.readByteArray()) + }.flowOn(Dispatchers.IO), + ) emit(PowerSyncControlArguments.ResponseStreamEnd) } diff --git a/common/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt b/common/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt index 1e7e3a27..0052d4c9 100644 --- a/common/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt +++ b/common/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt @@ -62,7 +62,6 @@ import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.encodeToJsonElement import kotlin.time.Clock - @OptIn(ExperimentalPowerSyncAPI::class) internal class StreamingSyncClient( private val bucketStorage: BucketStorage, @@ -88,10 +87,11 @@ internal class StreamingSyncClient( private val httpClient: HttpClient = when (val config = options.clientConfiguration) { - is SyncClientConfiguration.ExtendedConfig -> HttpClient { - configureSyncHttpClient(options.userAgent) - config.block(this) - } + is SyncClientConfiguration.ExtendedConfig -> + HttpClient { + configureSyncHttpClient(options.userAgent) + config.block(this) + } is SyncClientConfiguration.ExistingClient -> config.client } @@ -299,11 +299,13 @@ internal class StreamingSyncClient( flow { val credentials = requireNotNull(connector.getCredentialsCached()) { "Not logged in" } - emitAll(httpClient.rSocketSyncStream( - userAgent = options.userAgent, - credentials = credentials, - req = req, - )) + emitAll( + httpClient.rSocketSyncStream( + userAgent = options.userAgent, + credentials = credentials, + req = req, + ), + ) } } } diff --git a/common/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt b/common/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt index 49d0c02f..eaf550ad 100644 --- a/common/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt +++ b/common/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt @@ -10,7 +10,6 @@ import kotlin.native.HiddenFromObjC * the HTTP client used to connect to the PowerSync service. */ public sealed class SyncClientConfiguration { - /** * Extends the default Ktor [HttpClient] configuration with the provided block. */ diff --git a/core/src/appleMain/kotlin/com/powersync/DatabaseDriverFactory.native.kt b/core/src/appleMain/kotlin/com/powersync/DatabaseDriverFactory.native.kt index 411cd694..d7f72d23 100644 --- a/core/src/appleMain/kotlin/com/powersync/DatabaseDriverFactory.native.kt +++ b/core/src/appleMain/kotlin/com/powersync/DatabaseDriverFactory.native.kt @@ -18,8 +18,6 @@ public actual class DatabaseDriverFactory : NativeConnectionFactory() { actual override fun resolveDefaultDatabasePath(dbFilename: String): String = appleDefaultDatabasePath(dbFilename) } -private fun appleClientKnownNotSupportBackpressure(config: HttpClientEngineConfig): Boolean { - return config is DarwinClientEngineConfig -} +private fun appleClientKnownNotSupportBackpressure(config: HttpClientEngineConfig): Boolean = config is DarwinClientEngineConfig internal actual val inMemoryDriver: InMemoryConnectionFactory = DatabaseDriverFactory() diff --git a/internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/SDK.kt b/internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/SDK.kt index 1a0313b0..eb2bd273 100644 --- a/internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/SDK.kt +++ b/internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/SDK.kt @@ -56,9 +56,7 @@ public fun sqlite3DatabaseFactory(initialStatements: List): PersistentCo } } -private fun appleClientKnownNotSupportBackpressure(config: HttpClientEngineConfig): Boolean { - return config is DarwinClientEngineConfig -} +private fun appleClientKnownNotSupportBackpressure(config: HttpClientEngineConfig): Boolean = config is DarwinClientEngineConfig /** * Helper class designed to bridge SKIEE methods and allow them to throw From 075a391606e447a074d7d969626013210d3883de Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 1 Dec 2025 13:43:02 +0100 Subject: [PATCH 6/6] Add changelog entry --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a97931b1..fb6d2dab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ - Sync options: `newClientImplementation` is now the default. - Make `androidx.sqlite:sqlite-bundled` an API dependency of `:core` to avoid toolchain warnings. +- On Apple platforms, use a websocket protocol as a workaround to clients not supporting backpressure in HTTP response + streams. ## 1.8.1