diff --git a/src/connection_string.ts b/src/connection_string.ts index 06315a96869..d5e1b81af89 100644 --- a/src/connection_string.ts +++ b/src/connection_string.ts @@ -639,6 +639,10 @@ interface OptionDescriptor { } export const OPTIONS = { + adaptiveRetries: { + default: false, + type: 'boolean' + }, appName: { type: 'string' }, diff --git a/src/mongo_client.ts b/src/mongo_client.ts index 87d969fee93..e8a5f6158d2 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -229,6 +229,8 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC retryReads?: boolean; /** Enable retryable writes. */ retryWrites?: boolean; + /** Whether to enable adaptive retry rate limiting using a token bucket. Defaults to false. */ + adaptiveRetries?: boolean; /** Allow a driver to force a Single topology type with a connection string containing one host */ directConnection?: boolean; /** Instruct the driver it is connecting to a load balancer fronting a mongos like service */ @@ -1041,6 +1043,7 @@ export interface MongoOptions extends Required< Pick< MongoClientOptions, + | 'adaptiveRetries' | 'autoEncryption' | 'connectTimeoutMS' | 'directConnection' diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index d4abb13de08..dc85c37f775 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -266,11 +266,13 @@ async function executeOperationWithRetries< try { try { const result = await server.command(operation, timeoutContext); - topology.tokenBucket.deposit( - attempt > 0 - ? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry - : RETRY_TOKEN_RETURN_RATE // otherwise - ); + if (topology.s.options.adaptiveRetries) { + topology.tokenBucket.deposit( + attempt > 0 + ? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry + : RETRY_TOKEN_RETURN_RATE // otherwise + ); + } return operation.handleOk(result); } catch (error) { return operation.handleError(error); @@ -279,7 +281,11 @@ async function executeOperationWithRetries< // Should never happen but if it does - propagate the error. if (!(operationError instanceof MongoError)) throw operationError; - if (attempt > 0 && !operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) { + if ( + topology.s.options.adaptiveRetries && + attempt > 0 && + !operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) + ) { // if a retry attempt fails with a non-overload error, deposit 1 token. topology.tokenBucket.deposit(RETRY_COST); } @@ -318,10 +324,6 @@ async function executeOperationWithRetries< } if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) { - if (!topology.tokenBucket.consume(RETRY_COST)) { - throw error; - } - const backoffMS = Math.random() * Math.min(MAX_BACKOFF_MS, BASE_BACKOFF_MS * 2 ** attempt); // if the backoff would exhaust the CSOT timeout, short-circuit. @@ -329,6 +331,10 @@ async function executeOperationWithRetries< throw error; } + if (topology.s.options.adaptiveRetries && !topology.tokenBucket.consume(RETRY_COST)) { + throw error; + } + await setTimeout(backoffMS); } diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index fb86423929d..8ed4899f9b6 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -146,6 +146,7 @@ export interface TopologyOptions extends BSONSerializeOptions, ServerOptions { hosts: HostAddress[]; retryWrites: boolean; retryReads: boolean; + adaptiveRetries: boolean; /** How long to block for server selection before throwing an error */ serverSelectionTimeoutMS: number; /** The name of the replica set to connect to */ diff --git a/test/integration/client-backpressure/client-backpressure.prose.test.ts b/test/integration/client-backpressure/client-backpressure.prose.test.ts index ee263b86837..3665fd55c81 100644 --- a/test/integration/client-backpressure/client-backpressure.prose.test.ts +++ b/test/integration/client-backpressure/client-backpressure.prose.test.ts @@ -4,10 +4,12 @@ import * as sinon from 'sinon'; import { type Collection, INITIAL_TOKEN_BUCKET_SIZE, + MAX_RETRIES, type MongoClient, MongoServerError } from '../../mongodb'; import { clearFailPoint, configureFailPoint, measureDuration } from '../../tools/utils'; +import { filterForCommands } from '../shared'; describe('Client Backpressure (Prose)', function () { let client: MongoClient; @@ -64,18 +66,141 @@ describe('Client Backpressure (Prose)', function () { } ); - it('Test 2: Token Bucket capacity is Enforced', async () => { - // 1-2. Assert that the client's retry token bucket is at full capacity and that the capacity - // is DEFAULT_RETRY_TOKEN_CAPACITY. + it('Test 2: Token Bucket capacity is Enforced', async function () { + // 1. Let client be a MongoClient with adaptiveRetries=True. + const client = this.configuration.newClient({ + adaptiveRetries: true + }); + await client.connect(); + + // 2. Assert that the client's retry token bucket is at full capacity and that the capacity is DEFAULT_RETRY_TOKEN_CAPACITY. const tokenBucket = client.topology.tokenBucket; expect(tokenBucket).to.have.property('budget', INITIAL_TOKEN_BUCKET_SIZE); expect(tokenBucket).to.have.property('capacity', INITIAL_TOKEN_BUCKET_SIZE); - // 3. Execute a successful ping command. + // 3. Using client, execute a successful ping command. await client.db('admin').command({ ping: 1 }); - // 4. Assert that the successful command did not increase the number of tokens in the bucket - // above DEFAULT_RETRY_TOKEN_CAPACITY. + // 4. Assert that the successful command did not increase the number of tokens in the bucket above DEFAULT_RETRY_TOKEN_CAPACITY. expect(tokenBucket).to.have.property('budget').that.is.at.most(INITIAL_TOKEN_BUCKET_SIZE); + + await client.close(); }); + + it( + 'Test 3: Overload Errors are Retried a Maximum of MAX_RETRIES times', + { + requires: { + mongodb: '>=4.4' + } + }, + async function () { + // 1. Let `client` be a `MongoClient` with command event monitoring enabled. + const client = this.configuration.newClient({ + monitorCommands: true + }); + await client.connect(); + + // 2. Let `coll` be a collection. + const collection = client.db('foo').collection('bar'); + const commandsStarted = []; + client.on('commandStarted', filterForCommands(['find'], commandsStarted)); + + /* + * 3. Configure the following failpoint: + { + configureFailPoint: 'failCommand', + mode: 'alwaysOn', + data: { + failCommands: ['find'], + errorCode: 462, // IngressRequestRateLimitExceeded + errorLabels: ['SystemOverloadedError', 'RetryableError'] + } + } + * */ + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: 'alwaysOn', + data: { + failCommands: ['find'], + errorCode: 462, + errorLabels: ['RetryableError', 'SystemOverloadedError'] + } + }); + + // 4. Perform a find operation with `coll` that fails. + const error = await collection.findOne({}).catch(e => e); + + // 5. Assert that the raised error contains both the `RetryableError` and `SystemOverloadedError` error labels. + expect(error).to.be.instanceof(MongoServerError); + expect(error.hasErrorLabel('RetryableError')).to.be.true; + expect(error.hasErrorLabel('SystemOverloadedError')).to.be.true; + + // 6. Assert that the total number of started commands is MAX_RETRIES + 1 (6). + expect(commandsStarted).to.have.length(MAX_RETRIES + 1); + + await client.close(); + } + ); + + it( + 'Test 4: Adaptive Retries are Limited by Token Bucket Tokens', + { + requires: { + mongodb: '>=4.4' + } + }, + async function () { + // 1. Let `client` be a `MongoClient` with `adaptiveRetries=True` and command event monitoring enabled. + const client = this.configuration.newClient({ + adaptiveRetries: true, + monitorCommands: true + }); + await client.connect(); + + // 2. Set `client`'s retry token bucket to have 2 tokens. + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + client.topology!.tokenBucket['budget'] = 2; + + // 3. Let `coll` be a collection. + const collection = client.db('foo').collection('bar'); + const commandsStarted = []; + client.on('commandStarted', filterForCommands(['find'], commandsStarted)); + + /* + * 4. Configure the following failpoint: + { + configureFailPoint: 'failCommand', + mode: {times: 3}, + data: { + failCommands: ['find'], + errorCode: 462, // IngressRequestRateLimitExceeded + errorLabels: ['SystemOverloadedError', 'RetryableError'] + } + } + * */ + await configureFailPoint(this.configuration, { + configureFailPoint: 'failCommand', + mode: { times: 3 }, + data: { + failCommands: ['find'], + errorCode: 462, + errorLabels: ['RetryableError', 'SystemOverloadedError'] + } + }); + + // 5. Perform a find operation with `coll` that fails. + const error = await collection.findOne({}).catch(e => e); + + // 6. Assert that the raised error contains both the `RetryableError` and `SystemOverloadedError` error labels. + expect(error).to.be.instanceof(MongoServerError); + expect(error.hasErrorLabel('RetryableError')).to.be.true; + expect(error.hasErrorLabel('SystemOverloadedError')).to.be.true; + + // 7. Assert that the total number of started commands is 3: one for the initial attempt and two for the retries. + expect(commandsStarted).to.have.length(3); + + await client.close(); + } + ); }); diff --git a/test/spec/uri-options/client-backpressure-options.json b/test/spec/uri-options/client-backpressure-options.json new file mode 100644 index 00000000000..73eed592a47 --- /dev/null +++ b/test/spec/uri-options/client-backpressure-options.json @@ -0,0 +1,35 @@ +{ + "tests": [ + { + "description": "adaptiveRetries=true is parsed correctly", + "uri": "mongodb://example.com/?adaptiveRetries=true", + "valid": true, + "warning": false, + "hosts": null, + "auth": null, + "options": { + "adaptiveRetries": true + } + }, + { + "description": "adaptiveRetries=false is parsed correctly", + "uri": "mongodb://example.com/?adaptiveRetries=false", + "valid": true, + "warning": false, + "hosts": null, + "auth": null, + "options": { + "adaptiveRetries": false + } + }, + { + "description": "adaptiveRetries with invalid value causes a warning", + "uri": "mongodb://example.com/?adaptiveRetries=invalid", + "valid": true, + "warning": true, + "hosts": null, + "auth": null, + "options": null + } + ] +} \ No newline at end of file diff --git a/test/spec/uri-options/client-backpressure-options.yml b/test/spec/uri-options/client-backpressure-options.yml new file mode 100644 index 00000000000..0bfd4453565 --- /dev/null +++ b/test/spec/uri-options/client-backpressure-options.yml @@ -0,0 +1,27 @@ +tests: + - + description: "adaptiveRetries=true is parsed correctly" + uri: "mongodb://example.com/?adaptiveRetries=true" + valid: true + warning: false + hosts: ~ + auth: ~ + options: + adaptiveRetries: true + - + description: "adaptiveRetries=false is parsed correctly" + uri: "mongodb://example.com/?adaptiveRetries=false" + valid: true + warning: false + hosts: ~ + auth: ~ + options: + adaptiveRetries: false + - + description: "adaptiveRetries with invalid value causes a warning" + uri: "mongodb://example.com/?adaptiveRetries=invalid" + valid: true + warning: true + hosts: ~ + auth: ~ + options: ~ \ No newline at end of file diff --git a/test/tools/uri_spec_runner.ts b/test/tools/uri_spec_runner.ts index bb6f44459a1..3250944c673 100644 --- a/test/tools/uri_spec_runner.ts +++ b/test/tools/uri_spec_runner.ts @@ -358,6 +358,7 @@ export function executeUriValidationTest( case 'serverSelectionTimeoutMS': case 'serverMonitoringMode': case 'socketTimeoutMS': + case 'adaptiveRetries': case 'retryWrites': case 'directConnection': case 'loadBalanced': diff --git a/test/unit/connection_string.test.ts b/test/unit/connection_string.test.ts index 3c5d32bccb2..60f08dc6aa1 100644 --- a/test/unit/connection_string.test.ts +++ b/test/unit/connection_string.test.ts @@ -889,4 +889,23 @@ describe('Connection String', function () { } }); }); + + context('when adaptiveRetries is set', function () { + it('defaults to false', function () { + const options = parseOptions('mongodb://localhost:27017'); + expect(options.adaptiveRetries).to.equal(false); + }); + + it('can be enabled via connection string', function () { + const options = parseOptions('mongodb://localhost:27017?adaptiveRetries=true'); + expect(options.adaptiveRetries).to.equal(true); + }); + + it('can be enabled via client options', function () { + const options = parseOptions('mongodb://localhost:27017', { + adaptiveRetries: true + }); + expect(options.adaptiveRetries).to.equal(true); + }); + }); });