4343import java .util .function .Supplier ;
4444import java .util .stream .Collectors ;
4545
46+ import static io .r2dbc .spi .IsolationLevel .READ_COMMITTED ;
47+ import static io .r2dbc .spi .IsolationLevel .SERIALIZABLE ;
4648import static io .r2dbc .spi .TransactionDefinition .*;
4749import static java .util .Collections .emptyMap ;
4850import static oracle .r2dbc .test .DatabaseConfig .sharedConnection ;
@@ -84,8 +86,9 @@ public void testBeginTransaction() {
8486 Mono .from (sharedConnection ()).block (connectTimeout ());
8587 try {
8688 verifyReadCommittedIsolation (sessionA , sessionA ::beginTransaction );
87-
88- // TODO: Verify serializable
89+ awaitNone (
90+ sessionA .setTransactionIsolationLevel (IsolationLevel .SERIALIZABLE ));
91+ verifySerializableIsolation (sessionA , sessionA ::beginTransaction );
8992 }
9093 finally {
9194 awaitNone (sessionA .close ());
@@ -113,20 +116,21 @@ public void testBeginTransactionDefined() {
113116 () -> sessionA .beginTransaction (IsolationLevel .REPEATABLE_READ ));
114117 assertThrows (IllegalArgumentException .class ,
115118 () -> sessionA .beginTransaction (transactionDefinition (Map .of (
116- ISOLATION_LEVEL , IsolationLevel . READ_COMMITTED ,
119+ ISOLATION_LEVEL , READ_COMMITTED ,
117120 READ_ONLY , true ))));
118121 assertThrows (IllegalArgumentException .class ,
119122 () -> sessionA .beginTransaction (transactionDefinition (Map .of (
120- ISOLATION_LEVEL , IsolationLevel . READ_COMMITTED ,
123+ ISOLATION_LEVEL , READ_COMMITTED ,
121124 READ_ONLY , false ))));
122125 assertThrows (IllegalArgumentException .class ,
123126 () -> sessionA .beginTransaction (transactionDefinition (Map .of (
124127 LOCK_WAIT_TIMEOUT , Duration .ofSeconds (10 )))));
125128
126129 verifyReadCommittedIsolation (sessionA , () ->
127- sessionA .beginTransaction (IsolationLevel . READ_COMMITTED ));
130+ sessionA .beginTransaction (READ_COMMITTED ));
128131
129- // TODO: Verify serializable
132+ verifySerializableIsolation (sessionA , () ->
133+ sessionA .beginTransaction (IsolationLevel .SERIALIZABLE ));
130134 }
131135 finally {
132136 awaitNone (sessionA .close ());
@@ -306,6 +310,12 @@ public void testBeginTransactionNameIsolationLevel() {
306310 "testBeginTransactionNameIsolationLevel : " + System .nanoTime ();
307311 verifyReadCommittedIsolation (connection , () ->
308312 connection .beginTransaction (transactionDefinition (Map .of (NAME , name ))));
313+ verifyReadCommittedIsolation (connection , () ->
314+ connection .beginTransaction (transactionDefinition (
315+ Map .of (NAME , name , ISOLATION_LEVEL , READ_COMMITTED ))));
316+ verifySerializableIsolation (connection , () ->
317+ connection .beginTransaction (transactionDefinition (
318+ Map .of (NAME , name , ISOLATION_LEVEL , SERIALIZABLE ))));
309319 }
310320 finally {
311321 tryAwaitNone (connection .close ());
@@ -395,6 +405,100 @@ private static void verifyReadCommittedIsolation(
395405 finally {
396406 tryAwaitExecution (sessionA .createStatement (
397407 "DROP TABLE verifyReadCommittedIsolation" ));
408+ tryAwaitNone (sessionA .setAutoCommit (true ));
409+ }
410+ }
411+
412+ /**
413+ * Verifies that a {@code publisherSupplier} outputs a {@code Publisher} that
414+ * begins a transaction with the SERIALIZABLE isolation level for
415+ * {@code sessionA}
416+ * @param sessionA Database session
417+ * @param publisherSupplier Outputs a {@code Publisher} that emits
418+ * {@code onComplete} when a SERIALIZABLE transaction begins for
419+ * {@code sessionA}
420+ */
421+ private static void verifySerializableIsolation (
422+ Connection sessionA , Supplier <Publisher <Void >> publisherSupplier ) {
423+
424+ // Create a publisher and expect it to set auto-commit false only after the
425+ // first subscriber subscribes
426+ Publisher <Void > beginTransactionPublisher = publisherSupplier .get ();
427+ assertTrue (sessionA .isAutoCommit (),
428+ "Unexpected return value from isAutoCommit() before" +
429+ " beginTransaction()" );
430+
431+ try {
432+ // Insert into this table after beginning a transaction
433+ awaitExecution (sessionA .createStatement (
434+ "CREATE TABLE verifySerializableIsolation (value VARCHAR(10))" ));
435+
436+ awaitNone (beginTransactionPublisher );
437+ assertFalse (
438+ sessionA .isAutoCommit (),
439+ "Unexpected return value from isAutoCommit() after" +
440+ " beginTransaction()" );
441+
442+ // Expect the publisher to NOT repeatedly set auto-commit to false
443+ // for each subscriber
444+ awaitNone (sessionA .setAutoCommit (true ));
445+ awaitNone (beginTransactionPublisher );
446+ assertTrue (
447+ sessionA .isAutoCommit (),
448+ "Unexpected return value from isAutoCommit() after multiple " +
449+ "subscriptions to a beginTransaction() publisher" );
450+
451+ // Now begin a transaction and verify that a table INSERT is not visible
452+ // until the transaction is committed.
453+ awaitNone (publisherSupplier .get ());
454+ assertFalse (
455+ sessionA .isAutoCommit (),
456+ "Unexpected return value from isAutoCommit() after" +
457+ " beginTransaction()" );
458+ awaitUpdate (1 , sessionA .createStatement (
459+ "INSERT INTO verifySerializableIsolation VALUES ('A')" ));
460+
461+ // sessionB doesn't see the INSERT made in sessionA's open transaction
462+ Connection sessionB =
463+ Mono .from (newConnection ()).block (connectTimeout ());
464+ try {
465+ Statement selectInSessionB = sessionB .createStatement (
466+ "SELECT value FROM verifySerializableIsolation" );
467+ awaitQuery (
468+ Collections .emptyList (), row -> 0 , selectInSessionB );
469+
470+ // sessionA COMMITs and sessionB can now see the INSERT
471+ awaitNone (sessionA .commitTransaction ());
472+ awaitQuery (List .of ("A" ), row -> row .get ("value" ), selectInSessionB );
473+
474+ // Begin a new SERIALIZABLE transaction with sessionA, then update the
475+ // row in sessionB, then verify that sessionA does not see the update
476+ // after sessionB commits, and only sees the update after its
477+ // SERIALIZABLE transaction ends
478+ awaitNone (publisherSupplier .get ());
479+ awaitNone (sessionB .beginTransaction ());
480+ awaitUpdate (1 , sessionB .createStatement (
481+ "UPDATE verifySerializableIsolation SET value = 'B'" ));
482+ awaitQuery (List .of ("A" ), row -> row .get ("value" ),
483+ sessionA .createStatement (
484+ "SELECT value FROM verifySerializableIsolation" ));
485+ awaitNone (sessionB .commitTransaction ());
486+ awaitQuery (List .of ("A" ), row -> row .get ("value" ),
487+ sessionA .createStatement (
488+ "SELECT value FROM verifySerializableIsolation" ));
489+ awaitNone (sessionA .commitTransaction ());
490+ awaitQuery (List .of ("B" ), row -> row .get ("value" ),
491+ sessionA .createStatement (
492+ "SELECT value FROM verifySerializableIsolation" ));
493+ }
494+ finally {
495+ awaitNone (sessionB .close ());
496+ }
497+ }
498+ finally {
499+ tryAwaitExecution (sessionA .createStatement (
500+ "DROP TABLE verifySerializableIsolation" ));
501+ tryAwaitNone (sessionA .setAutoCommit (true ));
398502 }
399503 }
400504
@@ -483,7 +587,7 @@ public void testClose() {
483587 assertThrows (IllegalStateException .class ,
484588 () -> connection .beginTransaction ());
485589 assertThrows (IllegalStateException .class ,
486- () -> connection .beginTransaction (IsolationLevel . READ_COMMITTED ));
590+ () -> connection .beginTransaction (READ_COMMITTED ));
487591 assertThrows (IllegalStateException .class ,
488592 () -> connection .commitTransaction ());
489593 assertThrows (IllegalStateException .class ,
@@ -508,7 +612,7 @@ public void testClose() {
508612 () -> connection .setAutoCommit (true ));
509613 assertThrows (IllegalStateException .class ,
510614 () ->
511- connection .setTransactionIsolationLevel (IsolationLevel . READ_COMMITTED ));
615+ connection .setTransactionIsolationLevel (READ_COMMITTED ));
512616
513617 // Expect multiple subscribers to see same the signal from the close()
514618 // publisher
@@ -909,7 +1013,7 @@ public void testGetTransactionIsolationLevel() {
9091013 try {
9101014 // Expect the initial isolation level to be READ_COMMITTED
9111015 assertEquals (
912- IsolationLevel . READ_COMMITTED ,
1016+ READ_COMMITTED ,
9131017 connection .getTransactionIsolationLevel (),
9141018 "Unexpected return value of getTransactionIsolationLevel() for a" +
9151019 " newly created connection" );
@@ -944,15 +1048,15 @@ public void testSetTransactionIsolationLevelUnsupported() {
9441048 // this level after setting unsupported levels. Expect setting any
9451049 // level other than READ COMMITTED to result in onError
9461050 awaitNone (
947- sessionA .setTransactionIsolationLevel (IsolationLevel . READ_COMMITTED ));
1051+ sessionA .setTransactionIsolationLevel (READ_COMMITTED ));
9481052 assertThrows (IllegalArgumentException .class , () ->
9491053 sessionA .setTransactionIsolationLevel (
9501054 IsolationLevel .READ_UNCOMMITTED ));
9511055 assertThrows (IllegalArgumentException .class , () ->
9521056 sessionA .setTransactionIsolationLevel (
9531057 IsolationLevel .REPEATABLE_READ ));
9541058 assertEquals (
955- IsolationLevel . READ_COMMITTED ,
1059+ READ_COMMITTED ,
9561060 sessionA .getTransactionIsolationLevel (),
9571061 "Unexpected return value of getTransactionIsolationLevel() after " +
9581062 "setting an unsupported isolation level" );
@@ -1000,7 +1104,7 @@ public void testSetTransactionIsolationLevelReadCommitted() {
10001104 Connection sessionA =
10011105 Mono .from (sharedConnection ()).block (connectTimeout ());
10021106 awaitNone (
1003- sessionA .setTransactionIsolationLevel (IsolationLevel . READ_COMMITTED ));
1107+ sessionA .setTransactionIsolationLevel (READ_COMMITTED ));
10041108
10051109 try {
10061110 // Verify isolation levels by reading inserts made into this table. The
@@ -1017,7 +1121,7 @@ public void testSetTransactionIsolationLevelReadCommitted() {
10171121 Connection sessionB =
10181122 Mono .from (newConnection ()).block (connectTimeout ());
10191123 assertEquals (
1020- IsolationLevel . READ_COMMITTED ,
1124+ READ_COMMITTED ,
10211125 sessionB .getTransactionIsolationLevel (),
10221126 "Unexpected return value of getTransactionIsolationLevel() for a"
10231127 + " newly created connection" );
@@ -1112,7 +1216,7 @@ public void testSetTransactionIsolationLevelSerializable() {
11121216 Connection sessionB =
11131217 Mono .from (newConnection ()).block (connectTimeout ());
11141218 assertEquals (
1115- IsolationLevel . READ_COMMITTED ,
1219+ READ_COMMITTED ,
11161220 sessionB .getTransactionIsolationLevel (),
11171221 "Unexpected return value of getTransactionIsolationLevel() for a"
11181222 + " newly created connection" );
0 commit comments