diff --git a/etc/sdam_viz.js b/etc/sdam_viz.js index 9bfa8418dd..59ebbdf0e2 100644 --- a/etc/sdam_viz.js +++ b/etc/sdam_viz.js @@ -5,7 +5,7 @@ // npx ts-node etc/sdam_viz.js -h const { MongoClient } = require('../src'); -const { now, calculateDurationInMs, arrayStrictEqual, errorStrictEqual } = require('../src/utils'); +const { calculateDurationInMs, arrayStrictEqual, errorStrictEqual, processTimeMS } = require('../src/utils'); const util = require('util'); const chalk = require('chalk'); @@ -207,7 +207,7 @@ async function scheduleWriteWorkload(client) { const currentWriteWorkload = writeWorkloadCounter++; try { - const start = now(); + const start = processTimeMS(); await client.db('test').collection('test').insertOne({ a: 42 }); averageWriteMS = 0.2 * calculateDurationInMs(start) + 0.8 * averageWriteMS; diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index ed7e8f6a0d..9652e3a5e4 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -47,8 +47,8 @@ import { maxWireVersion, type MongoDBNamespace, noop, - now, once, + processTimeMS, squashError, uuidV4 } from '../utils'; @@ -241,7 +241,7 @@ export class Connection extends TypedEventEmitter { this.description = new StreamDescription(this.address, options); this.generation = options.generation; - this.lastUseTime = now(); + this.lastUseTime = processTimeMS(); this.messageStream = this.socket .on('error', this.onSocketError.bind(this)) @@ -299,7 +299,7 @@ export class Connection extends TypedEventEmitter { } public markAvailable(): void { - this.lastUseTime = now(); + this.lastUseTime = processTimeMS(); } private onSocketError(cause: Error) { @@ -510,7 +510,7 @@ export class Connection extends TypedEventEmitter { const message = this.prepareCommand(ns.db, command, options); let started = 0; if (this.shouldEmitAndLogCommand) { - started = now(); + started = processTimeMS(); this.emitAndLogCommand( this.monitorCommands, Connection.COMMAND_STARTED, diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 411295e658..3f16c71930 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -36,7 +36,7 @@ import { List, makeCounter, noop, - now, + processTimeMS, promiseWithResolvers } from '../utils'; import { connect } from './connect'; @@ -319,7 +319,7 @@ export class ConnectionPool extends TypedEventEmitter { * explicitly destroyed by the new owner. */ async checkOut(options: { timeoutContext: TimeoutContext } & Abortable): Promise { - const checkoutTime = now(); + const checkoutTime = processTimeMS(); this.emitAndLog( ConnectionPool.CONNECTION_CHECK_OUT_STARTED, new ConnectionCheckOutStartedEvent(this) @@ -616,7 +616,7 @@ export class ConnectionPool extends TypedEventEmitter { this.pending++; // This is our version of a "virtual" no-I/O connection as the spec requires - const connectionCreatedTime = now(); + const connectionCreatedTime = processTimeMS(); this.emitAndLog( ConnectionPool.CONNECTION_CREATED, new ConnectionCreatedEvent(this, { id: connectOptions.id }) diff --git a/src/cmap/connection_pool_events.ts b/src/cmap/connection_pool_events.ts index 5b8c4093cf..fe07536a6d 100644 --- a/src/cmap/connection_pool_events.ts +++ b/src/cmap/connection_pool_events.ts @@ -13,7 +13,7 @@ import { CONNECTION_READY } from '../constants'; import type { MongoError } from '../error'; -import { now } from '../utils'; +import { processTimeMS } from '../utils'; import type { Connection } from './connection'; import type { ConnectionPool, ConnectionPoolOptions } from './connection_pool'; @@ -145,7 +145,7 @@ export class ConnectionReadyEvent extends ConnectionPoolMonitoringEvent { /** @internal */ constructor(pool: ConnectionPool, connection: Connection, connectionCreatedEventTime: number) { super(pool); - this.durationMS = now() - connectionCreatedEventTime; + this.durationMS = processTimeMS() - connectionCreatedEventTime; this.connectionId = connection.id; } } @@ -224,7 +224,7 @@ export class ConnectionCheckOutFailedEvent extends ConnectionPoolMonitoringEvent error?: MongoError ) { super(pool); - this.durationMS = now() - checkoutTime; + this.durationMS = processTimeMS() - checkoutTime; this.reason = reason; this.error = error; } @@ -252,7 +252,7 @@ export class ConnectionCheckedOutEvent extends ConnectionPoolMonitoringEvent { /** @internal */ constructor(pool: ConnectionPool, connection: Connection, checkoutTime: number) { super(pool); - this.durationMS = now() - checkoutTime; + this.durationMS = processTimeMS() - checkoutTime; this.connectionId = connection.id; } } diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 326bdeeecc..f0e574ff7e 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -14,8 +14,8 @@ import { type EventEmitterWithState, makeStateMachine, noop, - now, - ns + ns, + processTimeMS } from '../utils'; import { ServerType, STATE_CLOSED, STATE_CLOSING } from './common'; import { @@ -326,7 +326,7 @@ function checkServer(monitor: Monitor, callback: Callback) { ); // We have not actually sent an outgoing handshake, but when we get the next response we // want the duration to reflect the time since we last heard from the server - start = now(); + start = processTimeMS(); } else { monitor.rttPinger?.close(); monitor.rttPinger = undefined; @@ -360,7 +360,7 @@ function checkServer(monitor: Monitor, callback: Callback) { } // Record new start time before sending handshake - start = now(); + start = processTimeMS(); if (isAwaitable) { awaited = true; @@ -383,7 +383,7 @@ function checkServer(monitor: Monitor, callback: Callback) { const socket = await makeSocket(monitor.connectOptions); const connection = makeConnection(monitor.connectOptions, socket); // The start time is after socket creation but before the handshake - start = now(); + start = processTimeMS(); try { await performInitialHandshake(connection, monitor.connectOptions); return connection; @@ -532,7 +532,7 @@ export class RTTPinger { } private measureRoundTripTime() { - const start = now(); + const start = processTimeMS(); if (this.closed) { return; @@ -607,7 +607,7 @@ export class MonitorInterval { } wake() { - const currentTime = now(); + const currentTime = processTimeMS(); const timeSinceLastCall = currentTime - this.lastExecutionEnded; // TODO(NODE-4674): Add error handling and logging to the monitor @@ -651,7 +651,7 @@ export class MonitorInterval { } toJSON() { - const currentTime = now(); + const currentTime = processTimeMS(); const timeSinceLastCall = currentTime - this.lastExecutionEnded; return { timerId: this.timerId != null ? 'set' : 'cleared', @@ -684,7 +684,7 @@ export class MonitorInterval { this.isExecutionInProgress = true; this.fn(() => { - this.lastExecutionEnded = now(); + this.lastExecutionEnded = processTimeMS(); this.isExecutionInProgress = false; this._reschedule(this.heartbeatFrequencyMS); }); diff --git a/src/sdam/server_description.ts b/src/sdam/server_description.ts index 903baefcd8..d7f16fe287 100644 --- a/src/sdam/server_description.ts +++ b/src/sdam/server_description.ts @@ -1,6 +1,12 @@ import { type Document, Long, type ObjectId } from '../bson'; import { type MongoError, MongoRuntimeError } from '../error'; -import { arrayStrictEqual, compareObjectId, errorStrictEqual, HostAddress, now } from '../utils'; +import { + arrayStrictEqual, + compareObjectId, + errorStrictEqual, + HostAddress, + processTimeMS +} from '../utils'; import { type ClusterTime, ServerType } from './common'; const WRITABLE_SERVER_TYPES = new Set([ @@ -110,7 +116,7 @@ export class ServerDescription { this.maxWireVersion = hello?.maxWireVersion ?? 0; this.roundTripTime = options?.roundTripTime ?? -1; this.minRoundTripTime = options?.minRoundTripTime ?? 0; - this.lastUpdateTime = now(); + this.lastUpdateTime = processTimeMS(); this.lastWriteDate = hello?.lastWrite?.lastWriteDate ?? 0; // NOTE: This actually builds the stack string instead of holding onto the getter and all its // associated references. This is done to prevent a memory leak. diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 05cd8eb2ab..eba356b020 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -45,7 +45,7 @@ import { List, makeStateMachine, noop, - now, + processTimeMS, promiseWithResolvers, shuffle } from '../utils'; @@ -602,7 +602,7 @@ export class Topology extends TypedEventEmitter { resolve, reject, cancelled: false, - startTime: now(), + startTime: processTimeMS(), operationName: options.operationName, waitingLogged: false, previousServer: options.previousServer @@ -1001,7 +1001,8 @@ function processWaitQueue(topology: Topology) { waitQueueMember.serverSelector, topology.description, topology.s.serverSelectionTimeoutMS !== 0 - ? topology.s.serverSelectionTimeoutMS - (now() - waitQueueMember.startTime) + ? topology.s.serverSelectionTimeoutMS - + (processTimeMS() - waitQueueMember.startTime) : -1, waitQueueMember.operationName ) diff --git a/src/sessions.ts b/src/sessions.ts index 0788ff1538..6c39a6f0b4 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -42,7 +42,7 @@ import { List, MongoDBNamespace, noop, - now, + processTimeMS, squashError, uuidV4 } from './utils'; @@ -726,7 +726,9 @@ export class ClientSession }) : null; - const startTime = this.timeoutContext?.csotEnabled() ? this.timeoutContext.start : now(); + const startTime = this.timeoutContext?.csotEnabled() + ? this.timeoutContext.start + : processTimeMS(); let committed = false; let result: any; @@ -768,7 +770,7 @@ export class ClientSession if ( fnError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) && - (this.timeoutContext != null || now() - startTime < MAX_TIMEOUT) + (this.timeoutContext != null || processTimeMS() - startTime < MAX_TIMEOUT) ) { continue; } @@ -796,14 +798,14 @@ export class ClientSession if ( !isMaxTimeMSExpiredError(commitError) && commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult) && - (this.timeoutContext != null || now() - startTime < MAX_TIMEOUT) + (this.timeoutContext != null || processTimeMS() - startTime < MAX_TIMEOUT) ) { continue; } if ( commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) && - (this.timeoutContext != null || now() - startTime < MAX_TIMEOUT) + (this.timeoutContext != null || processTimeMS() - startTime < MAX_TIMEOUT) ) { break; } @@ -943,7 +945,7 @@ export class ServerSession { return; } this.id = { id: new Binary(uuidV4(), Binary.SUBTYPE_UUID) }; - this.lastUse = now(); + this.lastUse = processTimeMS(); this.txnNumber = 0; this.isDirty = false; } @@ -1078,7 +1080,7 @@ export function applySession( } // mark the last use of this session, and apply the `lsid` - serverSession.lastUse = now(); + serverSession.lastUse = processTimeMS(); command.lsid = serverSession.id; const inTxnOrTxnCommand = session.inTransaction() || isTransactionCommand(command); diff --git a/src/utils.ts b/src/utils.ts index 49aa15ccea..c0f0864dbf 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -434,10 +434,13 @@ export function makeStateMachine(stateTable: StateTable): StateTransitionFunctio }; } -/** @internal */ -export function now(): number { - const hrtime = process.hrtime(); - return Math.floor(hrtime[0] * 1000 + hrtime[1] / 1000000); +/** + * This function returns the number of milliseconds since an arbitrary point in time. + * This function should only be used to measure time intervals. + * @internal + * */ +export function processTimeMS(): number { + return Math.floor(performance.now()); } /** @internal */ @@ -446,7 +449,7 @@ export function calculateDurationInMs(started: number | undefined): number { return -1; } - const elapsed = now() - started; + const elapsed = processTimeMS() - started; return elapsed < 0 ? 0 : elapsed; } diff --git a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts index e8736ac3b6..6a56563091 100644 --- a/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts +++ b/test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts @@ -19,7 +19,7 @@ import { MongoServerSelectionError, ObjectId } from '../../../src'; -import { now, squashError } from '../../../src/utils'; +import { processTimeMS, squashError } from '../../../src/utils'; import { clearFailPoint, configureFailPoint, @@ -916,7 +916,7 @@ describe('CSOT spec prose tests', function () { * - Expect this to fail with a server selection timeout error after no more than 15ms. */ client = new MongoClient('mongodb://invalid/?timeoutMS=10&serverSelectionTimeoutMS=20'); - const start = now(); + const start = processTimeMS(); const maybeError = await client .db('test') @@ -926,7 +926,7 @@ describe('CSOT spec prose tests', function () { () => null, e => e ); - const end = now(); + const end = processTimeMS(); expect(maybeError).to.be.instanceof(MongoOperationTimeoutError); expect(end - start).to.be.lte(15); @@ -940,7 +940,7 @@ describe('CSOT spec prose tests', function () { * - Expect this to fail with a server selection timeout error after no more than 15ms. */ client = new MongoClient('mongodb://invalid/?timeoutMS=10&serverSelectionTimeoutMS=20'); - const start = now(); + const start = processTimeMS(); const maybeError = await client .db('test') @@ -950,7 +950,7 @@ describe('CSOT spec prose tests', function () { () => null, e => e ); - const end = now(); + const end = processTimeMS(); expect(maybeError).to.be.instanceof(MongoOperationTimeoutError); expect(end - start).to.be.lte(15); @@ -964,7 +964,7 @@ describe('CSOT spec prose tests', function () { * - Expect this to fail with a server selection timeout error after no more than 15ms. */ client = new MongoClient('mongodb://invalid/?timeoutMS=20&serverSelectionTimeoutMS=10'); - const start = now(); + const start = processTimeMS(); const maybeError = await client .db('test') .admin() @@ -973,7 +973,7 @@ describe('CSOT spec prose tests', function () { () => null, e => e ); - const end = now(); + const end = processTimeMS(); expect(maybeError).to.be.instanceof(MongoOperationTimeoutError); expect(end - start).to.be.lte(15); @@ -987,7 +987,7 @@ describe('CSOT spec prose tests', function () { * - Expect this to fail with a server selection timeout error after no more than 15ms. */ client = new MongoClient('mongodb://invalid/?timeoutMS=0&serverSelectionTimeoutMS=10'); - const start = now(); + const start = processTimeMS(); const maybeError = await client .db('test') .admin() @@ -996,7 +996,7 @@ describe('CSOT spec prose tests', function () { () => null, e => e ); - const end = now(); + const end = processTimeMS(); expect(maybeError).to.be.instanceof(MongoOperationTimeoutError); expect(end - start).to.be.lte(15); @@ -1040,7 +1040,7 @@ describe('CSOT spec prose tests', function () { serverSelectionTimeoutMS: 20, timeoutMS: 10 }); - const start = now(); + const start = processTimeMS(); const maybeError = await client .db('db') .collection('coll') @@ -1049,7 +1049,7 @@ describe('CSOT spec prose tests', function () { () => null, e => e ); - const end = now(); + const end = processTimeMS(); expect(maybeError).to.be.instanceof(MongoOperationTimeoutError); expect(end - start).to.be.lte(15); }).skipReason = @@ -1092,7 +1092,7 @@ describe('CSOT spec prose tests', function () { serverSelectionTimeoutMS: 10, timeoutMS: 20 }); - const start = now(); + const start = processTimeMS(); const maybeError = await client .db('db') .collection('coll') @@ -1101,7 +1101,7 @@ describe('CSOT spec prose tests', function () { () => null, e => e ); - const end = now(); + const end = processTimeMS(); expect(maybeError).to.be.instanceof(MongoOperationTimeoutError); expect(end - start).to.be.lte(15); }).skipReason = diff --git a/test/integration/crud/client_bulk_write.test.ts b/test/integration/crud/client_bulk_write.test.ts index 247d4683b8..99e815242c 100644 --- a/test/integration/crud/client_bulk_write.test.ts +++ b/test/integration/crud/client_bulk_write.test.ts @@ -9,7 +9,7 @@ import { MongoOperationTimeoutError } from '../../../src'; import { TimeoutContext } from '../../../src/timeout'; -import { now } from '../../../src/utils'; +import { processTimeMS } from '../../../src/utils'; import { clearFailPoint, configureFailPoint, @@ -102,7 +102,7 @@ describe('Client Bulk Write', function () { }); it('timeoutMS is used as the timeout for the bulk write', metadata, async function () { - const start = now(); + const start = processTimeMS(); const timeoutError = await client .bulkWrite([ { @@ -112,7 +112,7 @@ describe('Client Bulk Write', function () { } ]) .catch(e => e); - const end = now(); + const end = processTimeMS(); expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); expect(end - start).to.be.within(300 - 100, 300 + 100); }); @@ -132,7 +132,7 @@ describe('Client Bulk Write', function () { }); it('timeoutMS is used as the timeout for the bulk write', metadata, async function () { - const start = now(); + const start = processTimeMS(); const timeoutError = await client .bulkWrite( [ @@ -145,7 +145,7 @@ describe('Client Bulk Write', function () { { timeoutMS: 300 } ) .catch(e => e); - const end = now(); + const end = processTimeMS(); expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); expect(end - start).to.be.within(300 - 100, 300 + 100); }); @@ -165,7 +165,7 @@ describe('Client Bulk Write', function () { }); it('bulk write options take precedence over the client options', metadata, async function () { - const start = now(); + const start = processTimeMS(); const timeoutError = await client .bulkWrite( [ @@ -178,7 +178,7 @@ describe('Client Bulk Write', function () { { timeoutMS: 300 } ) .catch(e => e); - const end = now(); + const end = processTimeMS(); expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); expect(end - start).to.be.within(300 - 100, 300 + 100); }); @@ -217,7 +217,7 @@ describe('Client Bulk Write', function () { }); it('a single batch bulk write does not take longer than timeoutMS', async function () { - const start = now(); + const start = processTimeMS(); let end; const timeoutError = client .bulkWrite( @@ -232,7 +232,7 @@ describe('Client Bulk Write', function () { ) .catch(e => e) .then(e => { - end = now(); + end = processTimeMS(); return e; }); @@ -252,7 +252,7 @@ describe('Client Bulk Write', function () { }, async function () { const models = await makeMultiBatchWrite(this.configuration); - const start = now(); + const start = processTimeMS(); let end; const timeoutError = client .bulkWrite(models, { @@ -262,7 +262,7 @@ describe('Client Bulk Write', function () { }) .catch(e => e) .then(r => { - end = now(); + end = processTimeMS(); return r; }); @@ -300,7 +300,7 @@ describe('Client Bulk Write', function () { }); it('the operation times out', metadata, async function () { - const start = now(); + const start = processTimeMS(); const timeoutError = await client .bulkWrite( [ @@ -313,7 +313,7 @@ describe('Client Bulk Write', function () { { timeoutMS: 300 } ) .catch(e => e); - const end = now(); + const end = processTimeMS(); expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); expect(end - start).to.be.within(300 - 100, 300 + 100); }); @@ -346,7 +346,7 @@ describe('Client Bulk Write', function () { async function () { const timeoutMS = 1500; const models = await makeMultiResponseBatchModelArray(this.configuration); - const start = now(); + const start = processTimeMS(); const timeoutError = await client .bulkWrite(models, { verboseResults: true, @@ -354,7 +354,7 @@ describe('Client Bulk Write', function () { }) .catch(e => e); - const end = now(); + const end = processTimeMS(); expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); // DRIVERS-3005 - killCursors causes cursor cleanup to extend past timeoutMS. @@ -435,14 +435,14 @@ describe('Client Bulk Write', function () { }, async function () { const models = await makeMultiBatchWrite(this.configuration); - const start = now(); + const start = processTimeMS(); const timeoutError = await client .bulkWrite(models, { timeoutMS: 2000 }) .catch(e => e); - const end = now(); + const end = processTimeMS(); expect(timeoutError).to.be.instanceOf(MongoOperationTimeoutError); expect(end - start).to.be.within(2000 - 100, 2000 + 100); expect(commands.length, 'Test must execute two batches.').to.equal(2); diff --git a/test/tools/utils.ts b/test/tools/utils.ts index eed0fde5f1..d092fe2eca 100644 --- a/test/tools/utils.ts +++ b/test/tools/utils.ts @@ -21,7 +21,7 @@ import { } from '../../src'; import { OP_MSG } from '../../src/cmap/wire_protocol/constants'; import { Topology } from '../../src/sdam/topology'; -import { now } from '../../src/utils'; +import { processTimeMS } from '../../src/utils'; import { type TestConfiguration } from './runner/config'; export function ensureCalledWith(stub: any, args: any[]) { @@ -483,9 +483,9 @@ export async function measureDuration(f: () => Promise): Promise<{ duration: number; result: T | Error; }> { - const start = now(); + const start = processTimeMS(); const result = await f().catch(e => e); - const end = now(); + const end = processTimeMS(); return { duration: end - start, result diff --git a/test/unit/sessions.test.ts b/test/unit/sessions.test.ts index 1c6c31ab23..d03006bb26 100644 --- a/test/unit/sessions.test.ts +++ b/test/unit/sessions.test.ts @@ -5,7 +5,7 @@ import * as sinon from 'sinon'; import { MongoRuntimeError } from '../../src/error'; import { MongoClient } from '../../src/mongo_client'; import { applySession, ClientSession, ServerSession, ServerSessionPool } from '../../src/sessions'; -import { isHello, now } from '../../src/utils'; +import { isHello, processTimeMS } from '../../src/utils'; import { genClusterTime } from '../tools/common'; import * as mock from '../tools/mongodb-mock/index'; @@ -513,7 +513,7 @@ describe('Sessions - unit', function () { it('should remove sessions which have timed out on acquire, and return a fresh session', function () { const oldSession = new ServerSession(); - oldSession.lastUse = now() - 30 * 60 * 1000; // add 30min + oldSession.lastUse = processTimeMS() - 30 * 60 * 1000; // add 30min const pool = new ServerSessionPool(client); pool.sessions.push(oldSession); @@ -527,7 +527,7 @@ describe('Sessions - unit', function () { describe('release()', () => { const makeOldSession = () => { const oldSession = new ServerSession(); - oldSession.lastUse = now() - 30 * 60 * 1000; // add 30min + oldSession.lastUse = processTimeMS() - 30 * 60 * 1000; // add 30min return oldSession; }; @@ -603,7 +603,7 @@ describe('Sessions - unit', function () { it('should not reintroduce a soon-to-expire session to the pool on release', function () { const session = new ServerSession(); - session.lastUse = now() - 9.5 * 60 * 1000; // add 9.5min + session.lastUse = processTimeMS() - 9.5 * 60 * 1000; // add 9.5min const pool = new ServerSessionPool(client);