Skip to content

Commit 4632234

Browse files
authored
fixing format (#451)
1 parent ad372da commit 4632234

File tree

32 files changed

+183
-179
lines changed

32 files changed

+183
-179
lines changed

rsocket-core/src/jmh/java/io/rsocket/FragmentationPerf.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,15 @@ public void setup(Blackhole bh) {
4343
ByteBuffer data = createRandomBytes(1 << 18);
4444
ByteBuffer metadata = createRandomBytes(1 << 18);
4545
largeFrame =
46-
Frame.Request.from(1, FrameType.REQUEST_RESPONSE, DefaultPayload.create(data, metadata), 1);
46+
Frame.Request.from(
47+
1, FrameType.REQUEST_RESPONSE, DefaultPayload.create(data, metadata), 1);
4748
largeFrameFragmenter = new FrameFragmenter(1024);
4849

4950
data = createRandomBytes(16);
5051
metadata = createRandomBytes(16);
5152
smallFrame =
52-
Frame.Request.from(1, FrameType.REQUEST_RESPONSE, DefaultPayload.create(data, metadata), 1);
53+
Frame.Request.from(
54+
1, FrameType.REQUEST_RESPONSE, DefaultPayload.create(data, metadata), 1);
5355
smallFrameFragmenter = new FrameFragmenter(2);
5456
smallFramesIterable =
5557
smallFrameFragmenter

rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.rsocket.RSocketFactory.Start;
2020
import io.rsocket.perfutil.TestDuplexConnection;
2121
import io.rsocket.util.DefaultPayload;
22-
2322
import java.nio.ByteBuffer;
2423
import java.nio.charset.StandardCharsets;
2524
import org.openjdk.jmh.annotations.Benchmark;

rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public ConnectionSetupPayload retain(int increment) {
9595
}
9696

9797
public abstract ConnectionSetupPayload touch();
98+
9899
public abstract ConnectionSetupPayload touch(Object hint);
99100

100101
private static final class DefaultConnectionSetupPayload extends ConnectionSetupPayload {
@@ -106,11 +107,7 @@ private static final class DefaultConnectionSetupPayload extends ConnectionSetup
106107
private final int flags;
107108

108109
public DefaultConnectionSetupPayload(
109-
String metadataMimeType,
110-
String dataMimeType,
111-
ByteBuf data,
112-
ByteBuf metadata,
113-
int flags) {
110+
String metadataMimeType, String dataMimeType, ByteBuf data, ByteBuf metadata, int flags) {
114111
this.metadataMimeType = metadataMimeType;
115112
this.dataMimeType = dataMimeType;
116113
this.data = data;

rsocket-core/src/main/java/io/rsocket/Frame.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,8 @@ public static Frame from(
273273
String metadataMimeType,
274274
String dataMimeType,
275275
Payload payload) {
276-
final ByteBuf metadata = payload.hasMetadata() ? payload.sliceMetadata() : Unpooled.EMPTY_BUFFER;
276+
final ByteBuf metadata =
277+
payload.hasMetadata() ? payload.sliceMetadata() : Unpooled.EMPTY_BUFFER;
277278
final ByteBuf data = payload.sliceData();
278279

279280
final Frame frame = RECYCLER.get();

rsocket-core/src/main/java/io/rsocket/Payload.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,10 @@
1616
package io.rsocket;
1717

1818
import io.netty.buffer.ByteBuf;
19-
20-
import java.nio.ByteBuffer;
21-
import java.nio.charset.StandardCharsets;
22-
2319
import io.netty.util.ReferenceCounted;
2420
import io.netty.util.ResourceLeakDetector;
21+
import java.nio.ByteBuffer;
22+
import java.nio.charset.StandardCharsets;
2523

2624
/** Payload of a {@link Frame}. */
2725
public interface Payload extends ReferenceCounted {
@@ -47,30 +45,26 @@ public interface Payload extends ReferenceCounted {
4745
*/
4846
ByteBuf sliceData();
4947

50-
/**
51-
* Increases the reference count by {@code 1}.
52-
*/
48+
/** Increases the reference count by {@code 1}. */
5349
@Override
5450
Payload retain();
5551

56-
/**
57-
* Increases the reference count by the specified {@code increment}.
58-
*/
52+
/** Increases the reference count by the specified {@code increment}. */
5953
@Override
6054
Payload retain(int increment);
6155

6256
/**
63-
* Records the current access location of this object for debugging purposes.
64-
* If this object is determined to be leaked, the information recorded by this operation will be provided to you
65-
* via {@link ResourceLeakDetector}. This method is a shortcut to {@link #touch(Object) touch(null)}.
57+
* Records the current access location of this object for debugging purposes. If this object is
58+
* determined to be leaked, the information recorded by this operation will be provided to you via
59+
* {@link ResourceLeakDetector}. This method is a shortcut to {@link #touch(Object) touch(null)}.
6660
*/
6761
@Override
6862
Payload touch();
6963

7064
/**
71-
* Records the current access location of this object with an additional arbitrary information for debugging
72-
* purposes. If this object is determined to be leaked, the information recorded by this operation will be
73-
* provided to you via {@link ResourceLeakDetector}.
65+
* Records the current access location of this object with an additional arbitrary information for
66+
* debugging purposes. If this object is determined to be leaked, the information recorded by this
67+
* operation will be provided to you via {@link ResourceLeakDetector}.
7468
*/
7569
@Override
7670
Payload touch(Object hint);
@@ -90,4 +84,4 @@ default String getMetadataUtf8() {
9084
default String getDataUtf8() {
9185
return sliceData().toString(StandardCharsets.UTF_8);
9286
}
93-
}
87+
}

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import io.rsocket.exceptions.Exceptions;
2525
import io.rsocket.internal.LimitableRequestPublisher;
2626
import io.rsocket.internal.UnboundedProcessor;
27-
2827
import java.nio.channels.ClosedChannelException;
2928
import java.time.Duration;
3029
import java.util.Collection;
@@ -64,7 +63,8 @@ class RSocketClient implements RSocket {
6463
Function<Frame, ? extends Payload> frameDecoder,
6564
Consumer<Throwable> errorConsumer,
6665
StreamIdSupplier streamIdSupplier) {
67-
this(connection, frameDecoder, errorConsumer, streamIdSupplier, Duration.ZERO, Duration.ZERO, 0);
66+
this(
67+
connection, frameDecoder, errorConsumer, streamIdSupplier, Duration.ZERO, Duration.ZERO, 0);
6868
}
6969

7070
RSocketClient(
@@ -372,11 +372,13 @@ public Flux<Payload> get() {
372372
public Frame apply(Payload payload) {
373373
final Frame requestFrame;
374374
if (firstPayload.compareAndSet(true, false)) {
375-
requestFrame = Frame.Request.from(
376-
streamId, requestType, payload, l);
375+
requestFrame =
376+
Frame.Request.from(
377+
streamId, requestType, payload, l);
377378
} else {
378-
requestFrame = Frame.PayloadFrame.from(
379-
streamId, FrameType.NEXT, payload);
379+
requestFrame =
380+
Frame.PayloadFrame.from(
381+
streamId, FrameType.NEXT, payload);
380382
}
381383
payload.release();
382384
return requestFrame;

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,11 @@
2828
import io.rsocket.transport.ClientTransport;
2929
import io.rsocket.transport.ServerTransport;
3030
import io.rsocket.util.DefaultPayload;
31+
import io.rsocket.util.EmptyPayload;
3132
import java.time.Duration;
3233
import java.util.function.Consumer;
3334
import java.util.function.Function;
3435
import java.util.function.Supplier;
35-
36-
import io.rsocket.util.EmptyPayload;
3736
import reactor.core.publisher.Mono;
3837

3938
/** Factory for creating RSocket clients and servers. */
@@ -244,7 +243,10 @@ public Mono<RSocket> start() {
244243
.doOnNext(
245244
rSocket ->
246245
new RSocketServer(
247-
multiplexer.asServerConnection(), rSocket, frameDecoder, errorConsumer))
246+
multiplexer.asServerConnection(),
247+
rSocket,
248+
frameDecoder,
249+
errorConsumer))
248250
.then(finalConnection.sendOne(setupFrame))
249251
.then(wrappedRSocketClient);
250252
});
@@ -359,7 +361,8 @@ private Mono<? extends Void> processSetupFrame(
359361
sender -> acceptor.get().accept(setupPayload, sender).map(plugins::applyServer))
360362
.map(
361363
handler ->
362-
new RSocketServer(multiplexer.asClientConnection(), handler, frameDecoder, errorConsumer))
364+
new RSocketServer(
365+
multiplexer.asClientConnection(), handler, frameDecoder, errorConsumer))
363366
.then();
364367
}
365368
}

rsocket-core/src/main/java/io/rsocket/RSocketServer.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import io.rsocket.exceptions.ApplicationException;
2727
import io.rsocket.internal.LimitableRequestPublisher;
2828
import io.rsocket.internal.UnboundedProcessor;
29-
3029
import java.util.Collection;
3130
import java.util.function.Consumer;
3231
import java.util.function.Function;
@@ -314,7 +313,8 @@ private Mono<Void> handleRequestResponse(int streamId, Mono<Payload> response) {
314313
if (payload.hasMetadata()) {
315314
flags = Frame.setFlag(flags, FLAGS_M);
316315
}
317-
final Frame frame = Frame.PayloadFrame.from(streamId, FrameType.NEXT_COMPLETE, payload, flags);
316+
final Frame frame =
317+
Frame.PayloadFrame.from(streamId, FrameType.NEXT_COMPLETE, payload, flags);
318318
payload.release();
319319
return frame;
320320
})
@@ -330,11 +330,12 @@ private Mono<Void> handleRequestResponse(int streamId, Mono<Payload> response) {
330330

331331
private Mono<Void> handleStream(int streamId, Flux<Payload> response, int initialRequestN) {
332332
response
333-
.map(payload -> {
334-
final Frame frame = Frame.PayloadFrame.from(streamId, FrameType.NEXT, payload);
335-
payload.release();
336-
return frame;
337-
})
333+
.map(
334+
payload -> {
335+
final Frame frame = Frame.PayloadFrame.from(streamId, FrameType.NEXT, payload);
336+
payload.release();
337+
return frame;
338+
})
338339
.transform(
339340
frameFlux -> {
340341
LimitableRequestPublisher<Frame> frames = LimitableRequestPublisher.wrap(frameFlux);

rsocket-core/src/main/java/io/rsocket/internal/SwitchTransform.java

Lines changed: 62 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -10,71 +10,71 @@
1010
import reactor.core.publisher.Operators;
1111

1212
public final class SwitchTransform<T, R> extends Flux<R> {
13-
14-
final Publisher<? extends T> source;
13+
14+
final Publisher<? extends T> source;
15+
final BiFunction<T, Flux<T>, Publisher<? extends R>> transformer;
16+
17+
public SwitchTransform(
18+
Publisher<? extends T> source, BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
19+
this.source = Objects.requireNonNull(source, "source");
20+
this.transformer = Objects.requireNonNull(transformer, "transformer");
21+
}
22+
23+
@Override
24+
public void subscribe(CoreSubscriber<? super R> actual) {
25+
Flux.from(source).subscribe(new SwitchTransformSubscriber<>(actual, transformer));
26+
}
27+
28+
static final class SwitchTransformSubscriber<T, R> implements CoreSubscriber<T> {
29+
@SuppressWarnings("rawtypes")
30+
static final AtomicIntegerFieldUpdater<SwitchTransformSubscriber> ONCE =
31+
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformSubscriber.class, "once");
32+
33+
final CoreSubscriber<? super R> actual;
1534
final BiFunction<T, Flux<T>, Publisher<? extends R>> transformer;
16-
17-
public SwitchTransform(
18-
Publisher<? extends T> source, BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
19-
this.source = Objects.requireNonNull(source, "source");
20-
this.transformer = Objects.requireNonNull(transformer, "transformer");
35+
final UnboundedProcessor<T> processor = new UnboundedProcessor<>();
36+
Subscription s;
37+
volatile int once;
38+
39+
SwitchTransformSubscriber(
40+
CoreSubscriber<? super R> actual,
41+
BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
42+
this.actual = actual;
43+
this.transformer = transformer;
2144
}
22-
45+
2346
@Override
24-
public void subscribe(CoreSubscriber<? super R> actual) {
25-
Flux.from(source).subscribe(new SwitchTransformSubscriber<>(actual, transformer));
47+
public void onSubscribe(Subscription s) {
48+
if (Operators.validate(this.s, s)) {
49+
this.s = s;
50+
processor.onSubscribe(s);
51+
}
2652
}
27-
28-
static final class SwitchTransformSubscriber<T, R> implements CoreSubscriber<T> {
29-
@SuppressWarnings("rawtypes")
30-
static final AtomicIntegerFieldUpdater<SwitchTransformSubscriber> ONCE =
31-
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformSubscriber.class, "once");
32-
33-
final CoreSubscriber<? super R> actual;
34-
final BiFunction<T, Flux<T>, Publisher<? extends R>> transformer;
35-
final UnboundedProcessor<T> processor = new UnboundedProcessor<>();
36-
Subscription s;
37-
volatile int once;
38-
39-
SwitchTransformSubscriber(
40-
CoreSubscriber<? super R> actual,
41-
BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
42-
this.actual = actual;
43-
this.transformer = transformer;
44-
}
45-
46-
@Override
47-
public void onSubscribe(Subscription s) {
48-
if (Operators.validate(this.s, s)) {
49-
this.s = s;
50-
processor.onSubscribe(s);
51-
}
52-
}
53-
54-
@Override
55-
public void onNext(T t) {
56-
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
57-
try {
58-
Publisher<? extends R> result =
59-
Objects.requireNonNull(
60-
transformer.apply(t, processor), "The transformer returned a null value");
61-
Flux.from(result).subscribe(actual);
62-
} catch (Throwable e) {
63-
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
64-
return;
65-
}
66-
}
67-
processor.onNext(t);
68-
}
69-
70-
@Override
71-
public void onError(Throwable t) {
72-
processor.onError(t);
73-
}
74-
75-
@Override
76-
public void onComplete() {
77-
processor.onComplete();
53+
54+
@Override
55+
public void onNext(T t) {
56+
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
57+
try {
58+
Publisher<? extends R> result =
59+
Objects.requireNonNull(
60+
transformer.apply(t, processor), "The transformer returned a null value");
61+
Flux.from(result).subscribe(actual);
62+
} catch (Throwable e) {
63+
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
64+
return;
7865
}
66+
}
67+
processor.onNext(t);
68+
}
69+
70+
@Override
71+
public void onError(Throwable t) {
72+
processor.onError(t);
73+
}
74+
75+
@Override
76+
public void onComplete() {
77+
processor.onComplete();
7978
}
80-
}
79+
}
80+
}

rsocket-core/src/main/java/io/rsocket/lease/Lease.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package io.rsocket.lease;
1818

19-
import javax.annotation.Nullable;
2019
import java.nio.ByteBuffer;
20+
import javax.annotation.Nullable;
2121

2222
/** A contract for RSocket lease, which is sent by a request acceptor and is time bound. */
2323
public interface Lease {

0 commit comments

Comments
 (0)