Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- Sync options: `newClientImplementation` is now the default.
- Make `androidx.sqlite:sqlite-bundled` an API dependency of `:core` to avoid toolchain warnings.
- On Apple platforms, use a websocket protocol as a workaround to clients not supporting backpressure in HTTP response
streams.

## 1.8.1

Expand Down
2 changes: 2 additions & 0 deletions common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ kotlin {
implementation(libs.kotlinx.datetime)
implementation(libs.stately.concurrency)
implementation(libs.configuration.annotations)
implementation(libs.rsocket.core)
implementation(libs.rsocket.transport.websocket)
api(libs.ktor.client.core)
api(libs.kermit)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.powersync.internal

@RequiresOptIn(message = "This API is internal to PowerSync and should never be used outside of the SDK.")
@Retention(AnnotationRetention.BINARY)
@Target(
AnnotationTarget.CLASS,
AnnotationTarget.FUNCTION,
AnnotationTarget.CONSTRUCTOR,
AnnotationTarget.PROPERTY,
)
public annotation class InternalPowerSyncAPI
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.powersync.internal

import io.ktor.client.engine.HttpClientEngine
import io.ktor.client.engine.HttpClientEngineConfig
import kotlin.concurrent.atomics.AtomicReference
import kotlin.concurrent.atomics.ExperimentalAtomicApi

/**
* A hook installed by the `:core` and `:internal:PowerSyncKotlin` projects.
*
* The hook is responsible for determining whether a given [HttpClientEngine] (expressed through
* [HttpClientEngineConfig] because the former is not always public) is known not to support backpressure.
* In particular, this is the case for the `Darwin` HTTP engine.
*
* When an engine is marked to not support backpressure handling, we will use a custom protocol with explicit
* flow control instead of relying on HTTP response streams.
*/
@OptIn(ExperimentalAtomicApi::class)
@InternalPowerSyncAPI
public val httpClientIsKnownToNotSupportBackpressure: AtomicReference<((HttpClientEngineConfig) -> Boolean)?> = AtomicReference(null)

@OptIn(ExperimentalAtomicApi::class, InternalPowerSyncAPI::class)
internal val HttpClientEngineConfig.isKnownToNotSupportBackpressure: Boolean
get() {
val check = httpClientIsKnownToNotSupportBackpressure.load() ?: return false
return check(this)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.powersync.sync

import com.powersync.ExperimentalPowerSyncAPI
import com.powersync.internal.isKnownToNotSupportBackpressure
import com.powersync.sync.StreamingSyncClient.Companion.SOCKET_TIMEOUT
import io.ktor.client.HttpClient
import io.ktor.client.HttpClientConfig
import io.ktor.client.plugins.DefaultRequest
import io.ktor.client.plugins.HttpClientPlugin
import io.ktor.client.plugins.HttpTimeout
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
import io.ktor.client.plugins.websocket.WebSockets
import io.ktor.client.request.headers
import io.ktor.util.AttributeKey
import kotlin.experimental.ExperimentalObjCRefinement
import kotlin.native.HiddenFromObjC

/**
* This API is experimental and may change in future releases.
*
* Configures a [HttpClient] for PowerSync sync operations.
* Configures required plugins and default request headers.
*
* This is currently only necessary when using a [SyncClientConfiguration.ExistingClient] for PowerSync
* network requests.
*
* Example usage:
*
* ```kotlin
* val client = HttpClient() {
* configureSyncHttpClient()
* // Your own config here
* }
* ```
*/
@OptIn(ExperimentalObjCRefinement::class)
@HiddenFromObjC
@ExperimentalPowerSyncAPI
public fun HttpClientConfig<*>.configureSyncHttpClient(userAgent: String = userAgent()) {
install(HttpTimeout) {
socketTimeoutMillis = SOCKET_TIMEOUT
}
install(ContentNegotiation)
install(WebSocketIfNecessaryPlugin)

install(DefaultRequest) {
headers {
append("User-Agent", userAgent)
}
}
}

/**
* A client plugin that installs WebSocket support and configures it only if the HTTP client implementation is known not
* to support backpressure properly (since that is the only case in which we need RSocket over WebSockets).
*/
internal object WebSocketIfNecessaryPlugin : HttpClientPlugin<Unit, WebSockets> {
override val key: AttributeKey<WebSockets>
get() = WebSockets.key

val needsRSocketKey = AttributeKey<Boolean>("NeedsRSocketSupport")

override fun prepare(block: Unit.() -> Unit): WebSockets =
WebSockets.prepare {
// RSocket Frames (Header + Payload) MUST be limited to 16,777,215 bytes, regardless of whether the utilized
// transport protocol requires the Frame Length field: https://github.com/rsocket/rsocket/blob/master/Protocol.md#max-frame-size
maxFrameSize = 16_777_215 + 1024 // + some extra, you never know
}

override fun install(
plugin: WebSockets,
scope: HttpClient,
) {
if (scope.engineConfig.isKnownToNotSupportBackpressure) {
WebSockets.install(plugin, scope)
scope.attributes.put(needsRSocketKey, true)
} else {
scope.attributes.put(needsRSocketKey, false)
}
}
}
148 changes: 148 additions & 0 deletions common/src/commonMain/kotlin/com/powersync/sync/RSocketSupport.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package com.powersync.sync

import com.powersync.ExperimentalPowerSyncAPI
import com.powersync.bucket.PowerSyncControlArguments
import com.powersync.connectors.PowerSyncCredentials
import com.powersync.utils.JsonUtil
import io.ktor.client.HttpClient
import io.ktor.client.plugins.websocket.webSocketSession
import io.ktor.http.URLBuilder
import io.ktor.http.URLProtocol
import io.ktor.http.takeFrom
import io.rsocket.kotlin.core.RSocketConnector
import io.rsocket.kotlin.keepalive.KeepAlive
import io.rsocket.kotlin.payload.PayloadMimeType
import io.rsocket.kotlin.payload.buildPayload
import io.rsocket.kotlin.payload.data
import io.rsocket.kotlin.payload.metadata
import io.rsocket.kotlin.transport.RSocketClientTarget
import io.rsocket.kotlin.transport.RSocketConnection
import io.rsocket.kotlin.transport.RSocketTransportApi
import io.rsocket.kotlin.transport.ktor.websocket.internal.KtorWebSocketConnection
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.io.readByteArray
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement
import kotlin.coroutines.CoroutineContext
import kotlin.time.Duration.Companion.seconds

/**
* Connects to the RSocket endpoint for receiving sync lines.
*
* Note that we reconstruct the transport layer for RSocket by opening a WebSocket connection
* manually instead of using the high-level RSocket Ktor integration.
* The reason is that every request to the sync service needs its own metadata and data payload
* (e.g. to transmit the token), but the Ktor integration only supports setting a single payload for
* the entire client.
*/
@OptIn(RSocketTransportApi::class, ExperimentalPowerSyncAPI::class)
internal fun HttpClient.rSocketSyncStream(
userAgent: String,
req: JsonElement,
credentials: PowerSyncCredentials,
): Flow<PowerSyncControlArguments> =
flow {
val flowContext = currentCoroutineContext()

val websocketUri =
URLBuilder(credentials.endpointUri("sync/stream")).apply {
protocol =
when (protocolOrNull) {
URLProtocol.HTTP -> URLProtocol.WS
else -> URLProtocol.WSS
}
}

// Note: We're using a custom connector here because we need to set options for each request
// without creating a new HTTP client each time. The recommended approach would be to add an
// RSocket extension to the HTTP client, but that only allows us to set the SETUP metadata for
// all connections (bad because we need a short-lived token in there).
// https://github.com/rsocket/rsocket-kotlin/issues/311
val target =
object : RSocketClientTarget {
@RSocketTransportApi
override suspend fun connectClient(): RSocketConnection {
val ws =
webSocketSession {
url.takeFrom(websocketUri)
}
return KtorWebSocketConnection(ws)
}

override val coroutineContext: CoroutineContext
get() = flowContext
}

val connector =
RSocketConnector {
connectionConfig {
payloadMimeType =
PayloadMimeType(
metadata = "application/json",
data = "application/json",
)

setupPayload {
buildPayload {
data("{}")
metadata(
JsonUtil.json.encodeToString(
ConnectionSetupMetadata(
token = "Bearer ${credentials.token}",
userAgent = userAgent,
),
),
)
}
}

keepAlive = KeepAlive(interval = 20.0.seconds, maxLifetime = 30.0.seconds)
}
}

val rSocket = connector.connect(target)
emit(PowerSyncControlArguments.ConnectionEstablished)
val syncStream =
rSocket.requestStream(
buildPayload {
data(JsonUtil.json.encodeToString(req))
metadata(JsonUtil.json.encodeToString(RequestStreamMetadata("/sync/stream")))
},
)

emitAll(
syncStream
.map {
PowerSyncControlArguments.BinaryLine(it.data.readByteArray())
}.flowOn(Dispatchers.IO),
)
emit(PowerSyncControlArguments.ResponseStreamEnd)
}

/**
* The metadata payload we need to use when connecting with RSocket.
*
* This corresponds to `RSocketContextMeta` on the sync service.
*/
@Serializable
private class ConnectionSetupMetadata(
val token: String,
@SerialName("user_agent")
val userAgent: String,
)

/**
* The metadata payload we send for the `REQUEST_STREAM` frame.
*/
@Serializable
private class RequestStreamMetadata(
val path: String,
)
Loading