Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions packages/pg-pool/diagnostics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
'use strict'

const noopChannel = { hasSubscribers: false }

let poolConnectChannel = noopChannel
let poolReleaseChannel = noopChannel
let poolRemoveChannel = noopChannel

try {
let dc
if (typeof process.getBuiltInModule === 'function') {
dc = process.getBuiltInModule('diagnostics_channel')
} else {
dc = require('diagnostics_channel')
}
if (typeof dc.tracingChannel === 'function') {
poolConnectChannel = dc.tracingChannel('pg:pool:connect')
}
if (typeof dc.channel === 'function') {
poolReleaseChannel = dc.channel('pg:pool:release')
poolRemoveChannel = dc.channel('pg:pool:remove')
}
} catch (e) {
// diagnostics_channel not available (non-Node environment)
}

// Check explicitly for `false` rather than truthiness because the aggregated
// `hasSubscribers` getter on TracingChannel is `undefined` on Node 18 (which
// backported TracingChannel but not the getter). When `undefined`, we assume
// there may be subscribers and trace unconditionally.
function shouldTrace(channel) {
return channel.hasSubscribers !== false
}

module.exports = { poolConnectChannel, poolReleaseChannel, poolRemoveChannel, shouldTrace }
34 changes: 34 additions & 0 deletions packages/pg-pool/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'
const EventEmitter = require('events').EventEmitter
const { poolConnectChannel, poolReleaseChannel, poolRemoveChannel, shouldTrace } = require('./diagnostics')

const NOOP = function () {}

Expand Down Expand Up @@ -178,6 +179,10 @@ class Pool extends EventEmitter {

this._clients = this._clients.filter((c) => c !== client)
const context = this
if (shouldTrace(poolRemoveChannel)) {
poolRemoveChannel.publish({ client: { processID: client.processID } })
}

client.end(() => {
context.emit('remove', client)

Expand All @@ -196,6 +201,31 @@ class Pool extends EventEmitter {
const response = promisify(this.Promise, cb)
const result = response.result

if (shouldTrace(poolConnectChannel)) {
const context = {
pool: {
totalCount: this.totalCount,
idleCount: this.idleCount,
waitingCount: this.waitingCount,
maxSize: this.options.max,
},
}
const origCb = response.callback
const enrichedCb = (err, client, done) => {
if (client) context.client = { processID: client.processID, reused: !!client._poolUseCount }
return origCb(err, client, done)
}
poolConnectChannel.traceCallback(
(tracedCb) => {
response.callback = tracedCb
},
0,
context,
null,
enrichedCb
)
}

// if we don't have to connect a new client, don't do so
if (this._isFull() || this._idle.length) {
// if we have idle clients schedule a pulse immediately
Expand Down Expand Up @@ -388,6 +418,10 @@ class Pool extends EventEmitter {

this.emit('release', err, client)

if (shouldTrace(poolReleaseChannel)) {
poolReleaseChannel.publish({ client: { processID: client.processID }, error: err || undefined })
}

// TODO(bmc): expose a proper, public interface _queryable and _ending
if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) {
if (client._poolUseCount >= this.options.maxUses) {
Expand Down
197 changes: 197 additions & 0 deletions packages/pg-pool/test/diagnostics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
'use strict'

const expect = require('expect.js')
const EventEmitter = require('events').EventEmitter
const describe = require('mocha').describe
const it = require('mocha').it
const dc = require('diagnostics_channel')
const Pool = require('../')

// TracingChannel exists on Node 18+ but the aggregated hasSubscribers getter
// and stable unsubscribe behavior require Node 19.9+/20.5+. Skip tracing
// tests on older versions where TracingChannel is missing or has internal bugs.
const hasStableTracingChannel =
typeof dc.tracingChannel === 'function' && typeof dc.tracingChannel('pg:pool:test:probe').hasSubscribers === 'boolean'

function mockClient(methods) {
return function () {
const client = new EventEmitter()
client.end = function (cb) {
if (cb) process.nextTick(cb)
}
client._queryable = true
client._ending = false
client.processID = 12345
Object.assign(client, methods)
return client
}
}

describe('diagnostics channels', function () {
describe('pg:pool:connect', function () {
;(hasStableTracingChannel ? it : it.skip)('publishes start event when connect is called', function (done) {
const pool = new Pool({
Client: mockClient({
connect: function (cb) {
process.nextTick(() => cb(null))
},
}),
})

let capturedContext
const channel = dc.tracingChannel('pg:pool:connect')
const subs = {
start: (ctx) => {
capturedContext = ctx
},
end: () => {},
asyncStart: () => {},
asyncEnd: () => {},
error: () => {},
}

channel.subscribe(subs)

pool.connect(function (err, client, release) {
if (err) return done(err)
release()
pool.end(() => {
expect(capturedContext).to.be.ok()
expect(capturedContext.pool).to.be.ok()
expect(capturedContext.pool.maxSize).to.be(10)
expect(capturedContext.pool.totalCount).to.be.a('number')

channel.unsubscribe(subs)
done()
})
})
})
;(hasStableTracingChannel ? it : it.skip)('enriches context with client info on asyncEnd', function (done) {
const pool = new Pool({
Client: mockClient({
connect: function (cb) {
process.nextTick(() => cb(null))
},
}),
})

const channel = dc.tracingChannel('pg:pool:connect')
const subs = {
start: () => {},
end: () => {},
asyncStart: () => {},
asyncEnd: (ctx) => {
expect(ctx.client).to.be.ok()
expect(ctx.client.processID).to.be(12345)

channel.unsubscribe(subs)
done()
},
error: () => {},
}

channel.subscribe(subs)

pool.connect(function (err, client, release) {
if (err) return done(err)
release()
pool.end()
})
})
})

describe('pg:pool:release', function () {
it('publishes when a client is released', function (done) {
const pool = new Pool({
Client: mockClient({
connect: function (cb) {
process.nextTick(() => cb(null))
},
}),
})

let releaseMessage
const channel = dc.channel('pg:pool:release')
const onMessage = (msg) => {
releaseMessage = msg
}
channel.subscribe(onMessage)

pool.connect(function (err, client, release) {
if (err) return done(err)
release()
pool.end(() => {
expect(releaseMessage).to.be.ok()
expect(releaseMessage.client).to.be.ok()
expect(releaseMessage.client.processID).to.be(12345)

channel.unsubscribe(onMessage)
done()
})
})
})

it('includes error when released with error', function (done) {
const pool = new Pool({
Client: mockClient({
connect: function (cb) {
process.nextTick(() => cb(null))
},
}),
})

let releaseMessage
const channel = dc.channel('pg:pool:release')
const onMessage = (msg) => {
releaseMessage = msg
}
channel.subscribe(onMessage)

pool.connect(function (err, client, release) {
if (err) return done(err)
const releaseError = new Error('test error')
release(releaseError)
pool.end(() => {
expect(releaseMessage).to.be.ok()
expect(releaseMessage.error).to.be(releaseError)

channel.unsubscribe(onMessage)
done()
})
})
})
})

describe('pg:pool:remove', function () {
it('publishes when a client is removed', function (done) {
const pool = new Pool({
Client: mockClient({
connect: function (cb) {
process.nextTick(() => cb(null))
},
}),
})

let removeMessage
const channel = dc.channel('pg:pool:remove')
const onMessage = (msg) => {
removeMessage = msg
}
channel.subscribe(onMessage)

pool.connect(function (err, client, release) {
if (err) return done(err)
// release with error to trigger removal
release(new Error('force remove'))
pool.end(() => {
expect(removeMessage).to.be.ok()
expect(removeMessage.client).to.be.ok()
expect(removeMessage.client.processID).to.be(12345)

channel.unsubscribe(onMessage)
done()
})
})
})
})
})
66 changes: 55 additions & 11 deletions packages/pg/lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const Query = require('./query')
const defaults = require('./defaults')
const Connection = require('./connection')
const crypto = require('./crypto/utils')
const { queryChannel, connectionChannel, shouldTrace } = require('./diagnostics')

const activeQueryDeprecationNotice = nodeUtils.deprecate(
() => {},
Expand Down Expand Up @@ -207,18 +208,30 @@ class Client extends EventEmitter {

connect(callback) {
if (callback) {
this._connect(callback)
if (shouldTrace(connectionChannel)) {
const context = {
connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl },
}
connectionChannel.traceCallback((tracedCb) => this._connect(tracedCb), 0, context, null, callback)
} else {
this._connect(callback)
}
return
}

return new this._Promise((resolve, reject) => {
this._connect((error) => {
if (error) {
reject(error)
} else {
resolve(this)
const callback = (error) => {
if (error) reject(error)
else resolve(this)
}
if (shouldTrace(connectionChannel)) {
const context = {
connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl },
}
})
connectionChannel.traceCallback((tracedCb) => this._connect(tracedCb), 0, context, null, callback)
} else {
this._connect(callback)
}
})
}

Expand Down Expand Up @@ -687,11 +700,42 @@ class Client extends EventEmitter {
return result
}

if (this._queryQueue.length > 0) {
queryQueueLengthDeprecationNotice()
const enqueue = () => {
if (this._queryQueue.length > 0) queryQueueLengthDeprecationNotice()
this._queryQueue.push(query)
this._pulseQueryQueue()
}

if (shouldTrace(queryChannel) && query.callback) {
const context = {
query: { text: query.text, name: query.name, rowMode: query._rowMode },
client: {
database: this.database,
host: this.host,
port: this.port,
user: this.user,
processID: this.processID,
ssl: !!this.ssl,
},
}
const origCb = query.callback
const enrichedCb = (err, res) => {
if (res) context.result = { rowCount: res.rowCount, command: res.command }
return origCb(err, res)
}
queryChannel.traceCallback(
(tracedCb) => {
query.callback = tracedCb
enqueue()
},
0,
context,
null,
enrichedCb
)
} else {
enqueue()
}
this._queryQueue.push(query)
this._pulseQueryQueue()
return result
}

Expand Down
Loading
Loading