7373
7474import static com .mongodb .assertions .Assertions .assertFalse ;
7575import static com .mongodb .assertions .Assertions .assertNotNull ;
76+ import static com .mongodb .assertions .Assertions .assertNull ;
7677import static com .mongodb .assertions .Assertions .assertTrue ;
7778import static com .mongodb .assertions .Assertions .fail ;
7879import static com .mongodb .assertions .Assertions .isTrue ;
@@ -198,7 +199,7 @@ public void getAsync(final SingleResultCallback<InternalConnection> callback) {
198199 LOGGER .trace (format ("Pooled connection %s to server %s is not yet open" ,
199200 getId (connection ), serverId ));
200201 }
201- openConcurrencyLimiter .openAsyncOrGetAvailable (connection , timeout , eventSendingCallback );
202+ openConcurrencyLimiter .openAsyncWithConcurrencyLimit (connection , timeout , eventSendingCallback );
202203 }
203204 }
204205 }));
@@ -358,7 +359,7 @@ public synchronized void run() {
358359 LOGGER .debug (format ("Ensuring minimum pooled connections to %s" , serverId .getAddress ()));
359360 }
360361 pool .ensureMinSize (settings .getMinSize (), newConnection ->
361- openConcurrencyLimiter .openImmediately (new PooledConnection (newConnection )));
362+ openConcurrencyLimiter .openImmediatelyAndTryHandOverOrRelease (new PooledConnection (newConnection )));
362363 }
363364 } catch (MongoInterruptedException | MongoTimeoutException e ) {
364365 //complete the maintenance task
@@ -802,47 +803,56 @@ private final class OpenConcurrencyLimiter {
802803 }
803804
804805 PooledConnection openOrGetAvailable (final PooledConnection connection , final Timeout timeout ) throws MongoTimeoutException {
805- return openOrGetAvailable (connection , true , timeout );
806+ PooledConnection result = openWithConcurrencyLimit (connection , OpenWithConcurrencyLimitMode .TRY_GET_AVAILABLE , timeout );
807+ return assertNotNull (result );
806808 }
807809
808- void openImmediately (final PooledConnection connection ) throws MongoTimeoutException {
809- PooledConnection result = openOrGetAvailable (connection , false , Timeout .immediate ());
810- assertTrue (result == connection );
810+ void openImmediatelyAndTryHandOverOrRelease (final PooledConnection connection ) throws MongoTimeoutException {
811+ assertNull (openWithConcurrencyLimit (connection , OpenWithConcurrencyLimitMode .TRY_HAND_OVER_OR_RELEASE , Timeout .immediate ()));
811812 }
812813
813814 /**
814815 * This method can be thought of as operating in two phases.
815816 * In the first phase it tries to synchronously acquire a permit to open the {@code connection}
816- * or get a different {@linkplain PooledConnection#opened() opened} connection if {@code tryGetAvailable } is {@code true} and
817- * one becomes available while waiting for a permit.
817+ * or get a different {@linkplain PooledConnection#opened() opened} connection if {@code mode } is
818+ * {@link OpenWithConcurrencyLimitMode#TRY_GET_AVAILABLE} and one becomes available while waiting for a permit.
818819 * The first phase has one of the following outcomes:
819820 * <ol>
820821 * <li>A {@link MongoTimeoutException} or a different {@link Exception} is thrown,
821822 * and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}.</li>
822823 * <li>An opened connection different from the specified one is returned,
823- * and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}.</li>
824+ * and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}.
825+ * This outcome is possible only if {@code mode} is {@link OpenWithConcurrencyLimitMode#TRY_GET_AVAILABLE}.</li>
824826 * <li>A permit is acquired, {@link #connectionCreated(ConnectionPoolListener, ConnectionId)} is reported
825827 * and an attempt to open the specified {@code connection} is made. This is the second phase in which
826828 * the {@code connection} is {@linkplain PooledConnection#open() opened synchronously}.
827829 * The attempt to open the {@code connection} has one of the following outcomes
828- * combined with releasing the acquired permit:</li>
830+ * combined with releasing the acquired permit:
829831 * <ol>
830832 * <li>An {@link Exception} is thrown
831833 * and the {@code connection} is {@linkplain PooledConnection#closeAndHandleOpenFailure() closed}.</li>
832- * <li>The specified {@code connection}, which is now opened, is returned.</li>
834+ * <li>Else if the specified {@code connection} is opened successfully and
835+ * {@code mode} is {@link OpenWithConcurrencyLimitMode#TRY_HAND_OVER_OR_RELEASE},
836+ * then {@link #tryHandOverOrRelease(UsageTrackingInternalConnection)} is called and {@code null} is returned.</li>
837+ * <li>Else the specified {@code connection}, which is now opened, is returned.</li>
833838 * </ol>
839+ * </li>
834840 * </ol>
835841 *
836842 * @param timeout Applies only to the first phase.
837843 * @return An {@linkplain PooledConnection#opened() opened} connection which is
838- * either the specified {@code connection} or a different one.
844+ * either the specified {@code connection},
845+ * or potentially a different one if {@code mode} is {@link OpenWithConcurrencyLimitMode#TRY_GET_AVAILABLE},
846+ * or {@code null} if {@code mode} is {@link OpenWithConcurrencyLimitMode#TRY_HAND_OVER_OR_RELEASE}.
839847 * @throws MongoTimeoutException If the first phase timed out.
840848 */
841- private PooledConnection openOrGetAvailable (
842- final PooledConnection connection , final boolean tryGetAvailable , final Timeout timeout ) throws MongoTimeoutException {
849+ @ Nullable
850+ private PooledConnection openWithConcurrencyLimit (final PooledConnection connection , final OpenWithConcurrencyLimitMode mode ,
851+ final Timeout timeout ) throws MongoTimeoutException {
843852 PooledConnection availableConnection ;
844853 try {//phase one
845- availableConnection = acquirePermitOrGetAvailableOpenedConnection (tryGetAvailable , timeout );
854+ availableConnection = acquirePermitOrGetAvailableOpenedConnection (
855+ mode == OpenWithConcurrencyLimitMode .TRY_GET_AVAILABLE , timeout );
846856 } catch (RuntimeException e ) {
847857 connection .closeSilently ();
848858 throw e ;
@@ -853,25 +863,32 @@ private PooledConnection openOrGetAvailable(
853863 } else {//acquired a permit, phase two
854864 try {
855865 connection .open ();
866+ if (mode == OpenWithConcurrencyLimitMode .TRY_HAND_OVER_OR_RELEASE ) {
867+ tryHandOverOrRelease (connection .wrapped );
868+ return null ;
869+ } else {
870+ return connection ;
871+ }
856872 } finally {
857873 releasePermit ();
858874 }
859- return connection ;
860875 }
861876 }
862877
863878 /**
864- * This method is similar to {@link #openOrGetAvailable(PooledConnection, boolean, Timeout)} with the following differences:
879+ * This method is similar to {@link #openWithConcurrencyLimit(PooledConnection, OpenWithConcurrencyLimitMode, Timeout)}
880+ * with the following differences:
865881 * <ul>
866- * <li>It does not have the {@code tryGetAvailable} parameter and acts as if this parameter were {@code true}.</li>
882+ * <li>It does not have the {@code mode} parameter and acts as if this parameter were
883+ * {@link OpenWithConcurrencyLimitMode#TRY_GET_AVAILABLE}.</li>
867884 * <li>While the first phase is still synchronous, the {@code connection} is
868885 * {@linkplain PooledConnection#openAsync(SingleResultCallback) opened asynchronously} in the second phase.</li>
869886 * <li>Instead of returning a result or throwing an exception via Java {@code return}/{@code throw} statements,
870887 * it calls {@code callback.}{@link SingleResultCallback#onResult(Object, Throwable) onResult(result, failure)}
871888 * and passes either a {@link PooledConnection} or an {@link Exception}.</li>
872889 * </ul>
873890 */
874- void openAsyncOrGetAvailable (
891+ void openAsyncWithConcurrencyLimit (
875892 final PooledConnection connection , final Timeout timeout , final SingleResultCallback <InternalConnection > callback ) {
876893 PooledConnection availableConnection ;
877894 try {//phase one
@@ -1044,6 +1061,14 @@ private long awaitNanos(final Condition condition, final long timeoutNanos) thro
10441061 }
10451062 }
10461063
1064+ /**
1065+ * @see OpenConcurrencyLimiter#openWithConcurrencyLimit(PooledConnection, OpenWithConcurrencyLimitMode, Timeout)
1066+ */
1067+ private enum OpenWithConcurrencyLimitMode {
1068+ TRY_GET_AVAILABLE ,
1069+ TRY_HAND_OVER_OR_RELEASE
1070+ }
1071+
10471072 @ NotThreadSafe
10481073 private static final class MutableReference <T > {
10491074 @ Nullable
0 commit comments