4242import static io .r2dbc .spi .TransactionDefinition .LOCK_WAIT_TIMEOUT ;
4343import static io .r2dbc .spi .TransactionDefinition .NAME ;
4444import static io .r2dbc .spi .TransactionDefinition .READ_ONLY ;
45- import static java .sql .Connection .TRANSACTION_READ_COMMITTED ;
46- import static java .sql .Connection .TRANSACTION_SERIALIZABLE ;
4745import static oracle .r2dbc .impl .OracleR2dbcExceptions .requireNonNull ;
4846import static oracle .r2dbc .impl .OracleR2dbcExceptions .fromJdbc ;
4947import static oracle .r2dbc .impl .OracleR2dbcExceptions .requireOpenConnection ;
@@ -93,6 +91,38 @@ final class OracleConnectionImpl implements Connection, Lifecycle {
9391 */
9492 private Duration statementTimeout = Duration .ZERO ;
9593
94+ /**
95+ * <p>
96+ * The isolation level of the database session created by this
97+ * {@code Connection}. The value is initialized as READ COMMITTED because
98+ * that is the default isolation level of an Oracle Database session. The
99+ * value of this field may be updated by
100+ * {@link #setTransactionIsolationLevel(IsolationLevel)}.
101+ * </p><p>
102+ * The value of this field will not be correct if user code executes a
103+ * command that changes the isolation level, such as
104+ * {@code ALTER SESSION SET ISOLATION_LEVEL = ...}.
105+ * </p>
106+ */
107+ private IsolationLevel isolationLevel = READ_COMMITTED ;
108+
109+ /**
110+ * <p>
111+ * The definition of the current transaction, or {@code null} if there is
112+ * no current transaction. This field is set to a non-null value by
113+ * invocations of {@link #beginTransaction()} or
114+ * {@link #beginTransaction(TransactionDefinition)}. This field is set
115+ * back to a {@code null} value when the transaction ends with an
116+ * invocation of {@link #commitTransaction()} or
117+ * {@link #rollbackTransaction()}.
118+ * </p><p>
119+ * The value of this field will not be correct if user code begins a
120+ * transaction implicitly by executing DML without first calling one
121+ * of the {@code beginTransaction} methods.
122+ * </p>
123+ */
124+ private TransactionDefinition currentTransaction = null ;
125+
96126 /**
97127 * Constructs a new connection that uses the specified {@code adapter} to
98128 * perform database operations with the specified {@code jdbcConnection}.
@@ -110,8 +140,11 @@ final class OracleConnectionImpl implements Connection, Lifecycle {
110140 * {@inheritDoc}
111141 * <p>
112142 * Implements the R2DBC SPI method by executing a {@code SET TRANSACTION}
113- * command to explicitly begin a transaction on the Oracle Database to which
114- * JDBC is connected.
143+ * command to explicitly begin a transaction on the Oracle Database that
144+ * JDBC is connected to. The transaction started by this method has
145+ * the isolation level set by the last call to
146+ * {@link Connection#setTransactionIsolationLevel(IsolationLevel)}, or
147+ * {@link IsolationLevel#READ_COMMITTED} if no isolation level has been set.
115148 * </p><p>
116149 * Oracle Database supports transactions that begin <i>implicitly</i>
117150 * when executing SQL statements that modify data, or when a executing a
@@ -132,27 +165,6 @@ final class OracleConnectionImpl implements Connection, Lifecycle {
132165 @ Override
133166 public Publisher <Void > beginTransaction () {
134167 requireOpenConnection (jdbcConnection );
135-
136- final IsolationLevel isolationLevel ;
137- int jdbcIsolationLevel =
138- fromJdbc (jdbcConnection ::getTransactionIsolation );
139-
140- // Map JDBC's isolation level to an R2DBC IsolationLevel
141- switch (jdbcIsolationLevel ) {
142- case TRANSACTION_READ_COMMITTED :
143- isolationLevel = READ_COMMITTED ;
144- break ;
145- case TRANSACTION_SERIALIZABLE :
146- isolationLevel = SERIALIZABLE ;
147- break ;
148- default :
149- // In 21c, Oracle only supports READ COMMITTED or SERIALIZABLE. Any
150- // other level is unexpected and has not been verified with test cases.
151- throw new IllegalArgumentException (
152- "Unrecognized JDBC transaction isolation level: "
153- + jdbcIsolationLevel );
154- }
155-
156168 return beginTransaction (isolationLevel );
157169 }
158170
@@ -203,8 +215,9 @@ public Publisher<Void> beginTransaction() {
203215 * behavior of this method.
204216 * </p>
205217 *
206- * @implNote Supporting SERIALIZABLE isolation level requires a way to
207- * disable Oracle JDBC's result set caching feature.
218+ * @param definition {@inheritDoc}. Oracle R2DBC retains a reference to
219+ * this object. After this method returns, mutations to the object may
220+ * effect the behavior of Oracle R2DBC.
208221 *
209222 * @throws IllegalArgumentException If the {@code definition} specifies an
210223 * unsupported isolation level.
@@ -226,7 +239,8 @@ public Publisher<Void> beginTransaction(TransactionDefinition definition) {
226239 .then (Mono .from (createStatement (composeSetTransaction (definition ))
227240 .execute ())
228241 .flatMap (result -> Mono .from (result .getRowsUpdated ()))
229- .then ())
242+ .then ()
243+ .doOnSuccess (nil -> this .currentTransaction = definition ))
230244 .cache ();
231245 }
232246
@@ -310,7 +324,8 @@ private static void validateTransactionDefinition(
310324 }
311325
312326 // TODO: Only supporting READ COMMITTED
313- if (! isolationLevel .equals (READ_COMMITTED )) {
327+ if (! (isolationLevel .equals (READ_COMMITTED )
328+ || isolationLevel .equals (SERIALIZABLE ))) {
314329 throw new IllegalArgumentException (
315330 "Unsupported ISOLATION_LEVEL: " + isolationLevel );
316331 }
@@ -375,7 +390,8 @@ public Publisher<Void> close() {
375390 @ Override
376391 public Publisher <Void > commitTransaction () {
377392 requireOpenConnection (jdbcConnection );
378- return adapter .publishCommit (jdbcConnection );
393+ return Mono .from (adapter .publishCommit (jdbcConnection ))
394+ .doOnSuccess (nil -> currentTransaction = null );
379395 }
380396
381397 /**
@@ -511,7 +527,8 @@ public Publisher<Void> releaseSavepoint(String name) {
511527 @ Override
512528 public Publisher <Void > rollbackTransaction () {
513529 requireOpenConnection (jdbcConnection );
514- return adapter .publishRollback (jdbcConnection );
530+ return Mono .from (adapter .publishRollback (jdbcConnection ))
531+ .doOnSuccess (nil -> currentTransaction = null );
515532 }
516533
517534 /**
@@ -624,30 +641,57 @@ public Publisher<Void> setStatementTimeout(Duration timeout) {
624641 /**
625642 * {@inheritDoc}
626643 * <p>
627- * Implements the R2DBC SPI method by returning the JDBC connection's
628- * transaction isolation level.
644+ * Implements the R2DBC SPI method by returning the isolation level set for
645+ * the database session of this {@code Connection}, if the session is not
646+ * currently in a transaction.
647+ * </p><p>
648+ * If the session is in a transaction, and an isolation level was
649+ * explicitly specified via {@link TransactionDefinition#ISOLATION_LEVEL},
650+ * then the isolation level of that transaction is returned. If the current
651+ * transaction is read-only, then {@link IsolationLevel#SERIALIZABLE} is
652+ * returned as read-only transactions have the same behavior as if the
653+ * SERIALIZABLE isolation level. Otherwise, if no isolation level was
654+ * explicitly set, then the current transaction should have the isolation
655+ * level set for the database session.
629656 * </p>
630- * @implNote Currently, Oracle R2DBC only supports the READ COMMITTED
631- * isolation level.
632657 * @throws IllegalStateException If this {@code Connection} is closed
633658 */
634659 @ Override
635660 public IsolationLevel getTransactionIsolationLevel () {
636661 requireOpenConnection (jdbcConnection );
637- return READ_COMMITTED ;
662+
663+ if (currentTransaction == null ) {
664+ return isolationLevel ;
665+ }
666+ else {
667+ IsolationLevel currentIsolationLevel =
668+ currentTransaction .getAttribute (ISOLATION_LEVEL );
669+
670+ return currentIsolationLevel != null
671+ ? currentIsolationLevel
672+ : Boolean .TRUE == currentTransaction .getAttribute (READ_ONLY )
673+ ? SERIALIZABLE
674+ : isolationLevel ;
675+ }
638676 }
639677
640678 /**
641679 * {@inheritDoc}
642680 * <p>
643681 * Implements the R2DBC SPI method by setting the transaction isolation
644- * level of the JDBC connection.
682+ * level of this connection's database session. This method will by-pass
683+ * the JDBC {@link java.sql.Connection#setTransactionIsolation(int)}
684+ * method in order to execute a non-blocking {@code ALTER SESSION} command.
685+ * After this method is called, invocations of
686+ * {@link java.sql.Connection#getTransactionIsolation()} on the JDBC
687+ * {@code Connection} may no longer return a correct value. The correct
688+ * isolation level is retained by the {@link #isolationLevel} field of this
689+ * {@code Connection}.
645690 * </p><p>
646691 * Oracle Database only supports {@link IsolationLevel#READ_COMMITTED} and
647- * {@link IsolationLevel#SERIALIZABLE} isolation levels. If an unsupported
648- * {@code isolationLevel} is specified to this method, then the returned
649- * publisher emits {@code onError} with an {@link R2dbcException}
650- * indicating that the specified {@code isolationLevel} is not supported.
692+ * {@link IsolationLevel#SERIALIZABLE} isolation levels. This method throws
693+ * an {@code IllegalArgumentException} if an unsupported
694+ * {@code isolationLevel} is specified.
651695 * </p><p>
652696 * Oracle Database does not support changing an isolation level during
653697 * an active transaction. If the isolation level is changed during an
@@ -662,8 +706,6 @@ public IsolationLevel getTransactionIsolationLevel() {
662706 * transaction isolation level for each subscription. Signals emitted to
663707 * the first subscription are propagated to all subsequent subscriptions.
664708 * </p>
665- * @implNote Currently, Oracle R2DBC only supports the READ COMMITTED
666- * isolation level.
667709 * @throws IllegalStateException If this {@code Connection} is closed
668710 */
669711 @ Override
@@ -672,18 +714,29 @@ public Publisher<Void> setTransactionIsolationLevel(
672714 requireNonNull (isolationLevel , "isolationLevel is null" );
673715 requireOpenConnection (jdbcConnection );
674716
675- // TODO: Need to add a connection factory option that disables Oracle
676- // JDBC's Result Set caching function before SERIALIZABLE can be supported.
677- // For now, the isolation level can never be changed from the default READ
678- // COMMITTED.
679- if (isolationLevel .equals (READ_COMMITTED )) {
717+ // Do nothing if the level isn't changed
718+ if (isolationLevel .equals (this .isolationLevel ))
680719 return Mono .empty ();
720+
721+ // Compose a command to set the isolation level of the database session:
722+ // ALTER SESSION SET ISOLATION_LEVEL = {SERIALIZABLE | READ COMMITTED}
723+ String alterSession = "ALTER SESSION SET ISOLATION_LEVEL = " ;
724+ if (isolationLevel .equals (READ_COMMITTED )) {
725+ alterSession += "READ COMMITTED" ;
726+ }
727+ else if (isolationLevel .equals (SERIALIZABLE )) {
728+ alterSession += "SERIALIZABLE" ;
681729 }
682730 else {
683- return Mono .error (OracleR2dbcExceptions .newNonTransientException (
684- "Oracle R2DBC does not support isolation level: " + isolationLevel ,
685- null ));
731+ throw new IllegalArgumentException (
732+ "Oracle Database does not support isolation level: " + isolationLevel );
686733 }
734+
735+ return Mono .from (createStatement (alterSession )
736+ .execute ())
737+ .then ()
738+ .doOnSuccess (nil -> this .isolationLevel = isolationLevel )
739+ .cache ();
687740 }
688741
689742 /**
0 commit comments