Skip to content

Commit 0e3585e

Browse files
Remove Result.consumedFuture
1 parent 3185420 commit 0e3585e

File tree

2 files changed

+7
-76
lines changed

2 files changed

+7
-76
lines changed

src/main/java/oracle/r2dbc/impl/OracleBatchImpl.java

Lines changed: 3 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
package oracle.r2dbc.impl;
2323

2424
import java.sql.Connection;
25+
import java.sql.Statement;
2526
import java.time.Duration;
2627
import java.util.LinkedList;
2728
import java.util.Queue;
@@ -114,18 +115,6 @@ public Batch add(String sql) {
114115
* are executed in the order they were added. Calling this method clears all
115116
* statements that have been added to the current batch.
116117
* </p><p>
117-
* A {@code Result} emitted by the returned {@code Publisher} must be
118-
* <a href="OracleStatementImpl.html#fully-consumed-result">
119-
* fully-consumed
120-
* </a>
121-
* before the next {@code Result} is emitted. This ensures that a command in
122-
* the batch can not be executed while the {@code Result} of a previous
123-
* command is consumed concurrently. It is a known limitation of the Oracle
124-
* R2DBC Driver that concurrent operations on a single {@code Connection}
125-
* will result in blocked threads. Deferring {@code Statement} execution
126-
* until full consumption of the previous {@code Statement}'s {@code Result}
127-
* is necessary in order to avoid blocked threads.
128-
* </p><p>
129118
* If the execution of any statement in the sequence results in a failure,
130119
* then the returned publisher emits {@code onError} with an
131120
* {@link R2dbcException} that describes the failure, and all subsequent
@@ -147,42 +136,8 @@ public Publisher<OracleResultImpl> execute() {
147136
requireOpenConnection(jdbcConnection);
148137
Queue<OracleStatementImpl> currentStatements = statements;
149138
statements = new LinkedList<>();
150-
return publishBatch(currentStatements);
151-
}
152-
153-
/**
154-
* <p>
155-
* Publish a batch of {@code Result}s from {@code statements}. Each
156-
* {@code Result} is published serially with the consumption of the
157-
* previous {@code Result}.
158-
* </p><p>
159-
* This method returns an empty {@code Publisher} if {@code statements} is
160-
* empty. Otherwise, this method dequeues the next {@code Statement} and
161-
* executes it for a {@code Result}. After the {@code Result} has been
162-
* fully consumed, this method is invoked recursively to publish the {@code
163-
* Result}s of remaining {@code statements}.
164-
* </p>
165-
* @param statements A batch to executed.
166-
* @return {@code Publisher} of {@code statements} {@code Result}s
167-
*/
168-
private static Publisher<OracleResultImpl> publishBatch(
169-
Queue<OracleStatementImpl> statements) {
170-
171-
OracleStatementImpl next = statements.poll();
172-
173-
if (next != null) {
174-
AtomicReference<OracleResultImpl> lastResult =
175-
new AtomicReference<>(null);
176-
177-
return Flux.from(next.execute())
178-
.doOnNext(lastResult::set)
179-
.concatWith(Flux.defer(() ->
180-
Mono.from(lastResult.get().onConsumed())
181-
.thenMany(publishBatch(statements))));
182-
}
183-
else {
184-
return Mono.empty();
185-
}
139+
return Flux.fromIterable(currentStatements)
140+
.concatMap(OracleStatementImpl::execute);
186141
}
187142

188143
}

src/main/java/oracle/r2dbc/impl/OracleResultImpl.java

Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import java.sql.ResultSet;
3737
import java.sql.SQLWarning;
3838
import java.util.Objects;
39-
import java.util.concurrent.CompletableFuture;
4039
import java.util.concurrent.atomic.AtomicBoolean;
4140
import java.util.concurrent.atomic.AtomicReference;
4241
import java.util.function.BiFunction;
@@ -82,15 +81,6 @@ abstract class OracleResultImpl implements Result {
8281
*/
8382
private boolean isPublished = false;
8483

85-
/**
86-
* Future that is completed when this {@code Result} has been
87-
* <a href="OracleStatementImpl.html#fully-consumed-result">
88-
* fully-consumed
89-
* </a>.
90-
*/
91-
private final CompletableFuture<Void> consumedFuture =
92-
new CompletableFuture<>();
93-
9484
/**
9585
* Reference to a publisher that must be subscribed to after all segments of
9686
* this result have been consumed. The reference is updated to {@code null}
@@ -125,11 +115,10 @@ abstract <T> Publisher<T> publishSegments(
125115
* as well.
126116
* </p><p>
127117
* When the returned publisher terminates with {@code onComplete},
128-
* {@code onError}, or {@code cancel}, the {@link #consumedFuture} is
129-
* completed and the {@link #onConsumed} publisher is subscribed to. The
130-
* {@code onConsumed} reference is update to {@code null} so that
131-
* post-consumption calls to {@link #onConsumed(Publisher)} can detect that
132-
* this result is already consumed.
118+
* {@code onError}, or {@code cancel}, the {@link #onConsumed} publisher is
119+
* subscribed to. The {@code onConsumed} reference is updated to {@code null}
120+
* so that post-consumption calls to {@link #onConsumed(Publisher)} can detect
121+
* that this result is already consumed.
133122
* </p><p>
134123
* The returned {@code Publisher} emits {@code onError} with an
135124
* {@link R2dbcException} if this {@code Result} has a {@link Message} segment
@@ -151,7 +140,6 @@ private <T extends Segment, U> Publisher<U> publishSegments(
151140
setPublished();
152141

153142
Mono<U> whenConsumed = Mono.defer(() -> {
154-
consumedFuture.complete(null);
155143
Publisher<Void> consumedPublisher = onConsumed.getAndSet(null);
156144
return consumedPublisher == null
157145
? Mono.empty()
@@ -279,18 +267,6 @@ public OracleResultImpl filter(Predicate<Segment> filter) {
279267
return new FilteredResult(this, filter);
280268
}
281269

282-
/**
283-
* Returns a {@code Publisher} that emits {@code onComplete} when this
284-
* {@code Result} has been
285-
* <a href="OracleStatementImpl.html#fully-consumed-result">
286-
* fully-consumed
287-
* </a>.
288-
* @return {@code Publisher} of this {@code Result}'s consumption
289-
*/
290-
final Publisher<Void> onConsumed() {
291-
return Mono.fromCompletionStage(consumedFuture);
292-
}
293-
294270
/**
295271
* <p>
296272
* Sets a publisher that is subscribed to when all segments of this result

0 commit comments

Comments
 (0)