Skip to content

Commit a2668e4

Browse files
committed
Polishing
1 parent 0bf7663 commit a2668e4

19 files changed

+105
-98
lines changed

src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,6 @@ static MySqlConnectionConfiguration setup(ConnectionFactoryOptions options) {
298298
* @param builder the builder of {@link MySqlConnectionConfiguration}.
299299
* @param mapper the {@link OptionMapper} of {@code options}.
300300
*/
301-
@SuppressWarnings("unchecked")
302301
private static void setupHost(MySqlConnectionConfiguration.Builder builder, OptionMapper mapper) {
303302
mapper.requires(HOST).asString()
304303
.to(builder::host);

src/main/java/io/asyncer/r2dbc/mysql/client/Client.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
* An abstraction that wraps the networking part of exchanging methods.
4242
*/
4343
public interface Client {
44+
4445
InternalLogger logger = InternalLoggerFactory.getInstance(Client.class);
4546

4647
/**

src/main/java/io/asyncer/r2dbc/mysql/client/ClientExceptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.r2dbc.spi.R2dbcNonTransientResourceException;
2121

2222
/**
23-
* An utility considers generic exceptions of {@link Client}.
23+
* A utility considers generic exceptions to {@link Client}.
2424
*/
2525
final class ClientExceptions {
2626

src/main/java/io/asyncer/r2dbc/mysql/client/DefaultHostnameVerifier.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ final class DefaultHostnameVerifier implements HostnameVerifier {
4747

4848
static final DefaultHostnameVerifier INSTANCE = new DefaultHostnameVerifier();
4949

50-
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHostnameVerifier.class);
50+
private static final InternalLogger logger =
51+
InternalLoggerFactory.getInstance(DefaultHostnameVerifier.class);
5152

5253
private static final boolean LOG_DEBUG = logger.isDebugEnabled();
5354

src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java

Lines changed: 43 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ final class ReactorNettyClient implements Client {
6969
private static final int ST_CLOSED = 2;
7070

7171
private static final AtomicIntegerFieldUpdater<ReactorNettyClient> STATE_UPDATER =
72-
AtomicIntegerFieldUpdater.newUpdater(ReactorNettyClient.class, "state");
72+
AtomicIntegerFieldUpdater.newUpdater(ReactorNettyClient.class, "state");
7373

7474
private volatile int state = ST_CONNECTED;
7575

@@ -80,7 +80,7 @@ final class ReactorNettyClient implements Client {
8080
private final Sinks.Many<ClientMessage> requests = Sinks.many().unicast().onBackpressureBuffer();
8181

8282
private final Sinks.Many<ServerMessage> responseProcessor =
83-
Sinks.many().multicast().onBackpressureBuffer(512, false);
83+
Sinks.many().multicast().onBackpressureBuffer(512, false);
8484

8585
private final RequestQueue requestQueue = new RequestQueue();
8686

@@ -89,7 +89,7 @@ final class ReactorNettyClient implements Client {
8989
requireNonNull(context, "context must not be null");
9090
requireNonNull(ssl, "ssl must not be null");
9191
require(responseProcessor.asFlux() instanceof Subscriber,
92-
"responseProcessor(" + responseProcessor + ") must be a Subscriber");
92+
"responseProcessor(" + responseProcessor + ") must be a Subscriber");
9393

9494
this.connection = connection;
9595
this.context = context;
@@ -155,11 +155,11 @@ public <T> Flux<T> exchange(ClientMessage request,
155155
}
156156

157157
Flux<T> responses = OperatorUtils.discardOnCancel(
158-
responseProcessor.asFlux()
159-
.doOnSubscribe(ignored -> emitNextRequest(request))
160-
.handle(handler)
161-
.doOnTerminate(requestQueue))
162-
.doOnDiscard(ReferenceCounted.class, ReferenceCounted::release);
158+
responseProcessor.asFlux()
159+
.doOnSubscribe(ignored -> emitNextRequest(request))
160+
.handle(handler)
161+
.doOnTerminate(requestQueue)
162+
).doOnDiscard(ReferenceCounted.class, ReferenceCounted::release);
163163

164164
requestQueue.submit(RequestTask.wrap(request, sink, responses));
165165
}).flatMapMany(Function.identity());
@@ -181,17 +181,16 @@ public <T> Flux<T> exchange(FluxExchangeable<T> exchangeable) {
181181
}
182182

183183
Flux<T> responses = responseProcessor
184-
.asFlux()
185-
.doOnSubscribe(ignored -> exchangeable.subscribe(
186-
this::emitNextRequest,
187-
e ->
188-
requests.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST))
189-
)
190-
.handle(exchangeable)
191-
.doOnTerminate(() -> {
192-
exchangeable.dispose();
193-
requestQueue.run();
194-
});
184+
.asFlux()
185+
.doOnSubscribe(ignored -> exchangeable.subscribe(
186+
this::emitNextRequest,
187+
e -> requests.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST)
188+
))
189+
.handle(exchangeable)
190+
.doOnTerminate(() -> {
191+
exchangeable.dispose();
192+
requestQueue.run();
193+
});
195194

196195
requestQueue.submit(RequestTask.wrap(exchangeable, sink, OperatorUtils.discardOnCancel(responses)
197196
.doOnDiscard(ReferenceCounted.class, ReferenceCounted::release)
@@ -201,36 +200,32 @@ public <T> Flux<T> exchange(FluxExchangeable<T> exchangeable) {
201200

202201
@Override
203202
public Mono<Void> close() {
204-
return Mono
205-
.<Mono<Void>>create(sink -> {
206-
if (state == ST_CLOSED) {
207-
logger.debug("Close request ignored (connection already closed)");
208-
sink.success();
209-
return;
210-
}
203+
return Mono.<Mono<Void>>create(sink -> {
204+
if (state == ST_CLOSED) {
205+
logger.debug("Close request ignored (connection already closed)");
206+
sink.success();
207+
return;
208+
}
209+
210+
logger.debug("Close request accepted");
211211

212-
logger.debug("Close request accepted");
213-
214-
requestQueue.submit(RequestTask.wrap(sink, Mono.fromRunnable(() -> {
215-
Sinks.EmitResult result = requests.tryEmitNext(ExitMessage.INSTANCE);
216-
217-
if (result != Sinks.EmitResult.OK) {
218-
logger.error("Exit message sending failed due to {}, force closing", result);
219-
} else {
220-
if (STATE_UPDATER.compareAndSet(this, ST_CONNECTED, ST_CLOSING)) {
221-
logger.debug("Exit message sent");
222-
} else {
223-
logger.debug("Exit message sent (duplicated / connection already closed)");
224-
}
225-
}
226-
})));
227-
})
228-
.flatMap(Function.identity())
229-
.onErrorResume(e -> {
230-
logger.error("Exit message sending failed, force closing", e);
231-
return Mono.empty();
232-
})
233-
.then(forceClose());
212+
requestQueue.submit(RequestTask.wrap(sink, Mono.fromRunnable(() -> {
213+
Sinks.EmitResult result = requests.tryEmitNext(ExitMessage.INSTANCE);
214+
215+
if (result != Sinks.EmitResult.OK) {
216+
logger.error("Exit message sending failed due to {}, force closing", result);
217+
} else {
218+
if (STATE_UPDATER.compareAndSet(this, ST_CONNECTED, ST_CLOSING)) {
219+
logger.debug("Exit message sent");
220+
} else {
221+
logger.debug("Exit message sent (duplicated / connection already closed)");
222+
}
223+
}
224+
})));
225+
}).flatMap(Function.identity()).onErrorResume(e -> {
226+
logger.error("Exit message sending failed, force closing", e);
227+
return Mono.empty();
228+
}).then(forceClose());
234229
}
235230

236231
@Override

src/main/java/io/asyncer/r2dbc/mysql/client/RequestTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,19 +79,19 @@ static <T> RequestTask<T> wrap(ClientMessage message, MonoSink<T> sink, T suppli
7979
task = new RequestTask<>(null, sink, supplier);
8080

8181
}
82-
sink.onCancel(() -> task.cancel0());
82+
sink.onCancel(task::cancel0);
8383
return task;
8484
}
8585

8686
static <T> RequestTask<T> wrap(Flux<? extends ClientMessage> messages, MonoSink<T> sink, T supplier) {
8787
final RequestTask<T> task = new RequestTask<>(new DisposableFlux(messages), sink, supplier);
88-
sink.onCancel(() -> task.cancel0());
88+
sink.onCancel(task::cancel0);
8989
return task;
9090
}
9191

9292
static <T> RequestTask<T> wrap(MonoSink<T> sink, T supplier) {
9393
final RequestTask<T> task = new RequestTask<>(null, sink, supplier);
94-
sink.onCancel(() -> task.cancel0());
94+
sink.onCancel(task::cancel0);
9595
return task;
9696
}
9797

src/main/java/io/asyncer/r2dbc/mysql/codec/AbstractPrimitiveCodec.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.asyncer.r2dbc.mysql.codec;
1818

1919
import io.asyncer.r2dbc.mysql.MySqlColumnMetadata;
20-
import io.netty.buffer.ByteBufAllocator;
2120

2221
import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.require;
2322

src/main/java/io/asyncer/r2dbc/mysql/collation/CharsetTargets.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ final class CharsetTargets {
7878
* Looks like JVM not support "Cp895" (also KAMENICKY, KEYBCS2), but it has some close to "Cp852".
7979
* <p>
8080
* See also:
81-
* <ul><li>https://en.wikipedia.org/wiki/Kamenick%C3%BD_encoding</li><li>
82-
* https://en.wikipedia.org/wiki/Code_page_852</li></ul>
81+
* <ul><li><a href="https://en.wikipedia.org/wiki/Kamenick%C3%BD_encoding">Kamenick Encoding</a></li><li>
82+
* <a href="https://en.wikipedia.org/wiki/Code_page_852">Cp 852</a></li></ul>
8383
*/
8484
static final CharsetTarget KEYBCS2 = CP852;
8585

src/main/java/io/asyncer/r2dbc/mysql/internal/util/DiscardOnCancelSubscriber.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ public final void cancel() {
9898
}
9999

100100
@Override
101-
@SuppressWarnings("rawtypes")
102101
public final Object scanUnsafe(Attr key) {
103102
if (key == Attr.PARENT) {
104103
return this.s;
@@ -113,7 +112,6 @@ public final Object scanUnsafe(Attr key) {
113112
}
114113
}
115114

116-
@SuppressWarnings("unchecked")
117115
static <T> CoreSubscriber<T> create(CoreSubscriber<? super T> s, boolean fuseable) {
118116
if (fuseable) {
119117
if (s instanceof Fuseable.ConditionalSubscriber) {

src/main/java/io/asyncer/r2dbc/mysql/internal/util/FluxDiscardOnCancel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* client code expects to start a request/response conversation without any previous response state.
2929
* <p>
3030
* This is a slightly altered version of R2DBC SQL Server's implementation:
31-
* https://github.com/r2dbc/r2dbc-mssql
31+
* <a href="https://github.com/r2dbc/r2dbc-mssql">r2dbc-mssql</a>
3232
*/
3333
final class FluxDiscardOnCancel<T> extends FluxOperator<T, T> {
3434

0 commit comments

Comments
 (0)