|
22 | 22 | package oracle.r2dbc.impl; |
23 | 23 |
|
24 | 24 | import io.r2dbc.spi.Connection; |
| 25 | +import io.r2dbc.spi.ConnectionFactories; |
| 26 | +import io.r2dbc.spi.ConnectionFactoryOptions; |
25 | 27 | import io.r2dbc.spi.Parameter; |
26 | 28 | import io.r2dbc.spi.Parameters; |
27 | 29 | import io.r2dbc.spi.R2dbcException; |
|
32 | 34 | import io.r2dbc.spi.Result.UpdateCount; |
33 | 35 | import io.r2dbc.spi.Statement; |
34 | 36 | import io.r2dbc.spi.Type; |
| 37 | +import oracle.r2dbc.OracleR2dbcOptions; |
| 38 | +import oracle.r2dbc.test.DatabaseConfig; |
35 | 39 | import org.junit.jupiter.api.Test; |
36 | 40 | import org.reactivestreams.Publisher; |
37 | 41 | import reactor.core.publisher.Flux; |
|
43 | 47 | import java.util.Collections; |
44 | 48 | import java.util.List; |
45 | 49 | import java.util.NoSuchElementException; |
| 50 | +import java.util.concurrent.Executor; |
| 51 | +import java.util.concurrent.ExecutorService; |
| 52 | +import java.util.concurrent.Executors; |
46 | 53 | import java.util.concurrent.ForkJoinPool; |
| 54 | +import java.util.concurrent.TimeUnit; |
47 | 55 | import java.util.concurrent.atomic.AtomicInteger; |
48 | 56 | import java.util.concurrent.atomic.AtomicLong; |
49 | 57 | import java.util.stream.Collectors; |
50 | 58 | import java.util.stream.IntStream; |
51 | 59 | import java.util.stream.LongStream; |
52 | 60 | import java.util.stream.Stream; |
53 | 61 |
|
| 62 | +import static java.lang.String.format; |
54 | 63 | import static java.util.Arrays.asList; |
55 | 64 | import static oracle.r2dbc.test.DatabaseConfig.connectTimeout; |
| 65 | +import static oracle.r2dbc.test.DatabaseConfig.host; |
56 | 66 | import static oracle.r2dbc.test.DatabaseConfig.newConnection; |
| 67 | +import static oracle.r2dbc.test.DatabaseConfig.password; |
| 68 | +import static oracle.r2dbc.test.DatabaseConfig.port; |
| 69 | +import static oracle.r2dbc.test.DatabaseConfig.serviceName; |
57 | 70 | import static oracle.r2dbc.test.DatabaseConfig.sharedConnection; |
| 71 | +import static oracle.r2dbc.test.DatabaseConfig.sqlTimeout; |
| 72 | +import static oracle.r2dbc.test.DatabaseConfig.user; |
58 | 73 | import static oracle.r2dbc.util.Awaits.awaitError; |
59 | 74 | import static oracle.r2dbc.util.Awaits.awaitExecution; |
60 | 75 | import static oracle.r2dbc.util.Awaits.awaitMany; |
@@ -2035,119 +2050,98 @@ else if (index == 1) { |
2035 | 2050 | } |
2036 | 2051 |
|
2037 | 2052 | /** |
2038 | | - * Verifies that concurrent statement execution does not cause threads |
2039 | | - * to block. |
| 2053 | + * Verifies that concurrent statement execution on a single |
| 2054 | + * connection does not cause threads to block when there are many threads |
| 2055 | + * available. |
2040 | 2056 | */ |
2041 | 2057 | @Test |
2042 | | - public void testConcurrentExecute() { |
2043 | | - Connection connection = awaitOne(sharedConnection()); |
| 2058 | + public void testConcurrentExecuteManyThreads() throws InterruptedException { |
| 2059 | + ExecutorService executorService = Executors.newFixedThreadPool(4); |
2044 | 2060 | try { |
2045 | | - |
2046 | | - // Create many statements and execute them in parallel. "Many" should |
2047 | | - // be enough to exhaust the common ForkJoinPool if any thread gets blocked |
2048 | | - Publisher<Integer>[] publishers = |
2049 | | - new Publisher[ForkJoinPool.getCommonPoolParallelism() * 4]; |
2050 | | - |
2051 | | - for (int i = 0; i < publishers.length; i++) { |
2052 | | - Flux<Integer> flux = Flux.from(connection.createStatement( |
2053 | | - "SELECT " + i + " FROM sys.dual") |
2054 | | - .execute()) |
2055 | | - .flatMap(result -> |
2056 | | - result.map(row -> row.get(0, Integer.class))) |
2057 | | - .cache(); |
2058 | | - |
2059 | | - flux.subscribe(); |
2060 | | - publishers[i] = flux; |
| 2061 | + Connection connection = awaitOne(connect(executorService)); |
| 2062 | + try { |
| 2063 | + verifyConcurrentExecute(connection); |
| 2064 | + } |
| 2065 | + finally { |
| 2066 | + tryAwaitNone(connection.close()); |
2061 | 2067 | } |
2062 | | - |
2063 | | - awaitMany( |
2064 | | - IntStream.range(0, publishers.length) |
2065 | | - .boxed() |
2066 | | - .collect(Collectors.toList()), |
2067 | | - Flux.concat(publishers)); |
2068 | 2068 | } |
2069 | 2069 | finally { |
2070 | | - tryAwaitNone(connection.close()); |
| 2070 | + executorService.shutdown(); |
| 2071 | + executorService.awaitTermination( |
| 2072 | + sqlTimeout().toSeconds(), TimeUnit.SECONDS); |
2071 | 2073 | } |
2072 | 2074 | } |
2073 | 2075 |
|
2074 | 2076 | /** |
2075 | | - * Verifies that concurrent statement execution and row fetching does not |
2076 | | - * cause threads to block. |
| 2077 | + * Verifies that concurrent statement execution on a single |
| 2078 | + * connection does not cause threads to block when there is just one thread |
| 2079 | + * available. |
2077 | 2080 | */ |
2078 | 2081 | @Test |
2079 | | - public void testConcurrentFetch() { |
2080 | | - Connection connection = awaitOne(sharedConnection()); |
| 2082 | + public void testConcurrentExecuteSingleThread() throws InterruptedException { |
| 2083 | + ExecutorService executorService = Executors.newSingleThreadExecutor(); |
2081 | 2084 | try { |
2082 | | - |
2083 | | - awaitExecution(connection.createStatement( |
2084 | | - "CREATE TABLE testConcurrentFetch (value NUMBER)")); |
2085 | | - |
2086 | | - // Create many statements and execute them in parallel. "Many" should |
2087 | | - // be enough to exhaust the common ForkJoinPool if any thread gets blocked |
2088 | | - Publisher<Long>[] publishers = |
2089 | | - new Publisher[ForkJoinPool.getCommonPoolParallelism() * 4]; |
2090 | | - |
2091 | | - for (int i = 0; i < publishers.length; i++) { |
2092 | | - |
2093 | | - // Each publisher batch inserts a range of 100 values |
2094 | | - Statement statement = connection.createStatement( |
2095 | | - "INSERT INTO testConcurrentFetch VALUES (?)"); |
2096 | | - int start = i * 100; |
2097 | | - statement.bind(0, start); |
2098 | | - IntStream.range(start + 1, start + 100) |
2099 | | - .forEach(value -> { |
2100 | | - statement.add().bind(0, value); |
2101 | | - }); |
2102 | | - |
2103 | | - Mono<Long> mono = Flux.from(statement.execute()) |
2104 | | - .flatMap(Result::getRowsUpdated) |
2105 | | - .collect(Collectors.summingLong(Long::longValue)) |
2106 | | - .cache(); |
2107 | | - |
2108 | | - // Execute in parallel, and retain the result for verification later |
2109 | | - mono.subscribe(); |
2110 | | - publishers[i] = mono; |
| 2085 | + Connection connection = awaitOne(connect(executorService)); |
| 2086 | + try { |
| 2087 | + verifyConcurrentExecute(connection); |
2111 | 2088 | } |
2112 | | - |
2113 | | - // Expect each publisher to emit an update count of 100 |
2114 | | - awaitMany( |
2115 | | - LongStream.range(0, publishers.length) |
2116 | | - .map(i -> 100) |
2117 | | - .boxed() |
2118 | | - .collect(Collectors.toList()), |
2119 | | - Flux.merge(publishers)); |
2120 | | - |
2121 | | - // Create publishers that fetch rows in parallel |
2122 | | - Publisher<List<Integer>>[] fetchPublishers = |
2123 | | - new Publisher[publishers.length]; |
2124 | | - |
2125 | | - for (int i = 0; i < publishers.length; i++) { |
2126 | | - Mono<List<Integer>> mono = Flux.from(connection.createStatement( |
2127 | | - "SELECT value FROM testConcurrentFetch ORDER BY value") |
2128 | | - .execute()) |
2129 | | - .flatMap(result -> |
2130 | | - result.map(row -> row.get(0, Integer.class))) |
2131 | | - .collect(Collectors.toList()) |
2132 | | - .cache(); |
2133 | | - |
2134 | | - // Execute in parallel, and retain the result for verification later |
2135 | | - mono.subscribe(); |
2136 | | - fetchPublishers[i] = mono; |
| 2089 | + finally { |
| 2090 | + tryAwaitNone(connection.close()); |
2137 | 2091 | } |
| 2092 | + } |
| 2093 | + finally { |
| 2094 | + executorService.shutdown(); |
| 2095 | + executorService.awaitTermination( |
| 2096 | + sqlTimeout().toSeconds(), TimeUnit.SECONDS); |
| 2097 | + } |
| 2098 | + } |
2138 | 2099 |
|
2139 | | - // Expect each fetch publisher to get the same result |
2140 | | - List<Integer> expected = IntStream.range(0, publishers.length * 100) |
2141 | | - .boxed() |
2142 | | - .collect(Collectors.toList()); |
| 2100 | + /** |
| 2101 | + * Verifies that concurrent statement execution and row fetching on a single |
| 2102 | + * connection does not cause threads to block when there is just one thread |
| 2103 | + * available. |
| 2104 | + */ |
| 2105 | + @Test |
| 2106 | + public void testConcurrentFetchSingleThread() throws InterruptedException { |
| 2107 | + ExecutorService executorService = Executors.newSingleThreadExecutor(); |
| 2108 | + try { |
| 2109 | + Connection connection = awaitOne(connect(executorService)); |
| 2110 | + try { |
| 2111 | + verifyConcurrentFetch(connection); |
| 2112 | + } |
| 2113 | + finally { |
| 2114 | + tryAwaitNone(connection.close()); |
| 2115 | + } |
| 2116 | + } |
| 2117 | + finally { |
| 2118 | + executorService.shutdown(); |
| 2119 | + executorService.awaitTermination( |
| 2120 | + sqlTimeout().toSeconds(), TimeUnit.SECONDS); |
| 2121 | + } |
| 2122 | + } |
2143 | 2123 |
|
2144 | | - for (Publisher<List<Integer>> publisher : fetchPublishers) |
2145 | | - awaitOne(expected, publisher); |
| 2124 | + /** |
| 2125 | + * Verifies that concurrent statement execution and row fetching on a single |
| 2126 | + * connection does not cause threads to block when there are many threads |
| 2127 | + * available. |
| 2128 | + */ |
| 2129 | + @Test |
| 2130 | + public void testConcurrentFetchManyThreads() throws InterruptedException { |
| 2131 | + ExecutorService executorService = Executors.newFixedThreadPool(4); |
| 2132 | + try { |
| 2133 | + Connection connection = awaitOne(connect(executorService)); |
| 2134 | + try { |
| 2135 | + verifyConcurrentFetch(connection); |
| 2136 | + } |
| 2137 | + finally { |
| 2138 | + tryAwaitNone(connection.close()); |
| 2139 | + } |
2146 | 2140 | } |
2147 | 2141 | finally { |
2148 | | - tryAwaitExecution(connection.createStatement( |
2149 | | - "DROP TABLE testConcurrentFetch")); |
2150 | | - tryAwaitNone(connection.close()); |
| 2142 | + executorService.shutdown(); |
| 2143 | + executorService.awaitTermination( |
| 2144 | + sqlTimeout().toSeconds(), TimeUnit.SECONDS); |
2151 | 2145 | } |
2152 | 2146 | } |
2153 | 2147 |
|
@@ -2277,4 +2271,129 @@ public Object getValue() { |
2277 | 2271 | return value; |
2278 | 2272 | } |
2279 | 2273 | } |
| 2274 | + |
| 2275 | + /** |
| 2276 | + * Connect to the database configured by {@link DatabaseConfig}, with a |
| 2277 | + * the connection configured to use a given {@code executor} for async |
| 2278 | + * callbacks. |
| 2279 | + * @param executor Executor for async callbacks |
| 2280 | + * @return Connection that uses the {@code executor} |
| 2281 | + */ |
| 2282 | + private static Publisher<? extends Connection> connect(Executor executor) { |
| 2283 | + return ConnectionFactories.get( |
| 2284 | + ConnectionFactoryOptions.parse(format( |
| 2285 | + "r2dbc:oracle://%s:%d/%s", host(), port(), serviceName())) |
| 2286 | + .mutate() |
| 2287 | + .option( |
| 2288 | + ConnectionFactoryOptions.USER, user()) |
| 2289 | + .option( |
| 2290 | + ConnectionFactoryOptions.PASSWORD, password()) |
| 2291 | + .option( |
| 2292 | + OracleR2dbcOptions.EXECUTOR, executor) |
| 2293 | + .build()) |
| 2294 | + .create(); |
| 2295 | + } |
| 2296 | + |
| 2297 | + /** |
| 2298 | + * Verifies concurrent statement execution the given {@code connection} |
| 2299 | + * @param connection Connection to verify |
| 2300 | + */ |
| 2301 | + private void verifyConcurrentExecute(Connection connection) { |
| 2302 | + |
| 2303 | + // Create many statements and execute them in parallel. |
| 2304 | + Publisher<Integer>[] publishers = new Publisher[8]; |
| 2305 | + |
| 2306 | + for (int i = 0; i < publishers.length; i++) { |
| 2307 | + Flux<Integer> flux = Flux.from(connection.createStatement( |
| 2308 | + "SELECT " + i + " FROM sys.dual") |
| 2309 | + .execute()) |
| 2310 | + .flatMap(result -> |
| 2311 | + result.map(row -> row.get(0, Integer.class))) |
| 2312 | + .cache(); |
| 2313 | + |
| 2314 | + flux.subscribe(); |
| 2315 | + publishers[i] = flux; |
| 2316 | + } |
| 2317 | + |
| 2318 | + awaitMany( |
| 2319 | + IntStream.range(0, publishers.length) |
| 2320 | + .boxed() |
| 2321 | + .collect(Collectors.toList()), |
| 2322 | + Flux.concat(publishers)); |
| 2323 | + } |
| 2324 | + |
| 2325 | + /** |
| 2326 | + * Verifies concurrent row fetching with the given {@code connection} |
| 2327 | + * @param connection Connection to verify |
| 2328 | + */ |
| 2329 | + private void verifyConcurrentFetch(Connection connection) { |
| 2330 | + try { |
| 2331 | + awaitExecution(connection.createStatement( |
| 2332 | + "CREATE TABLE testConcurrentFetch (value NUMBER)")); |
| 2333 | + |
| 2334 | + // Create many statements and execute them in parallel. |
| 2335 | + Publisher<Long>[] publishers = new Publisher[8]; |
| 2336 | + |
| 2337 | + for (int i = 0; i < publishers.length; i++) { |
| 2338 | + |
| 2339 | + Statement statement = connection.createStatement( |
| 2340 | + "INSERT INTO testConcurrentFetch VALUES (?)"); |
| 2341 | + |
| 2342 | + // Each publisher batch inserts a range of 10 values |
| 2343 | + int start = i * 10; |
| 2344 | + statement.bind(0, start); |
| 2345 | + IntStream.range(start + 1, start + 10) |
| 2346 | + .forEach(value -> { |
| 2347 | + statement.add().bind(0, value); |
| 2348 | + }); |
| 2349 | + |
| 2350 | + Mono<Long> mono = Flux.from(statement.execute()) |
| 2351 | + .flatMap(Result::getRowsUpdated) |
| 2352 | + .collect(Collectors.summingLong(Long::longValue)) |
| 2353 | + .cache(); |
| 2354 | + |
| 2355 | + // Execute in parallel, and retain the result for verification later |
| 2356 | + mono.subscribe(); |
| 2357 | + publishers[i] = mono; |
| 2358 | + } |
| 2359 | + |
| 2360 | + // Expect each publisher to emit an update count of 100 |
| 2361 | + awaitMany( |
| 2362 | + Stream.generate(() -> 10L) |
| 2363 | + .limit(publishers.length) |
| 2364 | + .collect(Collectors.toList()), |
| 2365 | + Flux.merge(publishers)); |
| 2366 | + |
| 2367 | + // Create publishers that fetch rows in parallel |
| 2368 | + Publisher<List<Integer>>[] fetchPublishers = |
| 2369 | + new Publisher[publishers.length]; |
| 2370 | + |
| 2371 | + for (int i = 0; i < fetchPublishers.length; i++) { |
| 2372 | + Mono<List<Integer>> mono = Flux.from(connection.createStatement( |
| 2373 | + "SELECT value FROM testConcurrentFetch ORDER BY value") |
| 2374 | + .execute()) |
| 2375 | + .flatMap(result -> |
| 2376 | + result.map(row -> row.get(0, Integer.class))) |
| 2377 | + .sort() |
| 2378 | + .collect(Collectors.toList()) |
| 2379 | + .cache(); |
| 2380 | + |
| 2381 | + // Execute in parallel, and retain the result for verification later |
| 2382 | + mono.subscribe(); |
| 2383 | + fetchPublishers[i] = mono; |
| 2384 | + } |
| 2385 | + |
| 2386 | + // Expect each fetch publisher to get the same result |
| 2387 | + List<Integer> expected = IntStream.range(0, publishers.length * 10) |
| 2388 | + .boxed() |
| 2389 | + .collect(Collectors.toList()); |
| 2390 | + |
| 2391 | + for (Publisher<List<Integer>> publisher : fetchPublishers) |
| 2392 | + awaitOne(expected, publisher); |
| 2393 | + } |
| 2394 | + finally { |
| 2395 | + tryAwaitExecution(connection.createStatement( |
| 2396 | + "DROP TABLE testConcurrentFetch")); |
| 2397 | + } |
| 2398 | + } |
2280 | 2399 | } |
0 commit comments