From 33c093daf77153ea3d0ab8d7925772efd37bdebe Mon Sep 17 00:00:00 2001 From: Marylia Gutierrez Date: Wed, 16 Oct 2024 13:05:04 -0400 Subject: [PATCH] fix(instrumentation-pg): not add duplicate listeners to pg pool (#2484) --- .../src/instrumentation.ts | 83 +++--- .../src/internal-types.ts | 5 + .../src/utils.ts | 2 +- .../test/pg-pool.test.ts | 267 ++++++++++++++++++ 4 files changed, 321 insertions(+), 36 deletions(-) diff --git a/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts b/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts index ec2ced25f5..2905e783a8 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts @@ -39,6 +39,7 @@ import { PostgresCallback, PgPoolExtended, PgPoolCallback, + EVENT_LISTENERS_SET, } from './internal-types'; import { PgInstrumentationConfig } from './types'; import * as utils from './utils'; @@ -435,6 +436,52 @@ export class PgInstrumentation extends InstrumentationBase { + this._connectionsCounter = utils.updateCounter( + poolName, + pgPool, + this._connectionsCount, + this._connectionPendingRequests, + this._connectionsCounter + ); + }); + + pgPool.on('acquire', () => { + this._connectionsCounter = utils.updateCounter( + poolName, + pgPool, + this._connectionsCount, + this._connectionPendingRequests, + this._connectionsCounter + ); + }); + + pgPool.on('remove', () => { + this._connectionsCounter = utils.updateCounter( + poolName, + pgPool, + this._connectionsCount, + this._connectionPendingRequests, + this._connectionsCounter + ); + }); + + pgPool.on('release' as any, () => { + this._connectionsCounter = utils.updateCounter( + poolName, + pgPool, + this._connectionsCount, + this._connectionPendingRequests, + this._connectionsCounter + ); + }); + pgPool[EVENT_LISTENERS_SET] = true; + } + private _getPoolConnectPatch() { const plugin = this; return (originalConnect: typeof pgPoolTypes.prototype.connect) => { @@ -449,41 +496,7 @@ export class PgInstrumentation extends InstrumentationBase { - plugin._connectionsCounter = utils.updateCounter( - this, - plugin._connectionsCount, - plugin._connectionPendingRequests, - plugin._connectionsCounter - ); - }); - - this.on('acquire', () => { - plugin._connectionsCounter = utils.updateCounter( - this, - plugin._connectionsCount, - plugin._connectionPendingRequests, - plugin._connectionsCounter - ); - }); - - this.on('remove', () => { - plugin._connectionsCounter = utils.updateCounter( - this, - plugin._connectionsCount, - plugin._connectionPendingRequests, - plugin._connectionsCounter - ); - }); - - this.on('release' as any, () => { - plugin._connectionsCounter = utils.updateCounter( - this, - plugin._connectionsCount, - plugin._connectionPendingRequests, - plugin._connectionsCounter - ); - }); + plugin._setPoolConnectEventListeners(this); if (callback) { const parentSpan = trace.getSpan(context.active()); diff --git a/plugins/node/opentelemetry-instrumentation-pg/src/internal-types.ts b/plugins/node/opentelemetry-instrumentation-pg/src/internal-types.ts index 1270814499..d01432b65c 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/src/internal-types.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/src/internal-types.ts @@ -55,8 +55,13 @@ export interface PgPoolOptionsParams { maxClient: number; // maximum size of the pool } +export const EVENT_LISTENERS_SET = Symbol( + 'opentelemetry.instrumentation.pg.eventListenersSet' +); + export interface PgPoolExtended extends pgPoolTypes { options: PgPoolOptionsParams; + [EVENT_LISTENERS_SET]?: boolean; // flag to identify if the event listeners for instrumentation have been set } export type PgClientConnect = (callback?: Function) => Promise | void; diff --git a/plugins/node/opentelemetry-instrumentation-pg/src/utils.ts b/plugins/node/opentelemetry-instrumentation-pg/src/utils.ts index f34fb174e0..7d2eb9c115 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/src/utils.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/src/utils.ts @@ -282,12 +282,12 @@ export interface poolConnectionsCounter { } export function updateCounter( + poolName: string, pool: PgPoolExtended, connectionCount: UpDownCounter, connectionPendingRequests: UpDownCounter, latestCounter: poolConnectionsCounter ): poolConnectionsCounter { - const poolName = getPoolName(pool.options); const all = pool.totalCount; const pending = pool.waitingCount; const idle = pool.idleCount; diff --git a/plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts b/plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts index 7f2c0db6fc..b8d65a42b5 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts @@ -572,5 +572,272 @@ describe('pg-pool', () => { }); }); }); + + it('should not add duplicate event listeners to PgPool events', done => { + const poolAux: pgPool = new pgPool(CONFIG); + let completed = 0; + poolAux.connect((err, client, release) => { + if (err) { + throw new Error(err.message); + } + if (!release) { + throw new Error('Did not receive release function'); + } + if (!client) { + throw new Error('No client received'); + } + assert.ok(client); + release(); + + assert.equal( + poolAux.listenerCount('connect'), + 1, + `${poolAux.listenerCount('connect')} event listener(s) for 'connect'` + ); + assert.equal( + poolAux.listenerCount('acquire'), + 1, + `${poolAux.listenerCount('acquire')} event listener(s) for 'acquire'` + ); + assert.equal( + poolAux.listenerCount('remove'), + 1, + `${poolAux.listenerCount('remove')} event listener(s) for 'remove'` + ); + assert.equal( + poolAux.listenerCount('release'), + 1, + `${poolAux.listenerCount('release')} event listener(s) for 'release'` + ); + + completed++; + if (completed >= 2) { + done(); + } + }); + + poolAux.connect((err, client, release) => { + if (err) { + throw new Error(err.message); + } + if (!release) { + throw new Error('Did not receive release function'); + } + if (!client) { + throw new Error('No client received'); + } + assert.ok(client); + release(); + + assert.equal( + poolAux.listenerCount('connect'), + 1, + `${poolAux.listenerCount('connect')} event listener(s) for 'connect'` + ); + assert.equal( + poolAux.listenerCount('acquire'), + 1, + `${poolAux.listenerCount('acquire')} event listener(s) for 'acquire'` + ); + assert.equal( + poolAux.listenerCount('remove'), + 1, + `${poolAux.listenerCount('remove')} event listener(s) for 'remove'` + ); + assert.equal( + poolAux.listenerCount('release'), + 1, + `${poolAux.listenerCount('release')} event listener(s) for 'release'` + ); + + completed++; + if (completed >= 2) { + done(); + } + }); + }); + + it('adding a custom event listener should still work with the default event listeners to PgPool events', done => { + const poolAux: pgPool = new pgPool(CONFIG); + let testValue = 0; + poolAux.on('connect', () => { + testValue = 1; + }); + + poolAux.connect((err, client, release) => { + if (err) { + throw new Error(err.message); + } + if (!release) { + throw new Error('Did not receive release function'); + } + if (!client) { + throw new Error('No client received'); + } + assert.ok(client); + + client.query('SELECT NOW()', async (err, ret) => { + release(); + if (err) { + throw new Error(err.message); + } + assert.ok(ret); + assert.equal( + poolAux.listenerCount('connect'), + 2, + `${poolAux.listenerCount( + 'connect' + )} event listener(s) for 'connect'` + ); + assert.equal( + poolAux.listenerCount('acquire'), + 1, + `${poolAux.listenerCount( + 'acquire' + )} event listener(s) for 'acquire'` + ); + assert.equal( + poolAux.listenerCount('remove'), + 1, + `${poolAux.listenerCount('remove')} event listener(s) for 'remove'` + ); + assert.equal( + poolAux.listenerCount('release'), + 1, + `${poolAux.listenerCount( + 'release' + )} event listener(s) for 'release'` + ); + assert.equal(testValue, 1); + + const { resourceMetrics, errors } = await metricReader.collect(); + assert.deepEqual( + errors, + [], + 'expected no errors from the callback during metric collection' + ); + + const metrics = resourceMetrics.scopeMetrics[0].metrics; + assert.strictEqual( + metrics[1].descriptor.name, + METRIC_DB_CLIENT_CONNECTION_COUNT + ); + assert.strictEqual( + metrics[1].dataPoints[0].attributes[ + ATTR_DB_CLIENT_CONNECTION_STATE + ], + 'used' + ); + assert.strictEqual( + metrics[1].dataPoints[0].value, + 1, + 'expected to have 1 used connection' + ); + done(); + }); + }); + }); + + it('when creating multiple pools, all of them should be instrumented', done => { + const pool1: pgPool = new pgPool(CONFIG); + const pool2: pgPool = new pgPool(CONFIG); + + let completed = 0; + pool1.connect((err, client, release) => { + if (err) { + throw new Error(err.message); + } + if (!release) { + throw new Error('Did not receive release function'); + } + if (!client) { + throw new Error('No client received'); + } + assert.ok(client); + release(); + + assert.equal( + pool1.listenerCount('connect'), + 1, + `${pool1.listenerCount( + 'connect' + )} event listener(s) for 'connect' on pool1` + ); + assert.equal( + pool1.listenerCount('acquire'), + 1, + `${pool1.listenerCount( + 'acquire' + )} event listener(s) for 'acquire' on pool1` + ); + assert.equal( + pool1.listenerCount('remove'), + 1, + `${pool1.listenerCount( + 'remove' + )} event listener(s) for 'remove' on pool1` + ); + assert.equal( + pool1.listenerCount('release'), + 1, + `${pool1.listenerCount( + 'release' + )} event listener(s) for 'release' on pool1` + ); + + completed++; + if (completed >= 2) { + done(); + } + }); + + pool2.connect((err, client, release) => { + if (err) { + throw new Error(err.message); + } + if (!release) { + throw new Error('Did not receive release function'); + } + if (!client) { + throw new Error('No client received'); + } + assert.ok(client); + release(); + + assert.equal( + pool2.listenerCount('connect'), + 1, + `${pool2.listenerCount( + 'connect' + )} event listener(s) for 'connect' on pool2` + ); + assert.equal( + pool2.listenerCount('acquire'), + 1, + `${pool2.listenerCount( + 'acquire' + )} event listener(s) for 'acquire' on pool2` + ); + assert.equal( + pool2.listenerCount('remove'), + 1, + `${pool2.listenerCount( + 'remove' + )} event listener(s) for 'remove' on pool2` + ); + assert.equal( + pool2.listenerCount('release'), + 1, + `${pool2.listenerCount( + 'release' + )} event listener(s) for 'release' on pool2` + ); + + completed++; + if (completed >= 2) { + done(); + } + }); + }); }); });