Skip to content

Commit 58ca797

Browse files
olme04whyoleg
authored andcommitted
reformat code
1 parent ca4cd9f commit 58ca797

File tree

27 files changed

+322
-199
lines changed

27 files changed

+322
-199
lines changed

benchmarks/src/jvmMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketBenchmark.kt

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,12 @@ abstract class RSocketBenchmark<Payload : Any> {
113113
}
114114

115115
//Run every request in separate coroutine which will be dispatched on Default dispatcher (threads amount = cores amount)
116-
private inline fun parallel(bh: Blackhole, p: Int, crossinline block: suspend (bh: Blackhole) -> Unit): Unit = runBlocking {
117-
(0..p).map {
118-
GlobalScope.async { block(bh) }
119-
}.awaitAll()
120-
}
116+
private inline fun parallel(bh: Blackhole, p: Int, crossinline block: suspend (bh: Blackhole) -> Unit): Unit =
117+
runBlocking {
118+
(0..p).map {
119+
GlobalScope.async { block(bh) }
120+
}.awaitAll()
121+
}
121122

122123
//Run every request in separate coroutine, but on single thread dispatcher:
123124
// - do request 1
@@ -128,9 +129,10 @@ abstract class RSocketBenchmark<Payload : Any> {
128129
// - receive result on request 2
129130
// - ....
130131
//working with requests is single threaded but concurrent
131-
private inline fun concurrent(bh: Blackhole, p: Int, crossinline block: suspend (bh: Blackhole) -> Unit): Unit = runBlocking {
132-
(0..p).map {
133-
async { block(bh) }
134-
}.awaitAll()
135-
}
132+
private inline fun concurrent(bh: Blackhole, p: Int, crossinline block: suspend (bh: Blackhole) -> Unit): Unit =
133+
runBlocking {
134+
(0..p).map {
135+
async { block(bh) }
136+
}.awaitAll()
137+
}
136138
}

benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
8181

8282
override suspend fun doRequestStream(): Flow<Payload> = client.requestStream(payloadCopy()).flowOn(requestStrategy)
8383

84-
override suspend fun doRequestChannel(): Flow<Payload> = client.requestChannel(payloadCopy(), payloadsFlow).flowOn(requestStrategy)
84+
override suspend fun doRequestChannel(): Flow<Payload> =
85+
client.requestChannel(payloadCopy(), payloadsFlow).flowOn(requestStrategy)
8586

8687
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ public interface Connection : CoroutineScope {
3535
}
3636

3737
@OptIn(TransportApi::class)
38-
internal suspend inline fun <T> Connection.receiveFrame(block: (frame: Frame) -> T): T = receive().readFrame(pool).closeOnError(block)
38+
internal suspend inline fun <T> Connection.receiveFrame(block: (frame: Frame) -> T): T =
39+
receive().readFrame(pool).closeOnError(block)
3940

4041
@OptIn(TransportApi::class)
4142
internal suspend fun Connection.sendFrame(frame: Frame) {

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocketRequestHandler.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ public class RSocketRequestHandlerBuilder internal constructor() {
2727
private var fireAndForget: (suspend RSocket.(payload: Payload) -> Unit)? = null
2828
private var requestResponse: (suspend RSocket.(payload: Payload) -> Payload)? = null
2929
private var requestStream: (suspend RSocket.(payload: Payload) -> Flow<Payload>)? = null
30-
private var requestChannel: (suspend RSocket.(initPayload: Payload, payloads: Flow<Payload>) -> Flow<Payload>)? = null
30+
private var requestChannel: (suspend RSocket.(initPayload: Payload, payloads: Flow<Payload>) -> Flow<Payload>)? =
31+
null
3132

3233
public fun metadataPush(block: (suspend RSocket.(metadata: ByteReadPacket) -> Unit)) {
3334
check(metadataPush == null) { "Metadata Push handler already configured" }

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RequestStrategy.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ import kotlin.coroutines.*
2424
private val DefaultStrategy: RequestStrategy = PrefetchStrategy(64, 16)
2525

2626
@ExperimentalStreamsApi
27-
internal fun CoroutineContext.requestStrategy(): RequestStrategy.Element = (get(RequestStrategy) ?: DefaultStrategy).provide()
27+
internal fun CoroutineContext.requestStrategy(): RequestStrategy.Element =
28+
(get(RequestStrategy) ?: DefaultStrategy).provide()
2829

2930
@ExperimentalStreamsApi
3031
public interface RequestStrategy : CoroutineContext.Element {

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/InterceptorsBuilder.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,6 @@ internal class Interceptors(
5454
fun wrapRequester(requester: RSocket): RSocket = requesters.fold(requester) { r, i -> i.intercept(r) }
5555
fun wrapResponder(responder: RSocket): RSocket = responders.fold(responder) { r, i -> i.intercept(r) }
5656
fun wrapConnection(connection: Connection): Connection = connections.fold(connection) { c, i -> i.intercept(c) }
57-
fun wrapAcceptor(connection: ConnectionAcceptor): ConnectionAcceptor = acceptors.fold(connection) { c, i -> i.intercept(c) }
57+
fun wrapAcceptor(connection: ConnectionAcceptor): ConnectionAcceptor =
58+
acceptors.fold(connection) { c, i -> i.intercept(c) }
5859
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/MetadataPushFrame.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,5 @@ internal class MetadataPushFrame(
4545
}
4646
}
4747

48-
internal fun ByteReadPacket.readMetadataPush(pool: ObjectPool<ChunkBuffer>): MetadataPushFrame = MetadataPushFrame(readPacket(pool))
48+
internal fun ByteReadPacket.readMetadataPush(pool: ObjectPool<ChunkBuffer>): MetadataPushFrame =
49+
MetadataPushFrame(readPacket(pool))

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/CloseOperations.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ internal inline fun <T : Closeable, R> T.closeOnError(block: (T) -> R): R {
3131
private val onUndeliveredCloseable: (Closeable) -> Unit = Closeable::close
3232

3333
@Suppress("FunctionName")
34-
internal fun <E : Closeable> SafeChannel(capacity: Int): Channel<E> = Channel(capacity, onUndeliveredElement = onUndeliveredCloseable)
34+
internal fun <E : Closeable> SafeChannel(capacity: Int): Channel<E> =
35+
Channel(capacity, onUndeliveredElement = onUndeliveredCloseable)
3536

3637
internal fun <E : Closeable> SendChannel<E>.safeTrySend(element: E) {
3738
trySend(element).onFailure { element.close() }

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/FrameSender.kt

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ internal class FrameSender(
4242
suspend fun sendMetadataPush(metadata: ByteReadPacket): Unit = prioritizer.send(MetadataPushFrame(metadata))
4343

4444
suspend fun sendCancel(id: Int): Unit = withContext(NonCancellable) { prioritizer.send(CancelFrame(id)) }
45-
suspend fun sendError(id: Int, throwable: Throwable): Unit = withContext(NonCancellable) { prioritizer.send(ErrorFrame(id, throwable)) }
45+
suspend fun sendError(id: Int, throwable: Throwable): Unit =
46+
withContext(NonCancellable) { prioritizer.send(ErrorFrame(id, throwable)) }
47+
4648
suspend fun sendRequestN(id: Int, n: Int): Unit = prioritizer.send(RequestNFrame(id, n))
4749

4850
suspend fun sendRequestPayload(type: FrameType, streamId: Int, payload: Payload, initialRequest: Int = 0) {
@@ -103,14 +105,24 @@ internal class FrameSender(
103105
val fType = if (first && type.isRequestType) type else FrameType.Payload
104106
val fragment = Payload(dataFragment, metadataFragment)
105107
val follows = metadata != null && metadata.isNotEmpty || data.isNotEmpty
106-
prioritizer.send(RequestFrame(fType, streamId, follows, (!follows && complete), !fType.isRequestType, initialRequest, fragment))
108+
prioritizer.send(
109+
RequestFrame(
110+
type = fType,
111+
streamId = streamId,
112+
follows = follows,
113+
complete = (!follows && complete),
114+
next = !fType.isRequestType,
115+
initialRequest = initialRequest,
116+
payload = fragment
117+
)
118+
)
107119
first = false
108120
remaining = fragmentSize
109121
} while (follows)
110122
}
111123

112124
private fun Payload.isFragmentable(hasInitialRequest: Boolean) = when (maxFragmentSize) {
113-
0 -> false
125+
0 -> false
114126
else -> when (val meta = metadata) {
115127
null -> data.remaining > maxFragmentSize - fragmentOffset - (if (hasInitialRequest) Int.SIZE_BYTES else 0)
116128
else -> data.remaining + meta.remaining > maxFragmentSize - fragmentOffsetWithMetadata - (if (hasInitialRequest) Int.SIZE_BYTES else 0)

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Limiter.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ import kotlinx.coroutines.*
2323
import kotlinx.coroutines.flow.*
2424
import kotlin.coroutines.*
2525

26-
internal suspend inline fun Flow<Payload>.collectLimiting(limiter: Limiter, crossinline action: suspend (value: Payload) -> Unit) {
26+
internal suspend inline fun Flow<Payload>.collectLimiting(
27+
limiter: Limiter,
28+
crossinline action: suspend (value: Payload) -> Unit
29+
) {
2730
collect { payload ->
2831
payload.closeOnError {
2932
limiter.useRequest()

0 commit comments

Comments
 (0)