{@code srvServiceName=string}: The SRV service name. See {@link ClusterSettings#getSrvServiceName()} for details.
- *
{@code srvMaxHosts=number}: The maximum number of hosts from the SRV record to connect to.
+ *
{@code srvMaxHosts=n}: The maximum number of hosts from the SRV record to connect to.
*
*
General configuration:
*
- *
{@code retryWrites=true|false}. If true the driver will retry supported write operations if they fail due to a network error.
- * Defaults to true.
- *
{@code retryReads=true|false}. If true the driver will retry supported read operations if they fail due to a network error.
- * Defaults to true.
+ *
{@code retryWrites=true|false}: Whether attempts to execute write commands should be retried if they fail due to a retryable error.
+ * Defaults to true. See also {@code maxAdaptiveRetries}.
+ *
{@code retryReads=true|false}: Whether attempts to execute read commands should be retried if they fail due to a retryable error.
+ * Defaults to true. See also {@code maxAdaptiveRetries}.
+ *
{@code maxAdaptiveRetries=n}: This is {@linkplain Beta Beta API}.
+ * The maximum number of retry attempts when encountering a retryable overload error.
+ * See {@link MongoClientSettings.Builder#maxAdaptiveRetries(Integer)} for more information.
+ *
{@code enableOverloadRetargeting=true|false}: This is {@linkplain Beta Beta API}.
+ * Whether to enable overload retargeting. Defaults to false.
+ * See {@link MongoClientSettings.Builder#enableOverloadRetargeting(boolean)} for more information.
*
{@code uuidRepresentation=unspecified|standard|javaLegacy|csharpLegacy|pythonLegacy}. See
* {@link MongoClientSettings#getUuidRepresentation()} for documentation of semantics of this parameter. Defaults to "javaLegacy", but
* will change to "unspecified" in the next major release.
@@ -308,6 +315,8 @@ public class ConnectionString {
private WriteConcern writeConcern;
private Boolean retryWrites;
private Boolean retryReads;
+ private Integer maxAdaptiveRetries;
+ private Boolean enableOverloadRetargeting;
private ReadConcern readConcern;
private Integer minConnectionPoolSize;
@@ -558,6 +567,8 @@ public ConnectionString(final String connectionString, @Nullable final DnsClient
GENERAL_OPTIONS_KEYS.add("servermonitoringmode");
GENERAL_OPTIONS_KEYS.add("retrywrites");
GENERAL_OPTIONS_KEYS.add("retryreads");
+ GENERAL_OPTIONS_KEYS.add("maxadaptiveretries");
+ GENERAL_OPTIONS_KEYS.add("enableoverloadretargeting");
GENERAL_OPTIONS_KEYS.add("appname");
@@ -706,6 +717,15 @@ private void translateOptions(final Map> optionsMap) {
case "retryreads":
retryReads = parseBoolean(value, "retryreads");
break;
+ case "maxadaptiveretries":
+ maxAdaptiveRetries = parseInteger(value, "maxadaptiveretries");
+ if (maxAdaptiveRetries < 0) {
+ throw new IllegalArgumentException("maxAdaptiveRetries must be >= 0");
+ }
+ break;
+ case "enableoverloadretargeting":
+ enableOverloadRetargeting = parseBoolean(value, "enableoverloadretargeting");
+ break;
case "uuidrepresentation":
uuidRepresentation = createUuidRepresentation(value);
break;
@@ -1455,13 +1475,15 @@ public WriteConcern getWriteConcern() {
}
/**
- *
Gets whether writes should be retried if they fail due to a network error
- *
+ * Gets whether attempts to execute write commands should be retried if they fail due to a retryable error.
+ * See {@link MongoClientSettings.Builder#retryWrites(boolean)} for more information.
+ *
* The name of this method differs from others in this class so as not to conflict with the now removed
* getRetryWrites() method, which returned a primitive {@code boolean} value, and didn't allow callers to differentiate
* between a false value and an unset value.
*
- * @return the retryWrites value, or null if unset
+ * @return the {@code retryWrites} value, or {@code null} if unset
+ * @see #getMaxAdaptiveRetries()
* @since 3.9
* @mongodb.server.release 3.6
*/
@@ -1471,9 +1493,11 @@ public Boolean getRetryWritesValue() {
}
/**
- *
Gets whether reads should be retried if they fail due to a network error
+ * Gets whether attempts to execute read commands should be retried if they fail due to a retryable error.
+ * See {@link MongoClientSettings.Builder#retryReads(boolean)} for more information.
*
- * @return the retryWrites value
+ * @return the {@code retryReads} value, or {@code null} if unset
+ * @see #getMaxAdaptiveRetries()
* @since 3.11
* @mongodb.server.release 3.6
*/
@@ -1482,6 +1506,33 @@ public Boolean getRetryReads() {
return retryReads;
}
+ /**
+ * Gets the maximum number of retry attempts when encountering a retryable overload error.
+ * See {@link MongoClientSettings.Builder#maxAdaptiveRetries(Integer)} for more information.
+ *
+ * @return The {@code maxAdaptiveRetries} value, or {@code null} if unset.
+ * @since 5.7
+ */
+ @Beta(Reason.CLIENT)
+ @Nullable
+ public Integer getMaxAdaptiveRetries() {
+ return maxAdaptiveRetries;
+ }
+
+ /**
+ * Gets whether overload retargeting is enabled.
+ * See {@link MongoClientSettings.Builder#enableOverloadRetargeting(boolean)} for more information.
+ *
+ * @return the enableOverloadRetargeting value, or null if not set
+ * @see MongoClientSettings.Builder#enableOverloadRetargeting(boolean)
+ * @since 5.7
+ */
+ @Beta(Reason.CLIENT)
+ @Nullable
+ public Boolean getEnableOverloadRetargeting() {
+ return enableOverloadRetargeting;
+ }
+
/**
* Gets the minimum connection pool size specified in the connection string.
* @return the minimum connection pool size
@@ -1795,6 +1846,8 @@ public boolean equals(final Object o) {
&& Objects.equals(writeConcern, that.writeConcern)
&& Objects.equals(retryWrites, that.retryWrites)
&& Objects.equals(retryReads, that.retryReads)
+ && Objects.equals(maxAdaptiveRetries, that.maxAdaptiveRetries)
+ && Objects.equals(enableOverloadRetargeting, that.enableOverloadRetargeting)
&& Objects.equals(readConcern, that.readConcern)
&& Objects.equals(minConnectionPoolSize, that.minConnectionPoolSize)
&& Objects.equals(maxConnectionPoolSize, that.maxConnectionPoolSize)
@@ -1826,7 +1879,7 @@ public boolean equals(final Object o) {
@Override
public int hashCode() {
return Objects.hash(credential, isSrvProtocol, hosts, database, collection, directConnection, readPreference,
- writeConcern, retryWrites, retryReads, readConcern, minConnectionPoolSize, maxConnectionPoolSize, maxWaitTime,
+ writeConcern, retryWrites, retryReads, maxAdaptiveRetries, enableOverloadRetargeting, readConcern, minConnectionPoolSize, maxConnectionPoolSize, maxWaitTime,
maxConnectionIdleTime, maxConnectionLifeTime, maxConnecting, connectTimeout, timeout, socketTimeout, sslEnabled,
sslInvalidHostnameAllowed, requiredReplicaSetName, serverSelectionTimeout, localThreshold, heartbeatFrequency,
serverMonitoringMode, applicationName, compressorList, uuidRepresentation, srvServiceName, srvMaxHosts, proxyHost,
diff --git a/driver-core/src/main/com/mongodb/MongoClientSettings.java b/driver-core/src/main/com/mongodb/MongoClientSettings.java
index 41c5f73a1d7..d4a06c07d8c 100644
--- a/driver-core/src/main/com/mongodb/MongoClientSettings.java
+++ b/driver-core/src/main/com/mongodb/MongoClientSettings.java
@@ -17,6 +17,7 @@
package com.mongodb;
import com.mongodb.annotations.Alpha;
+import com.mongodb.annotations.Beta;
import com.mongodb.annotations.Immutable;
import com.mongodb.annotations.NotThreadSafe;
import com.mongodb.annotations.Reason;
@@ -24,6 +25,7 @@
import com.mongodb.client.model.geojson.codecs.GeoJsonCodecProvider;
import com.mongodb.client.model.mql.ExpressionCodecProvider;
import com.mongodb.connection.ClusterSettings;
+import com.mongodb.connection.ClusterType;
import com.mongodb.connection.ConnectionPoolSettings;
import com.mongodb.connection.ServerSettings;
import com.mongodb.connection.SocketSettings;
@@ -93,6 +95,9 @@ public final class MongoClientSettings {
private final WriteConcern writeConcern;
private final boolean retryWrites;
private final boolean retryReads;
+ @Nullable
+ private final Integer maxAdaptiveRetries;
+ private final boolean enableOverloadRetargeting;
private final ReadConcern readConcern;
private final MongoCredential credential;
private final TransportSettings transportSettings;
@@ -214,6 +219,9 @@ public static final class Builder {
private WriteConcern writeConcern = WriteConcern.ACKNOWLEDGED;
private boolean retryWrites = true;
private boolean retryReads = true;
+ @Nullable
+ private Integer maxAdaptiveRetries;
+ private boolean enableOverloadRetargeting = false;
private ReadConcern readConcern = ReadConcern.DEFAULT;
private CodecRegistry codecRegistry = MongoClientSettings.getDefaultCodecRegistry();
private TransportSettings transportSettings;
@@ -255,6 +263,8 @@ private Builder(final MongoClientSettings settings) {
writeConcern = settings.getWriteConcern();
retryWrites = settings.getRetryWrites();
retryReads = settings.getRetryReads();
+ maxAdaptiveRetries = settings.getMaxAdaptiveRetries();
+ enableOverloadRetargeting = settings.getEnableOverloadRetargeting();
readConcern = settings.getReadConcern();
credential = settings.getCredential();
uuidRepresentation = settings.getUuidRepresentation();
@@ -314,6 +324,13 @@ public Builder applyConnectionString(final ConnectionString connectionString) {
if (retryReadsValue != null) {
retryReads = retryReadsValue;
}
+ if (connectionString.getMaxAdaptiveRetries() != null) {
+ maxAdaptiveRetries = connectionString.getMaxAdaptiveRetries();
+ }
+ Boolean enableOverloadRetargetingValue = connectionString.getEnableOverloadRetargeting();
+ if (enableOverloadRetargetingValue != null) {
+ enableOverloadRetargeting = enableOverloadRetargetingValue;
+ }
if (connectionString.getUuidRepresentation() != null) {
uuidRepresentation = connectionString.getUuidRepresentation();
}
@@ -428,13 +445,24 @@ public Builder writeConcern(final WriteConcern writeConcern) {
}
/**
- * Sets whether writes should be retried if they fail due to a network error.
+ * Sets whether attempts to execute write commands should be retried if they fail due to a retryable error.
+ *
+ * The errors {@linkplain MongoException#hasErrorLabel(String) having}
+ * the {@value MongoException#RETRYABLE_ERROR_LABEL} label are not the only ones considered retryable here:
+ * unlike applications, which may retry operations, the driver retries commands, which gives it more control
+ * and allows it to safely retry attempts failed due to a broader set of errors
+ * than what applications may {@linkplain MongoException#RETRYABLE_ERROR_LABEL safely retry}.
+ *
+ * For more information on how transactions affect retries,
+ * see the documentation of the {@value MongoException#TRANSIENT_TRANSACTION_ERROR_LABEL},
+ * {@value MongoException#UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL} error labels.
*
*
Starting with the 3.11.0 release, the default value is true
*
- * @param retryWrites sets if writes should be retried if they fail due to a network error.
+ * @param retryWrites sets if write commands should be retried if they fail due to a retryable error.
* @return this
* @see #getRetryWrites()
+ * @see #maxAdaptiveRetries(Integer)
* @mongodb.server.release 3.6
*/
public Builder retryWrites(final boolean retryWrites) {
@@ -443,11 +471,24 @@ public Builder retryWrites(final boolean retryWrites) {
}
/**
- * Sets whether reads should be retried if they fail due to a network error.
+ * Sets whether attempts to execute read commands should be retried if they fail due to a retryable error.
+ *
+ * The errors {@linkplain MongoException#hasErrorLabel(String) having}
+ * the {@value MongoException#RETRYABLE_ERROR_LABEL} label are not the only ones considered retryable here:
+ * unlike applications, which may retry operations, the driver retries commands, which gives it more control
+ * and allows it to safely retry attempts failed due to a broader set of errors
+ * than what applications may {@linkplain MongoException#RETRYABLE_ERROR_LABEL safely retry}.
+ *
+ * For more information on how transactions affect retries,
+ * see the documentation of the {@value MongoException#TRANSIENT_TRANSACTION_ERROR_LABEL},
+ * {@value MongoException#UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL} error labels.
+ *
+ * Default is {@code true}.
*
- * @param retryReads sets if reads should be retried if they fail due to a network error.
+ * @param retryReads sets if read commands should be retried if they fail due to a retryable error.
* @return this
* @see #getRetryReads()
+ * @see #maxAdaptiveRetries(Integer)
* @since 3.11
* @mongodb.server.release 3.6
*/
@@ -456,6 +497,101 @@ public Builder retryReads(final boolean retryReads) {
return this;
}
+ /**
+ * Sets the maximum number of retry attempts when executing a command and encountering
+ * an error {@linkplain MongoException#hasErrorLabel(String) having}
+ * the {@value MongoException#SYSTEM_OVERLOADED_ERROR_LABEL} and {@value MongoException#RETRYABLE_ERROR_LABEL} labels.
+ * Such errors are referred to as retryable overload errors.
+ *
+ * Default is {@code null}, implies the value 2 and the above retry behavior. The implied value and behavior may change in
+ * the future in a minor version.
+ * This means, there is no guarantee that not setting a value is equivalent to setting the value 2.
+ * The value 0 results in not retrying the attempts failed due to retryable overload errors.
+ *
+ *
+ *
Interactions with {@link #retryWrites(boolean)}/{@link #retryReads(boolean)}
+ *
+ *
+ *
Command kind
+ *
Interaction
+ *
+ *
+ *
+ *
+ *
write
+ *
+ * The attempts failed due to retryable overload errors are retried only if
+ * {@link #retryWrites(boolean)} is {@code true}.
+ *
+ *
+ *
+ *
read
+ *
+ * The attempts failed due to retryable overload errors are retried only if
+ * {@link #retryReads(boolean)} is {@code true}.
+ *
+ * Executing a write operation, for example, {@code MongoCluster.bulkWrite},
+ * may involve executing not only write commands, but also read commands. In such a situation,
+ * just like in other situations, the behavior related to retries depends on
+ * the known kind of command, not on the kind of operation.
+ *
+ *
+ *
+ *
unknown
+ *
+ * The attempts failed due to retryable overload errors are retried only if
+ * {@link #retryWrites(boolean)} is {@code true} and {@link #retryReads(boolean)} is {@code true}.
+ *
+ * The command kind is unknown when a command is executed via the {@code MongoDatabase.runCommand} operation.
+ *
+ *
+ *
+ *
+ *
+ * @param maxAdaptiveRetries Sets the maximum number of retry attempts when encountering a retryable overload error.
+ *
+ * @return {@code this}.
+ * @see #getMaxAdaptiveRetries()
+ * @mongodb.driver.manual reference/parameters/#mongodb-parameter-param.overloadAwareServerSelectionEnabled
+ * overloadAwareServerSelectionEnabled: the server-side counterpart, which is configured independently
+ * and affects the server behavior as opposed to the client behavior.
+ * @since 5.7
+ */
+ // TODO-BACKPRESSURE Valentin Document commands that we do not retry now, but should retry according to the spec.
+ @Beta(Reason.CLIENT)
+ public Builder maxAdaptiveRetries(@Nullable final Integer maxAdaptiveRetries) {
+ if (maxAdaptiveRetries != null) {
+ isTrueArgument("maxAdaptiveRetries >= 0", maxAdaptiveRetries >= 0);
+ }
+ this.maxAdaptiveRetries = maxAdaptiveRetries;
+ return this;
+ }
+
+ /**
+ * Sets whether to enable overload retargeting.
+ *
+ *
When enabled, the previously selected servers on which attempts failed with an error
+ * {@linkplain MongoException#hasErrorLabel(String) having}
+ * the {@value MongoException#SYSTEM_OVERLOADED_ERROR_LABEL} label may be deprioritized during
+ * server selection on subsequent retry attempts. This applies to reads when
+ * {@linkplain #retryReads(boolean) retryReads} is enabled, and to writes when
+ * {@linkplain #retryWrites(boolean) retryWrites} is enabled.
+ *
+ *
This setting does not take effect for {@linkplain ClusterType#SHARDED sharded clusters}.
+ *
+ *
Defaults to {@code false}.
+ *
+ * @param enableOverloadRetargeting whether to enable overload retargeting.
+ * @return this
+ * @see #getEnableOverloadRetargeting()
+ * @since 5.7
+ */
+ @Beta(Reason.CLIENT)
+ public Builder enableOverloadRetargeting(final boolean enableOverloadRetargeting) {
+ this.enableOverloadRetargeting = enableOverloadRetargeting;
+ return this;
+ }
+
/**
* Sets the read concern.
*
@@ -785,11 +921,14 @@ public WriteConcern getWriteConcern() {
}
/**
- * Returns true if writes should be retried if they fail due to a network error or other retryable error.
+ * Returns whether attempts to execute write commands should be retried if they fail due to a retryable error.
+ * See {@link Builder#retryWrites(boolean)} for more information.
*
*
Starting with the 3.11.0 release, the default value is true
*
* @return the retryWrites value
+ * @see Builder#retryWrites(boolean)
+ * @see #getMaxAdaptiveRetries()
* @mongodb.server.release 3.6
*/
public boolean getRetryWrites() {
@@ -797,9 +936,14 @@ public boolean getRetryWrites() {
}
/**
- * Returns true if reads should be retried if they fail due to a network error or other retryable error. The default value is true.
+ * Returns whether attempts to execute read commands should be retried if they fail due to a retryable error.
+ * See {@link Builder#retryReads(boolean)} for more information.
+ *
+ * Default is {@code true}.
*
* @return the retryReads value
+ * @see Builder#retryReads(boolean)
+ * @see #getMaxAdaptiveRetries()
* @since 3.11
* @mongodb.server.release 3.6
*/
@@ -807,6 +951,34 @@ public boolean getRetryReads() {
return retryReads;
}
+ /**
+ * Returns the maximum number of retry attempts when encountering a retryable overload error.
+ * See {@link Builder#maxAdaptiveRetries(Integer)} for more information.
+ *
+ * @return The maximum number of retry attempts when encountering a retryable overload error.
+ * @see Builder#maxAdaptiveRetries(Integer)
+ * @since 5.7
+ */
+ @Beta(Reason.CLIENT)
+ @Nullable
+ // TODO-BACKPRESSURE Valentin Use the `maxAdaptiveRetries` setting when retrying.
+ public Integer getMaxAdaptiveRetries() {
+ return maxAdaptiveRetries;
+ }
+
+ /**
+ * Returns whether overload retargeting is enabled.
+ * See {@link Builder#enableOverloadRetargeting(boolean)} for more information.
+ *
+ * @return the enableOverloadRetargeting value
+ * @see Builder#enableOverloadRetargeting(boolean)
+ * @since 5.7
+ */
+ @Beta(Reason.CLIENT)
+ public boolean getEnableOverloadRetargeting() {
+ return enableOverloadRetargeting;
+ }
+
/**
* The read concern to use.
*
@@ -819,7 +991,7 @@ public ReadConcern getReadConcern() {
}
/**
- * The codec registry to use, or null if not set.
+ * The codec registry to use.
*
* @return the codec registry
*/
@@ -1080,6 +1252,8 @@ public boolean equals(final Object o) {
MongoClientSettings that = (MongoClientSettings) o;
return retryWrites == that.retryWrites
&& retryReads == that.retryReads
+ && Objects.equals(maxAdaptiveRetries, that.maxAdaptiveRetries)
+ && enableOverloadRetargeting == that.enableOverloadRetargeting
&& heartbeatSocketTimeoutSetExplicitly == that.heartbeatSocketTimeoutSetExplicitly
&& heartbeatConnectTimeoutSetExplicitly == that.heartbeatConnectTimeoutSetExplicitly
&& Objects.equals(readPreference, that.readPreference)
@@ -1109,7 +1283,8 @@ public boolean equals(final Object o) {
@Override
public int hashCode() {
- return Objects.hash(readPreference, writeConcern, retryWrites, retryReads, readConcern, credential, transportSettings,
+ return Objects.hash(readPreference, writeConcern, retryWrites, retryReads, maxAdaptiveRetries, enableOverloadRetargeting, readConcern,
+ credential, transportSettings,
commandListeners, codecRegistry, loggerSettings, clusterSettings, socketSettings,
heartbeatSocketSettings, connectionPoolSettings, serverSettings, sslSettings, applicationName, compressorList,
uuidRepresentation, serverApi, autoEncryptionSettings, heartbeatSocketTimeoutSetExplicitly,
@@ -1124,6 +1299,8 @@ public String toString() {
+ ", writeConcern=" + writeConcern
+ ", retryWrites=" + retryWrites
+ ", retryReads=" + retryReads
+ + ", maxAdaptiveRetries=" + maxAdaptiveRetries
+ + ", enableOverloadRetargeting=" + enableOverloadRetargeting
+ ", readConcern=" + readConcern
+ ", credential=" + credential
+ ", transportSettings=" + transportSettings
@@ -1154,6 +1331,8 @@ private MongoClientSettings(final Builder builder) {
writeConcern = builder.writeConcern;
retryWrites = builder.retryWrites;
retryReads = builder.retryReads;
+ maxAdaptiveRetries = builder.maxAdaptiveRetries;
+ enableOverloadRetargeting = builder.enableOverloadRetargeting;
readConcern = builder.readConcern;
credential = builder.credential;
transportSettings = builder.transportSettings;
diff --git a/driver-core/src/main/com/mongodb/MongoException.java b/driver-core/src/main/com/mongodb/MongoException.java
index a668dd344b7..b15023b2374 100644
--- a/driver-core/src/main/com/mongodb/MongoException.java
+++ b/driver-core/src/main/com/mongodb/MongoException.java
@@ -36,20 +36,54 @@ public class MongoException extends RuntimeException {
/**
* An error label indicating that the exception can be treated as a transient transaction error.
+ * See the documentation linked below for more information.
*
* @see #hasErrorLabel(String)
+ * @mongodb.driver.manual core/transactions-in-applications/#std-label-transient-transaction-error TransientTransactionError
* @since 3.8
*/
public static final String TRANSIENT_TRANSACTION_ERROR_LABEL = "TransientTransactionError";
/**
* An error label indicating that the exception can be treated as an unknown transaction commit result.
+ * See the documentation linked below for more information.
*
* @see #hasErrorLabel(String)
+ * @mongodb.driver.manual core/transactions-in-applications/#std-label-unknown-transaction-commit-result UnknownTransactionCommitResult
* @since 3.8
*/
public static final String UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL = "UnknownTransactionCommitResult";
+ /**
+ * Server is overloaded and shedding load.
+ * If an application retries explicitly, it should use exponential backoff because the server has indicated overload.
+ * This label on its own does not mean that the operation can be {@linkplain #RETRYABLE_ERROR_LABEL safely retried}.
+ *
+ * @see #hasErrorLabel(String)
+ * @see MongoClientSettings.Builder#maxAdaptiveRetries(Integer)
+ * @mongodb.atlas.manual overload-errors/ Overload errors
+ * @since 5.7
+ * @mongodb.server.release 8.3
+ */
+ public static final String SYSTEM_OVERLOADED_ERROR_LABEL = "SystemOverloadedError";
+
+ /**
+ * The operation is safe to retry, that is,
+ * retry without rereading the relevant data or considering the semantics of the operation.
+ *
+ * For more information on how transactions affect retries,
+ * see the documentation of the {@value #TRANSIENT_TRANSACTION_ERROR_LABEL}, {@value #UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL}
+ * error labels.
+ *
+ * @see #hasErrorLabel(String)
+ * @see MongoClientSettings.Builder#retryWrites(boolean)
+ * @see MongoClientSettings.Builder#retryReads(boolean)
+ * @mongodb.atlas.manual overload-errors/ Overload errors
+ * @since 5.7
+ * @mongodb.server.release 8.3
+ */
+ public static final String RETRYABLE_ERROR_LABEL = "RetryableError";
+
private static final long serialVersionUID = -4415279469780082174L;
private final int code;
diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java
index 4146d06c22e..2f0fcaa6379 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java
@@ -113,9 +113,9 @@ abstract class BaseCluster implements Cluster {
private volatile ClusterDescription description;
BaseCluster(final ClusterId clusterId,
- final ClusterSettings settings,
- final ClusterableServerFactory serverFactory,
- final ClientMetadata clientMetadata) {
+ final ClusterSettings settings,
+ final ClusterableServerFactory serverFactory,
+ final ClientMetadata clientMetadata) {
this.clusterId = notNull("clusterId", clusterId);
this.settings = notNull("settings", settings);
this.serverFactory = notNull("serverFactory", serverFactory);
@@ -159,7 +159,7 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera
if (serverTuple != null) {
ServerAddress serverAddress = serverTuple.getServerDescription().getAddress();
logServerSelectionSucceeded(operationContext, clusterId, serverAddress, serverSelector, currentDescription);
- serverDeprioritization.updateCandidate(serverAddress);
+ serverDeprioritization.updateCandidate(serverAddress, currentDescription.getType());
return serverTuple;
}
computedServerSelectionTimeout.onExpired(() ->
@@ -302,7 +302,7 @@ private boolean handleServerSelectionRequest(
if (serverTuple != null) {
ServerAddress serverAddress = serverTuple.getServerDescription().getAddress();
logServerSelectionSucceeded(operationContext, clusterId, serverAddress, request.originalSelector, description);
- serverDeprioritization.updateCandidate(serverAddress);
+ serverDeprioritization.updateCandidate(serverAddress, description.getType());
request.onResult(serverTuple, null);
return true;
}
@@ -361,8 +361,7 @@ private static ServerSelector getCompleteServerSelector(
final ClusterSettings settings) {
List selectors = Stream.of(
getRaceConditionPreFilteringSelector(serversSnapshot),
- serverSelector,
- serverDeprioritization.getServerSelector(),
+ serverDeprioritization.apply(serverSelector),
settings.getServerSelector(), // may be null
new LatencyMinimizingServerSelector(settings.getLocalThreshold(MILLISECONDS), MILLISECONDS),
AtMostTwoRandomServerSelector.instance(),
diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java
index 574a85669d0..36f6688cb0e 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java
@@ -172,7 +172,8 @@ private InternalConnectionInitializationDescription createInitializationDescript
private BsonDocument createHelloCommand(final Authenticator authenticator, final InternalConnection connection) {
BsonDocument helloCommandDocument = new BsonDocument(getHandshakeCommandName(), new BsonInt32(1))
- .append("helloOk", BsonBoolean.TRUE);
+ .append("helloOk", BsonBoolean.TRUE)
+ .append("backpressure", BsonBoolean.TRUE);
if (clientMetadataDocument != null) {
helloCommandDocument.append("client", clientMetadataDocument);
}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java
index f23d5e5226b..06c2c9b9358 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java
@@ -17,6 +17,7 @@
import com.mongodb.Function;
import com.mongodb.MongoConnectionPoolClearedException;
+import com.mongodb.MongoException;
import com.mongodb.ReadConcern;
import com.mongodb.RequestContext;
import com.mongodb.ServerAddress;
@@ -27,7 +28,6 @@
import com.mongodb.internal.IgnorableRequestContext;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.TimeoutSettings;
-import com.mongodb.internal.VisibleForTesting;
import com.mongodb.internal.observability.micrometer.Span;
import com.mongodb.internal.observability.micrometer.TracingManager;
import com.mongodb.internal.session.SessionContext;
@@ -40,6 +40,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import static com.mongodb.MongoException.SYSTEM_OVERLOADED_ERROR_LABEL;
import static java.util.stream.Collectors.toList;
/**
@@ -76,7 +77,19 @@ public OperationContext(final RequestContext requestContext, final SessionContex
null);
}
- public static OperationContext simpleOperationContext(
+ public OperationContext(final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext,
+ final TracingManager tracingManager,
+ @Nullable final ServerApi serverApi,
+ @Nullable final String operationName,
+ final ServerDeprioritization serverDeprioritization) {
+ this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, serverDeprioritization,
+ tracingManager,
+ serverApi,
+ operationName,
+ null);
+ }
+
+ static OperationContext simpleOperationContext(
final TimeoutSettings timeoutSettings, @Nullable final ServerApi serverApi) {
return new OperationContext(
IgnorableRequestContext.INSTANCE,
@@ -113,6 +126,16 @@ public OperationContext withOperationName(final String operationName) {
operationName, tracingSpan);
}
+ /**
+ * TODO-JAVA-6058: This method enables overriding the ServerDeprioritization state.
+ * It is a temporary solution to handle cases where deprioritization state persists across operations.
+ */
+ public OperationContext withNewServerDeprioritization() {
+ return new OperationContext(id, requestContext, sessionContext, timeoutContext,
+ new ServerDeprioritization(serverDeprioritization.enableOverloadRetargeting), tracingManager, serverApi,
+ operationName, tracingSpan);
+ }
+
public long getId() {
return id;
}
@@ -152,8 +175,7 @@ public void setTracingSpan(final Span tracingSpan) {
this.tracingSpan = tracingSpan;
}
- @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
- public OperationContext(final long id,
+ private OperationContext(final long id,
final RequestContext requestContext,
final SessionContext sessionContext,
final TimeoutContext timeoutContext,
@@ -174,26 +196,6 @@ public OperationContext(final long id,
this.tracingSpan = tracingSpan;
}
- @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
- public OperationContext(final long id,
- final RequestContext requestContext,
- final SessionContext sessionContext,
- final TimeoutContext timeoutContext,
- final TracingManager tracingManager,
- @Nullable final ServerApi serverApi,
- @Nullable final String operationName) {
- this.id = id;
- this.serverDeprioritization = new ServerDeprioritization();
- this.requestContext = requestContext;
- this.sessionContext = sessionContext;
- this.timeoutContext = timeoutContext;
- this.tracingManager = tracingManager;
- this.serverApi = serverApi;
- this.operationName = operationName;
- this.tracingSpan = null;
- }
-
-
/**
* @return The same {@link ServerDeprioritization} if called on the same {@link OperationContext}.
*/
@@ -217,7 +219,8 @@ public OperationContext withConnectionEstablishmentSessionContext() {
}
public OperationContext withMinRoundTripTime(final ServerDescription serverDescription) {
- return withTimeoutContext(timeoutContext.withMinRoundTripTime(TimeUnit.NANOSECONDS.toMillis(serverDescription.getMinRoundTripTimeNanos())));
+ return withTimeoutContext(
+ timeoutContext.withMinRoundTripTime(TimeUnit.NANOSECONDS.toMillis(serverDescription.getMinRoundTripTimeNanos())));
}
public OperationContext withOverride(final TimeoutContextOverride timeoutContextOverrideFunction) {
@@ -227,26 +230,38 @@ public OperationContext withOverride(final TimeoutContextOverride timeoutContext
public static final class ServerDeprioritization {
@Nullable
private ServerAddress candidate;
+ @Nullable
+ private ClusterType clusterType;
private final Set deprioritized;
- private final DeprioritizingSelector selector;
+ private final boolean enableOverloadRetargeting;
+
+ public ServerDeprioritization() {
+ this(false);
+ }
- private ServerDeprioritization() {
- candidate = null;
- deprioritized = new HashSet<>();
- selector = new DeprioritizingSelector();
+ public ServerDeprioritization(final boolean enableOverloadRetargeting) {
+ this.enableOverloadRetargeting = enableOverloadRetargeting;
+ this.candidate = null;
+ this.deprioritized = new HashSet<>();
+ this.clusterType = null;
}
/**
- * The returned {@link ServerSelector} tries to {@linkplain ServerSelector#select(ClusterDescription) select}
- * only the {@link ServerDescription}s that do not have deprioritized {@link ServerAddress}es.
- * If no such {@link ServerDescription} can be selected, then it selects {@link ClusterDescription#getServerDescriptions()}.
+ * The returned {@link ServerSelector} wraps the provided selector and attempts
+ * {@linkplain ServerSelector#select(ClusterDescription) server selection} in two passes:
+ *
+ *
{@code retryWrites=true|false}. If true the driver will retry supported write operations if they fail due to a network error.
- * Defaults to false.
- *
{@code retryReads=true|false}. If true the driver will retry supported read operations if they fail due to a network error.
- * Defaults to false.
*
*
*
@@ -214,10 +211,15 @@
*
*
General configuration:
*
- *
{@code retryWrites=true|false}. If true the driver will retry supported write operations if they fail due to a network error.
- * Defaults to true.
- *
{@code retryReads=true|false}. If true the driver will retry supported read operations if they fail due to a network error.
- * Defaults to true.
+ *
{@code retryWrites=true|false}: Whether attempts to execute write commands should be retried if they fail due to a retryable error.
+ * Defaults to true. See also {@code maxAdaptiveRetries}.
+ *
{@code retryReads=true|false}: Whether attempts to execute read commands should be retried if they fail due to a retryable error.
+ * Defaults to true. See also {@code maxAdaptiveRetries}.
+ *
{@code maxAdaptiveRetries=n}: This is {@linkplain Beta Beta API}.
+ * The maximum number of retry attempts when encountering a retryable overload error.
+ * See {@link MongoClientSettings.Builder#maxAdaptiveRetries(Integer)} for more information.
+*
{@code enableOverloadRetargeting=true|false}: Whether to enable overload retargeting. Defaults to false.
+ * See {@link MongoClientSettings.Builder#enableOverloadRetargeting(boolean)} for more information.
*
{@code uuidRepresentation=unspecified|standard|javaLegacy|csharpLegacy|pythonLegacy}. See
* {@link MongoClientOptions#getUuidRepresentation()} for documentation of semantics of this parameter. Defaults to "javaLegacy", but
* will change to "unspecified" in the next major release.
@@ -381,11 +383,19 @@ public MongoClientOptions getOptions() {
if (retryWritesValue != null) {
builder.retryWrites(retryWritesValue);
}
-
Boolean retryReads = proxied.getRetryReads();
if (retryReads != null) {
builder.retryReads(retryReads);
}
+ Integer maxAdaptiveRetries = proxied.getMaxAdaptiveRetries();
+ if (maxAdaptiveRetries != null) {
+ builder.maxAdaptiveRetries(maxAdaptiveRetries);
+ }
+
+ Boolean enableOverloadRetargeting = proxied.getEnableOverloadRetargeting();
+ if (enableOverloadRetargeting != null) {
+ builder.enableOverloadRetargeting(enableOverloadRetargeting);
+ }
Integer maxConnectionPoolSize = proxied.getMaxConnectionPoolSize();
if (maxConnectionPoolSize != null) {
diff --git a/driver-legacy/src/test/functional/com/mongodb/DBTest.java b/driver-legacy/src/test/functional/com/mongodb/DBTest.java
index cf44573a2b4..b483e326081 100644
--- a/driver-legacy/src/test/functional/com/mongodb/DBTest.java
+++ b/driver-legacy/src/test/functional/com/mongodb/DBTest.java
@@ -31,7 +31,6 @@
import java.util.Locale;
import java.util.UUID;
-import static com.mongodb.ClusterFixture.OPERATION_CONTEXT;
import static com.mongodb.ClusterFixture.disableMaxTimeFailPoint;
import static com.mongodb.ClusterFixture.enableMaxTimeFailPoint;
import static com.mongodb.ClusterFixture.getBinding;
@@ -345,7 +344,7 @@ public void shouldApplyUuidRepresentationToCommandEncodingAndDecoding() {
BsonDocument getCollectionInfo(final String collectionName) {
return new ListCollectionsOperation<>(getDefaultDatabaseName(), new BsonDocumentCodec())
- .filter(new BsonDocument("name", new BsonString(collectionName))).execute(getBinding(), OPERATION_CONTEXT).next().get(0);
+ .filter(new BsonDocument("name", new BsonString(collectionName))).execute(getBinding(), ClusterFixture.createOperationContext()).next().get(0);
}
private boolean isCapped(final DBCollection collection) {
diff --git a/driver-legacy/src/test/functional/com/mongodb/LegacyMixedBulkWriteOperationSpecification.groovy b/driver-legacy/src/test/functional/com/mongodb/LegacyMixedBulkWriteOperationSpecification.groovy
index 6a9c511c3bc..2db1da67e22 100644
--- a/driver-legacy/src/test/functional/com/mongodb/LegacyMixedBulkWriteOperationSpecification.groovy
+++ b/driver-legacy/src/test/functional/com/mongodb/LegacyMixedBulkWriteOperationSpecification.groovy
@@ -184,7 +184,7 @@ class LegacyMixedBulkWriteOperationSpecification extends OperationFunctionalSpec
def insert = new InsertRequest(new BsonDocument('_id', new BsonInt32(1)))
def binding = getBinding()
createBulkWriteOperationForInsert(getNamespace(), true, ACKNOWLEDGED, false, asList(insert))
- .execute(binding, ClusterFixture.getOperationContext(binding.getReadPreference()))
+ .execute(binding, ClusterFixture.createOperationContext(binding.getReadPreference()))
def replacement = new UpdateRequest(new BsonDocument('_id', new BsonInt32(1)),
new BsonDocument('_id', new BsonInt32(1)).append('x', new BsonInt32(1)), REPLACE)
diff --git a/driver-legacy/src/test/unit/com/mongodb/MongoClientOptionsSpecification.groovy b/driver-legacy/src/test/unit/com/mongodb/MongoClientOptionsSpecification.groovy
index ae1d332674c..a386cd7f684 100644
--- a/driver-legacy/src/test/unit/com/mongodb/MongoClientOptionsSpecification.groovy
+++ b/driver-legacy/src/test/unit/com/mongodb/MongoClientOptionsSpecification.groovy
@@ -46,6 +46,8 @@ class MongoClientOptionsSpecification extends Specification {
options.getWriteConcern() == WriteConcern.ACKNOWLEDGED
options.getRetryWrites()
options.getRetryReads()
+ options.getMaxAdaptiveRetries() == null
+ !options.getEnableOverloadRetargeting()
options.getCodecRegistry() == MongoClientSettings.defaultCodecRegistry
options.getUuidRepresentation() == UuidRepresentation.UNSPECIFIED
options.getMinConnectionsPerHost() == 0
@@ -84,6 +86,11 @@ class MongoClientOptionsSpecification extends Specification {
given:
def builder = new MongoClientOptions.Builder()
+ when:
+ builder.maxAdaptiveRetries(-1)
+ then:
+ thrown(IllegalArgumentException)
+
when:
builder.dbDecoderFactory(null)
then:
@@ -116,6 +123,8 @@ class MongoClientOptionsSpecification extends Specification {
.readPreference(ReadPreference.secondary())
.retryWrites(true)
.retryReads(false)
+ .maxAdaptiveRetries(42)
+ .enableOverloadRetargeting(true)
.writeConcern(WriteConcern.JOURNALED)
.readConcern(ReadConcern.MAJORITY)
.minConnectionsPerHost(30)
@@ -162,6 +171,8 @@ class MongoClientOptionsSpecification extends Specification {
options.getServerSelector() == serverSelector
options.getRetryWrites()
!options.getRetryReads()
+ options.getMaxAdaptiveRetries() == 42
+ options.getEnableOverloadRetargeting()
options.getServerSelectionTimeout() == 150
options.getTimeout() == 10_000
options.getMaxWaitTime() == 200
@@ -207,6 +218,7 @@ class MongoClientOptionsSpecification extends Specification {
settings.writeConcern == WriteConcern.JOURNALED
settings.retryWrites
!settings.retryReads
+ settings.getMaxAdaptiveRetries() == 42
settings.autoEncryptionSettings == autoEncryptionSettings
settings.codecRegistry == codecRegistry
settings.commandListeners == [commandListener]
@@ -227,6 +239,7 @@ class MongoClientOptionsSpecification extends Specification {
optionsFromSettings.getServerSelector() == serverSelector
optionsFromSettings.getRetryWrites()
!optionsFromSettings.getRetryReads()
+ optionsFromSettings.getMaxAdaptiveRetries() == 42
optionsFromSettings.getServerSelectionTimeout() == 150
optionsFromSettings.getServerSelectionTimeout() == 150
optionsFromSettings.getMaxWaitTime() == 200
@@ -318,6 +331,7 @@ class MongoClientOptionsSpecification extends Specification {
.applicationName('appName')
.readPreference(ReadPreference.secondary())
.retryReads(true)
+ .enableOverloadRetargeting(true)
.uuidRepresentation(UuidRepresentation.STANDARD)
.writeConcern(WriteConcern.JOURNALED)
.minConnectionsPerHost(30)
@@ -619,6 +633,8 @@ class MongoClientOptionsSpecification extends Specification {
.writeConcern(WriteConcern.JOURNALED)
.retryWrites(true)
.retryReads(true)
+ .maxAdaptiveRetries(42)
+ .enableOverloadRetargeting(true)
.uuidRepresentation(UuidRepresentation.STANDARD)
.minConnectionsPerHost(30)
.connectionsPerHost(500)
@@ -663,6 +679,18 @@ class MongoClientOptionsSpecification extends Specification {
MongoClientOptions.builder().connectionsPerHost(0).build().getConnectionsPerHost() == 0
}
+ def 'should allow null, 0 maxAdaptiveRetries'() {
+ when:
+ def options = MongoClientOptions.builder().maxAdaptiveRetries(null).build()
+ then:
+ options.getMaxAdaptiveRetries() == null
+
+ when:
+ options = MongoClientOptions.builder().maxAdaptiveRetries(0).build()
+ then:
+ options.getMaxAdaptiveRetries() == 0
+ }
+
private static class MyDBEncoderFactory implements DBEncoderFactory {
@Override
DBEncoder create() {
diff --git a/driver-legacy/src/test/unit/com/mongodb/MongoClientURISpecification.groovy b/driver-legacy/src/test/unit/com/mongodb/MongoClientURISpecification.groovy
index 241ac958c8a..3de1f77b6da 100644
--- a/driver-legacy/src/test/unit/com/mongodb/MongoClientURISpecification.groovy
+++ b/driver-legacy/src/test/unit/com/mongodb/MongoClientURISpecification.groovy
@@ -131,6 +131,8 @@ class MongoClientURISpecification extends Specification {
+ 'heartbeatFrequencyMS=20000&'
+ 'retryWrites=true&'
+ 'retryReads=true&'
+ + 'maxAdaptiveRetries=42&'
+ + 'enableOverloadRetargeting=true&'
+ 'uuidRepresentation=csharpLegacy&'
+ 'appName=app1&'
+ 'timeoutMS=10000')
@@ -158,6 +160,8 @@ class MongoClientURISpecification extends Specification {
options.getHeartbeatFrequency() == 20000
options.getRetryWrites()
options.getRetryReads()
+ options.getMaxAdaptiveRetries() == 42
+ options.getEnableOverloadRetargeting()
options.getUuidRepresentation() == UuidRepresentation.C_SHARP_LEGACY
options.getApplicationName() == 'app1'
}
@@ -178,6 +182,8 @@ class MongoClientURISpecification extends Specification {
!options.isSslEnabled()
options.getRetryWrites()
options.getRetryReads()
+ options.getMaxAdaptiveRetries() == null
+ !options.getEnableOverloadRetargeting()
options.getUuidRepresentation() == UuidRepresentation.UNSPECIFIED
}
@@ -188,6 +194,8 @@ class MongoClientURISpecification extends Specification {
.readPreference(ReadPreference.secondary())
.retryWrites(true)
.retryReads(true)
+ .maxAdaptiveRetries(42)
+ .enableOverloadRetargeting(true)
.writeConcern(WriteConcern.JOURNALED)
.minConnectionsPerHost(30)
.connectionsPerHost(500)
@@ -220,6 +228,8 @@ class MongoClientURISpecification extends Specification {
options.getWriteConcern() == WriteConcern.JOURNALED
options.getRetryWrites()
options.getRetryReads()
+ options.getMaxAdaptiveRetries() == 42
+ options.getEnableOverloadRetargeting()
options.getTimeout() == 10_000
options.getServerSelectionTimeout() == 150
options.getMaxWaitTime() == 200
@@ -314,24 +324,37 @@ class MongoClientURISpecification extends Specification {
def 'should respect MongoClientOptions builder'() {
given:
- def uri = new MongoClientURI('mongodb://localhost/', MongoClientOptions.builder().connectionsPerHost(200))
+ def uri = new MongoClientURI('mongodb://localhost/', MongoClientOptions.builder()
+ .connectionsPerHost(200)
+ .maxAdaptiveRetries(42)
+ .enableOverloadRetargeting(true))
when:
def options = uri.getOptions()
then:
options.getConnectionsPerHost() == 200
+ options.getMaxAdaptiveRetries() == 42
+ options.getEnableOverloadRetargeting()
}
def 'should override MongoClientOptions builder'() {
given:
- def uri = new MongoClientURI('mongodb://localhost/?maxPoolSize=250', MongoClientOptions.builder().connectionsPerHost(200))
+ def uri = new MongoClientURI('mongodb://localhost/?'
+ + 'maxPoolSize=250'
+ + '&maxAdaptiveRetries=43'
+ + '&enableOverloadRetargeting=false',
+ MongoClientOptions.builder()
+ .connectionsPerHost(200)
+ .maxAdaptiveRetries(42))
when:
def options = uri.getOptions()
then:
options.getConnectionsPerHost() == 250
+ options.getMaxAdaptiveRetries() == 43
+ !options.getEnableOverloadRetargeting()
}
def 'should be equal to another MongoClientURI with the same string values'() {
@@ -371,7 +394,9 @@ class MongoClientURISpecification extends Specification {
+ 'socketTimeoutMS=5500;'
+ 'safe=false;w=1;wtimeout=2500;'
+ 'fsync=true;readPreference=primary;'
- + 'ssl=true') | new MongoClientURI('mongodb://localhost/db.coll?minPoolSize=5;'
+ + 'ssl=true;'
+ + 'maxAdaptiveRetries=42') | new MongoClientURI('mongodb://localhost/db.coll?'
+ + 'minPoolSize=5;'
+ 'maxPoolSize=10;'
+ 'waitQueueTimeoutMS=150;'
+ 'maxIdleTimeMS=200&maxLifeTimeMS=300;'
@@ -379,7 +404,8 @@ class MongoClientURISpecification extends Specification {
+ '&replicaSet=test;connectTimeoutMS=2500;'
+ 'socketTimeoutMS=5500&safe=false&w=1;'
+ 'wtimeout=2500;fsync=true'
- + '&readPreference=primary;ssl=true')
+ + '&readPreference=primary;ssl=true;'
+ + 'maxAdaptiveRetries=42')
}
def 'should be not equal to another MongoClientURI with the different string values'() {
@@ -401,12 +427,14 @@ class MongoClientURISpecification extends Specification {
+ '&readPreferenceTags=dc:ny,rack:1'
+ '&readPreferenceTags=dc:ny'
+ '&readPreferenceTags='
- + '&maxConnecting=1') | new MongoClientURI('mongodb://localhost/'
+ + '&maxConnecting=1'
+ + '&maxAdaptiveRetries=42') | new MongoClientURI('mongodb://localhost/'
+ '?readPreference=secondaryPreferred'
+ '&readPreferenceTags=dc:ny'
+ '&readPreferenceTags=dc:ny, rack:1'
+ '&readPreferenceTags='
- + '&maxConnecting=2')
+ + '&maxConnecting=2'
+ + '&maxAdaptiveRetries=43')
new MongoClientURI('mongodb://ross:123@localhost/?'
+ 'authMechanism=SCRAM-SHA-1') | new MongoClientURI('mongodb://ross:123@localhost/?'
+ 'authMechanism=GSSAPI')
@@ -419,7 +447,8 @@ class MongoClientURISpecification extends Specification {
+ 'minPoolSize=7;maxIdleTimeMS=1000;maxLifeTimeMS=2000;maxConnecting=1;'
+ 'replicaSet=test;'
+ 'connectTimeoutMS=2500;socketTimeoutMS=5500;autoConnectRetry=true;'
- + 'readPreference=secondaryPreferred;safe=false;w=1;wtimeout=2600')
+ + 'readPreference=secondaryPreferred;safe=false;w=1;wtimeout=2600;'
+ + 'maxAdaptiveRetries=42')
MongoClientOptions.Builder builder = MongoClientOptions.builder()
.connectionsPerHost(10)
@@ -433,6 +462,7 @@ class MongoClientURISpecification extends Specification {
.socketTimeout(5500)
.readPreference(secondaryPreferred())
.writeConcern(new WriteConcern(1, 2600))
+ .maxAdaptiveRetries(42)
MongoClientOptions options = builder.build()
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/ClientSession.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/ClientSession.java
index 3d9354e9ae9..c7301807910 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/ClientSession.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/ClientSession.java
@@ -17,6 +17,7 @@
package com.mongodb.reactivestreams.client;
+import com.mongodb.MongoException;
import com.mongodb.TransactionOptions;
import org.reactivestreams.Publisher;
@@ -65,6 +66,7 @@ public interface ClientSession extends com.mongodb.session.ClientSession {
* Start a transaction in the context of this session with default transaction options. A transaction can not be started if there is
* already an active transaction on this session.
*
+ * @see MongoException#TRANSIENT_TRANSACTION_ERROR_LABEL
* @mongodb.server.release 4.0
*/
void startTransaction();
@@ -75,14 +77,16 @@ public interface ClientSession extends com.mongodb.session.ClientSession {
*
* @param transactionOptions the options to apply to the transaction
*
+ * @see MongoException#TRANSIENT_TRANSACTION_ERROR_LABEL
* @mongodb.server.release 4.0
*/
void startTransaction(TransactionOptions transactionOptions);
/**
- * Commit a transaction in the context of this session. A transaction can only be commmited if one has first been started.
+ * Commit a transaction in the context of this session. A transaction can only be committed if one has first been started.
*
* @return an empty publisher that indicates when the operation has completed
+ * @see MongoException#UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL
* @mongodb.server.release 4.0
*/
Publisher commitTransaction();
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java
index ef18c2c6b1f..35ff27f79ec 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java
@@ -216,7 +216,8 @@ private OperationContext getOperationContext(final RequestContext requestContext
createTimeoutContext(session, timeoutSettings),
TracingManager.NO_OP,
mongoClient.getSettings().getServerApi(),
- commandName);
+ commandName,
+ new OperationContext.ServerDeprioritization(mongoClient.getSettings().getEnableOverloadRetargeting()));
}
private ReadPreference getReadPreferenceForBinding(final ReadPreference readPreference, @Nullable final ClientSession session) {
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/BackpressureProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/BackpressureProseTest.java
new file mode 100644
index 00000000000..458738617b5
--- /dev/null
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/BackpressureProseTest.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.reactivestreams.client;
+
+import com.mongodb.MongoClientSettings;
+import com.mongodb.client.MongoClient;
+import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient;
+
+/**
+ *
+ * Prose Tests.
+ */
+final class BackpressureProseTest extends com.mongodb.client.BackpressureProseTest {
+ @Override
+ protected MongoClient createClient(final MongoClientSettings mongoClientSettings) {
+ return new SyncMongoClient(mongoClientSettings);
+ }
+}
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ContextProviderTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ContextProviderTest.java
index 90529171219..80913aab843 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ContextProviderTest.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ContextProviderTest.java
@@ -19,6 +19,7 @@
import com.mongodb.ContextProvider;
import com.mongodb.RequestContext;
import com.mongodb.WriteConcern;
+import com.mongodb.annotations.NotThreadSafe;
import com.mongodb.event.CommandFailedEvent;
import com.mongodb.event.CommandListener;
import com.mongodb.event.CommandStartedEvent;
@@ -186,6 +187,7 @@ public void contextShouldBeAvailableInCommandEvents() {
}
}
+ @NotThreadSafe
private static final class TestCommandListener implements CommandListener {
private int numCommandStartedEventsWithExpectedContext;
private int numCommandSucceededEventsWithExpectedContext;
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableReadsProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableReadsProseTest.java
index 22b7f7645e1..bb748f00601 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableReadsProseTest.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableReadsProseTest.java
@@ -16,63 +16,14 @@
package com.mongodb.reactivestreams.client;
-import com.mongodb.client.MongoCursor;
-import com.mongodb.client.RetryableWritesProseTest;
+import com.mongodb.MongoClientSettings;
+import com.mongodb.client.AbstractRetryableReadsProseTest;
+import com.mongodb.client.MongoClient;
import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient;
-import org.bson.Document;
-import org.junit.jupiter.api.Test;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-
-import static com.mongodb.client.model.Filters.eq;
-
-/**
- * See
- * Retryable Reads Tests.
- */
-final class RetryableReadsProseTest {
- /**
- * See
- *
- * PoolClearedError Retryability Test.
- */
- @Test
- void poolClearedExceptionMustBeRetryable() throws InterruptedException, ExecutionException, TimeoutException {
- RetryableWritesProseTest.poolClearedExceptionMustBeRetryable(
- SyncMongoClient::new,
- mongoCollection -> mongoCollection.find(eq(0)).iterator().hasNext(), "find", false);
- }
-
- /**
- * See
- *
- * Retryable Reads Are Retried on a Different mongos When One is Available.
- */
- @Test
- void retriesOnDifferentMongosWhenAvailable() {
- RetryableWritesProseTest.retriesOnDifferentMongosWhenAvailable(
- SyncMongoClient::new,
- mongoCollection -> {
- try (MongoCursor cursor = mongoCollection.find().iterator()) {
- return cursor.hasNext();
- }
- }, "find", false);
- }
-
- /**
- * See
- *
- * Retryable Reads Are Retried on the Same mongos When No Others are Available.
- */
- @Test
- void retriesOnSameMongosWhenAnotherNotAvailable() {
- RetryableWritesProseTest.retriesOnSameMongosWhenAnotherNotAvailable(
- SyncMongoClient::new,
- mongoCollection -> {
- try (MongoCursor cursor = mongoCollection.find().iterator()) {
- return cursor.hasNext();
- }
- }, "find", false);
+final class RetryableReadsProseTest extends AbstractRetryableReadsProseTest {
+ @Override
+ protected MongoClient createClient(final MongoClientSettings settings) {
+ return new SyncMongoClient(settings);
}
}
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java
index 51a37ad1e35..fe1bcdfb97c 100644
--- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesProseTest.java
@@ -16,68 +16,89 @@
package com.mongodb.reactivestreams.client;
-import com.mongodb.client.test.CollectionHelper;
import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient;
import org.bson.Document;
-import org.bson.codecs.DocumentCodec;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
/**
- * See
- * Retryable Write Prose Tests.
+ *
+ * Prose Tests.
*/
-public class RetryableWritesProseTest extends DatabaseTestCase {
- private CollectionHelper collectionHelper;
-
- @BeforeEach
- @Override
- public void setUp() {
- super.setUp();
-
- collectionHelper = new CollectionHelper<>(new DocumentCodec(), collection.getNamespace());
- collectionHelper.create();
- }
-
+final class RetryableWritesProseTest {
/**
- * Prose test #2.
+ *
+ * 2. Test that drivers properly retry after encountering PoolClearedErrors.
*/
@Test
- public void poolClearedExceptionMustBeRetryable() throws InterruptedException, ExecutionException, TimeoutException {
+ void poolClearedExceptionMustBeRetryable() throws Exception {
com.mongodb.client.RetryableWritesProseTest.poolClearedExceptionMustBeRetryable(
SyncMongoClient::new,
mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true);
}
/**
- * Prose test #3.
+ *
+ * 3. Test that drivers return the original error after encountering a WriteConcernError with a RetryableWriteError label.
*/
@Test
- public void originalErrorMustBePropagatedIfNoWritesPerformed() throws InterruptedException {
+ void originalErrorMustBePropagatedIfNoWritesPerformed() throws Exception {
com.mongodb.client.RetryableWritesProseTest.originalErrorMustBePropagatedIfNoWritesPerformed(
SyncMongoClient::new);
}
/**
- * Prose test #4.
+ *
+ * 4. Test that in a sharded cluster writes are retried on a different mongos when one is available.
*/
@Test
- public void retriesOnDifferentMongosWhenAvailable() {
+ void retriesOnDifferentMongosWhenAvailable() throws InterruptedException, TimeoutException {
com.mongodb.client.RetryableWritesProseTest.retriesOnDifferentMongosWhenAvailable(
SyncMongoClient::new,
mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true);
}
/**
- * Prose test #5.
+ *
+ * 5. Test that in a sharded cluster writes are retried on the same mongos when no others are available.
*/
@Test
- public void retriesOnSameMongosWhenAnotherNotAvailable() {
+ void retriesOnSameMongosWhenAnotherNotAvailable() {
com.mongodb.client.RetryableWritesProseTest.retriesOnSameMongosWhenAnotherNotAvailable(
SyncMongoClient::new,
mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true);
}
+
+ /**
+ *
+ * 6. Test error propagation after encountering multiple errors.
+ * Case 1: Test that drivers return the correct error when receiving only errors without NoWritesPerformed.
+ */
+ @Test
+ @Disabled("TODO-BACKPRESSURE Valentin Enable when implementing JAVA-6055")
+ void errorPropagationAfterEncounteringMultipleErrorsCase1() throws Exception {
+ com.mongodb.client.RetryableWritesProseTest.errorPropagationAfterEncounteringMultipleErrorsCase1(SyncMongoClient::new);
+ }
+
+ /**
+ *
+ * 6. Test error propagation after encountering multiple errors.
+ * Case 2: Test that drivers return the correct error when receiving only errors with NoWritesPerformed.
+ */
+ @Test
+ void errorPropagationAfterEncounteringMultipleErrorsCase2() throws Exception {
+ com.mongodb.client.RetryableWritesProseTest.errorPropagationAfterEncounteringMultipleErrorsCase2(SyncMongoClient::new);
+ }
+
+ /**
+ *
+ * 6. Test error propagation after encountering multiple errors.
+ * Case 3: Test that drivers return the correct error when receiving some errors with NoWritesPerformed and some without NoWritesPerformed.
+ */
+ @Test
+ void errorPropagationAfterEncounteringMultipleErrorsCase3() throws Exception {
+ com.mongodb.client.RetryableWritesProseTest.errorPropagationAfterEncounteringMultipleErrorsCase3(SyncMongoClient::new);
+ }
}
diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/ClientSessionBindingSpecification.groovy b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/ClientSessionBindingSpecification.groovy
index cfe66a8031f..0fcbb5ac31a 100644
--- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/ClientSessionBindingSpecification.groovy
+++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/ClientSessionBindingSpecification.groovy
@@ -16,6 +16,7 @@
package com.mongodb.reactivestreams.client.internal
+import com.mongodb.ClusterFixture
import com.mongodb.ReadPreference
import com.mongodb.ServerAddress
import com.mongodb.async.FutureResultCallback
@@ -31,32 +32,34 @@ import com.mongodb.internal.connection.ServerTuple
import com.mongodb.reactivestreams.client.ClientSession
import spock.lang.Specification
-import static com.mongodb.ClusterFixture.OPERATION_CONTEXT
+import static com.mongodb.ClusterFixture.createOperationContext
class ClientSessionBindingSpecification extends Specification {
def 'should return the session context from the connection source'() {
given:
def session = Stub(ClientSession)
+ def operationContext = ClusterFixture.createOperationContext()
def wrappedBinding = Mock(AsyncClusterAwareReadWriteBinding);
wrappedBinding.retain() >> wrappedBinding
def binding = new ClientSessionBinding(session, false, wrappedBinding)
when:
def futureResultCallback = new FutureResultCallback()
- binding.getReadConnectionSource(OPERATION_CONTEXT, futureResultCallback)
+
+ binding.getReadConnectionSource(operationContext, futureResultCallback)
then:
- 1 * wrappedBinding.getReadConnectionSource(OPERATION_CONTEXT, _) >> {
+ 1 * wrappedBinding.getReadConnectionSource(operationContext, _) >> {
it[1].onResult(Stub(AsyncConnectionSource), null)
}
when:
futureResultCallback = new FutureResultCallback()
- binding.getWriteConnectionSource(OPERATION_CONTEXT, futureResultCallback)
+ binding.getWriteConnectionSource(operationContext, futureResultCallback)
then:
- 1 * wrappedBinding.getWriteConnectionSource(OPERATION_CONTEXT, _) >> {
+ 1 * wrappedBinding.getWriteConnectionSource(operationContext, _) >> {
it[1].onResult(Stub(AsyncConnectionSource), null)
}
}
@@ -87,10 +90,10 @@ class ClientSessionBindingSpecification extends Specification {
def wrappedBinding = createStubBinding()
def binding = new ClientSessionBinding(session, true, wrappedBinding)
def futureResultCallback = new FutureResultCallback()
- binding.getReadConnectionSource(OPERATION_CONTEXT, futureResultCallback)
+ binding.getReadConnectionSource(createOperationContext(), futureResultCallback)
def readConnectionSource = futureResultCallback.get()
futureResultCallback = new FutureResultCallback()
- binding.getWriteConnectionSource(OPERATION_CONTEXT, futureResultCallback)
+ binding.getWriteConnectionSource(createOperationContext(), futureResultCallback)
def writeConnectionSource = futureResultCallback.get()
when:
diff --git a/driver-scala/src/main/scala/org/mongodb/scala/ClientSessionImplicits.scala b/driver-scala/src/main/scala/org/mongodb/scala/ClientSessionImplicits.scala
index 9718b01c1a8..b8824dc31f5 100644
--- a/driver-scala/src/main/scala/org/mongodb/scala/ClientSessionImplicits.scala
+++ b/driver-scala/src/main/scala/org/mongodb/scala/ClientSessionImplicits.scala
@@ -33,7 +33,7 @@ trait ClientSessionImplicits {
/**
* Commit a transaction in the context of this session.
*
- * A transaction can only be commmited if one has first been started.
+ * A transaction can only be committed if one has first been started.
*/
def commitTransaction(): SingleObservable[Unit] = clientSession.commitTransaction()
diff --git a/driver-scala/src/main/scala/org/mongodb/scala/package.scala b/driver-scala/src/main/scala/org/mongodb/scala/package.scala
index 1cdc2d0a564..487987a6fe9 100644
--- a/driver-scala/src/main/scala/org/mongodb/scala/package.scala
+++ b/driver-scala/src/main/scala/org/mongodb/scala/package.scala
@@ -215,18 +215,48 @@ package object scala extends ClientSessionImplicits with ObservableImplicits wit
/**
* An error label indicating that the exception can be treated as a transient transaction error.
+ * See the documentation linked below for more information.
*
+ * @see [[https://www.mongodb.com/docs/manual/core/transactions-in-applications/#std-label-transient-transaction-error TransientTransactionError]]
* @since 2.4
*/
val TRANSIENT_TRANSACTION_ERROR_LABEL: String = com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL
/**
* An error label indicating that the exception can be treated as an unknown transaction commit result.
+ * See the documentation linked below for more information.
*
+ * @see [[https://www.mongodb.com/docs/manual/core/transactions-in-applications/#std-label-unknown-transaction-commit-result UnknownTransactionCommitResult]]
* @since 2.4
*/
val UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL: String =
com.mongodb.MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL
+
+ /**
+ * Server is overloaded and shedding load.
+ * If an application retries explicitly, it should use exponential backoff because the server has indicated overload.
+ * This label on its own does not mean that the operation can be [[MongoException.RETRYABLE_ERROR_LABEL safely retried]].
+ *
+ * @see [[https://www.mongodb.com/docs/atlas/overload-errors/ Overload errors]]
+ * @since 5.7
+ * @note Requires MongoDB 8.3 or greater
+ */
+ val SYSTEM_OVERLOADED_ERROR_LABEL: String = com.mongodb.MongoException.SYSTEM_OVERLOADED_ERROR_LABEL
+
+ /**
+ * The operation is safe to retry, that is,
+ * retry without rereading the relevant data or considering the semantics of the operation.
+ *
+ * For more information on how transactions affect retries,
+ * see the documentation of the
+ * [[MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL "TransientTransactionError"]],
+ * [[MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL "UnknownTransactionCommitResult"]] error labels.
+ *
+ * @see [[https://www.mongodb.com/docs/atlas/overload-errors/ Overload errors]]
+ * @since 5.7
+ * @note Requires MongoDB 8.3 or greater
+ */
+ val RETRYABLE_ERROR_LABEL: String = com.mongodb.MongoException.RETRYABLE_ERROR_LABEL
}
/**
diff --git a/driver-sync/src/examples/documentation/TransactionExample.java b/driver-sync/src/examples/documentation/TransactionExample.java
index 4f73122ee35..dea86b9ad4b 100644
--- a/driver-sync/src/examples/documentation/TransactionExample.java
+++ b/driver-sync/src/examples/documentation/TransactionExample.java
@@ -77,7 +77,8 @@ private void runTransactionWithRetry(final Runnable transactional) {
System.out.println("Transaction aborted. Caught exception during transaction.");
if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
- System.out.println("TransientTransactionError, aborting transaction and retrying ...");
+ System.out.printf("%s, aborting transaction and retrying ...%n",
+ MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL);
} else {
throw e;
}
@@ -94,7 +95,8 @@ private void commitWithRetry(final ClientSession clientSession) {
} catch (MongoException e) {
// can retry commit
if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
- System.out.println("UnknownTransactionCommitResult, retrying commit operation ...");
+ System.out.printf("%s, retrying commit operation ...%n",
+ MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL);
} else {
System.out.println("Exception during commit ...");
throw e;
diff --git a/driver-sync/src/main/com/mongodb/client/ClientSession.java b/driver-sync/src/main/com/mongodb/client/ClientSession.java
index 00ba5eba23c..d35ee570900 100644
--- a/driver-sync/src/main/com/mongodb/client/ClientSession.java
+++ b/driver-sync/src/main/com/mongodb/client/ClientSession.java
@@ -16,6 +16,7 @@
package com.mongodb.client;
+import com.mongodb.MongoException;
import com.mongodb.ServerAddress;
import com.mongodb.TransactionOptions;
import com.mongodb.internal.observability.micrometer.TransactionSpan;
@@ -76,6 +77,7 @@ public interface ClientSession extends com.mongodb.session.ClientSession {
* Start a transaction in the context of this session with default transaction options. A transaction can not be started if there is
* already an active transaction on this session.
*
+ * @see MongoException#TRANSIENT_TRANSACTION_ERROR_LABEL
* @mongodb.server.release 4.0
*/
void startTransaction();
@@ -86,13 +88,15 @@ public interface ClientSession extends com.mongodb.session.ClientSession {
*
* @param transactionOptions the options to apply to the transaction
*
+ * @see MongoException#TRANSIENT_TRANSACTION_ERROR_LABEL
* @mongodb.server.release 4.0
*/
void startTransaction(TransactionOptions transactionOptions);
/**
- * Commit a transaction in the context of this session. A transaction can only be commmited if one has first been started.
+ * Commit a transaction in the context of this session. A transaction can only be committed if one has first been started.
*
+ * @see MongoException#UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL
* @mongodb.server.release 4.0
*/
void commitTransaction();
@@ -110,6 +114,8 @@ public interface ClientSession extends com.mongodb.session.ClientSession {
* @param the return type of the transaction body
* @param transactionBody the body of the transaction
* @return the return value of the transaction body
+ * @see MongoException#TRANSIENT_TRANSACTION_ERROR_LABEL
+ * @see MongoException#UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL
* @mongodb.server.release 4.0
* @since 3.11
*/
@@ -122,6 +128,8 @@ public interface ClientSession extends com.mongodb.session.ClientSession {
* @param transactionBody the body of the transaction
* @param options the transaction options
* @return the return value of the transaction body
+ * @see MongoException#TRANSIENT_TRANSACTION_ERROR_LABEL
+ * @see MongoException#UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL
* @mongodb.server.release 4.0
* @since 3.11
*/
diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java
index bbeb7419bc7..9ba2139f18c 100644
--- a/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java
+++ b/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java
@@ -105,7 +105,7 @@ public MongoClientImpl(final Cluster cluster,
(SynchronousContextProvider) settings.getContextProvider(),
autoEncryptionSettings == null ? null : createCrypt(settings, autoEncryptionSettings), this,
operationExecutor, settings.getReadConcern(), settings.getReadPreference(), settings.getRetryReads(),
- settings.getRetryWrites(), settings.getServerApi(),
+ settings.getRetryWrites(), settings.getEnableOverloadRetargeting(), settings.getServerApi(),
new ServerSessionPool(cluster, TimeoutSettings.create(settings), settings.getServerApi()),
TimeoutSettings.create(settings), settings.getUuidRepresentation(),
settings.getWriteConcern(), new TracingManager(settings.getObservabilitySettings()));
diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java
index 920feb1f986..b5604a7a846 100644
--- a/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java
+++ b/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java
@@ -104,6 +104,7 @@ final class MongoClusterImpl implements MongoCluster {
private final ReadPreference readPreference;
private final boolean retryReads;
private final boolean retryWrites;
+ private final boolean enableOverloadRetargeting;
@Nullable
private final ServerApi serverApi;
private final ServerSessionPool serverSessionPool;
@@ -117,10 +118,9 @@ final class MongoClusterImpl implements MongoCluster {
@Nullable final AutoEncryptionSettings autoEncryptionSettings, final Cluster cluster, final CodecRegistry codecRegistry,
@Nullable final SynchronousContextProvider contextProvider, @Nullable final Crypt crypt, final Object originator,
@Nullable final OperationExecutor operationExecutor, final ReadConcern readConcern, final ReadPreference readPreference,
- final boolean retryReads, final boolean retryWrites, @Nullable final ServerApi serverApi,
- final ServerSessionPool serverSessionPool, final TimeoutSettings timeoutSettings, final UuidRepresentation uuidRepresentation,
- final WriteConcern writeConcern,
- final TracingManager tracingManager) {
+ final boolean retryReads, final boolean retryWrites, final boolean enableOverloadRetargeting,
+ @Nullable final ServerApi serverApi, final ServerSessionPool serverSessionPool, final TimeoutSettings timeoutSettings,
+ final UuidRepresentation uuidRepresentation, final WriteConcern writeConcern, final TracingManager tracingManager) {
this.autoEncryptionSettings = autoEncryptionSettings;
this.cluster = cluster;
this.codecRegistry = codecRegistry;
@@ -132,6 +132,7 @@ final class MongoClusterImpl implements MongoCluster {
this.readPreference = readPreference;
this.retryReads = retryReads;
this.retryWrites = retryWrites;
+ this.enableOverloadRetargeting = enableOverloadRetargeting;
this.serverApi = serverApi;
this.serverSessionPool = serverSessionPool;
this.timeoutSettings = timeoutSettings;
@@ -180,35 +181,35 @@ public Long getTimeout(final TimeUnit timeUnit) {
@Override
public MongoCluster withCodecRegistry(final CodecRegistry codecRegistry) {
return new MongoClusterImpl(autoEncryptionSettings, cluster, codecRegistry, contextProvider, crypt, originator,
- operationExecutor, readConcern, readPreference, retryReads, retryWrites, serverApi, serverSessionPool, timeoutSettings,
+ operationExecutor, readConcern, readPreference, retryReads, retryWrites, enableOverloadRetargeting, serverApi, serverSessionPool, timeoutSettings,
uuidRepresentation, writeConcern, tracingManager);
}
@Override
public MongoCluster withReadPreference(final ReadPreference readPreference) {
return new MongoClusterImpl(autoEncryptionSettings, cluster, codecRegistry, contextProvider, crypt, originator,
- operationExecutor, readConcern, readPreference, retryReads, retryWrites, serverApi, serverSessionPool, timeoutSettings,
+ operationExecutor, readConcern, readPreference, retryReads, retryWrites, enableOverloadRetargeting, serverApi, serverSessionPool, timeoutSettings,
uuidRepresentation, writeConcern, tracingManager);
}
@Override
public MongoCluster withWriteConcern(final WriteConcern writeConcern) {
return new MongoClusterImpl(autoEncryptionSettings, cluster, codecRegistry, contextProvider, crypt, originator,
- operationExecutor, readConcern, readPreference, retryReads, retryWrites, serverApi, serverSessionPool, timeoutSettings,
+ operationExecutor, readConcern, readPreference, retryReads, retryWrites, enableOverloadRetargeting, serverApi, serverSessionPool, timeoutSettings,
uuidRepresentation, writeConcern, tracingManager);
}
@Override
public MongoCluster withReadConcern(final ReadConcern readConcern) {
return new MongoClusterImpl(autoEncryptionSettings, cluster, codecRegistry, contextProvider, crypt, originator,
- operationExecutor, readConcern, readPreference, retryReads, retryWrites, serverApi, serverSessionPool, timeoutSettings,
+ operationExecutor, readConcern, readPreference, retryReads, retryWrites, enableOverloadRetargeting, serverApi, serverSessionPool, timeoutSettings,
uuidRepresentation, writeConcern, tracingManager);
}
@Override
public MongoCluster withTimeout(final long timeout, final TimeUnit timeUnit) {
return new MongoClusterImpl(autoEncryptionSettings, cluster, codecRegistry, contextProvider, crypt, originator,
- operationExecutor, readConcern, readPreference, retryReads, retryWrites, serverApi, serverSessionPool,
+ operationExecutor, readConcern, readPreference, retryReads, retryWrites, enableOverloadRetargeting, serverApi, serverSessionPool,
timeoutSettings.withTimeout(timeout, timeUnit), uuidRepresentation, writeConcern, tracingManager);
}
@@ -530,7 +531,8 @@ private OperationContext getOperationContext(final ClientSession session, final
createTimeoutContext(session, executorTimeoutSettings),
tracingManager,
serverApi,
- commandName);
+ commandName,
+ new OperationContext.ServerDeprioritization(enableOverloadRetargeting));
}
private RequestContext getRequestContext() {
@@ -591,9 +593,9 @@ ClientSession getClientSession(@Nullable final ClientSession clientSessionFromOp
* Create a tracing span for the given operation, and set it on operation context.
*
* @param actualClientSession the session that the operation is part of
- * @param operationContext the operation context for the operation
- * @param commandName the name of the command
- * @param namespace the namespace of the command
+ * @param operationContext the operation context for the operation
+ * @param commandName the name of the command
+ * @param namespace the namespace of the command
* @return the created span, or null if tracing is not enabled
*/
@Nullable
diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java
index 7828ecde684..3d2d58dc4c8 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java
@@ -48,6 +48,7 @@
import com.mongodb.event.ConnectionCreatedEvent;
import com.mongodb.event.ConnectionReadyEvent;
+import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL;
import static com.mongodb.internal.connection.CommandHelper.HELLO;
import static com.mongodb.internal.connection.CommandHelper.LEGACY_HELLO;
@@ -687,7 +688,7 @@ public void test10CustomTestWithTransactionUsesASingleTimeoutWithLock() {
+ " blockConnection: true,"
+ " blockTimeMS: " + 25
+ " errorCode: " + 24
- + " errorLabels: [\"TransientTransactionError\"]"
+ + " errorLabels: [\"" + TRANSIENT_TRANSACTION_ERROR_LABEL + "\"]"
+ " }"
+ "}");
diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractRetryableReadsProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractRetryableReadsProseTest.java
new file mode 100644
index 00000000000..4c6c536fac1
--- /dev/null
+++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractRetryableReadsProseTest.java
@@ -0,0 +1,248 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.client;
+
+import com.mongodb.MongoClientSettings;
+import com.mongodb.ReadPreference;
+import com.mongodb.ServerAddress;
+import com.mongodb.client.test.CollectionHelper;
+import com.mongodb.event.CommandFailedEvent;
+import com.mongodb.event.CommandSucceededEvent;
+import com.mongodb.internal.connection.TestClusterListener;
+import com.mongodb.internal.connection.TestCommandListener;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet;
+import static com.mongodb.ClusterFixture.serverVersionAtLeast;
+import static com.mongodb.MongoException.RETRYABLE_ERROR_LABEL;
+import static com.mongodb.MongoException.SYSTEM_OVERLOADED_ERROR_LABEL;
+import static com.mongodb.client.Fixture.getDefaultDatabaseName;
+import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder;
+import static com.mongodb.client.Fixture.getPrimary;
+import static com.mongodb.client.model.Filters.eq;
+import static java.lang.String.format;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ *
+ * Prose Tests.
+ */
+public abstract class AbstractRetryableReadsProseTest {
+
+ private static final String COLLECTION_NAME = "test";
+
+ private final TestCommandListener commandListener =
+ new TestCommandListener(asList("commandFailedEvent", "commandSucceededEvent"), emptyList());
+ private final TestClusterListener clusterListener = new TestClusterListener();
+
+ protected abstract MongoClient createClient(MongoClientSettings settings);
+
+ @AfterEach
+ void afterEach() {
+ CollectionHelper.dropDatabase(getDefaultDatabaseName());
+ commandListener.reset();
+ clusterListener.clearClusterDescriptionChangedEvents();
+ }
+
+ /**
+ *
+ * 1. PoolClearedError Retryability Test.
+ */
+ @Test
+ void poolClearedExceptionMustBeRetryable() throws Exception {
+ RetryableWritesProseTest.poolClearedExceptionMustBeRetryable(this::createClient,
+ mongoCollection -> mongoCollection.find(eq(0)).iterator().hasNext(), "find", false);
+ }
+
+ /**
+ *
+ * 2.1 Retryable Reads Are Retried on a Different mongos When One is Available.
+ */
+ @Test
+ void retriesOnDifferentMongosWhenAvailable() throws InterruptedException, TimeoutException {
+ RetryableWritesProseTest.retriesOnDifferentMongosWhenAvailable(this::createClient,
+ mongoCollection -> {
+ try (MongoCursor cursor = mongoCollection.find().iterator()) {
+ return cursor.hasNext();
+ }
+ }, "find", false);
+ }
+
+ /**
+ *
+ * 2.2 Retryable Reads Are Retried on the Same mongos When No Others are Available.
+ */
+ @Test
+ void retriesOnSameMongosWhenAnotherNotAvailable() {
+ RetryableWritesProseTest.retriesOnSameMongosWhenAnotherNotAvailable(this::createClient,
+ mongoCollection -> {
+ try (MongoCursor cursor = mongoCollection.find().iterator()) {
+ return cursor.hasNext();
+ }
+ }, "find", false);
+ }
+
+ /**
+ *
+ * 3.1 Retryable Reads Caused by Overload Errors Are Retried on a Different Replicaset Server When One is Available and enableOverloadRetargeting is enabled.
+ */
+ @Test
+ void overloadErrorRetriedOnDifferentReplicaSetServer() throws InterruptedException, TimeoutException {
+ //given
+ assumeTrue(serverVersionAtLeast(4, 4));
+ assumeTrue(isDiscoverableReplicaSet());
+ BsonDocument configureFailPoint = BsonDocument.parse(
+ "{\n"
+ + " configureFailPoint: \"failCommand\",\n"
+ + " mode: { times: 1 },\n"
+ + " data: {\n"
+ + " failCommands: [\"find\"],\n"
+ + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + "'],\n"
+ + " errorCode: 6\n"
+ + " }\n"
+ + "}\n");
+
+ try (FailPoint ignored = FailPoint.enable(configureFailPoint, getPrimary());
+ MongoClient client = createClient(getMongoClientSettingsBuilder()
+ .retryReads(true)
+ .readPreference(ReadPreference.primaryPreferred())
+ .enableOverloadRetargeting(true)
+ .addCommandListener(commandListener)
+ .applyToClusterSettings(builder -> builder.addClusterListener(clusterListener))
+ .build())) {
+
+ waitForClusterDiscovery();
+
+ MongoCollection collection = client.getDatabase(getDefaultDatabaseName())
+ .getCollection(COLLECTION_NAME);
+ commandListener.reset();
+
+ //when
+ collection.find().first();
+
+ //then
+ List commandFailedEvents = commandListener.getCommandFailedEvents();
+ assertEquals(1, commandFailedEvents.size());
+ List commandSucceededEvents = commandListener.getCommandSucceededEvents();
+ assertEquals(1, commandSucceededEvents.size());
+
+ ServerAddress failedServer = commandFailedEvents.get(0).getConnectionDescription().getServerAddress();
+ ServerAddress succeededServer = commandSucceededEvents.get(0).getConnectionDescription().getServerAddress();
+
+ assertNotEquals(failedServer, succeededServer,
+ format("Expected retry on different server but both were %s", failedServer));
+ }
+ }
+
+ /**
+ *
+ * 3.2 Retryable Reads Caused by Non-Overload Errors Are Retried on the Same Replicaset Server.
+ */
+ @Test
+ void nonOverloadErrorRetriedOnSameReplicaSetServer() throws InterruptedException, TimeoutException {
+ BsonDocument configureFailPoint = BsonDocument.parse(
+ "{\n"
+ + " configureFailPoint: \"failCommand\",\n"
+ + " mode: { times: 1 },\n"
+ + " data: {\n"
+ + " failCommands: [\"find\"],\n"
+ + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "'],\n"
+ + " errorCode: 6\n"
+ + " }\n"
+ + "}\n");
+ testRetriedOnTheSameServer(configureFailPoint);
+ }
+
+ /**
+ *
+ * 3.3 Retryable Reads Caused by Overload Errors Are Retried on Same Replicaset Server When enableOverloadRetargeting is disabled.
+ */
+ @Test
+ void overloadErrorRetriedOnSameReplicaSetServerWhenRetargetingDisabled() throws InterruptedException, TimeoutException {
+ BsonDocument configureFailPoint = BsonDocument.parse(
+ "{\n"
+ + " configureFailPoint: \"failCommand\",\n"
+ + " mode: { times: 1 },\n"
+ + " data: {\n"
+ + " failCommands: [\"find\"],\n"
+ + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + "'],\n"
+ + " errorCode: 6\n"
+ + " }\n"
+ + "}\n");
+ testRetriedOnTheSameServer(configureFailPoint);
+ }
+
+ private void testRetriedOnTheSameServer(final BsonDocument configureFailPoint) throws InterruptedException, TimeoutException {
+ //given
+ assumeTrue(serverVersionAtLeast(4, 4));
+ assumeTrue(isDiscoverableReplicaSet());
+ TestCommandListener commandListener = new TestCommandListener(asList("commandFailedEvent", "commandSucceededEvent"), emptyList());
+
+ try (FailPoint ignored = FailPoint.enable(configureFailPoint, getPrimary());
+ MongoClient client = createClient(getMongoClientSettingsBuilder()
+ .retryReads(true)
+ .readPreference(ReadPreference.primaryPreferred())
+ .addCommandListener(commandListener)
+ .applyToClusterSettings(builder -> builder.addClusterListener(clusterListener))
+ .build())) {
+
+ waitForClusterDiscovery();
+
+ MongoCollection collection = client.getDatabase(getDefaultDatabaseName())
+ .getCollection(COLLECTION_NAME);
+ commandListener.reset();
+
+ //when
+ collection.find().first();
+
+ //then
+ List commandFailedEvents = commandListener.getCommandFailedEvents();
+ assertEquals(1, commandFailedEvents.size());
+ List commandSucceededEvents = commandListener.getCommandSucceededEvents();
+ assertEquals(1, commandSucceededEvents.size());
+
+ ServerAddress failedServer = commandFailedEvents.get(0).getConnectionDescription().getServerAddress();
+ ServerAddress succeededServer = commandSucceededEvents.get(0).getConnectionDescription().getServerAddress();
+
+ assertEquals(failedServer, succeededServer,
+ format("Expected retry on same server but got %s and %s", failedServer, succeededServer));
+ }
+ }
+
+ private void waitForClusterDiscovery() throws InterruptedException, TimeoutException {
+ // We need both primary and secondary to be discovered (not UNKNOWN) before running the deprioritization tests.
+ //
+ // 1. The failpoint is set on the primary. If the primary is not yet discovered,
+ // primaryPreferred may route the find to a secondary, and the failpoint never fires.
+ //
+ // 2. When the primary is deprioritized on retry, primaryPreferred falls back to a secondary.
+ // If the secondaries are still UNKNOWN at that point, the fallback yields no selectable servers,
+ // causing the deprioritized primary to be selected again.
+ clusterListener.waitForAllServersDiscovered(Duration.ofSeconds(10));
+ }
+}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/BackpressureProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/BackpressureProseTest.java
new file mode 100644
index 00000000000..ec76ffa57af
--- /dev/null
+++ b/driver-sync/src/test/functional/com/mongodb/client/BackpressureProseTest.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.client;
+
+import com.mongodb.MongoClientSettings;
+import com.mongodb.MongoServerException;
+import com.mongodb.internal.connection.TestCommandListener;
+import com.mongodb.internal.time.StartTime;
+import com.mongodb.lang.Nullable;
+import org.bson.BsonDocument;
+import org.bson.Document;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static com.mongodb.ClusterFixture.serverVersionAtLeast;
+import static com.mongodb.MongoException.RETRYABLE_ERROR_LABEL;
+import static com.mongodb.MongoException.SYSTEM_OVERLOADED_ERROR_LABEL;
+import static com.mongodb.client.Fixture.getDefaultDatabaseName;
+import static com.mongodb.client.Fixture.getMongoClientSettings;
+import static com.mongodb.client.Fixture.getPrimary;
+import static java.lang.String.format;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/**
+ *
+ * Prose Tests.
+ */
+public class BackpressureProseTest {
+ protected MongoClient createClient(final MongoClientSettings mongoClientSettings) {
+ return MongoClients.create(mongoClientSettings);
+ }
+
+ /**
+ *
+ * Test 1: Operation Retry Uses Exponential Backoff.
+ */
+ @Test
+ @Disabled("TODO-BACKPRESSURE Valentin Enable when implementing JAVA-5956, JAVA-6117, JAVA-6113, JAVA-6119, JAVA-6141 if PR 1899 is merged")
+ void operationRetryUsesExponentialBackoff() throws InterruptedException {
+ assumeTrue(serverVersionAtLeast(4, 4));
+ BsonDocument configureFailPoint = BsonDocument.parse(
+ "{\n"
+ + " configureFailPoint: 'failCommand',\n"
+ + " mode: 'alwaysOn',\n"
+ + " data: {\n"
+ + " failCommands: ['insert'],\n"
+ + " errorCode: 2,\n"
+ + " errorLabels: ['" + SYSTEM_OVERLOADED_ERROR_LABEL + "', '" + RETRYABLE_ERROR_LABEL + "']\n"
+ + " }\n"
+ + "}\n");
+ try (MongoClient client = createClient(getMongoClientSettings());
+ FailPoint ignored = FailPoint.enable(configureFailPoint, getPrimary())) {
+ MongoCollection collection = dropAndGetCollection("operationRetryUsesExponentialBackoff", client);
+ long noBackoffTimeMillis = measureFailedInsertDuration(collection, false).toMillis();
+ long withBackoffTimeMillis = measureFailedInsertDuration(collection, true).toMillis();
+ long expectedMaxVarianceMillis = 300;
+ long maxTotalBackoffMillis = 300;
+ long actualAbsDiffMillis = Math.abs(withBackoffTimeMillis - (noBackoffTimeMillis + maxTotalBackoffMillis));
+ assertTrue(actualAbsDiffMillis < expectedMaxVarianceMillis,
+ format("Expected actualAbsDiffMillis < %d ms, but was %d ms (|%d ms - (%d ms + %d ms)|)",
+ expectedMaxVarianceMillis, actualAbsDiffMillis, withBackoffTimeMillis, noBackoffTimeMillis, maxTotalBackoffMillis));
+ }
+ }
+
+ private static Duration measureFailedInsertDuration(final MongoCollection collection, final boolean retryBackoff) {
+ // TODO-BACKPRESSURE Valentin uncomment below when https://github.com/mongodb/mongo-java-driver/pull/1899 is merged
+ // ExponentialBackoff.setTestJitterSupplier(() -> retryBackoff ? 1 : 0);
+ try {
+ StartTime startTime = StartTime.now();
+ assertThrows(MongoServerException.class, () -> collection.insertOne(Document.parse("{a: 1}")));
+ return startTime.elapsed();
+ } finally {
+ // TODO-BACKPRESSURE Valentin uncomment below when https://github.com/mongodb/mongo-java-driver/pull/1899 is merged
+ // ExponentialBackoff.clearTestJitterSupplier();
+ }
+ }
+
+ /**
+ *
+ * Test 3: Overload Errors are Retried a Maximum of {@code MAX_RETRIES} times.
+ */
+ @Test
+ @Disabled("TODO-BACKPRESSURE Valentin Enable when implementing JAVA-5956, JAVA-6117, JAVA-6113, JAVA-6119, JAVA-6141")
+ void overloadErrorsAreRetriedAtMostMaxRetriesTimes() throws InterruptedException {
+ overloadErrorsAreRetriedLimitedNumberOfTimes(null);
+ }
+
+ /**
+ *
+ * Test 4: Overload Errors are Retried a Maximum of {@code maxAdaptiveRetries} times when configured.
+ */
+ @Test
+ @Disabled("TODO-BACKPRESSURE Valentin Enable when implementing JAVA-5956, JAVA-6117, JAVA-6113, JAVA-6119, JAVA-6141")
+ void overloadErrorsAreRetriedAtMostMaxAdaptiveRetriesTimesWhenConfigured() throws InterruptedException {
+ overloadErrorsAreRetriedLimitedNumberOfTimes(1);
+ }
+
+ private void overloadErrorsAreRetriedLimitedNumberOfTimes(@Nullable final Integer maxAdaptiveRetries)
+ throws InterruptedException {
+ assumeTrue(serverVersionAtLeast(4, 4));
+ TestCommandListener commandListener = new TestCommandListener();
+ BsonDocument configureFailPoint = BsonDocument.parse(
+ "{\n"
+ + " configureFailPoint: 'failCommand',\n"
+ + " mode: 'alwaysOn',\n"
+ + " data: {\n"
+ + " failCommands: ['find'],\n"
+ + " errorCode: 462,\n"
+ + " errorLabels: ['" + SYSTEM_OVERLOADED_ERROR_LABEL + "', '" + RETRYABLE_ERROR_LABEL + "']\n"
+ + " }\n"
+ + "}\n");
+ try (MongoClient client = createClient(MongoClientSettings.builder(getMongoClientSettings())
+ .maxAdaptiveRetries(maxAdaptiveRetries)
+ .addCommandListener(commandListener)
+ .build());
+ FailPoint ignored = FailPoint.enable(configureFailPoint, getPrimary())) {
+ MongoCollection collection = dropAndGetCollection("overloadErrorsAreRetriedLimitedNumberOfTimes", client);
+ commandListener.reset();
+ MongoServerException exception = assertThrows(MongoServerException.class, () -> collection.find().first());
+ assertTrue(exception.hasErrorLabel(SYSTEM_OVERLOADED_ERROR_LABEL));
+ assertTrue(exception.hasErrorLabel(RETRYABLE_ERROR_LABEL));
+ // TODO-BACKPRESSURE Valentin replace 2 with `MAX_RETRIES` when implementing JAVA-5956, JAVA-6117, JAVA-6113, JAVA-6119, JAVA-6141
+ int expectedAttempts = (maxAdaptiveRetries == null ? 2 : maxAdaptiveRetries) + 1;
+ assertEquals(expectedAttempts, commandListener.getCommandStartedEvents().size());
+ }
+ }
+
+ private static MongoCollection dropAndGetCollection(final String name, final MongoClient client) {
+ MongoCollection result = client.getDatabase(getDefaultDatabaseName()).getCollection(name);
+ result.drop();
+ return result;
+ }
+}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/ContextProviderTest.java b/driver-sync/src/test/functional/com/mongodb/client/ContextProviderTest.java
index caf676a8ab7..c0247c9c7a2 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/ContextProviderTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/ContextProviderTest.java
@@ -19,6 +19,7 @@
import com.mongodb.ContextProvider;
import com.mongodb.RequestContext;
import com.mongodb.WriteConcern;
+import com.mongodb.annotations.NotThreadSafe;
import com.mongodb.event.CommandFailedEvent;
import com.mongodb.event.CommandListener;
import com.mongodb.event.CommandStartedEvent;
@@ -206,6 +207,7 @@ public void contextShouldBeAvailableInCommandEvents() {
}
}
+ @NotThreadSafe
private static final class TestCommandListener implements CommandListener {
private int numCommandStartedEventsWithExpectedContext;
private int numCommandSucceededEventsWithExpectedContext;
diff --git a/driver-sync/src/test/functional/com/mongodb/client/MongoWriteConcernWithResponseExceptionTest.java b/driver-sync/src/test/functional/com/mongodb/client/MongoWriteConcernWithResponseExceptionTest.java
index 6f90b3f5f01..eccc892ce77 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/MongoWriteConcernWithResponseExceptionTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/MongoWriteConcernWithResponseExceptionTest.java
@@ -43,6 +43,8 @@
import static com.mongodb.ClusterFixture.serverVersionAtLeast;
import static com.mongodb.client.Fixture.getDefaultDatabaseName;
import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder;
+import static com.mongodb.internal.operation.CommandOperationHelper.NO_WRITES_PERFORMED_ERROR_LABEL;
+import static com.mongodb.internal.operation.CommandOperationHelper.RETRYABLE_WRITE_ERROR_LABEL;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertThrows;
import static org.junit.Assume.assumeTrue;
@@ -69,7 +71,7 @@ public static void doesNotLeak(final Function
.append("data", new BsonDocument()
.append("writeConcernError", new BsonDocument()
.append("code", new BsonInt32(91))
- .append("errorLabels", new BsonArray(Stream.of("RetryableWriteError")
+ .append("errorLabels", new BsonArray(Stream.of(RETRYABLE_WRITE_ERROR_LABEL)
.map(BsonString::new).collect(Collectors.toList())))
.append("errmsg", new BsonString(""))
)
@@ -81,7 +83,7 @@ public static void doesNotLeak(final Function
.append("data", new BsonDocument()
.append("failCommands", new BsonArray(singletonList(new BsonString("insert"))))
.append("errorCode", new BsonInt32(10107))
- .append("errorLabels", new BsonArray(Stream.of("RetryableWriteError", "NoWritesPerformed")
+ .append("errorLabels", new BsonArray(Stream.of(RETRYABLE_WRITE_ERROR_LABEL, NO_WRITES_PERFORMED_ERROR_LABEL)
.map(BsonString::new).collect(Collectors.toList()))));
doesNotLeak(clientCreator, writeConcernErrorFpDoc, true, noWritesPerformedFpDoc);
doesNotLeak(clientCreator, noWritesPerformedFpDoc, false, writeConcernErrorFpDoc);
diff --git a/driver-sync/src/test/functional/com/mongodb/client/RetryableReadsProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/RetryableReadsProseTest.java
index ccf18aad5b9..5ca0f75d56b 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/RetryableReadsProseTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/RetryableReadsProseTest.java
@@ -16,57 +16,11 @@
package com.mongodb.client;
-import org.bson.Document;
-import org.junit.jupiter.api.Test;
+import com.mongodb.MongoClientSettings;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-
-import static com.mongodb.client.model.Filters.eq;
-
-/**
- * See
- * Retryable Reads Tests.
- */
-final class RetryableReadsProseTest {
- /**
- * See
- *
- * PoolClearedError Retryability Test.
- */
- @Test
- void poolClearedExceptionMustBeRetryable() throws InterruptedException, ExecutionException, TimeoutException {
- RetryableWritesProseTest.poolClearedExceptionMustBeRetryable(MongoClients::create,
- mongoCollection -> mongoCollection.find(eq(0)).iterator().hasNext(), "find", false);
- }
-
- /**
- * See
- *
- * Retryable Reads Are Retried on a Different mongos When One is Available.
- */
- @Test
- void retriesOnDifferentMongosWhenAvailable() {
- RetryableWritesProseTest.retriesOnDifferentMongosWhenAvailable(MongoClients::create,
- mongoCollection -> {
- try (MongoCursor cursor = mongoCollection.find().iterator()) {
- return cursor.hasNext();
- }
- }, "find", false);
- }
-
- /**
- * See
- *
- * Retryable Reads Are Retried on the Same mongos When No Others are Available.
- */
- @Test
- void retriesOnSameMongosWhenAnotherNotAvailable() {
- RetryableWritesProseTest.retriesOnSameMongosWhenAnotherNotAvailable(MongoClients::create,
- mongoCollection -> {
- try (MongoCursor cursor = mongoCollection.find().iterator()) {
- return cursor.hasNext();
- }
- }, "find", false);
+final class RetryableReadsProseTest extends AbstractRetryableReadsProseTest {
+ @Override
+ protected MongoClient createClient(final MongoClientSettings settings) {
+ return MongoClients.create(settings);
}
}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java
index fae39864bb9..87e8b533351 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/RetryableWritesProseTest.java
@@ -19,44 +19,41 @@
import com.mongodb.ConnectionString;
import com.mongodb.Function;
import com.mongodb.MongoClientSettings;
+import com.mongodb.MongoException;
import com.mongodb.MongoServerException;
import com.mongodb.MongoWriteConcernException;
import com.mongodb.ServerAddress;
-import com.mongodb.assertions.Assertions;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.event.CommandEvent;
import com.mongodb.event.CommandFailedEvent;
-import com.mongodb.event.CommandListener;
import com.mongodb.event.CommandSucceededEvent;
import com.mongodb.event.ConnectionCheckOutFailedEvent;
import com.mongodb.event.ConnectionCheckedOutEvent;
import com.mongodb.event.ConnectionPoolClearedEvent;
import com.mongodb.internal.connection.ServerAddressHelper;
+import com.mongodb.internal.connection.TestClusterListener;
import com.mongodb.internal.connection.TestCommandListener;
import com.mongodb.internal.connection.TestConnectionPoolListener;
-import org.bson.BsonArray;
-import org.bson.BsonBoolean;
+import com.mongodb.internal.event.ConfigureFailPointCommandListener;
+import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
-import org.bson.BsonString;
import org.bson.Document;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static com.mongodb.ClusterFixture.getConnectionString;
import static com.mongodb.ClusterFixture.getMultiMongosConnectionString;
@@ -64,36 +61,36 @@
import static com.mongodb.ClusterFixture.isSharded;
import static com.mongodb.ClusterFixture.isStandalone;
import static com.mongodb.ClusterFixture.serverVersionAtLeast;
+import static com.mongodb.MongoException.RETRYABLE_ERROR_LABEL;
+import static com.mongodb.MongoException.SYSTEM_OVERLOADED_ERROR_LABEL;
import static com.mongodb.client.Fixture.getDefaultDatabaseName;
import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder;
import static com.mongodb.client.Fixture.getMultiMongosMongoClientSettingsBuilder;
+import static com.mongodb.client.Fixture.getPrimary;
+import static com.mongodb.internal.operation.CommandOperationHelper.NO_WRITES_PERFORMED_ERROR_LABEL;
+import static com.mongodb.internal.operation.CommandOperationHelper.RETRYABLE_WRITE_ERROR_LABEL;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
/**
- * See
- * Retryable Write Prose Tests.
+ *
+ * Prose Tests.
*/
-public class RetryableWritesProseTest extends DatabaseTestCase {
-
- @BeforeEach
- @Override
- public void setUp() {
- super.setUp();
- }
-
+public class RetryableWritesProseTest {
/**
- * Prose test #2.
+ *
+ * 2. Test that drivers properly retry after encountering PoolClearedErrors.
*/
@Test
- public void poolClearedExceptionMustBeRetryable() throws InterruptedException, ExecutionException, TimeoutException {
+ void poolClearedExceptionMustBeRetryable() throws Exception {
poolClearedExceptionMustBeRetryable(MongoClients::create,
mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true);
}
@@ -101,8 +98,7 @@ public void poolClearedExceptionMustBeRetryable() throws InterruptedException, E
@SuppressWarnings("try")
public static void poolClearedExceptionMustBeRetryable(
final Function clientCreator,
- final Function, R> operation, final String operationName, final boolean write)
- throws InterruptedException, ExecutionException, TimeoutException {
+ final Function, R> operation, final String commandName, final boolean write) throws Exception {
assumeTrue(serverVersionAtLeast(4, 3) && !(write && isStandalone()));
TestConnectionPoolListener connectionPoolListener = new TestConnectionPoolListener(asList(
"connectionCheckedOutEvent",
@@ -118,7 +114,7 @@ public static void poolClearedExceptionMustBeRetryable(
/* We fake server's state by configuring a fail point. This breaks the mechanism of the
* streaming server monitoring protocol
* (https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-monitoring.md#streaming-protocol)
- * that allows the server to determine whether or not it needs to send a new state to the client.
+ * that allows the server to determine whether it needs to send a new state to the client.
* As a result, the client has to wait for at least its heartbeat delay until it hears back from a server
* (while it waits for a response, calling `ServerMonitor.connect` has no effect).
* Thus, we want to use small heartbeat delay to reduce delays in the test. */
@@ -127,24 +123,23 @@ public static void poolClearedExceptionMustBeRetryable(
.retryWrites(true)
.addCommandListener(commandListener)
.build();
- BsonDocument configureFailPoint = new BsonDocument()
- .append("configureFailPoint", new BsonString("failCommand"))
- .append("mode", new BsonDocument()
- .append("times", new BsonInt32(1)))
- .append("data", new BsonDocument()
- .append("failCommands", new BsonArray(singletonList(new BsonString(operationName))))
- .append("errorCode", new BsonInt32(91))
- .append("errorLabels", write
- ? new BsonArray(singletonList(new BsonString("RetryableWriteError")))
- : new BsonArray())
- .append("blockConnection", BsonBoolean.valueOf(true))
- .append("blockTimeMS", new BsonInt32(1000)));
+ BsonDocument configureFailPoint = BsonDocument.parse(
+ "{\n"
+ + " configureFailPoint: 'failCommand',\n"
+ + " mode: {'times': 1},\n"
+ + " data: {\n"
+ + " failCommands: ['" + commandName + "'],\n"
+ + " errorCode: 91,\n"
+ + " blockConnection: true,\n"
+ + " blockTimeMS: 1000,\n"
+ + (write
+ ? " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "']\n" : "")
+ + " }\n"
+ + "}\n");
int timeoutSeconds = 5;
try (MongoClient client = clientCreator.apply(clientSettings);
- FailPoint ignored = FailPoint.enable(configureFailPoint, Fixture.getPrimary())) {
- MongoCollection collection = client.getDatabase(getDefaultDatabaseName())
- .getCollection("poolClearedExceptionMustBeRetryable");
- collection.drop();
+ FailPoint ignored = FailPoint.enable(configureFailPoint, getPrimary())) {
+ MongoCollection collection = dropAndGetCollection("poolClearedExceptionMustBeRetryable", client);
ExecutorService ex = Executors.newFixedThreadPool(2);
try {
Future result1 = ex.submit(() -> operation.apply(collection));
@@ -158,83 +153,81 @@ public static void poolClearedExceptionMustBeRetryable(
ex.shutdownNow();
}
assertEquals(3, commandListener.getCommandStartedEvents().size());
- commandListener.getCommandStartedEvents().forEach(event -> assertEquals(operationName, event.getCommandName()));
+ commandListener.getCommandStartedEvents().forEach(event -> assertEquals(commandName, event.getCommandName()));
}
}
/**
- * Prose test #3.
+ *
+ * 3. Test that drivers return the original error after encountering a WriteConcernError with a RetryableWriteError label.
*/
@Test
- public void originalErrorMustBePropagatedIfNoWritesPerformed() throws InterruptedException {
+ void originalErrorMustBePropagatedIfNoWritesPerformed() throws Exception {
originalErrorMustBePropagatedIfNoWritesPerformed(MongoClients::create);
}
@SuppressWarnings("try")
public static void originalErrorMustBePropagatedIfNoWritesPerformed(
- final Function clientCreator) throws InterruptedException {
+ final Function clientCreator) throws Exception {
assumeTrue(serverVersionAtLeast(6, 0) && isDiscoverableReplicaSet());
- ServerAddress primaryServerAddress = Fixture.getPrimary();
- CompletableFuture futureFailPointFromListener = new CompletableFuture<>();
- CommandListener commandListener = new CommandListener() {
- private final AtomicBoolean configureFailPoint = new AtomicBoolean(true);
-
- @Override
- public void commandSucceeded(final CommandSucceededEvent event) {
- if (event.getCommandName().equals("insert")
- && event.getResponse().getDocument("writeConcernError", new BsonDocument())
- .getInt32("code", new BsonInt32(-1)).intValue() == 91
- && configureFailPoint.compareAndSet(true, false)) {
- Assertions.assertTrue(futureFailPointFromListener.complete(FailPoint.enable(
- new BsonDocument()
- .append("configureFailPoint", new BsonString("failCommand"))
- .append("mode", new BsonDocument()
- .append("times", new BsonInt32(1)))
- .append("data", new BsonDocument()
- .append("failCommands", new BsonArray(singletonList(new BsonString("insert"))))
- .append("errorCode", new BsonInt32(10107))
- .append("errorLabels", new BsonArray(Stream.of("RetryableWriteError", "NoWritesPerformed")
- .map(BsonString::new).collect(Collectors.toList())))),
- primaryServerAddress
- )));
+ ServerAddress primaryServerAddress = getPrimary();
+ BsonDocument configureFailPoint = BsonDocument.parse(
+ "{\n"
+ + " configureFailPoint: \"failCommand\",\n"
+ + " mode: { times: 1 },\n"
+ + " data: {\n"
+ + " failCommands: ['insert'],\n"
+ + " writeConcernError: {"
+ + " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "'],\n"
+ + " code: 91,\n"
+ + " errmsg: ''\n"
+ + " }\n"
+ + " }\n"
+ + "}\n");
+ BsonDocument configureFailPointFromListener = BsonDocument.parse(
+ "{\n"
+ + " configureFailPoint: \"failCommand\",\n"
+ + " mode: { times: 1 },\n"
+ + " data: {\n"
+ + " failCommands: ['insert'],\n"
+ + " errorCode: 10107,\n"
+ + " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "']\n"
+ + " }\n"
+ + "}\n");
+ Predicate configureFailPointEventMatcher = event -> {
+ if (event instanceof CommandSucceededEvent) {
+ CommandSucceededEvent commandSucceededEvent = (CommandSucceededEvent) event;
+ if (commandSucceededEvent.getCommandName().equals("insert")) {
+ assertEquals(91, commandSucceededEvent.getResponse().getDocument("writeConcernError", new BsonDocument())
+ .getInt32("code", new BsonInt32(-1)).intValue());
+ return true;
}
+ return false;
}
+ return false;
};
- BsonDocument failPointDocument = new BsonDocument()
- .append("configureFailPoint", new BsonString("failCommand"))
- .append("mode", new BsonDocument()
- .append("times", new BsonInt32(1)))
- .append("data", new BsonDocument()
- .append("writeConcernError", new BsonDocument()
- .append("code", new BsonInt32(91))
- .append("errorLabels", new BsonArray(Stream.of("RetryableWriteError")
- .map(BsonString::new).collect(Collectors.toList())))
- .append("errmsg", new BsonString(""))
- )
- .append("failCommands", new BsonArray(singletonList(new BsonString("insert")))));
- try (MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder()
- .retryWrites(true)
- .addCommandListener(commandListener)
- .applyToServerSettings(builder ->
- // see `poolClearedExceptionMustBeRetryable` for the explanation
- builder.heartbeatFrequency(50, TimeUnit.MILLISECONDS))
- .build());
- FailPoint ignored = FailPoint.enable(failPointDocument, primaryServerAddress)) {
- MongoCollection collection = client.getDatabase(getDefaultDatabaseName())
- .getCollection("originalErrorMustBePropagatedIfNoWritesPerformed");
- collection.drop();
+ try (ConfigureFailPointCommandListener commandListener = new ConfigureFailPointCommandListener(
+ configureFailPointFromListener, primaryServerAddress, configureFailPointEventMatcher);
+ MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder()
+ .retryWrites(true)
+ .addCommandListener(commandListener)
+ .applyToServerSettings(builder ->
+ // see `poolClearedExceptionMustBeRetryable` for the explanation
+ builder.heartbeatFrequency(50, TimeUnit.MILLISECONDS))
+ .build());
+ FailPoint ignored = FailPoint.enable(configureFailPoint, primaryServerAddress)) {
+ MongoCollection collection = dropAndGetCollection("originalErrorMustBePropagatedIfNoWritesPerformed", client);
MongoWriteConcernException e = assertThrows(MongoWriteConcernException.class, () -> collection.insertOne(new Document()));
assertEquals(91, e.getCode());
- } finally {
- futureFailPointFromListener.thenAccept(FailPoint::close);
}
}
/**
- * Prose test #4.
+ *
+ * 4. Test that in a sharded cluster writes are retried on a different mongos when one is available.
*/
@Test
- public void retriesOnDifferentMongosWhenAvailable() {
+ void retriesOnDifferentMongosWhenAvailable() throws InterruptedException, TimeoutException {
retriesOnDifferentMongosWhenAvailable(MongoClients::create,
mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true);
}
@@ -242,7 +235,8 @@ public void retriesOnDifferentMongosWhenAvailable() {
@SuppressWarnings("try")
public static void retriesOnDifferentMongosWhenAvailable(
final Function clientCreator,
- final Function, R> operation, final String operationName, final boolean write) {
+ final Function, R> operation, final String expectedCommandName, final boolean write)
+ throws InterruptedException, TimeoutException {
if (write) {
assumeTrue(serverVersionAtLeast(4, 4));
}
@@ -251,37 +245,44 @@ public static void retriesOnDifferentMongosWhenAvailable(
assumeTrue(connectionString != null);
ServerAddress s0Address = ServerAddressHelper.createServerAddress(connectionString.getHosts().get(0));
ServerAddress s1Address = ServerAddressHelper.createServerAddress(connectionString.getHosts().get(1));
- BsonDocument failPointDocument = BsonDocument.parse(
+ BsonDocument configureFailPoint = BsonDocument.parse(
"{\n"
+ " configureFailPoint: \"failCommand\",\n"
+ " mode: { times: 1 },\n"
+ " data: {\n"
- + " failCommands: [\"" + operationName + "\"],\n"
+ + " failCommands: [\"" + expectedCommandName + "\"],\n"
+ + " errorCode: 6,\n"
+ (write
- ? " errorLabels: [\"RetryableWriteError\"]," : "")
- + " errorCode: 6\n"
+ ? " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "']" : "")
+ " }\n"
+ "}\n");
TestCommandListener commandListener = new TestCommandListener(singletonList("commandFailedEvent"), emptyList());
- try (FailPoint s0FailPoint = FailPoint.enable(failPointDocument, s0Address);
- FailPoint s1FailPoint = FailPoint.enable(failPointDocument, s1Address);
+ TestClusterListener clusterListener = new TestClusterListener();
+ try (FailPoint s0FailPoint = FailPoint.enable(configureFailPoint, s0Address);
+ FailPoint s1FailPoint = FailPoint.enable(configureFailPoint, s1Address);
MongoClient client = clientCreator.apply(getMultiMongosMongoClientSettingsBuilder()
.retryReads(true)
.retryWrites(true)
.addCommandListener(commandListener)
// explicitly specify only s0 and s1, in case `getMultiMongosMongoClientSettingsBuilder` has more
- .applyToClusterSettings(builder -> builder.hosts(asList(s0Address, s1Address)))
+ .applyToClusterSettings(builder -> builder
+ .hosts(asList(s0Address, s1Address))
+ .addClusterListener(clusterListener))
.build())) {
- MongoCollection collection = client.getDatabase(getDefaultDatabaseName())
- .getCollection("retriesOnDifferentMongosWhenAvailable");
- collection.drop();
+ // We need both mongos servers to be discovered (not UNKNOWN) before running the deprioritization test.
+ // When the first mongos is deprioritized on retry, the selector falls back to the second mongos.
+ // If the second mongos is still UNKNOWN at that point, the non-deprioritized pass yields no selectable servers,
+ // causing the deprioritized mongos to be selected again.
+ clusterListener.waitForAllServersDiscovered(Duration.ofSeconds(10));
+
+ MongoCollection collection = dropAndGetCollection("retriesOnDifferentMongosWhenAvailable", client);
commandListener.reset();
assertThrows(MongoServerException.class, () -> operation.apply(collection));
List failedCommandEvents = commandListener.getEvents();
assertEquals(2, failedCommandEvents.size(), failedCommandEvents::toString);
List unexpectedCommandNames = failedCommandEvents.stream()
.map(CommandEvent::getCommandName)
- .filter(commandName -> !commandName.equals(operationName))
+ .filter(commandName -> !commandName.equals(expectedCommandName))
.collect(Collectors.toList());
assertTrue(unexpectedCommandNames.isEmpty(), unexpectedCommandNames::toString);
Set failedServerAddresses = failedCommandEvents.stream()
@@ -293,10 +294,11 @@ public static void retriesOnDifferentMongosWhenAvailable(
}
/**
- * Prose test #5.
+ *
+ * 5. Test that in a sharded cluster writes are retried on the same mongos when no others are available.
*/
@Test
- public void retriesOnSameMongosWhenAnotherNotAvailable() {
+ void retriesOnSameMongosWhenAnotherNotAvailable() {
retriesOnSameMongosWhenAnotherNotAvailable(MongoClients::create,
mongoCollection -> mongoCollection.insertOne(new Document()), "insert", true);
}
@@ -304,27 +306,29 @@ public void retriesOnSameMongosWhenAnotherNotAvailable() {
@SuppressWarnings("try")
public static void retriesOnSameMongosWhenAnotherNotAvailable(
final Function clientCreator,
- final Function, R> operation, final String operationName, final boolean write) {
+ final Function, R> operation, final String expectedCommandName, final boolean write) {
if (write) {
assumeTrue(serverVersionAtLeast(4, 4));
}
assumeTrue(isSharded());
ConnectionString connectionString = getConnectionString();
ServerAddress s0Address = ServerAddressHelper.createServerAddress(connectionString.getHosts().get(0));
- BsonDocument failPointDocument = BsonDocument.parse(
+ BsonDocument configureFailPoint = BsonDocument.parse(
"{\n"
+ " configureFailPoint: \"failCommand\",\n"
+ " mode: { times: 1 },\n"
+ " data: {\n"
- + " failCommands: [\"" + operationName + "\"],\n"
+ + " failCommands: [\"" + expectedCommandName + "\"],\n"
+ + " errorCode: 6,\n"
+ + (write
+ ? " errorLabels: ['" + RETRYABLE_WRITE_ERROR_LABEL + "']," : "")
+ (write
- ? " errorLabels: [\"RetryableWriteError\"]," : "")
- + " errorCode: 6\n"
+ ? " closeConnection: true\n" : "")
+ " }\n"
+ "}\n");
TestCommandListener commandListener = new TestCommandListener(
asList("commandFailedEvent", "commandSucceededEvent"), emptyList());
- try (FailPoint s0FailPoint = FailPoint.enable(failPointDocument, s0Address);
+ try (FailPoint s0FailPoint = FailPoint.enable(configureFailPoint, s0Address);
MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder()
.retryReads(true)
.retryWrites(true)
@@ -334,16 +338,14 @@ public static void retriesOnSameMongosWhenAnotherNotAvailable(
.hosts(singletonList(s0Address))
.mode(ClusterConnectionMode.MULTIPLE))
.build())) {
- MongoCollection collection = client.getDatabase(getDefaultDatabaseName())
- .getCollection("retriesOnSameMongosWhenAnotherNotAvailable");
- collection.drop();
+ MongoCollection collection = dropAndGetCollection("retriesOnSameMongosWhenAnotherNotAvailable", client);
commandListener.reset();
operation.apply(collection);
List commandEvents = commandListener.getEvents();
assertEquals(2, commandEvents.size(), commandEvents::toString);
List unexpectedCommandNames = commandEvents.stream()
.map(CommandEvent::getCommandName)
- .filter(commandName -> !commandName.equals(operationName))
+ .filter(commandName -> !commandName.equals(expectedCommandName))
.collect(Collectors.toList());
assertTrue(unexpectedCommandNames.isEmpty(), unexpectedCommandNames::toString);
assertInstanceOf(CommandFailedEvent.class, commandEvents.get(0), commandEvents::toString);
@@ -352,4 +354,175 @@ public static void retriesOnSameMongosWhenAnotherNotAvailable(
assertEquals(s0Address, commandEvents.get(1).getConnectionDescription().getServerAddress(), commandEvents::toString);
}
}
+
+ /**
+ *
+ * 6. Test error propagation after encountering multiple errors.
+ * Case 1: Test that drivers return the correct error when receiving only errors without NoWritesPerformed.
+ */
+ @Test
+ @Disabled("TODO-BACKPRESSURE Valentin Enable when implementing JAVA-6055")
+ void errorPropagationAfterEncounteringMultipleErrorsCase1() throws Exception {
+ errorPropagationAfterEncounteringMultipleErrorsCase1(MongoClients::create);
+ }
+
+ public static void errorPropagationAfterEncounteringMultipleErrorsCase1(final Function clientCreator)
+ throws Exception {
+ BsonDocument configureFailPoint = BsonDocument.parse(
+ "{\n"
+ + " configureFailPoint: 'failCommand',\n"
+ + " mode: {'times': 1},\n"
+ + " data: {\n"
+ + " failCommands: ['insert'],\n"
+ + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + "'],\n"
+ + " errorCode: 91\n"
+ + " }\n"
+ + "}\n");
+ BsonDocument configureFailPointFromListener = BsonDocument.parse(
+ "{\n"
+ + " configureFailPoint: \"failCommand\",\n"
+ + " mode: 'alwaysOn',\n"
+ + " data: {\n"
+ + " failCommands: ['insert'],\n"
+ + " errorCode: 10107,\n"
+ + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + "']\n"
+ + " }\n"
+ + "}\n");
+ errorPropagationAfterEncounteringMultipleErrors(
+ clientCreator,
+ configureFailPoint,
+ configureFailPointFromListener,
+ 10107,
+ null);
+ }
+
+ /**
+ *
+ * 6. Test error propagation after encountering multiple errors.
+ * Case 2: Test that drivers return the correct error when receiving only errors with NoWritesPerformed.
+ */
+ @Test
+ void errorPropagationAfterEncounteringMultipleErrorsCase2() throws Exception {
+ errorPropagationAfterEncounteringMultipleErrorsCase2(MongoClients::create);
+ }
+
+ public static void errorPropagationAfterEncounteringMultipleErrorsCase2(final Function clientCreator)
+ throws Exception {
+ BsonDocument configureFailPoint = BsonDocument.parse(
+ "{\n"
+ + " configureFailPoint: 'failCommand',\n"
+ + " mode: {'times': 1},\n"
+ + " data: {\n"
+ + " failCommands: ['insert'],\n"
+ + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL
+ + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "'],\n"
+ + " errorCode: 91\n"
+ + " }\n"
+ + "}\n");
+ BsonDocument configureFailPointFromListener = BsonDocument.parse(
+ "{\n"
+ + " configureFailPoint: \"failCommand\",\n"
+ + " mode: 'alwaysOn',\n"
+ + " data: {\n"
+ + " failCommands: ['insert'],\n"
+ + " errorCode: 10107,\n"
+ + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL
+ + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "']\n"
+ + " }\n"
+ + "}\n");
+ errorPropagationAfterEncounteringMultipleErrors(
+ clientCreator,
+ configureFailPoint,
+ configureFailPointFromListener,
+ 91,
+ null);
+ }
+
+ /**
+ *
+ * 6. Test error propagation after encountering multiple errors.
+ * Case 3: Test that drivers return the correct error when receiving some errors with NoWritesPerformed and some without NoWritesPerformed.
+ */
+ @Test
+ void errorPropagationAfterEncounteringMultipleErrorsCase3() throws Exception {
+ errorPropagationAfterEncounteringMultipleErrorsCase3(MongoClients::create);
+ }
+
+ public static void errorPropagationAfterEncounteringMultipleErrorsCase3(final Function clientCreator)
+ throws Exception {
+ BsonDocument configureFailPoint = BsonDocument.parse(
+ "{\n"
+ + " configureFailPoint: 'failCommand',\n"
+ + " mode: {'times': 1},\n"
+ + " data: {\n"
+ + " failCommands: ['insert'],\n"
+ + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL + "'],\n"
+ + " errorCode: 91\n"
+ + " }\n"
+ + "}\n");
+ BsonDocument configureFailPointFromListener = BsonDocument.parse(
+ "{\n"
+ + " configureFailPoint: \"failCommand\",\n"
+ + " mode: 'alwaysOn',\n"
+ + " data: {\n"
+ + " failCommands: ['insert'],\n"
+ + " errorCode: 91,\n"
+ + " errorLabels: ['" + RETRYABLE_ERROR_LABEL + "', '" + SYSTEM_OVERLOADED_ERROR_LABEL
+ + "', '" + NO_WRITES_PERFORMED_ERROR_LABEL + "']\n"
+ + " }\n"
+ + "}\n");
+ errorPropagationAfterEncounteringMultipleErrors(
+ clientCreator,
+ configureFailPoint,
+ configureFailPointFromListener,
+ 91,
+ NO_WRITES_PERFORMED_ERROR_LABEL);
+ }
+
+ /**
+ * @param unexpectedErrorLabel {@code null} means there is no expectation.
+ */
+ private static void errorPropagationAfterEncounteringMultipleErrors(
+ final Function clientCreator,
+ final BsonDocument configureFailPoint,
+ final BsonDocument configureFailPointFromListener,
+ final int expectedErrorCode,
+ @Nullable final String unexpectedErrorLabel) throws Exception {
+ assumeTrue(serverVersionAtLeast(6, 0));
+ assumeTrue(isDiscoverableReplicaSet());
+ ServerAddress primaryServerAddress = getPrimary();
+ Predicate configureFailPointEventMatcher = event -> {
+ if (event instanceof CommandFailedEvent) {
+ CommandFailedEvent commandFailedEvent = (CommandFailedEvent) event;
+ if (commandFailedEvent.getCommandName().equals("drop")) {
+ // this code may run against MongoDB 6, where dropping a nonexistent collection results in an error
+ return false;
+ }
+ MongoException cause = assertInstanceOf(MongoException.class, commandFailedEvent.getThrowable());
+ assertEquals(91, cause.getCode());
+ return true;
+ }
+ return false;
+ };
+ try (ConfigureFailPointCommandListener commandListener = new ConfigureFailPointCommandListener(
+ configureFailPointFromListener, primaryServerAddress, configureFailPointEventMatcher);
+ MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder()
+ .retryWrites(true)
+ .addCommandListener(commandListener)
+ .build());
+ FailPoint ignored = FailPoint.enable(configureFailPoint, primaryServerAddress)) {
+ MongoCollection collection = dropAndGetCollection("errorPropagationAfterEncounteringMultipleErrors", client);
+ MongoException e = assertThrows(MongoException.class, () -> collection.insertOne(new Document()));
+ assertEquals(expectedErrorCode, e.getCode());
+ if (unexpectedErrorLabel != null) {
+ assertFalse(e.hasErrorLabel(unexpectedErrorLabel));
+ }
+ }
+ }
+
+ private static MongoCollection dropAndGetCollection(final String name, final MongoClient client) {
+ MongoCollection result = client.getDatabase(getDefaultDatabaseName()).getCollection(name);
+ result.drop();
+ return result;
+ }
}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/WithTransactionProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/WithTransactionProseTest.java
index 1afbf61565e..a840a83babb 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/WithTransactionProseTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/WithTransactionProseTest.java
@@ -37,7 +37,9 @@
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
-// See https://github.com/mongodb/specifications/blob/master/source/transactions-convenient-api/tests/README.md#prose-tests
+/**
+ * Prose Tests.
+ */
public class WithTransactionProseTest extends DatabaseTestCase {
private static final long START_TIME_MS = 1L;
private static final long ERROR_GENERATING_INTERVAL = 121000L;
@@ -52,11 +54,10 @@ public void setUp() {
collection.insertOne(Document.parse("{ _id : 0 }"));
}
- //
- // Test that the callback raises a custom exception or error that does not include either UnknownTransactionCommitResult or
- // TransientTransactionError error labels. The callback will execute using withTransaction and assert that the callback's error
- // bypasses any retry logic within withTransaction and is propagated to the caller of withTransaction.
- //
+ /**
+ *
+ * Callback Raises a Custom Error.
+ */
@Test
public void testCallbackRaisesCustomError() {
final String exceptionMessage = "NotTransientOrUnknownError";
@@ -71,10 +72,10 @@ public void testCallbackRaisesCustomError() {
}
}
- //
- // Test that the callback that returns a custom value (e.g. boolean, string, object). Execute this callback using withTransaction
- // and assert that the callback's return value is propagated to the caller of withTransaction.
- //
+ /**
+ *
+ * Callback Returns a Value.
+ */
@Test
public void testCallbackReturnsValue() {
try (ClientSession session = client.startSession()) {
@@ -87,10 +88,10 @@ public void testCallbackReturnsValue() {
}
}
- //
- // If the callback raises an error with the TransientTransactionError label and the retry timeout has been exceeded, withTransaction
- // should propagate the error to its caller.
- //
+ /**
+ *
+ * Retry Timeout is Enforced, first scenario on the list.
+ */
@Test
public void testRetryTimeoutEnforcedTransientTransactionError() {
final String errorMessage = "transient transaction error";
@@ -110,10 +111,10 @@ public void testRetryTimeoutEnforcedTransientTransactionError() {
}
}
- //
- // If committing raises an error with the UnknownTransactionCommitResult label, the error is not a write concern timeout, and the
- // retry timeout has been exceeded, withTransaction should propagate the error to its caller.
- //
+ /**
+ *
+ * Retry Timeout is Enforced, second scenario on the list.
+ */
@Test
public void testRetryTimeoutEnforcedUnknownTransactionCommit() {
MongoDatabase failPointAdminDb = client.getDatabase("admin");
@@ -137,11 +138,10 @@ public void testRetryTimeoutEnforcedUnknownTransactionCommit() {
}
}
- //
- // If committing raises an error with the TransientTransactionError label and the retry timeout has been exceeded, withTransaction
- // should propagate the error to its caller. This case may occur if the commit was internally retried against a new primary after
- // a failover and the second primary returned a NoSuchTransaction error response.
- //
+ /**
+ *
+ * Retry Timeout is Enforced, third scenario on the list.
+ */
@Test
public void testRetryTimeoutEnforcedTransientTransactionErrorOnCommit() {
MongoDatabase failPointAdminDb = client.getDatabase("admin");
@@ -166,9 +166,9 @@ public void testRetryTimeoutEnforcedTransientTransactionErrorOnCommit() {
}
}
- //
- // Ensure cannot override timeout in transaction
- //
+ /**
+ * Ensure cannot override timeout in transaction.
+ */
@Test
public void testTimeoutMS() {
try (ClientSession session = client.startSession(ClientSessionOptions.builder()
@@ -182,9 +182,9 @@ public void testTimeoutMS() {
}
}
- //
- // Ensure legacy settings don't cause issues in sessions
- //
+ /**
+ * Ensure legacy settings don't cause issues in sessions.
+ */
@Test
public void testTimeoutMSAndLegacySettings() {
try (ClientSession session = client.startSession(ClientSessionOptions.builder()
diff --git a/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java b/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java
new file mode 100644
index 00000000000..a31182a51c0
--- /dev/null
+++ b/driver-sync/src/test/functional/com/mongodb/internal/event/ConfigureFailPointCommandListener.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.internal.event;
+
+import com.mongodb.ServerAddress;
+import com.mongodb.annotations.ThreadSafe;
+import com.mongodb.client.FailPoint;
+import com.mongodb.event.CommandEvent;
+import com.mongodb.event.CommandFailedEvent;
+import com.mongodb.event.CommandListener;
+import com.mongodb.event.CommandStartedEvent;
+import com.mongodb.event.CommandSucceededEvent;
+import org.bson.BsonDocument;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Predicate;
+
+import static com.mongodb.assertions.Assertions.assertNotNull;
+import static com.mongodb.assertions.Assertions.assertTrue;
+import static com.mongodb.assertions.Assertions.fail;
+
+@ThreadSafe
+public final class ConfigureFailPointCommandListener implements CommandListener, AutoCloseable {
+ private final BsonDocument configureFailPoint;
+ private final ServerAddress serverAddress;
+ private final Predicate eventMatcher;
+ private final Object lock;
+ private final CompletableFuture failPointFuture;
+
+ /**
+ * @param configureFailPoint See {@link FailPoint#enable(BsonDocument, ServerAddress)}.
+ * @param serverAddress See {@link FailPoint#enable(BsonDocument, ServerAddress)}.
+ * @param eventMatcher When an event is matched, an attempt to configure the fail point
+ * specified via {@code configureFailPoint} is made.
+ * The {@code eventMatcher} is guaranteed to be {@linkplain Predicate#test(Object) used} sequentially.
+ * The attempt is made at most once,
+ * and the {@code eventMatcher} {@linkplain Predicate#test(Object) test} that caused the attempt is the last one.
+ */
+ public ConfigureFailPointCommandListener(
+ final BsonDocument configureFailPoint,
+ final ServerAddress serverAddress,
+ final Predicate eventMatcher) {
+ this.configureFailPoint = configureFailPoint;
+ this.serverAddress = serverAddress;
+ this.eventMatcher = eventMatcher;
+ lock = new Object();
+ failPointFuture = new CompletableFuture<>();
+ }
+
+ @Override
+ public void commandStarted(final CommandStartedEvent event) {
+ onEvent(event);
+ }
+
+ @Override
+ public void commandSucceeded(final CommandSucceededEvent event) {
+ onEvent(event);
+ }
+
+ @Override
+ public void commandFailed(final CommandFailedEvent event) {
+ onEvent(event);
+ }
+
+ private void onEvent(final CommandEvent event) {
+ synchronized (lock) {
+ if (!failPointFuture.isDone()) {
+ try {
+ if (eventMatcher.test(event)) {
+ assertTrue(failPointFuture.complete(FailPoint.enable(configureFailPoint, serverAddress)));
+ }
+ } catch (Throwable e) {
+ assertTrue(failPointFuture.completeExceptionally(e));
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() throws InterruptedException, ExecutionException {
+ synchronized (lock) {
+ if (failPointFuture.cancel(true)) {
+ fail("The listener was closed before (in the happens-before order) it attempted to configure the fail point");
+ } else {
+ assertTrue(failPointFuture.isDone());
+ assertNotNull(failPointFuture.get()).close();
+ }
+ }
+ }
+}
diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/ClientSessionBindingSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/internal/ClientSessionBindingSpecification.groovy
index e2e664f324d..f2ecac0c170 100644
--- a/driver-sync/src/test/unit/com/mongodb/client/internal/ClientSessionBindingSpecification.groovy
+++ b/driver-sync/src/test/unit/com/mongodb/client/internal/ClientSessionBindingSpecification.groovy
@@ -16,7 +16,7 @@
package com.mongodb.client.internal
-
+import com.mongodb.ClusterFixture
import com.mongodb.ReadPreference
import com.mongodb.client.ClientSession
import com.mongodb.internal.binding.ClusterBinding
@@ -25,29 +25,28 @@ import com.mongodb.internal.binding.ReadWriteBinding
import com.mongodb.internal.connection.Cluster
import spock.lang.Specification
-import static com.mongodb.ClusterFixture.OPERATION_CONTEXT
-
class ClientSessionBindingSpecification extends Specification {
def 'should call underlying wrapped binding'() {
given:
def session = Stub(ClientSession)
+ def operationContext = ClusterFixture.createOperationContext()
def wrappedBinding = Mock(ClusterBinding);
def binding = new ClientSessionBinding(session, false, wrappedBinding)
when:
- binding.getReadConnectionSource(OPERATION_CONTEXT)
+ binding.getReadConnectionSource(operationContext)
then:
- 1 * wrappedBinding.getReadConnectionSource(OPERATION_CONTEXT) >> {
+ 1 * wrappedBinding.getReadConnectionSource(operationContext) >> {
Stub(ConnectionSource)
}
when:
- binding.getWriteConnectionSource(OPERATION_CONTEXT)
+ binding.getWriteConnectionSource(operationContext)
then:
- 1 * wrappedBinding.getWriteConnectionSource(OPERATION_CONTEXT) >> {
+ 1 * wrappedBinding.getWriteConnectionSource(operationContext) >> {
Stub(ConnectionSource)
}
}
@@ -77,8 +76,9 @@ class ClientSessionBindingSpecification extends Specification {
def session = Mock(ClientSession)
def wrappedBinding = createStubBinding()
def binding = new ClientSessionBinding(session, true, wrappedBinding)
- def readConnectionSource = binding.getReadConnectionSource(OPERATION_CONTEXT)
- def writeConnectionSource = binding.getWriteConnectionSource(OPERATION_CONTEXT)
+ def operationContext = ClusterFixture.createOperationContext()
+ def readConnectionSource = binding.getReadConnectionSource(operationContext)
+ def writeConnectionSource = binding.getWriteConnectionSource(operationContext)
when:
binding.release()
diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/CryptConnectionSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/internal/CryptConnectionSpecification.groovy
index 8a38f966754..3ec9a889e29 100644
--- a/driver-sync/src/test/unit/com/mongodb/client/internal/CryptConnectionSpecification.groovy
+++ b/driver-sync/src/test/unit/com/mongodb/client/internal/CryptConnectionSpecification.groovy
@@ -61,7 +61,7 @@ class CryptConnectionSpecification extends Specification {
def cryptConnection = new CryptConnection(wrappedConnection, crypt)
def codec = new DocumentCodec()
def timeoutContext = Mock(TimeoutContext)
- def operationContext = ClusterFixture.OPERATION_CONTEXT.withTimeoutContext(timeoutContext)
+ def operationContext = ClusterFixture.createOperationContext().withTimeoutContext(timeoutContext)
def operationTimeout = Mock(Timeout)
timeoutContext.getTimeout() >> operationTimeout
@@ -127,7 +127,7 @@ class CryptConnectionSpecification extends Specification {
def encryptedResponse = toRaw(new BsonDocument('ok', new BsonInt32(1)))
def decryptedResponse = encryptedResponse
def timeoutContext = Mock(TimeoutContext)
- def operationContext = ClusterFixture.OPERATION_CONTEXT.withTimeoutContext(timeoutContext)
+ def operationContext = ClusterFixture.createOperationContext().withTimeoutContext(timeoutContext)
def operationTimeout = Mock(Timeout)
timeoutContext.getTimeout() >> operationTimeout
@@ -183,7 +183,7 @@ class CryptConnectionSpecification extends Specification {
def encryptedResponse = toRaw(new BsonDocument('ok', new BsonInt32(1)))
def decryptedResponse = encryptedResponse
def timeoutContext = Mock(TimeoutContext)
- def operationContext = ClusterFixture.OPERATION_CONTEXT.withTimeoutContext(timeoutContext)
+ def operationContext = ClusterFixture.createOperationContext().withTimeoutContext(timeoutContext)
def operationTimeout = Mock(Timeout)
timeoutContext.getTimeout() >> operationTimeout
diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/MongoClusterSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/internal/MongoClusterSpecification.groovy
index c75a4255595..34f46e7b007 100644
--- a/driver-sync/src/test/unit/com/mongodb/client/internal/MongoClusterSpecification.groovy
+++ b/driver-sync/src/test/unit/com/mongodb/client/internal/MongoClusterSpecification.groovy
@@ -259,7 +259,7 @@ class MongoClusterSpecification extends Specification {
MongoClusterImpl createMongoCluster(final MongoClientSettings settings, final OperationExecutor operationExecutor) {
new MongoClusterImpl(null, cluster, settings.codecRegistry, null, null,
originator, operationExecutor, settings.readConcern, settings.readPreference, settings.retryReads, settings.retryWrites,
- null, serverSessionPool, TimeoutSettings.create(settings), settings.uuidRepresentation,
+ settings.enableOverloadRetargeting, null, serverSessionPool, TimeoutSettings.create(settings), settings.uuidRepresentation,
settings.writeConcern, TracingManager.NO_OP)
}
}