Skip to content

Commit 395c281

Browse files
ConnectionFactory.create() supports multiple subscribers
1 parent 2ed864e commit 395c281

File tree

2 files changed

+30
-17
lines changed

2 files changed

+30
-17
lines changed

src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryImpl.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,21 @@
5353
* <li>{@link ConnectionFactoryOptions#HOST}</li>
5454
* </ul>
5555
* <h3 id="supported_options">Supported Options</h3><p>
56-
* This implementation supports the following options for connection creation:
56+
* This implementation supports the following well known options for connection
57+
* creation:
5758
* </p><ul>
5859
* <li>{@link ConnectionFactoryOptions#PORT}</li>
5960
* <li>{@link ConnectionFactoryOptions#DATABASE}</li>
6061
* <li>{@link ConnectionFactoryOptions#USER}</li>
6162
* <li>{@link ConnectionFactoryOptions#PASSWORD}</li>
63+
* <li>{@link ConnectionFactoryOptions#CONNECT_TIMEOUT}</li>
64+
* <li>{@link ConnectionFactoryOptions#SSL}</li>
6265
* </ul>
66+
* <h3 id="extended_options">Supported Options</h3><p>
67+
* This implementation supports extended options having the name of a
68+
* subset of Oracle JDBC connection properties. The list of supported
69+
* connection properties is specified by {@link OracleReactiveJdbcAdapter}.
70+
* </p>
6371
*
6472
* @author harayuanwang, michael-a-mcmahon
6573
* @since 0.1.0
@@ -116,7 +124,6 @@ final class OracleConnectionFactoryImpl implements ConnectionFactory {
116124
*/
117125
OracleConnectionFactoryImpl(ConnectionFactoryOptions options) {
118126
OracleR2dbcExceptions.requireNonNull(options, "options is null.");
119-
120127
adapter = ReactiveJdbcAdapter.getOracleAdapter();
121128
dataSource = adapter.createDataSource(options);
122129
}
@@ -138,14 +145,15 @@ final class OracleConnectionFactoryImpl implements ConnectionFactory {
138145
* the returned publisher, so that the database can reclaim the resources
139146
* allocated for that connection.
140147
* </p><p>
141-
* The returned publisher does not support multiple subscribers. After a
142-
* subscriber has subscribed, the returned publisher emits {@code onError}
143-
* with an {@link IllegalStateException} to all subsequent subscribers.
148+
* The returned publisher supports multiple subscribers. One {@code
149+
* Connection} is emitted to each subscriber that subscribes and signals
150+
* demand.
144151
* </p>
145152
*/
146153
@Override
147154
public Publisher<Connection> create() {
148-
return Mono.fromDirect(adapter.publishConnection(dataSource))
155+
return Mono.defer(() ->
156+
Mono.fromDirect(adapter.publishConnection(dataSource)))
149157
.map(conn -> new OracleConnectionImpl(adapter, conn));
150158
}
151159

src/test/java/oracle/r2dbc/impl/OracleConnectionFactoryImplTest.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333

3434
import java.sql.SQLException;
3535
import java.time.Duration;
36+
import java.util.HashSet;
37+
import java.util.Set;
3638
import java.util.concurrent.atomic.AtomicInteger;
3739

3840
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -115,21 +117,24 @@ public void testCreate() {
115117
.build())
116118
.create();
117119

118-
// Expect publisher to emit 1 connection
119-
AtomicInteger counter = new AtomicInteger(0);
120+
// Expect publisher to emit one connection to each subscriber
121+
Set<Connection> connections = new HashSet<>();
120122
Flux.from(connectionPublisher)
121-
.doOnNext(connection -> counter.incrementAndGet())
123+
.doOnNext(connections::add)
122124
.doOnNext(connection -> Mono.from(connection.close()).subscribe())
123125
.blockLast(DatabaseConfig.connectTimeout());
124-
assertEquals(1, counter.get());
126+
assertEquals(1, connections.size());
127+
Flux.from(connectionPublisher)
128+
.doOnNext(connections::add)
129+
.doOnNext(connection -> Mono.from(connection.close()).subscribe())
130+
.blockLast(DatabaseConfig.connectTimeout());
131+
assertEquals(2, connections.size());
132+
Flux.from(connectionPublisher)
133+
.doOnNext(connections::add)
134+
.doOnNext(connection -> Mono.from(connection.close()).subscribe())
135+
.blockLast(DatabaseConfig.connectTimeout());
136+
assertEquals(3, connections.size());
125137

126-
// Expect publisher to reject multiple subscribers
127-
try {
128-
Mono.from(connectionPublisher)
129-
.block(Duration.ofSeconds(1));
130-
fail("Connection publisher did not reject multiple subscribers");
131-
}
132-
catch (IllegalStateException expected) { }
133138
}
134139

135140
/**

0 commit comments

Comments
 (0)