Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ import {
/** @public */
export type Stream = Socket | TLSSocket;

function applyBackpressureLabels(error: MongoError) {
error.addErrorLabel(MongoErrorLabel.SystemOverloadedError);
error.addErrorLabel(MongoErrorLabel.RetryableError);
}

export async function connect(options: ConnectionOptions): Promise<Connection> {
let connection: Connection | null = null;
try {
Expand Down Expand Up @@ -103,6 +108,8 @@ export async function performInitialHandshake(
const authContext = new AuthContext(conn, credentials, options);
conn.authContext = authContext;

// If we encounter an error preparing the handshake document, do NOT apply backpressure labels. Errors
// encountered building the handshake document are all client-side, and do not indicate an overloaded server.
const handshakeDoc = await prepareHandshakeDocument(authContext);

// @ts-expect-error: TODO(NODE-5141): The options need to be filtered properly, Connection options differ from Command options
Expand Down Expand Up @@ -163,12 +170,15 @@ export async function performInitialHandshake(
try {
await provider.auth(authContext);
} catch (error) {
// NOTE: If we encounter an error authenticating a connection, do NOT apply backpressure labels.

if (error instanceof MongoError) {
error.addErrorLabel(MongoErrorLabel.HandshakeError);
if (needsRetryableWriteLabel(error, response.maxWireVersion, conn.description.type)) {
error.addErrorLabel(MongoErrorLabel.RetryableWriteError);
}
}

throw error;
}
}
Expand All @@ -189,6 +199,9 @@ export async function performInitialHandshake(
if (error instanceof MongoError) {
error.addErrorLabel(MongoErrorLabel.HandshakeError);
}
// If we encounter an error executing the initial handshake, apply backpressure labels.
applyBackpressureLabels(error);

throw error;
}
}
Expand Down Expand Up @@ -424,6 +437,8 @@ export async function makeSocket(options: MakeConnectionOptions): Promise<Stream
socket = await connectedSocket;
return socket;
} catch (error) {
// If we encounter a SystemOverloaded error while establishing a socket, apply the backpressure labels to it.
applyBackpressureLabels(error);
socket.destroy();
throw error;
} finally {
Expand Down
4 changes: 3 additions & 1 deletion src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ export const MongoErrorLabel = Object.freeze({
ResetPool: 'ResetPool',
PoolRequestedRetry: 'PoolRequestedRetry',
InterruptInUseConnections: 'InterruptInUseConnections',
NoWritesPerformed: 'NoWritesPerformed'
NoWritesPerformed: 'NoWritesPerformed',
RetryableError: 'RetryableError',
SystemOverloadedError: 'SystemOverloadedError'
} as const);

/** @public */
Expand Down
9 changes: 7 additions & 2 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ export class Server extends TypedEventEmitter<ServerEvents> {
error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError);
const isNetworkTimeoutBeforeHandshakeError =
error instanceof MongoNetworkError && error.beforeHandshake;
const isAuthHandshakeError = error.hasErrorLabel(MongoErrorLabel.HandshakeError);
const isAuthOrEstablishmentHandshakeError = error.hasErrorLabel(MongoErrorLabel.HandshakeError);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renaming this is a minor cleanup and clarification - this label is also applied to the MongoDB handshake, not just auth.

const isSystemOverloadError = error.hasErrorLabel(MongoErrorLabel.SystemOverloadedError);

// Perhaps questionable and divergent from the spec, but considering MongoParseErrors like state change errors was legacy behavior.
if (isStateChangeError(error) || error instanceof MongoParseError) {
Expand All @@ -424,8 +425,12 @@ export class Server extends TypedEventEmitter<ServerEvents> {
} else if (
isNetworkNonTimeoutError ||
isNetworkTimeoutBeforeHandshakeError ||
isAuthHandshakeError
isAuthOrEstablishmentHandshakeError
) {
// Do NOT clear the pool if we encounter a system overloaded error.
if (isSystemOverloadError) {
return;
}
// from the SDAM spec: The driver MUST synchronize clearing the pool with updating the topology.
// In load balanced mode: there is no monitoring, so there is no topology to update. We simply clear the pool.
// For other topologies: the `ResetPool` label instructs the topology to clear the server's pool in `updateServer()`.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import { expect } from 'chai';
import { once } from 'events';

import { type MongoClient } from '../../../src';
import {
type ConnectionCheckOutFailedEvent,
type ConnectionPoolClearedEvent,
type MongoClient
} from '../../../src';
import {
CONNECTION_POOL_CLEARED,
CONNECTION_POOL_READY,
SERVER_HEARTBEAT_FAILED,
SERVER_HEARTBEAT_SUCCEEDED
} from '../../../src/constants';
import { sleep } from '../../tools/utils';

describe('Server Discovery and Monitoring Prose Tests', function () {
context('Monitors sleep at least minHeartbeatFrequencyMS between checks', function () {
Expand Down Expand Up @@ -187,4 +192,74 @@ describe('Server Discovery and Monitoring Prose Tests', function () {
}
});
});

context('Connection Pool Backpressure', function () {
let client: MongoClient;
const checkoutFailedEvents: Array<ConnectionCheckOutFailedEvent> = [];
const poolClearedEvents: Array<ConnectionPoolClearedEvent> = [];

beforeEach(async function () {
client = this.configuration.newClient({}, { maxConnecting: 100 });

client.on('connectionCheckOutFailed', e => checkoutFailedEvents.push(e));
client.on('connectionPoolCleared', e => poolClearedEvents.push(e));

await client.connect();

const admin = client.db('admin').admin();
await admin.command({
setParameter: 1,
ingressConnectionEstablishmentRateLimiterEnabled: true
});
await admin.command({
setParameter: 1,
ingressConnectionEstablishmentRatePerSec: 20
});
await admin.command({
setParameter: 1,
ingressConnectionEstablishmentBurstCapacitySecs: 1
});
await admin.command({
setParameter: 1,
ingressConnectionEstablishmentMaxQueueDepth: 1
});

await client.db('test').collection('test').insertOne({});
});

afterEach(async function () {
// give the time to recover from the connection storm before cleaning up.
await sleep(1000);

const admin = client.db('admin').admin();
await admin.command({
setParameter: 1,
ingressConnectionEstablishmentRateLimiterEnabled: false
});

await client.close();
});

it(
'does not clear the pool when connections are closed due to connection storms',
{
requires: {
mongodb: '>=7.0' // rate limiting added in 7.0
}
},
async function () {
await Promise.allSettled(
Array.from({ length: 100 }).map(() =>
client
.db('test')
.collection('test')
.findOne({ $where: 'function() { sleep(2000); return true; }' })
)
);

expect(poolClearedEvents).to.be.empty;
expect(checkoutFailedEvents.length).to.be.greaterThan(10);
}
);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"isMaster",
"hello"
],
"closeConnection": true,
"errorCode": 91,
"appName": "poolCreateMinSizeErrorTest"
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ failPoint:
mode: { times: 50 }
data:
failCommands: ["isMaster","hello"]
closeConnection: true
errorCode: 91
appName: "poolCreateMinSizeErrorTest"
poolOptions:
minPoolSize: 1
Expand Down
8 changes: 4 additions & 4 deletions test/spec/load-balancers/sdam-error-handling.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"description": "state change errors are correctly handled",
"schemaVersion": "1.3",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"topologies": [
Expand Down Expand Up @@ -263,7 +263,7 @@
"description": "errors during the initial connection hello are ignored",
"runOnRequirements": [
{
"minServerVersion": "4.9"
"minServerVersion": "4.4.7"
}
],
"operations": [
Expand All @@ -282,7 +282,7 @@
"isMaster",
"hello"
],
"closeConnection": true,
"errorCode": 11600,
"appName": "lbSDAMErrorTestClient"
}
}
Expand All @@ -297,7 +297,7 @@
}
},
"expectError": {
"isClientError": true
"isError": true
}
}
],
Expand Down
11 changes: 5 additions & 6 deletions test/spec/load-balancers/sdam-error-handling.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
description: state change errors are correctly handled

schemaVersion: '1.3'
schemaVersion: '1.4'

runOnRequirements:
- topologies: [ load-balanced ]
Expand Down Expand Up @@ -141,9 +141,8 @@ tests:
# to the same mongos on which the failpoint is set.
- description: errors during the initial connection hello are ignored
runOnRequirements:
# Server version 4.9+ is needed to set a fail point on the initial
# connection handshake with the appName filter due to SERVER-49336.
- minServerVersion: '4.9'
# Require SERVER-49336 for failCommand + appName on the initial handshake.
- minServerVersion: '4.4.7'
operations:
- name: failPoint
object: testRunner
Expand All @@ -154,14 +153,14 @@ tests:
mode: { times: 1 }
data:
failCommands: [isMaster, hello]
closeConnection: true
errorCode: 11600
appName: *singleClientAppName
- name: insertOne
object: *singleColl
arguments:
document: { x: 1 }
expectError:
isClientError: true
isError: true
expectEvents:
- client: *singleClient
eventType: cmap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,22 @@
"outcome": {
"servers": {
"a:27017": {
"type": "Unknown",
"topologyVersion": null,
"type": "RSPrimary",
"setName": "rs",
"topologyVersion": {
"processId": {
"$oid": "000000000000000000000001"
},
"counter": {
"$numberLong": "1"
}
},
"pool": {
"generation": 1
"generation": 0
}
}
},
"topologyType": "ReplicaSetNoPrimary",
"topologyType": "ReplicaSetWithPrimary",
"logicalSessionTimeoutMinutes": null,
"setName": "rs"
}
Expand Down
Loading