Skip to content

Commit 6198f50

Browse files
Fix cursor leaks and verify the fix when testing
1 parent 395c281 commit 6198f50

File tree

6 files changed

+175
-45
lines changed

6 files changed

+175
-45
lines changed

src/main/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,10 @@ private static void configureExtendedOptions(
511511

512512
// Apply any extended options as connection properties
513513
for (Option<CharSequence> option : SUPPORTED_CONNECTION_PROPERTY_OPTIONS) {
514-
CharSequence value = options.getValue(option);
514+
// Using Object as the value type allows options to be set as types like
515+
// Boolean or Integer. These types make sense for numeric or boolean
516+
// connection property values, such as statement cache size, or enable x.
517+
Object value = options.getValue(option);
515518
if (value != null) {
516519
runOrHandleSQLException(() ->
517520
oracleDataSource.setConnectionProperty(

src/main/java/oracle/r2dbc/impl/OracleResultImpl.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import static oracle.r2dbc.impl.OracleR2dbcExceptions.getOrHandleSQLException;
3737
import static oracle.r2dbc.impl.OracleR2dbcExceptions.requireNonNull;
38+
import static oracle.r2dbc.impl.OracleR2dbcExceptions.runOrHandleSQLException;
3839

3940
/**
4041
* <p>
@@ -90,9 +91,13 @@ <T> Publisher<T> publishRows(
9091
}
9192

9293
/**
94+
* <p>
9395
* Creates a {@code Result} that either publishes a {@code ResultSet} of
9496
* row data from a query, or publishes an update count as an empty stream.
95-
*
97+
* </p><p>
98+
* The {@link java.sql.Statement} that created the {@code resultSet} is closed
99+
* when the returned result is fully consumed.
100+
* </p>
96101
* @param adapter Adapts {@code ResultSet} API calls into reactive streams.
97102
* Not null.
98103
* @param resultSet Row data to publish
@@ -105,19 +110,29 @@ public static Result createQueryResult(
105110

106111
@Override
107112
Publisher<Integer> publishUpdateCount() {
113+
runOrHandleSQLException(() ->
114+
resultSet.getStatement().close());
108115
return Mono.empty();
109116
}
110117

111118
@Override
112119
<T> Publisher<T> publishRows(
113120
BiFunction<Row, RowMetadata, ? extends T> mappingFunction) {
114121

122+
// Obtain a reference to the statement before the ResultSet is
123+
// logically closed by its row publisher. The statement is closed when
124+
// the publisher terminates.
125+
java.sql.Statement jdbcStatement =
126+
getOrHandleSQLException(resultSet::getStatement);
127+
115128
OracleRowMetadataImpl metadata = new OracleRowMetadataImpl(
116129
getOrHandleSQLException(resultSet::getMetaData));
117130

118-
return Flux.from(adapter.publishRows(resultSet, jdbcRow ->
131+
return Flux.<T>from(adapter.publishRows(resultSet, jdbcRow ->
119132
mappingFunction.apply(
120-
new OracleRowImpl(jdbcRow, metadata, adapter), metadata)));
133+
new OracleRowImpl(jdbcRow, metadata, adapter), metadata)))
134+
.doFinally(signalType ->
135+
runOrHandleSQLException(jdbcStatement::close));
121136
}
122137
};
123138
}
@@ -128,10 +143,15 @@ <T> Publisher<T> publishRows(
128143
* {@link PreparedStatement#getGeneratedKeys()} {@code ResultSet}, or
129144
* publishes an {@code updateCount}.
130145
* </p><p>
146+
* The {@link java.sql.Statement} that created the {@code ResultSet} is closed
147+
* when the {@code Publisher} returned by this method emits a
148+
* {@code Result}.
149+
* </p><p>
131150
* For compliance with R2DBC standards, a {@code Row} of generated column
132151
* values will remain valid after the {@code Connection} that created them
133152
* is closed. This behavior is verified by version 0.8.2 of
134-
* {@code io.r2dbc.spi.test.TestKit#returnGeneratedValues()}.
153+
* {@code io.r2dbc.spi.test.TestKit#returnGeneratedValues()}. The {@code Rows}
154+
* of generated value
135155
* </p>
136156
*
137157
* @implNote
@@ -167,16 +187,25 @@ public static Publisher<Result> createGeneratedValuesResult(
167187

168188
// Avoid invoking ResultSet.getMetaData() on an empty ResultSet, it may
169189
// throw a SQLException
170-
if (! getOrHandleSQLException(values::isBeforeFirst))
190+
if (! getOrHandleSQLException(values::isBeforeFirst)) {
191+
runOrHandleSQLException(() -> values.getStatement().close());
171192
return Mono.just(createUpdateCountResult(updateCount));
193+
}
172194

173195
// Obtain metadata before the ResultSet is closed by publishRows(...)
174196
OracleRowMetadataImpl metadata =
175197
new OracleRowMetadataImpl(getOrHandleSQLException(values::getMetaData));
176198

199+
// Obtain a reference to the statement before the ResultSet is
200+
// logically closed by its row publisher. The statement is closed when
201+
// the publisher terminates.
202+
java.sql.Statement jdbcStatement =
203+
getOrHandleSQLException(values::getStatement);
204+
177205
return Flux.from(adapter.publishRows(
178206
values, ReactiveJdbcAdapter.JdbcRow::copy))
179207
.collectList()
208+
.doFinally(signalType -> runOrHandleSQLException(jdbcStatement::close))
180209
.map(cachedRows -> new OracleResultImpl() {
181210

182211
@Override

src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.Queue;
4242
import java.util.concurrent.atomic.AtomicBoolean;
4343
import java.util.function.BiFunction;
44+
import java.util.function.Function;
4445

4546
import static oracle.r2dbc.impl.OracleR2dbcExceptions.getOrHandleSQLException;
4647
import static oracle.r2dbc.impl.OracleR2dbcExceptions.requireNonNull;
@@ -546,8 +547,8 @@ private Publisher<Result> createResultPublisher(
546547

547548
/**
548549
* <p>
549-
* Executes a JDBC statement with a single, non-batched, set of parameters.
550-
* If the execution returns a {@link java.sql.ResultSet} then the
550+
* Executes a JDBC statement with a single, non-batched, set of {@code
551+
* bindValues}. If the execution results in a {@link java.sql.ResultSet} then the
551552
* {@code jdbcStatement} is closed after the {@code ResultSet} is fully
552553
* consumed by {@link Result#map(BiFunction)}. Otherwise, if the execution
553554
* only produces an update count, then the {@code jdbcStatement} is closed
@@ -569,7 +570,6 @@ private Publisher<Result> executeSingle(
569570
return Mono.from(adapter.publishSQLExecution(jdbcStatement))
570571
.map(isResultSet -> {
571572
if (isResultSet) {
572-
runOrHandleSQLException(jdbcStatement::closeOnCompletion);
573573
return OracleResultImpl.createQueryResult(
574574
adapter, getOrHandleSQLException(jdbcStatement::getResultSet));
575575
}
@@ -614,22 +614,27 @@ private Publisher<Result> executeBatch(
614614

615615
// Execute the batch. The execution won't return a ResultSet, so the JDBC
616616
// statement is closed immediately after the execution completes.
617-
runOrHandleSQLException(jdbcStatement::closeOnCompletion);
618617
return Flux.from(adapter.publishBatchUpdate(jdbcStatement))
619618
.map(updateCount ->
620-
OracleResultImpl.createUpdateCountResult(Math.toIntExact(updateCount)));
619+
OracleResultImpl.createUpdateCountResult(Math.toIntExact(updateCount)))
620+
.doFinally(signalType -> runOrHandleSQLException(jdbcStatement::close));
621621
}
622622

623623
/**
624624
* <p>
625-
* Executes a key generating {@code jdbcStatement} for each set
626-
* of bind values in a {@code batch}. The {@code jdbcStatement} is closed
627-
* after all executions have completed. If any execution results in an
628-
* error, subsequent executions are skipped.
625+
* Executes a JDBC statement with a single, non-batched, set of {@code
626+
* bindValues}. If the execution results in a {@link java.sql.ResultSet} then
627+
* the {@code jdbcStatement} is closed after the {@code ResultSet} is fully
628+
* consumed by {@link Result#map(BiFunction)}. If the execution results in
629+
* an update count and/or values generated by DML, then the {@code
630+
* jdbcStatement} is closed after the returned {@code Publisher} emits a
631+
* {@code Result} with all generated values cached, such that
632+
* {@link Result#map(BiFunction)} may be called after the database connection
633+
* has been closed.
629634
* </p><p>
630-
* The returned {@code Publisher} emits a {@code Result} with an update
631-
* count and generated values for each execution of the {@code
632-
* jdbcStatement}.
635+
* The returned publisher initiates SQL execution <i>the first time</i> a
636+
* subscriber subscribes, before the subscriber emits a {@code request}
637+
* signal.
633638
* </p>
634639
*
635640
* @implNote The 21c Oracle JDBC Driver does not support batch DML when
@@ -638,25 +643,23 @@ private Publisher<Result> executeBatch(
638643
*
639644
* @param jdbcStatement A JDBC statement
640645
* @param bindValues A set of bind values
641-
* @return A publisher that emits the {@code Results} of executing the
642-
* JDBC statement for each set of bind values in the {@code batch}
646+
* @return A publisher that emits the {@code Result} of executing the
647+
* JDBC statement.
643648
*/
644649
private Publisher<Result> executeGeneratingValues(
645650
PreparedStatement jdbcStatement, Object[] bindValues) {
646651

647652
setJdbcBindValues(bindValues, jdbcStatement);
648653
return Mono.from(adapter.publishSQLExecution(jdbcStatement))
649-
.flatMap(isResultSet -> {
650-
runOrHandleSQLException(jdbcStatement::closeOnCompletion);
651-
return isResultSet
654+
.flatMap(isResultSet ->
655+
isResultSet
652656
? Mono.just(OracleResultImpl.createQueryResult(
653657
adapter,
654658
getOrHandleSQLException(jdbcStatement::getResultSet)))
655659
: Mono.from(OracleResultImpl.createGeneratedValuesResult(
656660
adapter,
657661
getOrHandleSQLException(jdbcStatement::getUpdateCount),
658-
getOrHandleSQLException(jdbcStatement::getGeneratedKeys)));
659-
});
662+
getOrHandleSQLException(jdbcStatement::getGeneratedKeys))));
660663
}
661664

662665
/**

src/test/java/oracle/r2dbc/DatabaseConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import io.r2dbc.spi.ConnectionFactories;
2626
import io.r2dbc.spi.ConnectionFactory;
2727
import io.r2dbc.spi.ConnectionFactoryOptions;
28+
import io.r2dbc.spi.Option;
29+
import oracle.jdbc.OracleConnection;
2830
import oracle.r2dbc.util.SharedConnectionFactory;
2931
import org.reactivestreams.Publisher;
3032

@@ -216,6 +218,11 @@ public static int databaseVersion() {
216218
.option(ConnectionFactoryOptions.DATABASE, SERVICE_NAME)
217219
.option(ConnectionFactoryOptions.USER, USER)
218220
.option(ConnectionFactoryOptions.PASSWORD, PASSWORD)
221+
// Disable statement caching in order to verify cursor closing;
222+
// Cached statements don't close their cursors
223+
.option(Option.valueOf(
224+
OracleConnection.CONNECTION_PROPERTY_IMPLICIT_STATEMENT_CACHE_SIZE),
225+
0)
219226
.build());
220227

221228
SHARED_CONNECTION_FACTORY = new SharedConnectionFactory(

src/test/java/oracle/r2dbc/impl/OracleConnectionImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ public void testSetAutoCommit() {
559559
selectInSessionB);
560560
}
561561
finally {
562-
awaitNone(sessionA.close());
562+
awaitNone(sessionB.close());
563563
}
564564
}
565565
finally {

0 commit comments

Comments
 (0)