Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/connection_string.ts
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,10 @@ interface OptionDescriptor {
}

export const OPTIONS = {
adaptiveRetries: {
default: false,
type: 'boolean'
},
appName: {
type: 'string'
},
Expand Down
3 changes: 3 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC
retryReads?: boolean;
/** Enable retryable writes. */
retryWrites?: boolean;
/** Whether to enable adaptive retry rate limiting using a token bucket. Defaults to false. */
adaptiveRetries?: boolean;
/** Allow a driver to force a Single topology type with a connection string containing one host */
directConnection?: boolean;
/** Instruct the driver it is connecting to a load balancer fronting a mongos like service */
Expand Down Expand Up @@ -1041,6 +1043,7 @@ export interface MongoOptions
extends Required<
Pick<
MongoClientOptions,
| 'adaptiveRetries'
| 'autoEncryption'
| 'connectTimeoutMS'
| 'directConnection'
Expand Down
26 changes: 16 additions & 10 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,13 @@ async function executeOperationWithRetries<
try {
try {
const result = await server.command(operation, timeoutContext);
topology.tokenBucket.deposit(
attempt > 0
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
: RETRY_TOKEN_RETURN_RATE // otherwise
);
if (topology.s.options.adaptiveRetries) {
topology.tokenBucket.deposit(
attempt > 0
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
: RETRY_TOKEN_RETURN_RATE // otherwise
);
}
return operation.handleOk(result);
} catch (error) {
return operation.handleError(error);
Expand All @@ -279,7 +281,11 @@ async function executeOperationWithRetries<
// Should never happen but if it does - propagate the error.
if (!(operationError instanceof MongoError)) throw operationError;

if (attempt > 0 && !operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
if (
topology.s.options.adaptiveRetries &&
attempt > 0 &&
!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
) {
// if a retry attempt fails with a non-overload error, deposit 1 token.
topology.tokenBucket.deposit(RETRY_COST);
}
Expand Down Expand Up @@ -318,17 +324,17 @@ async function executeOperationWithRetries<
}

if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
if (!topology.tokenBucket.consume(RETRY_COST)) {
throw error;
}

const backoffMS = Math.random() * Math.min(MAX_BACKOFF_MS, BASE_BACKOFF_MS * 2 ** attempt);

// if the backoff would exhaust the CSOT timeout, short-circuit.
if (timeoutContext.csotEnabled() && backoffMS > timeoutContext.remainingTimeMS) {
throw error;
}

if (topology.s.options.adaptiveRetries && !topology.tokenBucket.consume(RETRY_COST)) {
throw error;
}

await setTimeout(backoffMS);
}

Expand Down
1 change: 1 addition & 0 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ export interface TopologyOptions extends BSONSerializeOptions, ServerOptions {
hosts: HostAddress[];
retryWrites: boolean;
retryReads: boolean;
adaptiveRetries: boolean;
/** How long to block for server selection before throwing an error */
serverSelectionTimeoutMS: number;
/** The name of the replica set to connect to */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import * as sinon from 'sinon';
import {
type Collection,
INITIAL_TOKEN_BUCKET_SIZE,
MAX_RETRIES,
type MongoClient,
MongoServerError
} from '../../mongodb';
import { clearFailPoint, configureFailPoint, measureDuration } from '../../tools/utils';
import { filterForCommands } from '../shared';

describe('Client Backpressure (Prose)', function () {
let client: MongoClient;
Expand Down Expand Up @@ -64,18 +66,141 @@ describe('Client Backpressure (Prose)', function () {
}
);

it('Test 2: Token Bucket capacity is Enforced', async () => {
// 1-2. Assert that the client's retry token bucket is at full capacity and that the capacity
// is DEFAULT_RETRY_TOKEN_CAPACITY.
it('Test 2: Token Bucket capacity is Enforced', async function () {
// 1. Let client be a MongoClient with adaptiveRetries=True.
const client = this.configuration.newClient({
adaptiveRetries: true
});
await client.connect();

// 2. Assert that the client's retry token bucket is at full capacity and that the capacity is DEFAULT_RETRY_TOKEN_CAPACITY.
const tokenBucket = client.topology.tokenBucket;
expect(tokenBucket).to.have.property('budget', INITIAL_TOKEN_BUCKET_SIZE);
expect(tokenBucket).to.have.property('capacity', INITIAL_TOKEN_BUCKET_SIZE);

// 3. Execute a successful ping command.
// 3. Using client, execute a successful ping command.
await client.db('admin').command({ ping: 1 });

// 4. Assert that the successful command did not increase the number of tokens in the bucket
// above DEFAULT_RETRY_TOKEN_CAPACITY.
// 4. Assert that the successful command did not increase the number of tokens in the bucket above DEFAULT_RETRY_TOKEN_CAPACITY.
expect(tokenBucket).to.have.property('budget').that.is.at.most(INITIAL_TOKEN_BUCKET_SIZE);

await client.close();
});

it(
'Test 3: Overload Errors are Retried a Maximum of MAX_RETRIES times',
{
requires: {
mongodb: '>=4.4'
}
},
async function () {
// 1. Let `client` be a `MongoClient` with command event monitoring enabled.
const client = this.configuration.newClient({
monitorCommands: true
});
await client.connect();

// 2. Let `coll` be a collection.
const collection = client.db('foo').collection('bar');
const commandsStarted = [];
client.on('commandStarted', filterForCommands(['find'], commandsStarted));

/*
* 3. Configure the following failpoint:
{
configureFailPoint: 'failCommand',
mode: 'alwaysOn',
data: {
failCommands: ['find'],
errorCode: 462, // IngressRequestRateLimitExceeded
errorLabels: ['SystemOverloadedError', 'RetryableError']
}
}
* */
await configureFailPoint(this.configuration, {
configureFailPoint: 'failCommand',
mode: 'alwaysOn',
data: {
failCommands: ['find'],
errorCode: 462,
errorLabels: ['RetryableError', 'SystemOverloadedError']
}
});

// 4. Perform a find operation with `coll` that fails.
const error = await collection.findOne({}).catch(e => e);

// 5. Assert that the raised error contains both the `RetryableError` and `SystemOverloadedError` error labels.
expect(error).to.be.instanceof(MongoServerError);
expect(error.hasErrorLabel('RetryableError')).to.be.true;
expect(error.hasErrorLabel('SystemOverloadedError')).to.be.true;

// 6. Assert that the total number of started commands is MAX_RETRIES + 1 (6).
expect(commandsStarted).to.have.length(MAX_RETRIES + 1);

await client.close();
}
);

it(
'Test 4: Adaptive Retries are Limited by Token Bucket Tokens',
{
requires: {
mongodb: '>=4.4'
}
},
async function () {
// 1. Let `client` be a `MongoClient` with `adaptiveRetries=True` and command event monitoring enabled.
const client = this.configuration.newClient({
adaptiveRetries: true,
monitorCommands: true
});
await client.connect();

// 2. Set `client`'s retry token bucket to have 2 tokens.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
client.topology!.tokenBucket['budget'] = 2;

// 3. Let `coll` be a collection.
const collection = client.db('foo').collection('bar');
const commandsStarted = [];
client.on('commandStarted', filterForCommands(['find'], commandsStarted));

/*
* 4. Configure the following failpoint:
{
configureFailPoint: 'failCommand',
mode: {times: 3},
data: {
failCommands: ['find'],
errorCode: 462, // IngressRequestRateLimitExceeded
errorLabels: ['SystemOverloadedError', 'RetryableError']
}
}
* */
await configureFailPoint(this.configuration, {
configureFailPoint: 'failCommand',
mode: { times: 3 },
data: {
failCommands: ['find'],
errorCode: 462,
errorLabels: ['RetryableError', 'SystemOverloadedError']
}
});

// 5. Perform a find operation with `coll` that fails.
const error = await collection.findOne({}).catch(e => e);

// 6. Assert that the raised error contains both the `RetryableError` and `SystemOverloadedError` error labels.
expect(error).to.be.instanceof(MongoServerError);
expect(error.hasErrorLabel('RetryableError')).to.be.true;
expect(error.hasErrorLabel('SystemOverloadedError')).to.be.true;

// 7. Assert that the total number of started commands is 3: one for the initial attempt and two for the retries.
expect(commandsStarted).to.have.length(3);

await client.close();
}
);
});
35 changes: 35 additions & 0 deletions test/spec/uri-options/client-backpressure-options.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"tests": [
{
"description": "adaptiveRetries=true is parsed correctly",
"uri": "mongodb://example.com/?adaptiveRetries=true",
"valid": true,
"warning": false,
"hosts": null,
"auth": null,
"options": {
"adaptiveRetries": true
}
},
{
"description": "adaptiveRetries=false is parsed correctly",
"uri": "mongodb://example.com/?adaptiveRetries=false",
"valid": true,
"warning": false,
"hosts": null,
"auth": null,
"options": {
"adaptiveRetries": false
}
},
{
"description": "adaptiveRetries with invalid value causes a warning",
"uri": "mongodb://example.com/?adaptiveRetries=invalid",
"valid": true,
"warning": true,
"hosts": null,
"auth": null,
"options": null
}
]
}
27 changes: 27 additions & 0 deletions test/spec/uri-options/client-backpressure-options.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
tests:
-
description: "adaptiveRetries=true is parsed correctly"
uri: "mongodb://example.com/?adaptiveRetries=true"
valid: true
warning: false
hosts: ~
auth: ~
options:
adaptiveRetries: true
-
description: "adaptiveRetries=false is parsed correctly"
uri: "mongodb://example.com/?adaptiveRetries=false"
valid: true
warning: false
hosts: ~
auth: ~
options:
adaptiveRetries: false
-
description: "adaptiveRetries with invalid value causes a warning"
uri: "mongodb://example.com/?adaptiveRetries=invalid"
valid: true
warning: true
hosts: ~
auth: ~
options: ~
1 change: 1 addition & 0 deletions test/tools/uri_spec_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ export function executeUriValidationTest(
case 'serverSelectionTimeoutMS':
case 'serverMonitoringMode':
case 'socketTimeoutMS':
case 'adaptiveRetries':
case 'retryWrites':
case 'directConnection':
case 'loadBalanced':
Expand Down
19 changes: 19 additions & 0 deletions test/unit/connection_string.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -889,4 +889,23 @@ describe('Connection String', function () {
}
});
});

context('when adaptiveRetries is set', function () {
it('defaults to false', function () {
const options = parseOptions('mongodb://localhost:27017');
expect(options.adaptiveRetries).to.equal(false);
});

it('can be enabled via connection string', function () {
const options = parseOptions('mongodb://localhost:27017?adaptiveRetries=true');
expect(options.adaptiveRetries).to.equal(true);
});

it('can be enabled via client options', function () {
const options = parseOptions('mongodb://localhost:27017', {
adaptiveRetries: true
});
expect(options.adaptiveRetries).to.equal(true);
});
});
});