Skip to content

Commit e56eafd

Browse files
Support EXECUTOR option
1 parent d2b1a15 commit e56eafd

File tree

4 files changed

+96
-11
lines changed

4 files changed

+96
-11
lines changed

src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryImpl.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,15 @@
2828
import io.r2dbc.spi.IsolationLevel;
2929
import io.r2dbc.spi.R2dbcException;
3030
import io.r2dbc.spi.Statement;
31+
import oracle.r2dbc.OracleR2dbcOptions;
3132
import org.reactivestreams.Publisher;
3233
import reactor.core.publisher.Mono;
3334

3435
import javax.sql.DataSource;
3536
import java.time.Duration;
3637
import java.util.Optional;
38+
import java.util.concurrent.Executor;
39+
import java.util.concurrent.ForkJoinPool;
3740

3841
/**
3942
* <p>
@@ -102,6 +105,12 @@ final class OracleConnectionFactoryImpl implements ConnectionFactory {
102105
/** JDBC data source that this factory uses to open connections */
103106
private final DataSource dataSource;
104107

108+
/**
109+
* Executor configured by {@link oracle.r2dbc.OracleR2dbcOptions#EXECUTOR},
110+
* or a default one if none was configured.
111+
*/
112+
private final Executor executor;
113+
105114
/**
106115
* <p>
107116
* Timeout applied to the execution of {@link Statement}s created by
@@ -189,6 +198,19 @@ final class OracleConnectionFactoryImpl implements ConnectionFactory {
189198
: Duration.parse(timeout.toString()))
190199
.orElse(Duration.ZERO);
191200

201+
Object executor = options.getValue(OracleR2dbcOptions.EXECUTOR);
202+
if (executor == null) {
203+
this.executor = ForkJoinPool.commonPool();
204+
}
205+
else if (executor instanceof Executor) {
206+
this.executor = (Executor) executor;
207+
}
208+
else {
209+
throw new IllegalArgumentException(
210+
"Value of " + OracleR2dbcOptions.EXECUTOR
211+
+ " is not an instance of Executor: " + executor.getClass());
212+
}
213+
192214
}
193215

194216
/**
@@ -221,7 +243,7 @@ public Publisher<Connection> create() {
221243
// to a particular connection.
222244
ReactiveJdbcAdapter adapter = ReactiveJdbcAdapter.getOracleAdapter();
223245

224-
return Mono.fromDirect(adapter.publishConnection(dataSource))
246+
return Mono.fromDirect(adapter.publishConnection(dataSource, executor))
225247
.flatMap(conn -> {
226248
OracleConnectionImpl connection =
227249
new OracleConnectionImpl(conn, adapter);

src/main/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapter.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.Set;
5858
import java.util.concurrent.CompletableFuture;
5959
import java.util.concurrent.ConcurrentLinkedDeque;
60+
import java.util.concurrent.Executor;
6061
import java.util.concurrent.Flow;
6162
import java.util.concurrent.atomic.AtomicBoolean;
6263
import java.util.concurrent.atomic.AtomicInteger;
@@ -108,7 +109,10 @@
108109
* {@link UsingConnectionSubscriber} for more details.
109110
* </li>
110111
* </ul><p>
111-
* A instance of this class is obtained by invoking {@link #getInstance()}.
112+
* A instance of this class is obtained by invoking {@link #getInstance()}. A
113+
* new instance should be created each time a JDBC {@code Connection} is
114+
* created, and that instance should be used to execute database calls with
115+
* that {@code Connection} only.
112116
* </p><p>
113117
* All JDBC type parameters supplied to the methods of this class must
114118
* {@linkplain Wrapper#isWrapperFor(Class) wrap} an Oracle JDBC interface
@@ -137,7 +141,7 @@ final class OracleReactiveJdbcAdapter implements ReactiveJdbcAdapter {
137141
* does or how it should be configured.
138142
*/
139143
private static final Set<Option<CharSequence>>
140-
SUPPORTED_CONNECTION_PROPERTY_OPTIONS = Set.of(
144+
JDBC_CONNECTION_PROPERTY_OPTIONS = Set.of(
141145

142146
// Support TNS_ADMIN (tnsnames.ora, ojdbc.properties).
143147
OracleR2dbcOptions.TNS_ADMIN,
@@ -489,10 +493,9 @@ private static void configureStandardOptions(
489493

490494
/**
491495
* Configures an {@code oracleDataSource} with the values of extended R2DBC
492-
* {@code Options}. Extended options are those declared as
493-
* {@link #SUPPORTED_CONNECTION_PROPERTY_OPTIONS} in this class. The values
494-
* of these options are used to configure the {@code oracleDataSource} as
495-
* specified in the javadoc of
496+
* {@code Options}. Extended options are those declared in
497+
* {@link OracleR2dbcOptions}. The values of these options are used to
498+
* configure the {@code oracleDataSource} as specified in the javadoc of
496499
* {@link #createDataSource(ConnectionFactoryOptions)}
497500
* @param oracleDataSource An data source to configure
498501
* @param options R2DBC options. Not null.
@@ -509,8 +512,8 @@ private static void configureExtendedOptions(
509512
OracleConnection.CONNECTION_PROPERTY_TNS_ADMIN, tnsAdmin.toString()));
510513
}
511514

512-
// Apply any extended options as connection properties
513-
for (Option<CharSequence> option : SUPPORTED_CONNECTION_PROPERTY_OPTIONS) {
515+
// Apply any JDBC connection property options
516+
for (Option<CharSequence> option : JDBC_CONNECTION_PROPERTY_OPTIONS) {
514517
// Using Object as the value type allows options to be set as types like
515518
// Boolean or Integer. These types make sense for numeric or boolean
516519
// connection property values, such as statement cache size, or enable x.
@@ -662,11 +665,12 @@ private static void setPropertyIfAbsent(
662665
*/
663666
@Override
664667
public Publisher<? extends Connection> publishConnection(
665-
DataSource dataSource) {
668+
DataSource dataSource, Executor executor) {
666669
OracleDataSource oracleDataSource = unwrapOracleDataSource(dataSource);
667670
return Mono.from(adaptFlowPublisher(() ->
668671
oracleDataSource
669672
.createConnectionBuilder()
673+
.executorOracle(executor)
670674
.buildConnectionPublisherOracle()))
671675
.onErrorMap(R2dbcException.class, error ->
672676
error.getErrorCode() == 18714 // ORA-18714 : Login timeout expired

src/main/java/oracle/r2dbc/impl/ReactiveJdbcAdapter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.sql.PreparedStatement;
3939
import java.sql.ResultSet;
4040
import java.sql.SQLException;
41+
import java.util.concurrent.Executor;
4142
import java.util.function.Function;
4243

4344
/**
@@ -124,6 +125,8 @@ static ReactiveJdbcAdapter getOracleAdapter() throws R2dbcException {
124125
* Publishes a single JDBC {@link Connection} to a single subscriber. The
125126
* connection is established as if by invoking
126127
* {@link DataSource#getConnection()} on the specified {@code dataSource}.
128+
* The {@code Connection} is configured to execute asynchronous tasks using
129+
* the provided {@code executor}.
127130
* </p><p>
128131
* The {@code dataSource} is retained by the returned publisher, meaning
129132
* that any mutable state of the {@code dataSource} can affect the behavior
@@ -144,10 +147,12 @@ static ReactiveJdbcAdapter getOracleAdapter() throws R2dbcException {
144147
* </p>
145148
* @param dataSource JDBC data source that is configured to establish a
146149
* connection. Not null.
150+
* @param executor Executor to use for executing asynchronous tasks. Not null.
147151
* @return A publisher that emits a JDBC connection. Not null.
148152
* @throws R2dbcException If a database access error occurs.
149153
*/
150-
Publisher<? extends Connection> publishConnection(DataSource dataSource)
154+
Publisher<? extends Connection> publishConnection(
155+
DataSource dataSource, Executor executor)
151156
throws R2dbcException;
152157

153158
/**

src/test/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapterTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import io.r2dbc.spi.Result;
3030
import oracle.jdbc.OracleConnection;
3131
import oracle.jdbc.datasource.OracleDataSource;
32+
import oracle.r2dbc.OracleR2dbcOptions;
3233
import org.junit.jupiter.api.Test;
34+
import reactor.core.publisher.Flux;
3335
import reactor.core.publisher.Mono;
3436

3537
import java.io.IOException;
@@ -41,11 +43,17 @@
4143
import java.sql.SQLException;
4244
import java.time.Duration;
4345
import java.util.Properties;
46+
import java.util.Set;
4447
import java.util.concurrent.CompletableFuture;
4548
import java.util.concurrent.CompletionException;
4649
import java.util.concurrent.ExecutionException;
50+
import java.util.concurrent.Executor;
51+
import java.util.concurrent.ExecutorService;
52+
import java.util.concurrent.Executors;
4753
import java.util.concurrent.TimeUnit;
4854
import java.util.concurrent.TimeoutException;
55+
import java.util.concurrent.atomic.AtomicInteger;
56+
import java.util.stream.Collectors;
4957

5058
import static io.r2dbc.spi.ConnectionFactoryOptions.CONNECT_TIMEOUT;
5159
import static io.r2dbc.spi.ConnectionFactoryOptions.DATABASE;
@@ -415,6 +423,52 @@ public void testStatementTimeout() {
415423

416424
}
417425

426+
/**
427+
* Verifies the {@link oracle.r2dbc.OracleR2dbcOptions#EXECUTOR} option
428+
*/
429+
@Test
430+
public void testExecutorOption() {
431+
432+
// Create a custom executor that increments a count when Runnables are
433+
// submitted, and then delegates to a single threaded executor.
434+
AtomicInteger count = new AtomicInteger(0);
435+
ExecutorService singleThread = Executors.newSingleThreadExecutor();
436+
Executor testExecutor = runnable -> {
437+
count.incrementAndGet();
438+
singleThread.execute(runnable);
439+
};
440+
441+
// Create a connection that is configured to use the custom executor
442+
Connection connection = awaitOne(ConnectionFactories.get(
443+
ConnectionFactoryOptions.builder()
444+
.option(OracleR2dbcOptions.EXECUTOR, testExecutor)
445+
.option(DRIVER, "oracle")
446+
.option(HOST, host())
447+
.option(PORT, port())
448+
.option(DATABASE, serviceName())
449+
.option(USER, user())
450+
.option(PASSWORD, password())
451+
.build())
452+
.create());
453+
454+
try {
455+
// Make some asynchronous database calls and expect the executor's
456+
// count to be incremented
457+
awaitOne(Set.of(0, 1, 2, 3), Flux.merge(
458+
connection.createStatement("SELECT 0 FROM sys.dual").execute(),
459+
connection.createStatement("SELECT 1 FROM sys.dual").execute(),
460+
connection.createStatement("SELECT 2 FROM sys.dual").execute(),
461+
connection.createStatement("SELECT 3 FROM sys.dual").execute())
462+
.flatMap(result ->
463+
result.map(row -> row.get(0, Integer.class)))
464+
.collect(Collectors.toSet()));
465+
assertTrue(count.get() != 0);
466+
}
467+
finally {
468+
tryAwaitNone(connection.close());
469+
}
470+
}
471+
418472
/**
419473
* Verifies that an attempt to connect with a {@code listeningChannel}
420474
* results in an {@link R2dbcTimeoutException}.

0 commit comments

Comments
 (0)