Skip to content

Commit f4cdca0

Browse files
Emit IllegalStateException for missing parameters of batch
1 parent 546a35b commit f4cdca0

File tree

2 files changed

+32
-8
lines changed

2 files changed

+32
-8
lines changed

src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.r2dbc.spi.OutParameters;
2626
import io.r2dbc.spi.Parameter;
2727
import io.r2dbc.spi.R2dbcException;
28+
import io.r2dbc.spi.R2dbcNonTransientException;
2829
import io.r2dbc.spi.Result;
2930
import io.r2dbc.spi.Statement;
3031
import io.r2dbc.spi.Type;
@@ -1126,8 +1127,19 @@ Publisher<OracleResultImpl> executeBatch() {
11261127
"Batch execution with generated values is not supported");
11271128
}
11281129

1130+
// If parameters are not set, then capture the error and then emit it after
1131+
// the result of executing with all previously added binds
1132+
IllegalStateException missingParameters = null;
1133+
try {
1134+
add();
1135+
}
1136+
catch (IllegalStateException illegalStateException) {
1137+
missingParameters = illegalStateException;
1138+
}
1139+
Mono<OracleResultImpl> missingParametersMono = missingParameters == null
1140+
? Mono.empty()
1141+
: Mono.error(missingParameters);
11291142

1130-
add(); // TODO: Catch and emit IllegalStateException as R2dbcException?
11311143
Queue<Object[]> currentBatch = batch;
11321144
int batchSize = batch.size();
11331145
batch = new LinkedList<>();
@@ -1171,7 +1183,8 @@ Publisher<OracleResultImpl> executeBatch() {
11711183
// Close the cursor before emitting the Result
11721184
runJdbc(preparedStatement::close);
11731185
return resultPublisher;
1174-
});
1186+
})
1187+
.concatWith(missingParametersMono);
11751188
});
11761189
}
11771190

src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.r2dbc.spi.Parameter;
2626
import io.r2dbc.spi.Parameters;
2727
import io.r2dbc.spi.R2dbcException;
28+
import io.r2dbc.spi.R2dbcNonTransientException;
2829
import io.r2dbc.spi.R2dbcType;
2930
import io.r2dbc.spi.Result;
3031
import io.r2dbc.spi.Result.Message;
@@ -35,6 +36,7 @@
3536
import org.reactivestreams.Publisher;
3637
import reactor.core.publisher.Flux;
3738
import reactor.core.publisher.Mono;
39+
import reactor.core.publisher.Signal;
3840

3941
import java.sql.RowId;
4042
import java.sql.SQLWarning;
@@ -806,12 +808,21 @@ public void testAdd() {
806808
connection.createStatement("INSERT INTO table VALUES(:x, :y)")
807809
.bind("x", 0).bind("y", 1).add()
808810
.bind("x", 0).add());
809-
assertThrows(
810-
IllegalStateException.class,
811-
() ->
812-
connection.createStatement("INSERT INTO table VALUES(:x, :y)")
813-
.bind("x", 0).bind("y", 1).add()
814-
.bind("y", 1).execute()); // implicit add()
811+
812+
// Expect the statement to execute with previously added binds, and
813+
// then emit an error if binds are missing in the final set of binds.
814+
List<Signal<Integer>> signals =
815+
awaitOne(Flux.from(connection.createStatement(
816+
"INSERT INTO testAdd VALUES (:x, :y)")
817+
.bind("x", 0).bind("y", 1).add()
818+
.bind("y", 1).execute())
819+
.flatMap(Result::getRowsUpdated)
820+
.materialize()
821+
.collectList());
822+
assertEquals(2, signals.size());
823+
assertEquals(1, signals.get(0).get());
824+
assertTrue(
825+
signals.get(1).getThrowable() instanceof IllegalStateException);
815826

816827
}
817828
finally {

0 commit comments

Comments
 (0)