Skip to content

Commit 897064a

Browse files
mostroverkhovrobertroeser
authored andcommitted
add stream error handlers to avoid global undeliverable error exceptions, which close app process if no appropriate global handler is set (#14)
1 parent 7eaf8de commit 897064a

File tree

3 files changed

+37
-68
lines changed

3 files changed

+37
-68
lines changed

rsocket-core/src/main/java/io/rsocket/android/RSocketClient.kt

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import io.reactivex.Completable
2424
import io.reactivex.Flowable
2525
import io.reactivex.Single
2626
import io.reactivex.disposables.Disposable
27+
import io.reactivex.functions.Action
28+
import io.reactivex.functions.Consumer
2729
import io.reactivex.processors.FlowableProcessor
2830
import io.reactivex.processors.PublishProcessor
2931
import io.reactivex.processors.UnicastProcessor
@@ -67,33 +69,29 @@ internal class RSocketClient @JvmOverloads constructor(
6769
val ackTimeoutMs = ackTimeout.toMillis
6870

6971
this.keepAliveSendSub = completeOnStart
70-
.andThen(Flowable.interval(tickPeriod.toMillis,TimeUnit.MILLISECONDS))
72+
.andThen(Flowable.interval(tickPeriod.toMillis, TimeUnit.MILLISECONDS))
7173
.doOnSubscribe({ _ -> timeLastTickSentMs = System.currentTimeMillis() })
7274
.concatMap({ _ -> sendKeepAlive(ackTimeoutMs, missedAcks).toFlowable<Long>() })
73-
.doOnError({ t:Throwable ->
75+
.subscribe({},
76+
{ t: Throwable ->
7477
errorConsumer(t)
75-
connection.close().subscribe()
78+
connection.close().subscribe({}, errorConsumer)
7679
})
77-
.subscribe()
7880
}
7981

8082
connection
8183
.onClose()
8284
.doFinally { cleanup() }
83-
.doOnError(errorConsumer)
84-
.subscribe()
85+
.subscribe({}, errorConsumer)
8586

8687
connection
8788
.send(sendProcessor)
88-
.doOnError { handleSendProcessorError(it) }
89-
.subscribe()
89+
.subscribe({}, { handleSendProcessorError(it) })
9090

9191
connection
9292
.receive()
9393
.doOnSubscribe { started.onComplete() }
94-
.doOnNext { handleIncomingFrames(it) }
95-
.doOnError(errorConsumer)
96-
.subscribe()
94+
.subscribe({ handleIncomingFrames(it) },errorConsumer)
9795
}
9896

9997
private fun handleSendProcessorError(t: Throwable) {
@@ -292,10 +290,12 @@ internal class RSocketClient @JvmOverloads constructor(
292290

293291
requestFrames
294292
.doOnNext { sendProcessor.onNext(it) }
295-
.doOnError { t ->
293+
.subscribe(
294+
{},
295+
{ t ->
296296
errorConsumer(t)
297297
receiver.onError(CancellationException("Disposed"))
298-
}.subscribe()
298+
})
299299
} else {
300300
sendOneFrame(Frame.RequestN.from(streamId, l))
301301
}
@@ -318,7 +318,7 @@ internal class RSocketClient @JvmOverloads constructor(
318318
}
319319
}
320320

321-
protected fun cleanup() {
321+
private fun cleanup() {
322322

323323
var subscribers: Collection<Subscriber<Payload>>
324324
var publishers: Collection<LimitableRequestPublisher<*>>

rsocket-core/src/main/java/io/rsocket/android/RSocketServer.kt

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@ import org.reactivestreams.Subscription
5555

5656
connection
5757
.send(sendProcessor)
58-
.doOnError { handleSendProcessorError(it) }
59-
.subscribe()
58+
.subscribe(
59+
{},
60+
{ handleSendProcessorError(it) })
6061

6162
this.receiveDisposable = connection
6263
.receive()
@@ -68,18 +69,20 @@ import org.reactivestreams.Subscription
6869
Completable.complete()
6970
}).toFlowable<Void>()
7071
}
71-
.doOnError(errorConsumer)
7272
.ignoreElements()
73-
.subscribe()
73+
.subscribe(
74+
{},
75+
errorConsumer)
7476

7577
this.connection
7678
.onClose()
77-
.doOnError(errorConsumer)
7879
.doFinally {
7980
cleanup()
8081
receiveDisposable.dispose()
8182
}
82-
.subscribe()
83+
.subscribe(
84+
{},
85+
errorConsumer)
8386
}
8487

8588
private fun handleSendProcessorError(t: Throwable) {
@@ -157,7 +160,7 @@ import org.reactivestreams.Subscription
157160
cleanUpSendingSubscriptions()
158161
cleanUpChannelProcessors()
159162

160-
requestHandler.close().subscribe()
163+
requestHandler.close().subscribe({}, errorConsumer)
161164
}
162165

163166
@Synchronized private fun cleanUpSendingSubscriptions() {
@@ -269,9 +272,8 @@ import org.reactivestreams.Subscription
269272
})
270273
.concatWith(Flowable.just(Frame.PayloadFrame.from(streamId, FrameType.COMPLETE)))
271274
.onErrorResumeNext { t: Throwable -> Flowable.just(Frame.Error.from(streamId, t)) }
272-
.doOnNext({ sendProcessor.onNext(it) })
273275
.doFinally { removeSubscription(streamId) }
274-
.subscribe()
276+
.subscribe({ sendProcessor.onNext(it) })
275277

276278
return Completable.complete()
277279
}
@@ -363,25 +365,8 @@ import org.reactivestreams.Subscription
363365
}
364366
}
365367

366-
private class Subs(val s: Subscription) : DisposableSubscription() {
367-
@Volatile private var isDisposed = false
368-
369-
override fun isDisposed(): Boolean = isDisposed
370-
371-
override fun dispose() {
372-
isDisposed = true
373-
cancel()
374-
}
375-
376-
override fun cancel() = s.cancel()
377-
378-
override fun request(n: Long) = s.request(n)
379-
}
380-
381368
companion object {
382369
fun disposable(d: Disposable): DisposableSubscription = Disp(d)
383-
384-
fun subscription(s: Subscription): DisposableSubscription = Subs(s)
385370
}
386371
}
387372

rsocket-core/src/main/java/io/rsocket/android/internal/ClientServerInputMultiplexer.kt

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -75,30 +75,22 @@ class ClientServerInputMultiplexer(val source: DuplexConnection, plugins: Plugin
7575
}
7676
type
7777
}
78-
.subscribe { group ->
78+
.subscribe({ group ->
7979
when (group.key) {
8080
Type.STREAM_ZERO -> streamZero.onNext(group)
8181
Type.SERVER -> server.onNext(group)
8282
Type.CLIENT -> client.onNext(group)
8383
}
84-
}
84+
}, { /*noop - errors are handled by demuxed frame streams*/ })
8585
}
8686

87-
fun asServerConnection(): DuplexConnection {
88-
return serverConnection
89-
}
87+
fun asServerConnection(): DuplexConnection = serverConnection
9088

91-
fun asClientConnection(): DuplexConnection {
92-
return clientConnection
93-
}
89+
fun asClientConnection(): DuplexConnection = clientConnection
9490

95-
fun asStreamZeroConnection(): DuplexConnection {
96-
return streamZeroConnection
97-
}
91+
fun asStreamZeroConnection(): DuplexConnection = streamZeroConnection
9892

99-
fun close(): Completable {
100-
return source.close()
101-
}
93+
fun close(): Completable = source.close()
10294

10395
private class InternalDuplexConnection(private val source: DuplexConnection,
10496
p: Flowable<Flowable<Frame>>) : DuplexConnection {
@@ -122,25 +114,17 @@ class ClientServerInputMultiplexer(val source: DuplexConnection, plugins: Plugin
122114

123115
override fun receive(): Flowable<Frame> {
124116
return processor.flatMapPublisher { f ->
125-
if (debugEnabled) {
126-
return@flatMapPublisher f.doOnNext { frame -> LOGGER.debug("receiving -> " + frame.toString()) }
127-
} else {
128-
return@flatMapPublisher f
129-
}
117+
if (debugEnabled)
118+
f.doOnNext { frame -> LOGGER.debug("receiving -> " + frame.toString()) }
119+
else f
130120
}
131121
}
132122

133-
override fun close(): Completable {
134-
return source.close()
135-
}
123+
override fun close(): Completable = source.close()
136124

137-
override fun onClose(): Completable {
138-
return source.onClose()
139-
}
125+
override fun onClose(): Completable = source.onClose()
140126

141-
override fun availability(): Double {
142-
return source.availability()
143-
}
127+
override fun availability(): Double = source.availability()
144128
}
145129

146130
companion object {

0 commit comments

Comments
 (0)