4242import static io .r2dbc .spi .TransactionDefinition .LOCK_WAIT_TIMEOUT ;
4343import static io .r2dbc .spi .TransactionDefinition .NAME ;
4444import static io .r2dbc .spi .TransactionDefinition .READ_ONLY ;
45- import static java .sql .Connection .TRANSACTION_READ_COMMITTED ;
46- import static java .sql .Connection .TRANSACTION_SERIALIZABLE ;
4745import static oracle .r2dbc .impl .OracleR2dbcExceptions .requireNonNull ;
4846import static oracle .r2dbc .impl .OracleR2dbcExceptions .fromJdbc ;
4947import static oracle .r2dbc .impl .OracleR2dbcExceptions .requireOpenConnection ;
@@ -93,6 +91,38 @@ final class OracleConnectionImpl implements Connection, Lifecycle {
9391 */
9492 private Duration statementTimeout = Duration .ZERO ;
9593
94+ /**
95+ * <p>
96+ * The isolation level of the database session created by this
97+ * {@code Connection}. The value is initialized as READ COMMITTED because
98+ * that is the default isolation level of an Oracle Database session. The
99+ * value of this field may be updated by
100+ * {@link #setTransactionIsolationLevel(IsolationLevel)}.
101+ * </p><p>
102+ * The value of this field will not be correct if user code executes a
103+ * command that changes the isolation level, such as
104+ * {@code ALTER SESSION SET ISOLATION_LEVEL = ...}.
105+ * </p>
106+ */
107+ private IsolationLevel isolationLevel = READ_COMMITTED ;
108+
109+ /**
110+ * <p>
111+ * The definition of the current transaction, or {@code null} if there is
112+ * no current transaction. This field is set to a non-null value by
113+ * invocations of {@link #beginTransaction()} or
114+ * {@link #beginTransaction(TransactionDefinition)}. This field is set
115+ * back to a {@code null} value when the transaction ends with an
116+ * invocation of {@link #commitTransaction()} or
117+ * {@link #rollbackTransaction()}.
118+ * </p><p>
119+ * The value of this field will not be correct if user code begins a
120+ * transaction implicitly by executing DML without first calling one
121+ * of the {@code beginTransaction} methods.
122+ * </p>
123+ */
124+ private TransactionDefinition currentTransaction = null ;
125+
96126 /**
97127 * Constructs a new connection that uses the specified {@code adapter} to
98128 * perform database operations with the specified {@code jdbcConnection}.
@@ -110,8 +140,11 @@ final class OracleConnectionImpl implements Connection, Lifecycle {
110140 * {@inheritDoc}
111141 * <p>
112142 * Implements the R2DBC SPI method by executing a {@code SET TRANSACTION}
113- * command to explicitly begin a transaction on the Oracle Database to which
114- * JDBC is connected.
143+ * command to explicitly begin a transaction on the Oracle Database that
144+ * JDBC is connected to. The transaction started by this method has
145+ * the isolation level set by the last call to
146+ * {@link Connection#setTransactionIsolationLevel(IsolationLevel)}, or
147+ * {@link IsolationLevel#READ_COMMITTED} if no isolation level has been set.
115148 * </p><p>
116149 * Oracle Database supports transactions that begin <i>implicitly</i>
117150 * when executing SQL statements that modify data, or when a executing a
@@ -132,27 +165,6 @@ final class OracleConnectionImpl implements Connection, Lifecycle {
132165 @ Override
133166 public Publisher <Void > beginTransaction () {
134167 requireOpenConnection (jdbcConnection );
135-
136- final IsolationLevel isolationLevel ;
137- int jdbcIsolationLevel =
138- fromJdbc (jdbcConnection ::getTransactionIsolation );
139-
140- // Map JDBC's isolation level to an R2DBC IsolationLevel
141- switch (jdbcIsolationLevel ) {
142- case TRANSACTION_READ_COMMITTED :
143- isolationLevel = READ_COMMITTED ;
144- break ;
145- case TRANSACTION_SERIALIZABLE :
146- isolationLevel = SERIALIZABLE ;
147- break ;
148- default :
149- // In 21c, Oracle only supports READ COMMITTED or SERIALIZABLE. Any
150- // other level is unexpected and has not been verified with test cases.
151- throw new IllegalArgumentException (
152- "Unrecognized JDBC transaction isolation level: "
153- + jdbcIsolationLevel );
154- }
155-
156168 return beginTransaction (isolationLevel );
157169 }
158170
@@ -203,8 +215,9 @@ public Publisher<Void> beginTransaction() {
203215 * behavior of this method.
204216 * </p>
205217 *
206- * @implNote Supporting SERIALIZABLE isolation level requires a way to
207- * disable Oracle JDBC's result set caching feature.
218+ * @param definition {@inheritDoc}. Oracle R2DBC retains a reference to
219+ * this object. After this method returns, mutations to the object may
220+ * effect the behavior of Oracle R2DBC.
208221 *
209222 * @throws IllegalArgumentException If the {@code definition} specifies an
210223 * unsupported isolation level.
@@ -226,7 +239,8 @@ public Publisher<Void> beginTransaction(TransactionDefinition definition) {
226239 .then (Mono .from (createStatement (composeSetTransaction (definition ))
227240 .execute ())
228241 .flatMap (result -> Mono .from (result .getRowsUpdated ()))
229- .then ())
242+ .then ()
243+ .doOnSuccess (nil -> this .currentTransaction = definition ))
230244 .cache ();
231245 }
232246
@@ -310,7 +324,8 @@ private static void validateTransactionDefinition(
310324 }
311325
312326 // TODO: Only supporting READ COMMITTED
313- if (! isolationLevel .equals (READ_COMMITTED )) {
327+ if (! (isolationLevel .equals (READ_COMMITTED )
328+ || isolationLevel .equals (SERIALIZABLE ))) {
314329 throw new IllegalArgumentException (
315330 "Unsupported ISOLATION_LEVEL: " + isolationLevel );
316331 }
@@ -375,7 +390,8 @@ public Publisher<Void> close() {
375390 @ Override
376391 public Publisher <Void > commitTransaction () {
377392 requireOpenConnection (jdbcConnection );
378- return adapter .publishCommit (jdbcConnection );
393+ return Mono .from (adapter .publishCommit (jdbcConnection ))
394+ .doOnSuccess (nil -> currentTransaction = null );
379395 }
380396
381397 /**
@@ -511,7 +527,8 @@ public Publisher<Void> releaseSavepoint(String name) {
511527 @ Override
512528 public Publisher <Void > rollbackTransaction () {
513529 requireOpenConnection (jdbcConnection );
514- return adapter .publishRollback (jdbcConnection );
530+ return Mono .from (adapter .publishRollback (jdbcConnection ))
531+ .doOnSuccess (nil -> currentTransaction = null );
515532 }
516533
517534 /**
@@ -624,30 +641,48 @@ public Publisher<Void> setStatementTimeout(Duration timeout) {
624641 /**
625642 * {@inheritDoc}
626643 * <p>
627- * Implements the R2DBC SPI method by returning the JDBC connection's
628- * transaction isolation level.
644+ * Implements the R2DBC SPI method by returning the isolation level set for
645+ * the database session of this {@code Connection}, if the session is not
646+ * currently in a transaction. If the session is in a transaction, then the
647+ * isolation level of that transaction is returned.
629648 * </p>
630- * @implNote Currently, Oracle R2DBC only supports the READ COMMITTED
631- * isolation level.
632649 * @throws IllegalStateException If this {@code Connection} is closed
633650 */
634651 @ Override
635652 public IsolationLevel getTransactionIsolationLevel () {
636653 requireOpenConnection (jdbcConnection );
637- return READ_COMMITTED ;
654+
655+ if (currentTransaction == null ) {
656+ return isolationLevel ;
657+ }
658+ else {
659+ IsolationLevel currentIsolationLevel =
660+ currentTransaction .getAttribute (ISOLATION_LEVEL );
661+
662+ if (currentIsolationLevel == null )
663+ return isolationLevel ;
664+ else
665+ return currentIsolationLevel ;
666+ }
638667 }
639668
640669 /**
641670 * {@inheritDoc}
642671 * <p>
643672 * Implements the R2DBC SPI method by setting the transaction isolation
644- * level of the JDBC connection.
673+ * level of this connection's database session. This method will by-pass
674+ * the JDBC {@link java.sql.Connection#setTransactionIsolation(int)}
675+ * method in order to execute a non-blocking {@code ALTER SESSION} command.
676+ * After this method is called, invocations of
677+ * {@link java.sql.Connection#getTransactionIsolation()} on the JDBC
678+ * {@code Connection} may no longer return a correct value. The correct
679+ * isolation level is retained by the {@link #isolationLevel} field of this
680+ * {@code Connection}.
645681 * </p><p>
646682 * Oracle Database only supports {@link IsolationLevel#READ_COMMITTED} and
647- * {@link IsolationLevel#SERIALIZABLE} isolation levels. If an unsupported
648- * {@code isolationLevel} is specified to this method, then the returned
649- * publisher emits {@code onError} with an {@link R2dbcException}
650- * indicating that the specified {@code isolationLevel} is not supported.
683+ * {@link IsolationLevel#SERIALIZABLE} isolation levels. This method throws
684+ * an {@code IllegalArgumentException} if an unsupported
685+ * {@code isolationLevel} is specified.
651686 * </p><p>
652687 * Oracle Database does not support changing an isolation level during
653688 * an active transaction. If the isolation level is changed during an
@@ -662,8 +697,6 @@ public IsolationLevel getTransactionIsolationLevel() {
662697 * transaction isolation level for each subscription. Signals emitted to
663698 * the first subscription are propagated to all subsequent subscriptions.
664699 * </p>
665- * @implNote Currently, Oracle R2DBC only supports the READ COMMITTED
666- * isolation level.
667700 * @throws IllegalStateException If this {@code Connection} is closed
668701 */
669702 @ Override
@@ -672,18 +705,29 @@ public Publisher<Void> setTransactionIsolationLevel(
672705 requireNonNull (isolationLevel , "isolationLevel is null" );
673706 requireOpenConnection (jdbcConnection );
674707
675- // TODO: Need to add a connection factory option that disables Oracle
676- // JDBC's Result Set caching function before SERIALIZABLE can be supported.
677- // For now, the isolation level can never be changed from the default READ
678- // COMMITTED.
679- if (isolationLevel .equals (READ_COMMITTED )) {
708+ // Do nothing if the level isn't changed
709+ if (isolationLevel .equals (this .isolationLevel ))
680710 return Mono .empty ();
711+
712+ // Compose a command to set the isolation level of the database session:
713+ // ALTER SESSION SET ISOLATION_LEVEL = {SERIALIZABLE | READ COMMITTED}
714+ String alterSession = "ALTER SESSION SET ISOLATION_LEVEL = " ;
715+ if (isolationLevel .equals (READ_COMMITTED )) {
716+ alterSession += "READ COMMITTED" ;
717+ }
718+ else if (isolationLevel .equals (SERIALIZABLE )) {
719+ alterSession += "SERIALIZABLE" ;
681720 }
682721 else {
683- return Mono .error (OracleR2dbcExceptions .newNonTransientException (
684- "Oracle R2DBC does not support isolation level: " + isolationLevel ,
685- null ));
722+ throw new IllegalArgumentException (
723+ "Oracle Database does not support isolation level: " + isolationLevel );
686724 }
725+
726+ return Mono .from (createStatement (alterSession )
727+ .execute ())
728+ .then ()
729+ .doOnSuccess (nil -> this .isolationLevel = isolationLevel )
730+ .cache ();
687731 }
688732
689733 /**
0 commit comments