3838import org .reactivestreams .Publisher ;
3939import org .reactivestreams .Subscriber ;
4040import org .reactivestreams .Subscription ;
41- import reactor .core .publisher .DirectProcessor ;
4241import reactor .core .publisher .Flux ;
4342import reactor .core .publisher .Mono ;
4443
5958import java .util .concurrent .Executor ;
6059import java .util .concurrent .Flow ;
6160import java .util .concurrent .atomic .AtomicBoolean ;
62- import java .util .concurrent .atomic .AtomicLong ;
63- import java .util .concurrent .locks .ReentrantLock ;
6461import java .util .function .Function ;
6562import java .util .function .Supplier ;
6663
7572import static oracle .r2dbc .impl .OracleR2dbcExceptions .runJdbc ;
7673import static oracle .r2dbc .impl .OracleR2dbcExceptions .toR2dbcException ;
7774import static org .reactivestreams .FlowAdapters .toFlowPublisher ;
78- import static org .reactivestreams .FlowAdapters .toFlowSubscriber ;
7975import static org .reactivestreams .FlowAdapters .toPublisher ;
76+ import static org .reactivestreams .FlowAdapters .toSubscriber ;
8077
8178/**
8279 * <p>
104101 * Connection without blocking a thread. Oracle JDBC implements thread
105102 * safety by blocking threads, and this can cause deadlocks in common
106103 * R2DBC programming scenarios. See the JavaDoc of
107- * {@link UsingConnectionSubscriber } for more details.
104+ * {@link AsyncLock } for more details.
108105 * </li>
109106 * </ul><p>
110107 * A instance of this class is obtained by invoking {@link #getInstance()}. A
@@ -917,28 +914,19 @@ public Publisher<String> publishClobRead(Clob clob)
917914 * contents are always copied into a new byte array. In a later release,
918915 * avoiding the copy using {@link ByteBuffer#array()} can be worth
919916 * considering.
920- *
921- * @implNote The 21c {@code OracleBlob} subscriber violates Rule 2.7 of the
922- * Reactive Streams Specification, which prohibits concurrent calls to
923- * {@link Subscription#request(long)}. This can cause undefined behavior by
924- * the {@code contentPublisher}. To work around this bug, this method
925- * proxies the {@link Subscription} between the {@code contentPublisher}
926- * and the {@code OracleBlob} subscriber. The proxy ensures that
927- * {@code request} signals are delivered serially.
928917 */
929918 @ Override
930919 public Publisher <Void > publishBlobWrite (
931920 Publisher <ByteBuffer > contentPublisher , Blob blob ) {
932921 OracleBlob oracleBlob = castAsType (blob , OracleBlob .class );
933922
934- // TODO: Move subscriberOracleCall into adaptFlowPublisher, so that it
923+ // TODO: Move subscriberOracle Call into adaptFlowPublisher, so that it
935924 // avoids lock contention
936- // This processor emits a terminal signal when all blob writing database
937- // calls have completed
938- DirectProcessor <Long > writeOutcomeProcessor = DirectProcessor . create ();
925+ // This subscriber receives a terminal signal after JDBC completes the
926+ // LOB write.
927+ CompletionSubscriber <Long > outcomeSubscriber = new CompletionSubscriber <> ();
939928 Flow .Subscriber <byte []> blobSubscriber = fromJdbc (() ->
940- oracleBlob .subscriberOracle (1L ,
941- toFlowSubscriber (writeOutcomeProcessor )));
929+ oracleBlob .subscriberOracle (1L , outcomeSubscriber ));
942930
943931 // TODO: Acquire async lock before invoking onNext, release when
944932 // writeOutcomeProcessor gets onNext with sum equal to sum of buffer
@@ -960,9 +948,10 @@ public Publisher<Void> publishBlobWrite(
960948 slice .get (byteArray );
961949 return byteArray ;
962950 })
963- .subscribe (new SerializedLobSubscriber <>(blobSubscriber ));
951+ .subscribe (toSubscriber (blobSubscriber ));
952+
964953
965- return toFlowPublisher (writeOutcomeProcessor . then ());
954+ return toFlowPublisher (outcomeSubscriber . publish ());
966955 });
967956 }
968957
@@ -974,33 +963,24 @@ public Publisher<Void> publishBlobWrite(
974963 * {@link OracleClob#subscriberOracle(long, Flow.Subscriber)} adapted to
975964 * conform with the R2DBC standards.
976965 * </p>
977- *
978- * @implNote The 21c {@code OracleClob} subscriber violates Rule 2.7 of the
979- * Reactive Streams Specification, which prohibits concurrent calls to
980- * {@link Subscription#request(long)}. This can cause undefined behavior by
981- * the {@code contentPublisher}. To work around this bug, this method
982- * proxies the {@link Subscription} between the {@code contentPublisher}
983- * and the {@code OracleClob} subscriber. The proxy ensures that
984- * {@code request} signals are delivered serially.
985966 */
986967 @ Override
987968 public Publisher <Void > publishClobWrite (
988969 Publisher <? extends CharSequence > contentPublisher , Clob clob ) {
989970 OracleClob oracleClob = castAsType (clob , OracleClob .class );
990971
991- // This processor emits a terminal signal when all clob writing database
992- // calls have completed
993- DirectProcessor <Long > writeOutcomeProcessor = DirectProcessor . create ();
972+ // This subscriber receives a terminal signal after JDBC completes the
973+ // LOB write.
974+ CompletionSubscriber <Long > outcomeSubscriber = new CompletionSubscriber <> ();
994975 Flow .Subscriber <String > clobSubscriber = fromJdbc (() ->
995- oracleClob .subscriberOracle (1L ,
996- toFlowSubscriber (writeOutcomeProcessor )));
976+ oracleClob .subscriberOracle (1L , outcomeSubscriber ));
997977
998978 return adaptFlowPublisher (() -> {
999979 Flux .from (contentPublisher )
1000980 .map (CharSequence ::toString )
1001- .subscribe (new SerializedLobSubscriber <> (clobSubscriber ));
981+ .subscribe (toSubscriber (clobSubscriber ));
1002982
1003- return toFlowPublisher (writeOutcomeProcessor . then ());
983+ return toFlowPublisher (outcomeSubscriber . publish ());
1004984 });
1005985 }
1006986
@@ -1276,134 +1256,59 @@ else if (isTypeConversionError(sqlException.getErrorCode()))
12761256 }
12771257
12781258 /**
1279- * <p>
1280- * A {@code Subscriber} that serializes {@code Subscription} method calls
1281- * made by {@link OracleBlob} or {@link OracleClob} subscribers. The purpose
1282- * of this class is to work around Oracle JDBC Bug #32097526, in which the
1283- * Large Object (LOB) subscribers violate Rule 2.7 of the Reactive Streams
1284- * 1.0.3 Specification by invoking subscription methods concurrently. This
1285- * violation can lead to unspecified behavior from the upstream LOB content
1286- * {@code Publisher}.
1287- * </p><p>
1288- * This class serves as an intermediary between a LOB content publisher
1289- * upstream, and the LOB subscriber downstream. It presents itself as a
1290- * subscription to the LOB subscriber so that it can regulate it's
1291- * subscription method calls. Each subscription call is regulated by
1292- * acquiring a mutually exclusive lock before the call is forwarded to the
1293- * content publisher's subscription.
1294- * </p>
1295- *
1296- * @implNote This class is an {@code org.reactivestreams.Subscriber} and a
1297- * {@code java.util.concurrent.Flow.Subscription}. These APIs were chosen to
1298- * interface with R2DBC Blob/Clob publishers upstream, and with Reactive
1299- * Extensions downstream.
1300- * @param <T> The type of item subscribed to
1259+ * A subscriber that relays {@code onComplete} or {@code onError} signals
1260+ * from an upstream publisher to downstream subscribers. This subscriber
1261+ * ignores {@code onNext} signals from an upstream publisher. This subscriber
1262+ * signals unbounded demand to an upstream publisher.
1263+ * @param <T> Type of values emitted from an upstream publisher.
13011264 */
1302- private static class SerializedLobSubscriber <T >
1303- implements org . reactivestreams . Subscriber <T >, Flow . Subscription {
1265+ private static final class CompletionSubscriber <T >
1266+ implements Flow . Subscriber <T > {
13041267
1305- /** The downstream OracleBlob/OracleClob subscriber */
1306- final Flow .Subscriber <T > lobSubscriber ;
1307-
1308- /** Guards access to the upstream content publisher's subscription */
1309- final ReentrantLock signalLock = new ReentrantLock ();
1310-
1311- /** The upstream content publisher's subscription */
1312- Subscription contentSubscription ;
1268+ /** Future completed by {@code onSubscribe} */
1269+ private final CompletableFuture <Flow .Subscription > subscriptionFuture =
1270+ new CompletableFuture <>();
13131271
13141272 /**
1315- * Constructs a new subscriber that regulates subscription calls from a
1316- * {@code lobSubscriber}. The {@code onSubscribe} method of the {@code
1317- * lobSubscriber} is invoked when the {@code onSubscribe} method of the
1318- * constructed subscriber is invoked.
1273+ * Future completed normally by {@code onComplete}, or exceptionally by
1274+ * {@code onError}
13191275 */
1320- SerializedLobSubscriber (Flow .Subscriber <T > lobSubscriber ) {
1321- this .lobSubscriber = lobSubscriber ;
1322- }
1276+ private final CompletableFuture <Void > resultFuture =
1277+ new CompletableFuture <>();
13231278
1324- /**
1325- * {@inheritDoc}
1326- * <p>
1327- * Retains the {@code subscription} and presents itself as a subscription
1328- * to the LOB subscriber. Subscription calls from the LOB subscriber are
1329- * then serially forwarded to the {@code subscription}.
1330- * </p>
1331- */
13321279 @ Override
1333- public void onSubscribe (Subscription subscription ) {
1334- contentSubscription = subscription ;
1335- lobSubscriber . onSubscribe ( this );
1280+ public void onSubscribe (Flow . Subscription subscription ) {
1281+ subscriptionFuture . complete ( Objects . requireNonNull ( subscription )) ;
1282+ subscription . request ( Long . MAX_VALUE );
13361283 }
13371284
1338- /**
1339- * {@inheritDoc}
1340- * <p>
1341- * Regulates a request call from the {@code lobSubscriber} by first
1342- * blocking until any active {@code request} or {@code cancel} call has
1343- * completed, and then forwarding the request to the content publisher.
1344- * </p>
1345- */
13461285 @ Override
1347- public void request (long n ) {
1348- signalLock .lock ();
1349- try {
1350- contentSubscription .request (n );
1351- }
1352- finally {
1353- signalLock .unlock ();
1354- }
1355- }
1356-
1357- /**
1358- * {@inheritDoc}
1359- * <p>
1360- * Regulates a cancel call from the {@code lobSubscriber} by first
1361- * blocking until any active {@code request} or {@code cancel} call has
1362- * completed, and then forwarding the cancel to the content publisher.
1363- * </p>
1364- */
1365- @ Override
1366- public void cancel () {
1367- signalLock .lock ();
1368- try {
1369- contentSubscription .cancel ();
1370- }
1371- finally {
1372- signalLock .unlock ();
1373- }
1286+ public void onNext (T item ) {
13741287 }
13751288
1376- /**
1377- * {@inheritDoc}
1378- * <p>
1379- * Forwards the signal to the LOB subscriber without any regulation.
1380- * </p>
1381- */
13821289 @ Override
1383- public void onNext ( T item ) {
1384- lobSubscriber . onNext ( item );
1290+ public void onError ( Throwable throwable ) {
1291+ resultFuture . completeExceptionally ( Objects . requireNonNull ( throwable ) );
13851292 }
13861293
1387- /**
1388- * {@inheritDoc}
1389- * <p>
1390- * Forwards the signal to the LOB subscriber without any regulation.
1391- * </p>
1392- */
13931294 @ Override
1394- public void onError ( Throwable throwable ) {
1395- lobSubscriber . onError ( throwable );
1295+ public void onComplete ( ) {
1296+ resultFuture . complete ( null );
13961297 }
13971298
13981299 /**
1399- * {@inheritDoc}
1400- * <p>
1401- * Forwards the signal to the LOB subscriber without any regulation.
1402- * </p>
1300+ * Returns a publisher that emits the same {@code onComplete} or
1301+ * {@code onError} signal emitted to this subscriber. Cancelling a
1302+ * subscription to the returned publisher cancels the subscription of this
1303+ * subscriber.
1304+ * @return A publisher that emits the terminal signal emitted to this
1305+ * subscriber.
14031306 */
1404- @ Override
1405- public void onComplete () {
1406- lobSubscriber .onComplete ();
1307+ Publisher <Void > publish () {
1308+ return Mono .fromCompletionStage (resultFuture )
1309+ .doOnCancel (() ->
1310+ subscriptionFuture .thenAccept (Flow .Subscription ::cancel ));
14071311 }
14081312 }
1313+
14091314}
0 commit comments