Skip to content

Commit a82b0c7

Browse files
Fix cursor closing
1 parent 2b30311 commit a82b0c7

File tree

6 files changed

+167
-52
lines changed

6 files changed

+167
-52
lines changed

src/main/java/oracle/r2dbc/impl/OracleResultImpl.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -454,18 +454,17 @@ <T> Publisher<T> publishSegments(Function<Segment, T> mappingFunction) {
454454
private static final class ResultSetResult extends OracleResultImpl {
455455

456456
private final ResultSet resultSet;
457+
private final RowMetadataImpl metadata;
457458
private final ReactiveJdbcAdapter adapter;
458459

459460
private ResultSetResult(ResultSet resultSet, ReactiveJdbcAdapter adapter) {
460461
this.resultSet = resultSet;
462+
this.metadata = createRowMetadata(fromJdbc(resultSet::getMetaData));
461463
this.adapter = adapter;
462464
}
463465

464466
@Override
465467
<T> Publisher<T> publishSegments(Function<Segment, T> mappingFunction) {
466-
RowMetadataImpl metadata =
467-
createRowMetadata(fromJdbc(resultSet::getMetaData));
468-
469468
return adapter.publishRows(resultSet, jdbcReadable ->
470469
mappingFunction.apply(
471470
new RowSegmentImpl(createRow(jdbcReadable, metadata, adapter))));
@@ -605,7 +604,9 @@ <T> Publisher<T> publishSegments(Function<Segment, T> mappingFunction) {
605604
// Invoke publishSegments(Class, Function) rather than
606605
// publishSegments(Function) to update the state of the result; Namely,
607606
// the state that has the onConsumed Publisher emit a terminal signal.
608-
.concatWith(result.publishSegments(Segment.class, mappingFunction));
607+
.concatWith(result != null
608+
? result.publishSegments(Segment.class,mappingFunction)
609+
: Mono.empty());
609610
}
610611
}
611612

src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java

Lines changed: 105 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import io.r2dbc.spi.Result;
2929
import io.r2dbc.spi.Statement;
3030
import io.r2dbc.spi.Type;
31-
import oracle.r2dbc.impl.OracleR2dbcExceptions.ThrowingRunnable;
3231
import oracle.r2dbc.impl.OracleR2dbcExceptions.ThrowingSupplier;
3332
import org.reactivestreams.Publisher;
3433
import reactor.core.publisher.Flux;
@@ -50,7 +49,6 @@
5049
import java.util.Queue;
5150
import java.util.concurrent.atomic.AtomicBoolean;
5251
import java.util.concurrent.atomic.AtomicInteger;
53-
import java.util.concurrent.atomic.AtomicReference;
5452
import java.util.function.BiFunction;
5553
import java.util.function.Function;
5654
import java.util.stream.IntStream;
@@ -61,10 +59,10 @@
6159
import static oracle.r2dbc.impl.OracleR2dbcExceptions.requireNonNull;
6260
import static oracle.r2dbc.impl.OracleR2dbcExceptions.requireOpenConnection;
6361
import static oracle.r2dbc.impl.OracleR2dbcExceptions.runJdbc;
64-
import static oracle.r2dbc.impl.SqlTypeMap.toJdbcType;
6562
import static oracle.r2dbc.impl.OracleResultImpl.createCallResult;
6663
import static oracle.r2dbc.impl.OracleResultImpl.createGeneratedValuesResult;
6764
import static oracle.r2dbc.impl.OracleResultImpl.createUpdateCountResult;
65+
import static oracle.r2dbc.impl.SqlTypeMap.toJdbcType;
6866

6967
/**
7068
* <p>
@@ -719,7 +717,7 @@ private Publisher<OracleResultImpl> executeSql() {
719717
(preparedStatement, discardQueue) ->
720718
setBindValues(preparedStatement, currentBindValues, discardQueue),
721719
preparedStatement ->
722-
publishSqlResult(preparedStatement, currentFetchSize));
720+
publishSqlResult(preparedStatement, currentFetchSize, true));
723721
}
724722

725723
/**
@@ -795,24 +793,31 @@ private void requireAllParametersSet() {
795793
* returned {@code Publisher} emits 0 {@code Result}s if the {@code
796794
* preparedStatement} returns no update count, row data, or implicit results.
797795
* </p>
796+
* @param isCursorClosable {@code true} if the cursor can be closed if no
797+
* result is a {@code ResultSet}
798798
* @return A {@code Publisher} that emits the {@code Result}s of executing a
799799
* {@code preparedStatement}.
800800
*/
801801
private Publisher<OracleResultImpl> publishSqlResult(
802-
PreparedStatement preparedStatement, int fetchSize) {
802+
PreparedStatement preparedStatement, int fetchSize,
803+
boolean isCursorClosable) {
803804

804-
runJdbc(preparedStatement::closeOnCompletion);
805805
return Mono.from(publishSqlExecution(preparedStatement, fetchSize))
806806
.flatMapMany(isResultSet -> {
807807

808-
// Check if any Result is a ResultSet to see if closeOnCompletion
809-
// will close the cursor
810-
boolean isAnyResultSet = isResultSet;
811-
812-
// Create all Results before emitting any. The JDBC API should not be
813-
// called after emitting the first one. Once a Result is emitted,
814-
// user code may initiate another database call, and this would have
815-
// a JDBC API block the thread.
808+
// Retain Publishers that complete when an implicit ResultSet is
809+
// consumed.
810+
List<Publisher<Void>> implicitResultConsumptions = new ArrayList<>(0);
811+
812+
// Collect all Results into a List before any are emitted to user
813+
// code. Ideally, no JDBC API calls should occur after the first
814+
// Result is emitted; Once a Result has been emitted, user code may
815+
// initiate a new Statement execution, and the JDBC connection
816+
// becomes locked. If a JDBC call occurs while the connection is
817+
// locked, it will block the calling thread. This can potentially
818+
// cause a deadlock where all threads are blocked until the JDBC
819+
// connection is unlocked, and the JDBC connection can not become
820+
// unlocked until a thread is available.
816821
List<OracleResultImpl> results = new ArrayList<>(1);
817822
OracleResultImpl firstResult =
818823
getSqlResult(adapter, preparedStatement, isResultSet);
@@ -821,26 +826,55 @@ private Publisher<OracleResultImpl> publishSqlResult(
821826
results.add(firstResult);
822827

823828
do {
829+
// Move the statement to the next result, if any
824830
boolean isNextResultSet = fromJdbc(() ->
825831
preparedStatement.getMoreResults(
826832
PreparedStatement.KEEP_CURRENT_RESULT));
827-
isAnyResultSet |= isNextResultSet;
833+
834+
// Get the next result, if any
828835
OracleResultImpl nextResult =
829836
getSqlResult(adapter, preparedStatement, isNextResultSet);
830837

831-
if (nextResult != null)
832-
results.add(nextResult);
833-
else
838+
// Break out of this loop if there is no next result
839+
if (nextResult == null)
834840
break;
835841

842+
// If the result is an implicit ResultSet, then retain it's
843+
// consumption publisher
844+
if (isNextResultSet)
845+
implicitResultConsumptions.add(nextResult.onConsumed());
846+
847+
// Add the next result to the list of all results
848+
results.add(nextResult);
836849
} while (true);
837850

838-
// Close the cursor if no Results are a ResultSet. Otherwise, let
839-
// PreparedStatement.closeOnCompletion() close the cursor.
840-
if (!isAnyResultSet)
841-
runJdbc(preparedStatement::close);
851+
Publisher<OracleResultImpl> resultPublisher =
852+
Flux.fromIterable(results);
853+
854+
if (!isCursorClosable) {
855+
// Don't attempt to close the cursor if the caller provided
856+
// isCursorClosable as false
857+
return resultPublisher;
858+
}
859+
else if (implicitResultConsumptions.isEmpty()) {
860+
// If no result is a ResultSet, then the cursor can be closed now.
861+
// Otherwise, PreparedStatement.closeOnCompletion() will close the
862+
// cursor after the ResultSet emits the last row
863+
if (!isResultSet)
864+
runJdbc(preparedStatement::close);
865+
else
866+
runJdbc(preparedStatement::closeOnCompletion);
842867

843-
return Flux.fromIterable(results);
868+
return resultPublisher;
869+
}
870+
else {
871+
// If at least one Result is an implicit ResultSet, then
872+
// PreparedStatement.closeOnCompletion()
873+
return Flux.from(resultPublisher)
874+
.concatWith(Flux.merge(implicitResultConsumptions)
875+
.doFinally(signalType -> runJdbc(preparedStatement::close))
876+
.cast(OracleResultImpl.class));
877+
}
844878
});
845879
}
846880

@@ -912,9 +946,19 @@ private static OracleResultImpl getSqlResult(
912946
private Publisher<OracleResultImpl> publishCallResult(
913947
CallableStatement callableStatement, Object[] bindValues,
914948
int fetchSize) {
915-
return Flux.from(publishSqlResult(callableStatement, fetchSize))
916-
.concatWith(Mono.just(createCallResult(
917-
createOutParameterRow(callableStatement, bindValues))));
949+
950+
// Create a Result of OutParameters that are read from the
951+
// CallableStatement.
952+
OracleResultImpl callResult =
953+
createCallResult(createOutParameterRow(callableStatement, bindValues));
954+
955+
return Flux.concat(
956+
publishSqlResult(callableStatement, fetchSize, false),
957+
Mono.just(callResult)
958+
// Close the CallableStatement after the Result is consumed.
959+
.concatWith(Mono.from(callResult.onConsumed())
960+
.doOnTerminate(() -> runJdbc(callableStatement::close))
961+
.cast(OracleResultImpl.class)));
918962
}
919963

920964
/**
@@ -1035,6 +1079,7 @@ private Publisher<OracleResultImpl> executeGeneratingValues() {
10351079
" has executed a query that returns row data");
10361080
}
10371081
else {
1082+
runJdbc(preparedStatement::closeOnCompletion);
10381083
return createGeneratedValuesResult(
10391084
fromJdbc(preparedStatement::getUpdateCount),
10401085
fromJdbc(preparedStatement::getGeneratedKeys),
@@ -1083,6 +1128,7 @@ Publisher<OracleResultImpl> executeBatch() {
10831128

10841129
addImplicit();
10851130
Queue<Object[]> currentBatch = batch;
1131+
int batchSize = batch.size();
10861132
batch = new LinkedList<>();
10871133

10881134
// Index incremented with each update count
@@ -1096,7 +1142,7 @@ Publisher<OracleResultImpl> executeBatch() {
10961142
return Flux.from(adapter.publishBatchUpdate(preparedStatement))
10971143
// All update counts are collected into a single long[]
10981144
.collect(
1099-
() -> new long[currentBatch.size()],
1145+
() -> new long[batchSize],
11001146
(updateCounts, updateCount) ->
11011147
updateCounts[index.getAndIncrement()] = updateCount)
11021148
.map(updateCounts -> {
@@ -1106,15 +1152,25 @@ Publisher<OracleResultImpl> executeBatch() {
11061152
OracleResultImpl.createBatchUpdateResult(updateCounts));
11071153

11081154
// Close the cursor before emitting the Result
1109-
runJdbc(preparedStatement::closeOnCompletion);
1110-
1155+
runJdbc(preparedStatement::close);
11111156
return result;
11121157
})
1113-
.onErrorResume(error ->
1114-
error.getCause() instanceof BatchUpdateException
1115-
? Mono.just(OracleResultImpl.createBatchUpdateErrorResult(
1116-
(BatchUpdateException) error.getCause()))
1117-
: Mono.error(error));
1158+
.onErrorResume(error -> {
1159+
final Mono<OracleResultImpl> resultPublisher;
1160+
1161+
if (error.getCause() instanceof BatchUpdateException) {
1162+
resultPublisher = Mono.just(
1163+
OracleResultImpl.createBatchUpdateErrorResult(
1164+
(BatchUpdateException) error.getCause()));
1165+
}
1166+
else {
1167+
resultPublisher = Mono.error(error);
1168+
}
1169+
1170+
// Close the cursor before emitting the Result
1171+
runJdbc(preparedStatement::close);
1172+
return resultPublisher;
1173+
});
11181174
});
11191175
}
11201176

@@ -1435,19 +1491,27 @@ private static void registerOutParameters(
14351491
Function<T, Publisher<OracleResultImpl>> resultFunction) {
14361492

14371493
T preparedStatement = statementSupplier.get();
1494+
AtomicBoolean isResultEmitted = new AtomicBoolean(false);
14381495
return Flux.usingWhen(
14391496
Mono.just(new LinkedList<Publisher<Void>>()),
14401497
discardQueue ->
14411498
Flux.from(bindFunction.apply(preparedStatement, discardQueue))
1442-
.thenMany(resultFunction.apply(preparedStatement))
1443-
.onErrorResume(R2dbcException.class, r2dbcException ->
1444-
Mono.just(OracleResultImpl.createErrorResult(r2dbcException)))
1445-
.defaultIfEmpty(createUpdateCountResult(-1L)),
1499+
.thenMany(resultFunction.apply(preparedStatement)),
14461500
discardQueue ->
14471501
Flux.fromIterable(discardQueue)
14481502
.concatMapDelayError(Function.identity()))
1449-
.doOnCancel(() -> runJdbc(preparedStatement::close))
1450-
.doOnError(error -> runJdbc(preparedStatement::close));
1503+
.doOnNext(result -> isResultEmitted.set(true))
1504+
.onErrorResume(R2dbcException.class, r2dbcException ->
1505+
Mono.just(OracleResultImpl.createErrorResult(r2dbcException)))
1506+
.defaultIfEmpty(createUpdateCountResult(-1L))
1507+
.doFinally(signalType -> {
1508+
// Close the cursor if the publisher is cancelled or emits an error
1509+
// before a Result is emitted. Otherwise, the resultFunction should
1510+
// arrange for the cursor to be closed as it may need to remain open
1511+
// until the Result is consumed
1512+
if (! isResultEmitted.get())
1513+
runJdbc(preparedStatement::close);
1514+
});
14511515
}
14521516

14531517
/**
@@ -1464,6 +1528,7 @@ private static void registerOutParameters(
14641528
private static OracleResultImpl getWarnings(
14651529
PreparedStatement preparedStatement, OracleResultImpl result) {
14661530
SQLWarning warning = fromJdbc(preparedStatement::getWarnings);
1531+
runJdbc(preparedStatement::clearWarnings);
14671532
return warning == null
14681533
? result
14691534
: OracleResultImpl.createWarningResult(warning, result);

src/test/java/oracle/r2dbc/impl/OracleReableImplTest.java renamed to src/test/java/oracle/r2dbc/impl/OracleReadableImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
* {@link OracleReadableImpl} implements behavior that is specified in it's class
4242
* and method level javadocs.
4343
*/
44-
public class OracleReableImplTest {
44+
public class OracleReadableImplTest {
4545

4646
/**
4747
* Verifies the implementation of

src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1758,8 +1758,8 @@ public void testNoOutImplicitResult() {
17581758
IntStream.rangeClosed(0, 100)
17591759
.forEach(i -> insert.bind(0, i).add());
17601760
awaitOne(101, Flux.from(insert.execute())
1761-
.reduce(0, (updateCount, result) ->
1762-
updateCount + awaitOne(result.getRowsUpdated())));
1761+
.flatMap(Result::getRowsUpdated)
1762+
.reduce(0, (total, updateCount) -> total + updateCount));
17631763

17641764
// Create a procedure that returns a cursor
17651765
awaitExecution(connection.createStatement(
@@ -1868,8 +1868,8 @@ public void testOutAndImplicitResult() {
18681868
IntStream.rangeClosed(0, 100)
18691869
.forEach(i -> insert.bind(0, i).add());
18701870
awaitOne(101, Flux.from(insert.execute())
1871-
.reduce(0, (updateCount, result) ->
1872-
updateCount + awaitOne(result.getRowsUpdated())));
1871+
.flatMap(Result::getRowsUpdated)
1872+
.reduce(0, (total, updateCount) -> total + updateCount));
18731873

18741874
// Create a procedure that returns a cursor
18751875
awaitExecution(connection.createStatement(

0 commit comments

Comments
 (0)