Skip to content

Commit 87dbc78

Browse files
Publish a SQLWarning chain as Result Message Segments
1 parent cd06ec9 commit 87dbc78

File tree

3 files changed

+165
-4
lines changed

3 files changed

+165
-4
lines changed

src/main/java/oracle/r2dbc/impl/OracleResultImpl.java

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,15 @@
3434

3535
import java.sql.BatchUpdateException;
3636
import java.sql.ResultSet;
37+
import java.sql.SQLWarning;
38+
import java.util.Objects;
3739
import java.util.concurrent.CompletableFuture;
3840
import java.util.concurrent.atomic.AtomicBoolean;
3941
import java.util.function.BiFunction;
4042
import java.util.function.Function;
4143
import java.util.function.Predicate;
4244
import java.util.stream.LongStream;
45+
import java.util.stream.Stream;
4346

4447
import static oracle.r2dbc.impl.OracleR2dbcExceptions.fromJdbc;
4548
import static oracle.r2dbc.impl.OracleR2dbcExceptions.requireNonNull;
@@ -376,6 +379,20 @@ static OracleResultImpl createErrorResult(R2dbcException r2dbcException) {
376379
return new ErrorResult(r2dbcException);
377380
}
378381

382+
/**
383+
* Creates a {@code Result} that publishes a {@code warning} as a
384+
* {@link Message} segment, followed by any {@code Segment}s of a
385+
* {@code result}.
386+
* @param warning Warning to publish
387+
* @param result Result to publisher
388+
* @return A {@code Result} for a {@code Statement} execution that
389+
* completed with a warning.
390+
*/
391+
static OracleResultImpl createWarningResult(
392+
SQLWarning warning, OracleResultImpl result) {
393+
return new WarningResult(warning, result);
394+
}
395+
379396
/**
380397
* {@link OracleResultImpl} subclass that publishes a single update count. An
381398
* instance of this class constructed with negative valued update count
@@ -392,16 +409,37 @@ private UpdateCountResult(long updateCount) {
392409
@Override
393410
<T> Publisher<T> publishSegments(Function<Segment, T> mappingFunction) {
394411
return updateCount >= 0
395-
? Mono.just(mappingFunction.apply(new UpdateCountImpl(updateCount)))
412+
? Mono.just(new UpdateCountImpl(updateCount))
413+
.map(mappingFunction)
396414
: Mono.empty();
397415
}
398416
}
399417

400418
/**
419+
* <p>
401420
* {@link OracleResultImpl} subclass that publishes JDBC {@link ResultSet} as
402421
* {@link RowSegment}s. {@link RowMetadata} of published {@code Rows} is
403422
* derived from the {@link java.sql.ResultSetMetaData} of the
404423
* {@link ResultSet}.
424+
* </p><p>
425+
* This {@code Result} is <i>not</i> implemented to publish
426+
* {@link SQLWarning} chains returned by {@link ResultSet#getWarnings()} as
427+
* {@link Message} segments. This implementation is correct for the 21.1
428+
* Oracle JDBC Driver which is known to implement {@code getWarnings()} by
429+
* returning {@code null} for forward-only insensitive {@code ResultSets}
430+
* when no invocation of {@link java.sql.Statement#setMaxRows(int)}
431+
* or {@link java.sql.Statement#setLargeMaxRows(long)} has occurred.
432+
* </p><p>
433+
* It is a known limitation of the 21.1 Oracle JDBC Driver that
434+
* {@link ResultSet#getWarnings()} can not be invoked after row publishing
435+
* has been initiated; The {@code ResultSet} is logically closed once row
436+
* publishing has been initiated, and so {@code getWarnings} would throw a
437+
* {@link java.sql.SQLException} to indicate a closed {@code ResultSet}. If
438+
* a later release of Oracle JDBC removes this limitation, then this
439+
* {@code Result} should be implemented to invoke {@code getWarnings} to
440+
* ensure correctness if a later release of Oracle JDBC also returns non-null
441+
* values from that method.
442+
* </p>
405443
*/
406444
private static final class ResultSetResult extends OracleResultImpl {
407445

@@ -482,8 +520,8 @@ private BatchUpdateResult(long[] updateCounts) {
482520
@Override
483521
<T> Publisher<T> publishSegments(Function<Segment, T> mappingFunction) {
484522
return Flux.fromStream(LongStream.of(updateCounts)
485-
.mapToObj(UpdateCountImpl::new)
486-
.map(mappingFunction));
523+
.mapToObj(UpdateCountImpl::new))
524+
.map(mappingFunction);
487525
}
488526
}
489527

@@ -532,6 +570,35 @@ <T> Publisher<T> publishSegments(Function<Segment, T> mappingFunction) {
532570
}
533571
}
534572

573+
/**
574+
* {@link OracleResultImpl} subclass that publishes a {@link SQLWarning}
575+
* chain as {@link Message} segments, followed by the segments of another
576+
* {@link OracleResultImpl}.
577+
*/
578+
private static final class WarningResult extends OracleResultImpl {
579+
580+
private final SQLWarning warning;
581+
private final OracleResultImpl result;
582+
583+
private WarningResult(SQLWarning warning, OracleResultImpl result) {
584+
this.warning = warning;
585+
this.result = result;
586+
}
587+
588+
@Override
589+
<T> Publisher<T> publishSegments(Function<Segment, T> mappingFunction) {
590+
return Flux.fromStream(Stream.iterate(
591+
warning, Objects::nonNull, SQLWarning::getNextWarning)
592+
.map(OracleR2dbcExceptions::toR2dbcException)
593+
.map(MessageImpl::new))
594+
.map(mappingFunction)
595+
// Invoke publishSegments(Class, Function) rather than
596+
// publishSegments(Function) to update the state of the result; Namely,
597+
// the state that has the onConsumed Publisher emit a terminal signal.
598+
.concatWith(result.publishSegments(Segment.class, mappingFunction));
599+
}
600+
}
601+
535602

536603
/**
537604
* Common interface for instances of {@link Segment} with a {@link Readable}

src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.sql.Connection;
4141
import java.sql.PreparedStatement;
4242
import java.sql.SQLType;
43+
import java.sql.SQLWarning;
4344
import java.time.Duration;
4445
import java.util.Arrays;
4546
import java.util.LinkedList;
@@ -819,7 +820,21 @@ private Publisher<Boolean> publishSqlExecution(
819820
PreparedStatement preparedStatement, int fetchSize) {
820821
runJdbc(() -> preparedStatement.setFetchSize(fetchSize));
821822
setQueryTimeout(preparedStatement);
822-
return adapter.publishSQLExecution(preparedStatement);
823+
return Mono.from(adapter.publishSQLExecution(preparedStatement))
824+
// Work around a bug in 21.1 Oracle JDBC that has the
825+
// OraclePreparedStatement.executeAsyncOracle Publisher emit onError
826+
// with a SQLException when the database returns a warning. In 21.3 it's
827+
// fixed so that the Publisher emits onComplete and the SQLWarning is
828+
// obtained from the usual call to PreparedStatement.getWarnings()
829+
// TODO: Remove this when 21.1 Oracle JDBC is no longer supported
830+
.onErrorResume(error ->
831+
// ORA-17110 is the error code for warnings. If the R2dbcException has
832+
// this code, then ignore it and return the normal boolean value
833+
// indicating if the result is a ResultSet or not
834+
error instanceof R2dbcException
835+
&& ((R2dbcException) error).getErrorCode() == 17110
836+
? Mono.just(null != fromJdbc(preparedStatement::getResultSet))
837+
: Mono.error(error));
823838
}
824839

825840
private void setQueryTimeout(PreparedStatement preparedStatement) {
@@ -1431,6 +1446,7 @@ private static void registerOutParameters(
14311446
.onErrorResume(R2dbcException.class, r2dbcException ->
14321447
Mono.just(OracleResultImpl.createErrorResult(r2dbcException)))
14331448
.defaultIfEmpty(createUpdateCountResult(-1L))
1449+
.map(result -> getWarnings(preparedStatement, result))
14341450
.doOnNext(lastResultRef::set),
14351451
discardQueue ->
14361452
Flux.fromIterable(discardQueue)
@@ -1443,6 +1459,25 @@ private static void registerOutParameters(
14431459
.doOnCancel((ThrowingRunnable)preparedStatement::close));
14441460
}
14451461

1462+
/**
1463+
* Returns a {@code Result} that publishes any {@link SQLWarning}s of a
1464+
* {@code preparedStatement} as {@link io.r2dbc.spi.Result.Message}
1465+
* segments followed by any {@code Segments} of a {@code result}. This method
1466+
* returns the provided {@code result} if the {@code preparedStatement} has
1467+
* no warnings.
1468+
* @param preparedStatement Statement that may have warnings
1469+
* @param result Result of executing the {@code preparedStatement}
1470+
* @return A {@code Result} having any warning messages of the
1471+
* {@code preparedStatement} along with its execution {@code result}.
1472+
*/
1473+
private static OracleResultImpl getWarnings(
1474+
PreparedStatement preparedStatement, OracleResultImpl result) {
1475+
SQLWarning warning = fromJdbc(preparedStatement::getWarnings);
1476+
return warning == null
1477+
? result
1478+
: OracleResultImpl.createWarningResult(warning, result);
1479+
}
1480+
14461481
/**
14471482
* Returns an exception indicating that a parameter has not been set.
14481483
* @return Unset parameter exception

src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import io.r2dbc.spi.R2dbcException;
2828
import io.r2dbc.spi.R2dbcType;
2929
import io.r2dbc.spi.Result;
30+
import io.r2dbc.spi.Result.Message;
31+
import io.r2dbc.spi.Result.UpdateCount;
3032
import io.r2dbc.spi.Statement;
3133
import io.r2dbc.spi.Type;
3234
import org.junit.jupiter.api.Test;
@@ -36,6 +38,7 @@
3638

3739
import java.math.BigDecimal;
3840
import java.sql.RowId;
41+
import java.sql.SQLWarning;
3942
import java.util.Collections;
4043
import java.util.concurrent.atomic.AtomicInteger;
4144
import java.util.stream.Collectors;
@@ -57,7 +60,10 @@
5760
import static oracle.r2dbc.util.Awaits.tryAwaitExecution;
5861
import static oracle.r2dbc.util.Awaits.tryAwaitNone;
5962
import static org.junit.jupiter.api.Assertions.assertEquals;
63+
import static org.junit.jupiter.api.Assertions.assertNull;
6064
import static org.junit.jupiter.api.Assertions.assertThrows;
65+
import static org.junit.jupiter.api.Assertions.assertTrue;
66+
import static org.junit.jupiter.api.Assertions.fail;
6167

6268
/**
6369
* Verifies that
@@ -1959,6 +1965,59 @@ public void testOutAndImplicitResult() {
19591965
}
19601966
}
19611967

1968+
/**
1969+
* Verifies that {@link OracleStatementImpl#execute()} emits a {@link Result}
1970+
* with a {@link Message} segment when the execution results in a
1971+
* warning.
1972+
*/
1973+
@Test
1974+
public void testWarningMessage() {
1975+
Connection connection =
1976+
Mono.from(sharedConnection()).block(connectTimeout());
1977+
try {
1978+
1979+
// Create a procedure using invalid syntax and expect the Result to
1980+
// have a Message with an R2dbcException having a SQLWarning as it's
1981+
// initial cause. Expect the Result to have an update count of zero as
1982+
// well, indicating that the statement completed after the warning.
1983+
AtomicInteger segmentCount = new AtomicInteger(0);
1984+
R2dbcException r2dbcException =
1985+
awaitOne(Flux.from(connection.createStatement(
1986+
"CREATE OR REPLACE PROCEDURE testWarningMessage" +
1987+
" IS BEGIN;")
1988+
.execute())
1989+
.concatMap(result ->
1990+
result.flatMap(segment -> {
1991+
int index = segmentCount.getAndIncrement();
1992+
if (index == 0) {
1993+
assertTrue(segment instanceof Message,
1994+
"Unexpected Segment: " + segment);
1995+
return Mono.just(((Message)segment).exception());
1996+
}
1997+
else if (index == 1) {
1998+
assertTrue(segment instanceof UpdateCount,
1999+
"Unexpected Segment: " + segment);
2000+
assertEquals(0, ((UpdateCount)segment).value());
2001+
return Mono.empty();
2002+
}
2003+
else {
2004+
fail("Unexpected Segment: " + segment);
2005+
return Mono.error(new AssertionError("Should not reach here"));
2006+
}
2007+
})));
2008+
2009+
// Expect ORA-17110 for an execution that completed with a warning
2010+
assertEquals(17110, r2dbcException.getErrorCode());
2011+
Throwable cause = r2dbcException.getCause();
2012+
assertTrue(cause instanceof SQLWarning, "Unexpected cause: " + cause);
2013+
assertEquals(17110, ((SQLWarning)cause).getErrorCode());
2014+
assertNull(cause.getCause());
2015+
}
2016+
finally {
2017+
tryAwaitNone(connection.close());
2018+
}
2019+
}
2020+
19622021
// TODO: Repalce with Parameters.inOut when that's available
19632022
private static final class InOutParameter
19642023
implements Parameter, Parameter.In, Parameter.Out {

0 commit comments

Comments
 (0)