Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,18 @@ package com.facebook.react.devsupport

import android.os.Handler
import android.os.Looper
import com.facebook.common.logging.FLog
import com.facebook.jni.HybridData
import com.facebook.proguard.annotations.DoNotStrip
import com.facebook.proguard.annotations.DoNotStripAny
import com.facebook.react.common.annotations.VisibleForTesting
import com.facebook.react.devsupport.inspector.DevSupportHttpClient
import com.facebook.soloader.SoLoader
import java.io.Closeable
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.ArrayDeque
import java.util.Queue
import okhttp3.Request
import okhttp3.Response
import okhttp3.WebSocket
Expand Down Expand Up @@ -66,7 +72,7 @@ internal class CxxInspectorPackagerConnection(
*/
@DoNotStripAny
private interface IWebSocket : Closeable {
fun send(message: String)
fun send(chunk: ByteBuffer)

/**
* Close the WebSocket connection. NOTE: There is no close() method in the C++ interface.
Expand All @@ -75,6 +81,95 @@ internal class CxxInspectorPackagerConnection(
override fun close()
}

/**
* A simple WebSocket wrapper that prevents having more than 16MiB of messages queued
* simultaneously. This is done to stop OkHttp from closing the WebSocket connection.
*
* https://github.com/facebook/react-native/issues/39651.
* https://github.com/square/okhttp/blob/4e7dbec1ea6c9cf8d80422ac9d44b9b185c749a3/okhttp/src/commonJvmAndroid/kotlin/okhttp3/internal/ws/RealWebSocket.kt#L684.
*/
private class InspectorPackagerWebSocketImpl(
private val nativeWebSocket: WebSocket,
private val handler: Handler,
) : IWebSocket {
private val messageQueue: Queue<Pair<String, Int>> = ArrayDeque()
private val queueLock = Any()
private val drainRunnable =
object : Runnable {
override fun run() {
FLog.d(TAG, "Attempting to drain the message queue after ${drainDelayMs}ms")
tryDrainQueue()
}
}

/**
* We are providing a String to OkHttp's WebSocket, because there is no guarantee that all CDP
* clients will support binary data format.
*/
override fun send(chunk: ByteBuffer) {
synchronized(queueLock) {
val messageSize = chunk.capacity()
val message = StandardCharsets.UTF_8.decode(chunk).toString()
val currentQueueSize = nativeWebSocket.queueSize()

if (currentQueueSize + messageSize > MAX_QUEUE_SIZE) {
FLog.d(TAG, "Reached queue size limit. Queueing the message.")
messageQueue.offer(Pair(message, messageSize))
scheduleDrain()
} else {
if (messageQueue.isEmpty()) {
nativeWebSocket.send(message)
} else {
messageQueue.offer(Pair(message, messageSize))
tryDrainQueue()
}
}
}
}

override fun close() {
synchronized(queueLock) {
handler.removeCallbacks(drainRunnable)
messageQueue.clear()
nativeWebSocket.close(1000, "End of session")
}
}

private fun tryDrainQueue() {
synchronized(queueLock) {
while (messageQueue.isNotEmpty()) {
val (nextMessage, nextMessageSize) = messageQueue.peek() ?: break
val currentQueueSize = nativeWebSocket.queueSize()

if (currentQueueSize + nextMessageSize <= MAX_QUEUE_SIZE) {
messageQueue.poll()
if (!nativeWebSocket.send(nextMessage)) {
// The WebSocket is closing, closed, or cancelled.
handler.removeCallbacks(drainRunnable)
messageQueue.clear()

break
}
} else {
scheduleDrain()
break
}
}
}
}

private fun scheduleDrain() {
FLog.d(TAG, "Scheduled a task to drain messages queue.")
handler.removeCallbacks(drainRunnable)
handler.postDelayed(drainRunnable, drainDelayMs)
}

companion object {
private val TAG: String = InspectorPackagerWebSocketImpl::class.java.simpleName
private const val drainDelayMs: Long = 100
}
}

/** Java implementation of the C++ InspectorPackagerConnectionDelegate interface. */
private class DelegateImpl {
private val httpClient = DevSupportHttpClient.websocketClient
Expand Down Expand Up @@ -124,15 +219,8 @@ internal class CxxInspectorPackagerConnection(
}
},
)
return object : IWebSocket {
override fun send(message: String) {
webSocket.send(message)
}

override fun close() {
webSocket.close(1000, "End of session")
}
}
return InspectorPackagerWebSocketImpl(webSocket, handler)
}

@DoNotStrip
Expand All @@ -146,6 +234,8 @@ internal class CxxInspectorPackagerConnection(
SoLoader.loadLibrary("react_devsupportjni")
}

@VisibleForTesting internal const val MAX_QUEUE_SIZE = 16L * 1024 * 1024 // 16MiB

@JvmStatic
private external fun initHybrid(
url: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ internal object InspectorNetworkHelper {
response.body().use { responseBody ->
if (responseBody != null) {
val inputStream = responseBody.byteStream()
val chunkSize = 1024
val chunkSize = 8 * 1024 // 8Kb
val buffer = ByteArray(chunkSize)
var bytesRead: Int

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,44 @@
* LICENSE file in the root directory of this source tree.
*/

#include <fbjni/ByteBuffer.h>

#include "JCxxInspectorPackagerConnectionWebSocket.h"

using namespace facebook::jni;
using namespace facebook::react::jsinspector_modern;

namespace facebook::react::jsinspector_modern {

namespace {

local_ref<JByteBuffer::javaobject> getReadOnlyByteBufferFromStringView(
std::string_view sv) {
auto buffer = JByteBuffer::wrapBytes(
const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(sv.data())),
sv.size());

/**
* Return a read-only buffer that shares the underlying contents.
* This guards from accidential mutations on the Java side, since we did
* casting above.
*
* https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html#asReadOnlyBuffer--
*/
static auto method =
buffer->javaClassStatic()->getMethod<JByteBuffer::javaobject()>(
"asReadOnlyBuffer");
return method(buffer);
}

} // namespace

void JCxxInspectorPackagerConnectionWebSocket::send(std::string_view message) {
static auto method =
javaClassStatic()->getMethod<void(const std::string&)>("send");
method(self(), std::string(message));
javaClassStatic()->getMethod<void(local_ref<JByteBuffer::javaobject>)>(
"send");
auto byteBuffer = getReadOnlyByteBufferFromStringView(message);
method(self(), byteBuffer);
}

void JCxxInspectorPackagerConnectionWebSocket::close() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

package com.facebook.react.devsupport

import okhttp3.internal.ws.RealWebSocket
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test

class CxxInspectorPackagerConnectionTest {

@Test
fun testMaxQueueSizeEquality() {
val okHttpRealWebSocketClass = RealWebSocket::class.java
val okHttpMaxQueueSizeField = okHttpRealWebSocketClass.getDeclaredField("MAX_QUEUE_SIZE")
okHttpMaxQueueSizeField.isAccessible = true

val okHttpMaxQueueSize = okHttpMaxQueueSizeField.getLong(null)
assertThat(okHttpMaxQueueSize).isNotNull

assertThat(okHttpMaxQueueSize)
.isEqualTo(CxxInspectorPackagerConnection.Companion.MAX_QUEUE_SIZE)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@ const uint16_t TRACE_EVENT_CHUNK_SIZE = 1000;
/**
* The maximum number of ProfileChunk trace events
* that will be sent in a single CDP Tracing.dataCollected message.
* TODO(T219394401): Increase the size once we manage the queue on OkHTTP
side
* properly and avoid WebSocket disconnections when sending a message larger
* than 16MB.
*/
const uint16_t PROFILE_TRACE_EVENT_CHUNK_SIZE = 1;
const uint16_t PROFILE_TRACE_EVENT_CHUNK_SIZE = 10;

} // namespace

Expand Down
Loading