4040import java .util .Collections ;
4141import java .util .List ;
4242import java .util .Map ;
43+ import java .util .function .Supplier ;
4344import java .util .stream .Collectors ;
4445
4546import static io .r2dbc .spi .TransactionDefinition .*;
@@ -82,7 +83,7 @@ public void testBeginTransaction() {
8283 Connection sessionA =
8384 Mono .from (sharedConnection ()).block (connectTimeout ());
8485 try {
85- verifyReadCommittedIsolation (sessionA , sessionA . beginTransaction () );
86+ verifyReadCommittedIsolation (sessionA , sessionA :: beginTransaction );
8687
8788 // TODO: Verify serializable
8889 }
@@ -122,7 +123,7 @@ public void testBeginTransactionDefined() {
122123 () -> sessionA .beginTransaction (transactionDefinition (Map .of (
123124 LOCK_WAIT_TIMEOUT , Duration .ofSeconds (10 )))));
124125
125- verifyReadCommittedIsolation (sessionA ,
126+ verifyReadCommittedIsolation (sessionA , () ->
126127 sessionA .beginTransaction (IsolationLevel .READ_COMMITTED ));
127128
128129 // TODO: Verify serializable
@@ -280,7 +281,7 @@ public void testBeginTransactionReadWrite() {
280281 Connection connection =
281282 Mono .from (sharedConnection ()).block (connectTimeout ());
282283 try {
283- verifyReadCommittedIsolation (connection ,
284+ verifyReadCommittedIsolation (connection , () ->
284285 connection .beginTransaction (transactionDefinition (Map .of (
285286 READ_ONLY , false ))));
286287 }
@@ -303,7 +304,7 @@ public void testBeginTransactionNameIsolationLevel() {
303304 // Try to use a unique transaction name
304305 String name =
305306 "testBeginTransactionNameIsolationLevel : " + System .nanoTime ();
306- verifyReadCommittedIsolation (connection ,
307+ verifyReadCommittedIsolation (connection , () ->
307308 connection .beginTransaction (transactionDefinition (Map .of (NAME , name ))));
308309 }
309310 finally {
@@ -315,14 +316,16 @@ public void testBeginTransactionNameIsolationLevel() {
315316 * Verifies that a {@code beginTransactionPublisher} begins a transaction
316317 * with the READ COMMITTED isolation level for {@code sessionA}
317318 * @param sessionA Database session
318- * @param beginTransactionPublisher Publishes {@code onComplete} when a
319- * READ COMMITTED transaction begins for {@code sessionA}
319+ * @param publisherSupplier Outputs a {@code Publisher} that emits
320+ * {@code onComplete} when a READ COMMITTED transaction begins for
321+ * {@code sessionA}
320322 */
321323 private static void verifyReadCommittedIsolation (
322- Connection sessionA , Publisher <Void > beginTransactionPublisher ) {
324+ Connection sessionA , Supplier < Publisher <Void >> publisherSupplier ) {
323325
324- // Expect the publisher to set auto-commit false when the first
325- // subscriber subscribes
326+ // Create a publisher and expect it to set auto-commit false only after the
327+ // first subscriber subscribes
328+ Publisher <Void > beginTransactionPublisher = publisherSupplier .get ();
326329 assertTrue (sessionA .isAutoCommit (),
327330 "Unexpected return value from isAutoCommit() before" +
328331 " beginTransaction()" );
@@ -349,7 +352,7 @@ private static void verifyReadCommittedIsolation(
349352
350353 // Now begin a transaction and verify that a table INSERT is not visible
351354 // until the transaction is committed.
352- awaitNone (sessionA . beginTransaction ());
355+ awaitNone (publisherSupplier . get ());
353356 assertFalse (
354357 sessionA .isAutoCommit (),
355358 "Unexpected return value from isAutoCommit() after" +
0 commit comments