Skip to content

Commit 2456e49

Browse files
authored
fixing clean up method in client - missing synchronization (#425)
1 parent a218427 commit 2456e49

File tree

1 file changed

+27
-16
lines changed

1 file changed

+27
-16
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@
1616

1717
package io.rsocket;
1818

19-
import static io.rsocket.util.ExceptionUtil.noStacktrace;
20-
2119
import io.netty.buffer.Unpooled;
2220
import io.netty.util.collection.IntObjectHashMap;
2321
import io.rsocket.exceptions.ConnectionException;
2422
import io.rsocket.exceptions.Exceptions;
2523
import io.rsocket.internal.LimitableRequestPublisher;
2624
import io.rsocket.util.PayloadImpl;
25+
import org.reactivestreams.Publisher;
26+
import org.reactivestreams.Subscriber;
27+
import reactor.core.Disposable;
28+
import reactor.core.publisher.*;
29+
30+
import javax.annotation.Nullable;
2731
import java.nio.channels.ClosedChannelException;
2832
import java.time.Duration;
2933
import java.util.Collection;
@@ -32,11 +36,8 @@
3236
import java.util.function.Consumer;
3337
import java.util.function.Function;
3438
import java.util.function.Supplier;
35-
import javax.annotation.Nullable;
36-
import org.reactivestreams.Publisher;
37-
import org.reactivestreams.Subscriber;
38-
import reactor.core.Disposable;
39-
import reactor.core.publisher.*;
39+
40+
import static io.rsocket.util.ExceptionUtil.noStacktrace;
4041

4142
/** Client Side of a RSocket socket. Sends {@link Frame}s to a {@link RSocketServer} */
4243
class RSocketClient implements RSocket {
@@ -419,29 +420,39 @@ private boolean contains(int streamId) {
419420
}
420421

421422
protected void cleanup() {
422-
senders.forEach(
423-
(integer, limitableRequestPublisher) ->
424-
cleanUpLimitableRequestPublisher(limitableRequestPublisher));
425-
426-
receivers.forEach((integer, subscriber) -> cleanUpSubscriber(subscriber));
427-
428-
synchronized (this) {
423+
Collection<Subscriber<Payload>> subscribers;
424+
Collection<LimitableRequestPublisher> publishers;
425+
synchronized (RSocketClient.this) {
426+
subscribers = receivers.values();
427+
publishers = senders.values();
428+
429429
senders.clear();
430430
receivers.clear();
431431
}
432432

433+
subscribers.forEach(this::cleanUpSubscriber);
434+
publishers.forEach(this::cleanUpLimitableRequestPublisher);
435+
433436
if (null != keepAliveSendSub) {
434437
keepAliveSendSub.dispose();
435438
}
436439
}
437440

438441
private synchronized void cleanUpLimitableRequestPublisher(
439442
LimitableRequestPublisher<?> limitableRequestPublisher) {
440-
limitableRequestPublisher.cancel();
443+
try {
444+
limitableRequestPublisher.cancel();
445+
} catch (Throwable t) {
446+
errorConsumer.accept(t);
447+
}
441448
}
442449

443450
private synchronized void cleanUpSubscriber(Subscriber<?> subscriber) {
444-
subscriber.onError(CLOSED_CHANNEL_EXCEPTION);
451+
try {
452+
subscriber.onError(CLOSED_CHANNEL_EXCEPTION);
453+
} catch (Throwable t) {
454+
errorConsumer.accept(t);
455+
}
445456
}
446457

447458
private void handleIncomingFrames(Frame frame) {

0 commit comments

Comments
 (0)