From 4f48fc3bbd5afce4954019620b112f600b2dfb53 Mon Sep 17 00:00:00 2001 From: Bob Evans Date: Fri, 24 May 2024 12:55:01 -0400 Subject: [PATCH] refactor: Updated `shim.recordConsume` to use `shim.record` and added ability to invoke an after hook with callback args (#2207) --- lib/instrumentation/amqplib/amqplib.js | 18 +-- lib/instrumentation/aws-sdk/v3/bedrock.js | 3 +- lib/instrumentation/core/inspector.js | 2 +- lib/instrumentation/langchain/runnable.js | 6 +- lib/instrumentation/langchain/tools.js | 3 +- lib/instrumentation/langchain/vectorstore.js | 3 +- lib/instrumentation/memcached.js | 2 +- lib/instrumentation/openai.js | 6 +- lib/instrumentation/redis.js | 6 +- lib/instrumentation/superagent.js | 2 +- lib/shim/message-shim/consume.js | 128 ++----------------- lib/shim/message-shim/index.js | 13 +- lib/shim/message-shim/subscribe-consume.js | 9 +- lib/shim/shim.js | 96 +++++++++----- lib/shim/specs/recorder.js | 17 ++- lib/shim/webframework-shim/middleware.js | 12 +- test/unit/shim/message-shim.test.js | 75 +---------- test/unit/shim/shim.test.js | 107 +++++++++------- test/versioned/amqplib/amqp-utils.js | 7 +- test/versioned/amqplib/callback.tap.js | 53 +++++++- test/versioned/amqplib/promises.tap.js | 56 +++++++- 21 files changed, 292 insertions(+), 332 deletions(-) diff --git a/lib/instrumentation/amqplib/amqplib.js b/lib/instrumentation/amqplib/amqplib.js index 6958eb17bf..f709ddf430 100644 --- a/lib/instrumentation/amqplib/amqplib.js +++ b/lib/instrumentation/amqplib/amqplib.js @@ -266,18 +266,20 @@ function wrapModel(shim, Model, promiseMode) { destinationName: shim.FIRST, callback: setCallback(shim, promiseMode), promise: promiseMode, - messageHandler: function handleConsumedMessage(shim, fn, name, message) { + after: function handleConsumedMessage({ shim, result, args, segment }) { + if (!shim.agent.config.message_tracer.segment_parameters.enabled) { + shim.logger.trace('Not capturing segment parameters') + return + } + // the message is the param when using the promised based model - message = promiseMode ? message : message[1] + const message = promiseMode ? result : args?.[1] if (!message) { shim.logger.trace('No results from consume.') return null } const parameters = getParametersFromMessage(message) - - const headers = message?.properties?.headers - - return { parameters, headers } + shim.copySegmentParameters(segment, parameters) } }) ) @@ -312,12 +314,10 @@ function wrapModel(shim, Model, promiseMode) { * Extracts the appropriate messageHandler parameters for the consume method. * * @param {Shim} shim instance of shim - * @param {object} _consumer not used - * @param {string} _name not used * @param {Array} args arguments passed to the consume method * @returns {object} message params */ -function describeMessage(shim, _consumer, _name, args) { +function describeMessage(shim, args) { const [message] = args if (!message?.properties) { diff --git a/lib/instrumentation/aws-sdk/v3/bedrock.js b/lib/instrumentation/aws-sdk/v3/bedrock.js index f20b2a6f1c..9fb575aad2 100644 --- a/lib/instrumentation/aws-sdk/v3/bedrock.js +++ b/lib/instrumentation/aws-sdk/v3/bedrock.js @@ -197,8 +197,7 @@ function getBedrockSpec({ commandName }, shim, _original, _name, args) { return new RecorderSpec({ promise: true, name: `Llm/${modelType}/Bedrock/${commandName}`, - // eslint-disable-next-line max-params - after: (shim, _fn, _fnName, err, response, segment) => { + after: ({ shim, error: err, result: response, segment }) => { const passThroughParams = { shim, err, diff --git a/lib/instrumentation/core/inspector.js b/lib/instrumentation/core/inspector.js index 9cb82871c8..7dba5e9919 100644 --- a/lib/instrumentation/core/inspector.js +++ b/lib/instrumentation/core/inspector.js @@ -16,7 +16,7 @@ function initialize(agent, inspector, name, shim) { shim.wrap(sessionProto, 'post', function wrapPost(shim, fn) { return function wrappedPost() { const args = shim.argsToArray.apply(shim, arguments) - shim.bindCallbackSegment(args, shim.LAST) + shim.bindCallbackSegment(null, args, shim.LAST) return fn.apply(this, args) } }) diff --git a/lib/instrumentation/langchain/runnable.js b/lib/instrumentation/langchain/runnable.js index 9ac0e2452c..2bfbd90ee6 100644 --- a/lib/instrumentation/langchain/runnable.js +++ b/lib/instrumentation/langchain/runnable.js @@ -61,8 +61,7 @@ function instrumentInvokeChain({ langchain, shim }) { return new RecorderSpec({ name: `${LANGCHAIN.CHAIN}/${fnName}`, promise: true, - // eslint-disable-next-line max-params - after(_shim, _fn, _name, err, output, segment) { + after({ error: err, result: output, segment }) { recordChatCompletionEvents({ segment, messages: [output], @@ -97,8 +96,7 @@ function instrumentStream({ langchain, shim }) { return new RecorderSpec({ name: `${LANGCHAIN.CHAIN}/${fnName}`, promise: true, - // eslint-disable-next-line max-params - after(_shim, _fn, _name, err, output, segment) { + after({ error: err, result: output, segment }) { // Input error occurred which means a stream was not created. // Skip instrumenting streaming and create Llm Events from // the data we have diff --git a/lib/instrumentation/langchain/tools.js b/lib/instrumentation/langchain/tools.js index 3abb65df7a..9844f6b3cf 100644 --- a/lib/instrumentation/langchain/tools.js +++ b/lib/instrumentation/langchain/tools.js @@ -31,8 +31,7 @@ module.exports = function initialize(shim, tools) { return new RecorderSpec({ name: `${LANGCHAIN.TOOL}/${name}`, promise: true, - // eslint-disable-next-line max-params - after(_shim, _fn, _name, err, output, segment) { + after({ error: err, result: output, segment }) { const metadata = mergeMetadata(instanceMeta, paramsMeta) const tags = mergeTags(instanceTags, paramsTags) segment.end() diff --git a/lib/instrumentation/langchain/vectorstore.js b/lib/instrumentation/langchain/vectorstore.js index d2ebba5f45..d602b9a169 100644 --- a/lib/instrumentation/langchain/vectorstore.js +++ b/lib/instrumentation/langchain/vectorstore.js @@ -89,8 +89,7 @@ module.exports = function initialize(shim, vectorstores) { return new RecorderSpec({ name: `${LANGCHAIN.VECTORSTORE}/${fnName}`, promise: true, - // eslint-disable-next-line max-params - after(_shim, _fn, _name, err, output, segment) { + after({ error: err, result: output, segment }) { if (!output) { // If we get an error, it is possible that `output = null`. // In that case, we define it to be an empty array. diff --git a/lib/instrumentation/memcached.js b/lib/instrumentation/memcached.js index eeac35f9ff..f98bf158d2 100644 --- a/lib/instrumentation/memcached.js +++ b/lib/instrumentation/memcached.js @@ -76,7 +76,7 @@ module.exports = function initialize(agent, memcached, moduleName, shim) { return new OperationSpec({ name: metacall.type || 'Unknown', callback: function wrapCallback(shim, fn, fnName, opSegment) { - shim.bindCallbackSegment(metacall, 'callback', opSegment) + shim.bindCallbackSegment(null, metacall, 'callback', opSegment) }, parameters }) diff --git a/lib/instrumentation/openai.js b/lib/instrumentation/openai.js index 303851ec25..a67196ccaf 100644 --- a/lib/instrumentation/openai.js +++ b/lib/instrumentation/openai.js @@ -262,8 +262,7 @@ module.exports = function initialize(agent, openai, moduleName, shim) { return new RecorderSpec({ name: OPENAI.COMPLETION, promise: true, - // eslint-disable-next-line max-params - after(_shim, _fn, _name, err, response, segment) { + after({ error: err, result: response, segment }) { if (request.stream) { instrumentStream({ agent, shim, request, response, segment }) } else { @@ -294,8 +293,7 @@ module.exports = function initialize(agent, openai, moduleName, shim) { return new RecorderSpec({ name: OPENAI.EMBEDDING, promise: true, - // eslint-disable-next-line max-params - after(_shim, _fn, _name, err, response, segment) { + after({ error: err, result: response, segment }) { addLlmMeta({ agent, segment }) if (!response) { diff --git a/lib/instrumentation/redis.js b/lib/instrumentation/redis.js index eb0c34f3e0..9e93b93ba5 100644 --- a/lib/instrumentation/redis.js +++ b/lib/instrumentation/redis.js @@ -49,7 +49,7 @@ function registerInternalSendCommand(shim, proto) { parameters, callback: function bindCallback(shim, _f, _n, segment) { if (shim.isFunction(commandObject.callback)) { - shim.bindCallbackSegment(commandObject, 'callback', segment) + shim.bindCallbackSegment(null, commandObject, 'callback', segment) } else { const self = this commandObject.callback = shim.bindSegment( @@ -87,9 +87,9 @@ function registerSendCommand(shim, proto) { callback: function bindCallback(shim, _f, _n, segment) { const last = args[args.length - 1] if (shim.isFunction(last)) { - shim.bindCallbackSegment(args, shim.LAST, segment) + shim.bindCallbackSegment(null, args, shim.LAST, segment) } else if (shim.isArray(last) && shim.isFunction(last[last.length - 1])) { - shim.bindCallbackSegment(last, shim.LAST, segment) + shim.bindCallbackSegment(null, last, shim.LAST, segment) } } }) diff --git a/lib/instrumentation/superagent.js b/lib/instrumentation/superagent.js index e4f4d47227..185925ee55 100644 --- a/lib/instrumentation/superagent.js +++ b/lib/instrumentation/superagent.js @@ -50,7 +50,7 @@ function wrapCallback(shim, callback) { return function wrappedCallback() { const segment = shim.getSegment(this) if (segment && segment.transaction.isActive()) { - shim.bindCallbackSegment(this, '_callback', segment) + shim.bindCallbackSegment(null, this, '_callback', segment) } return callback.apply(this, arguments) } diff --git a/lib/shim/message-shim/consume.js b/lib/shim/message-shim/consume.js index 62dc39121a..91ffa44cb9 100644 --- a/lib/shim/message-shim/consume.js +++ b/lib/shim/message-shim/consume.js @@ -4,7 +4,6 @@ */ 'use strict' -const TraceSegment = require('../../transaction/trace/segment') const genericRecorder = require('../../metrics/recorders/generic') const { _nameMessageSegment } = require('./common') const specs = require('../specs') @@ -25,7 +24,7 @@ module.exports = createRecorder function updateSpecFromArgs({ shim, fn, fnName, args, spec }) { let msgDesc = null if (shim.isFunction(spec)) { - msgDesc = spec.call(this, shim, fn, fnName, args) + msgDesc = spec(shim, fn, fnName, args) } else { msgDesc = spec const destIdx = shim.normalizeIndex(args.length, spec.destinationName) @@ -37,81 +36,6 @@ function updateSpecFromArgs({ shim, fn, fnName, args, spec }) { return msgDesc } -/** - * Binds the consumer callback to the active segment. - * - * @private - * @param {object} params to function - * @param {MessageShim} params.shim instance of shim - * @param {Array} params.args arguments passed to original consume function - * @param {specs.MessageSpec} params.msgDesc spec for the wrapped consume function - * @param {TraceSegment} params.segment active segment to bind callback - * @param {boolean} params.getParams flag to copy message parameters to segment - * @param {Function} params.resHandler function to handle response from callback to obtain the message parameters - */ -function bindCallback({ shim, args, msgDesc, segment, getParams, resHandler }) { - const cbIdx = shim.normalizeIndex(args.length, msgDesc.callback) - if (cbIdx !== null) { - shim.bindCallbackSegment(args, cbIdx, segment) - - // If we have a callback and a results handler, then wrap the callback so - // we can call the results handler and get the message properties. - if (resHandler) { - shim.wrap(args, cbIdx, function wrapCb(shim, cb, cbName) { - if (shim.isFunction(cb)) { - return function cbWrapper() { - const cbArgs = shim.argsToArray.apply(shim, arguments) - const msgProps = resHandler.call(this, shim, cb, cbName, cbArgs) - if (getParams && msgProps && msgProps.parameters) { - shim.copySegmentParameters(segment, msgProps.parameters) - } - - return cb.apply(this, arguments) - } - } - }) - } - } -} - -/** - * Binds the consumer function to the async context and checks return to possibly - * bind the promise - * - * @private - * @param {object} params to function - * @param {MessageShim} params.shim instance of shim - * @param {Function} params.fn consumer function - * @param {string} params.fnName name of function - * @param {Array} params.args arguments passed to original consume function - * @param {specs.MessageSpec} params.msgDesc spec for the wrapped consume function - * @param {TraceSegment} params.segment active segment to bind callback - * @param {boolean} params.getParams flag to copy message parameters to segment - * @param {Function} params.resHandler function to handle response from callback to obtain the message parameters - * @returns {Promise|*} response from consume function - */ -function bindConsumer({ shim, fn, fnName, args, msgDesc, segment, getParams, resHandler }) { - // Call the method in the context of our segment. - let ret = shim.applySegment(fn, segment, true, this, args) - - if (ret && msgDesc.promise && shim.isPromise(ret)) { - ret = shim.bindPromise(ret, segment) - - // Intercept the promise to handle the result. - if (resHandler) { - ret = ret.then(function interceptValue(res) { - const msgProps = resHandler.call(this, shim, fn, fnName, res) - if (getParams && msgProps && msgProps.parameters) { - shim.copySegmentParameters(segment, msgProps.parameters) - } - return res - }) - } - } - - return ret -} - /** * * @private @@ -119,48 +43,14 @@ function bindConsumer({ shim, fn, fnName, args, msgDesc, segment, getParams, res * @param {MessageShim} params.shim instance of shim * @param {Function} params.fn function that is being wrapped * @param {string} params.fnName name of function + * @param params.args * @param {specs.MessageSpec} params.spec spec for the wrapped consume function - * @returns {Function} recorder for consume function + * @returns {specs.MessageSpec} updated spec with logic to name segment and apply the genericRecorder */ -function createRecorder({ shim, fn, fnName, spec }) { - return function consumeRecorder() { - const parent = shim.getSegment() - if (!parent || !parent.transaction.isActive()) { - shim.logger.trace('Not recording consume, no active transaction.') - return fn.apply(this, arguments) - } - - // Process the message args. - const args = shim.argsToArray.apply(shim, arguments) - const msgDesc = updateSpecFromArgs.call(this, { shim, fn, fnName, args, spec }) - - // Make the segment if we can. - if (!msgDesc) { - shim.logger.trace('Not recording consume, no message descriptor.') - return fn.apply(this, args) - } - - const name = _nameMessageSegment(shim, msgDesc, shim._metrics.CONSUME) - - // Adds details needed by createSegment when used with a spec - msgDesc.name = name - msgDesc.recorder = genericRecorder - msgDesc.parent = parent - - const segment = shim.createSegment(msgDesc) - const getParams = shim.agent.config.message_tracer.segment_parameters.enabled - const resHandler = shim.isFunction(msgDesc.messageHandler) ? msgDesc.messageHandler : null - - bindCallback({ shim, args, msgDesc, segment, getParams, resHandler }) - return bindConsumer.call(this, { - shim, - fn, - fnName, - args, - msgDesc, - segment, - getParams, - resHandler - }) - } +function createRecorder({ spec, shim, fn, fnName, args }) { + const msgDesc = updateSpecFromArgs({ shim, fn, fnName, args, spec }) + // Adds details needed by createSegment when used with a spec + msgDesc.name = _nameMessageSegment(shim, msgDesc, shim._metrics.CONSUME) + msgDesc.recorder = genericRecorder + return msgDesc } diff --git a/lib/shim/message-shim/index.js b/lib/shim/message-shim/index.js index 6313f60940..b902aa6d27 100644 --- a/lib/shim/message-shim/index.js +++ b/lib/shim/message-shim/index.js @@ -287,17 +287,8 @@ function recordConsume(nodule, properties, spec) { properties = null } - // This is using wrap instead of record because the spec allows for a messageHandler - // which is being used to handle the result of the callback or promise of the - // original wrapped consume function. - // TODO: https://github.com/newrelic/node-newrelic/issues/981 - return this.wrap(nodule, properties, function wrapConsume(shim, fn, fnName) { - if (!shim.isFunction(fn)) { - shim.logger.debug('Not wrapping %s (%s) as consume', fn, fnName) - return fn - } - - return createRecorder({ shim, fn, fnName, spec }) + return this.record(nodule, properties, function wrapConsume(shim, fn, fnName, args) { + return createRecorder({ spec, shim, fn, fnName, args }) }) } diff --git a/lib/shim/message-shim/subscribe-consume.js b/lib/shim/message-shim/subscribe-consume.js index 8a65e22bc7..228c601f02 100644 --- a/lib/shim/message-shim/subscribe-consume.js +++ b/lib/shim/message-shim/subscribe-consume.js @@ -84,12 +84,12 @@ function makeWrapConsumer({ spec, queue, destinationName, destNameIsArg }) { spec.queue = queue } - return function wrapConsumer(shim, consumer, cName) { + return function wrapConsumer(shim, consumer) { if (!shim.isFunction(consumer)) { return consumer } - const consumerWrapper = createConsumerWrapper({ shim, consumer, cName, spec }) + const consumerWrapper = createConsumerWrapper({ shim, consumer, spec }) return shim.bindCreateTransaction( consumerWrapper, new specs.TransactionSpec({ @@ -108,10 +108,9 @@ function makeWrapConsumer({ spec, queue, destinationName, destNameIsArg }) { * @param {MessageShim} params.shim instance of shim * @param {specs.MessageSubscribeSpec} params.spec spec for function * @param {Function} params.consumer function for consuming message - * @param {string} params.cName name of consumer function * @returns {Function} handler for the transaction being created */ -function createConsumerWrapper({ shim, spec, consumer, cName }) { +function createConsumerWrapper({ shim, spec, consumer }) { return function createConsumeTrans() { // If there is no transaction or we're in a pre-existing transaction, // then don't do anything. Note that the latter should never happen. @@ -123,7 +122,7 @@ function createConsumerWrapper({ shim, spec, consumer, cName }) { return consumer.apply(this, args) } - const msgDesc = spec.messageHandler.call(this, shim, consumer, cName, args) + const msgDesc = spec.messageHandler.call(this, shim, args) // If message could not be handled, immediately kill this transaction. if (!msgDesc) { diff --git a/lib/shim/shim.js b/lib/shim/shim.js index 10d6d65a77..54a7fca45a 100644 --- a/lib/shim/shim.js +++ b/lib/shim/shim.js @@ -799,12 +799,13 @@ function _applyRecorderSegment({ segment, ctx, args, segDesc, shim, fn, name }) return ret.then( function onThen(val) { segment.touch() - segDesc.after(shim, fn, name, null, val, segment) + // passing in error as some instrumentation checks if it's not equal to `null` + segDesc.after({ shim, fn, name, error, result: val, segment }) return val }, function onCatch(err) { segment.touch() - segDesc.after(shim, fn, name, err, null, segment) + segDesc.after({ shim, fn, name, error: err, segment }) throw err // NOTE: This is not an error from our instrumentation. } ) @@ -815,7 +816,7 @@ function _applyRecorderSegment({ segment, ctx, args, segDesc, shim, fn, name }) throw err // Just rethrowing this error, not our error! } finally { if (segDesc.after && (error || !promised)) { - segDesc.after(shim, fn, name, error, ret, segment) + segDesc.after({ shim, fn, name, error, result: ret, segment }) } } } @@ -977,10 +978,11 @@ function bindSegment(nodule, property, segment, full) { * Replaces the callback in an arguments array with one that has been bound to * the given segment. * - * - `bindCallbackSegment(args, cbIdx [, segment])` - * - `bindCallbackSegment(obj, property [, segment])` + * - `bindCallbackSegment(spec, args, cbIdx [, segment])` + * - `bindCallbackSegment(spec, obj, property [, segment])` * * @memberof Shim.prototype + * @param {Spec} spec spec to original wrapped function, used to call after method with arguments passed to callback * @param {Array | object} args * The arguments array to pull the cb from. * @param {number|string} cbIdx @@ -990,7 +992,7 @@ function bindSegment(nodule, property, segment, full) { * currently active segment. * @see Shim#bindSegment */ -function bindCallbackSegment(args, cbIdx, parentSegment) { +function bindCallbackSegment(spec, args, cbIdx, parentSegment) { if (!args) { return } @@ -1009,35 +1011,59 @@ function bindCallbackSegment(args, cbIdx, parentSegment) { cbIdx = normalizedCBIdx } - // Pull out the callback and make sure it is a function. + // Make sure cb is function before wrapping + if (this.isFunction(args[cbIdx])) { + wrapCallback({ shim: this, args, cbIdx, parentSegment, spec }) + } +} + +/** + * Wraps the callback and creates a segment for the callback function. + * It will also call an after hook with the arguments passed to callback + * + * @private + * @param {Object} params to function + * @param {Shim} params.shim instance of shim + * @param {Array | object} params.args + * The arguments array to pull the cb from. + * @param {number|string} params.cbIdx + * The index of the callback. + * @param {TraceSegment} [params.parentSegment] + * The segment to use as the callback segment's parent. Defaults to the + * currently active segment. + * @param {Spec} params.spec spec to original wrapped function, used to call after method with arguments passed to callback + * + */ +function wrapCallback({ shim, args, cbIdx, parentSegment, spec }) { const cb = args[cbIdx] - if (this.isFunction(cb)) { - const shim = this - const realParent = parentSegment || shim.getSegment() - args[cbIdx] = shim.wrap(cb, null, function callbackWrapper(shim, fn, name) { - return function wrappedCallback() { - if (realParent) { - realParent.opaque = false - } - const segment = _rawCreateSegment( - shim, - new specs.SegmentSpec({ - name: 'Callback: ' + name, - parent: realParent - }) - ) - - if (segment) { - segment.async = false - } + const realParent = parentSegment || shim.getSegment() + args[cbIdx] = shim.wrap(cb, null, function callbackWrapper(shim, fn, name) { + return function wrappedCallback() { + if (realParent) { + realParent.opaque = false + } + const segment = _rawCreateSegment( + shim, + new specs.SegmentSpec({ + name: 'Callback: ' + name, + parent: realParent + }) + ) - // CB may end the transaction so update the parent's time preemptively. - realParent && realParent.touch() - return shim.applySegment(cb, segment, true, this, arguments) + if (segment) { + segment.async = false } - }) - shim.storeSegment(args[cbIdx], realParent) - } + + if (spec?.after) { + spec.after({ shim, fn, name, args: arguments, segment: realParent }) + } + + // CB may end the transaction so update the parent's time preemptively. + realParent && realParent.touch() + return shim.applySegment(cb, segment, true, this, arguments) + } + }) + shim.storeSegment(args[cbIdx], realParent) } /** @@ -1792,7 +1818,7 @@ function _bindAllCallbacks(shim, fn, name, args, spec) { _bindCallback({ context: this, callback: spec.spec.callback, - binder: shim.bindCallbackSegment, + binder: shim.bindCallbackSegment.bind(shim, spec.spec), shim, fn, args, @@ -1806,7 +1832,7 @@ function _bindAllCallbacks(shim, fn, name, args, spec) { _bindCallback({ context: this, callback: spec.spec.rowCallback, - binder: shim.bindRowCallbackSegment || shim.bindCallbackSegment, + binder: shim?.bindRowCallbackSegment || shim?.bindCallbackSegment?.bind(shim, spec.spec), shim, fn, args, @@ -1954,7 +1980,7 @@ function wrapStreamListeners({ stream, shim, segment, specEvent }) { return function wrappedOn(onEvent) { if (onEvent !== specEvent && (onEvent === 'end' || onEvent === 'error')) { const args = argsToArray.apply(shim, arguments) - shim.bindCallbackSegment(args, shim.LAST, segment) + shim.bindCallbackSegment(specEvent, args, shim.LAST, segment) return fn.apply(this, args) } return fn.apply(this, arguments) diff --git a/lib/shim/specs/recorder.js b/lib/shim/specs/recorder.js index 6086c07854..e0d3ac6caa 100644 --- a/lib/shim/specs/recorder.js +++ b/lib/shim/specs/recorder.js @@ -24,14 +24,17 @@ const WrapSpec = require('./wrap') * The instrumented function must have been invoked synchronously. * * @typedef {Function} SpecAfterFunction - * @param {object} shim The shim used to instrument the external library. - * @param {Function} fn The function/method from the external library being + * @param {Object} params params to function + * @param {object} params.shim The shim used to instrument the external library. + * @param {Function} params.fn The function/method from the external library being * instrumented. - * @param {string} name The name of the current function. - * @param {Error|null} error If the instrumented function threw an error, this - * will be that error. - * @param {*} value The result returned by the instrumented function. - * @param {TraceSegment} segment The segment used while instrumenting the + * @param {string} params.name The name of the current function. + * @param {Error|null} [params.error] If the instrumented function threw an error, this + * will be that error. In the case of a callback this will be omitted. + * @param {Array} [params.args] arguments passed to a callback function getting instrumented. + * @param {*} [params.result] The result returned by the instrumented function. In the case of a wrapped + * callback, the data needed is in `params.args`. + * @param {TraceSegment} params.segment The segment used while instrumenting the * function. */ diff --git a/lib/shim/webframework-shim/middleware.js b/lib/shim/webframework-shim/middleware.js index 1144ae757b..d76f4cab86 100644 --- a/lib/shim/webframework-shim/middleware.js +++ b/lib/shim/webframework-shim/middleware.js @@ -176,10 +176,10 @@ function middlewareWithCallbackRecorder({ spec, typeDetails, metricName, isError parent: txInfo.segmentStack[txInfo.segmentStack.length - 1], recorder, parameters: params, - after: function afterExec(shim, _fn, _name, err) { - const errIsError = isError(shim, err) + after: function afterExec({ shim, error }) { + const errIsError = isError(shim, error) if (errIsError) { - assignError(txInfo, err) + assignError(txInfo, error) } else if (!nextWrapper && !isErrorWare && spec.appendPath) { txInfo.transaction.nameState.popPath(route) } @@ -243,12 +243,12 @@ function middlewareWithPromiseRecorder({ spec, typeDetails, metricName, isErrorW callback: nextWrapper, recorder, parameters: params, - after: function afterExec(shim, _fn, _name, err, result) { + after: function afterExec({ shim, error, result }) { if (shim._responsePredicate(args, result)) { txInfo.transaction.nameState.freeze() } - if (isError(shim, err)) { - assignError(txInfo, err) + if (isError(shim, error)) { + assignError(txInfo, error) } else { txInfo.errorHandled = true diff --git a/test/unit/shim/message-shim.test.js b/test/unit/shim/message-shim.test.js index 7776dcf192..780c6608da 100644 --- a/test/unit/shim/message-shim.test.js +++ b/test/unit/shim/message-shim.test.js @@ -476,53 +476,6 @@ tap.test('MessageShim', function (t) { }) }) - t.test('should add parameters to segment', function (t) { - function wrapMe(q, cb) { - cb() - return shim.getSegment() - } - - const wrapped = shim.recordConsume(wrapMe, { - destinationName: shim.FIRST, - callback: shim.LAST, - messageHandler: function () { - return { parameters: { a: 'a', b: 'b' } } - } - }) - - helper.runInTransaction(agent, function () { - const segment = wrapped('foo', function () {}) - const attributes = segment.getAttributes() - t.equal(attributes.a, 'a') - t.equal(attributes.b, 'b') - t.end() - }) - }) - - t.test('should not add parameters when disabled', function (t) { - agent.config.message_tracer.segment_parameters.enabled = false - function wrapMe(q, cb) { - cb() - return shim.getSegment() - } - - const wrapped = shim.recordConsume(wrapMe, { - destinationName: shim.FIRST, - callback: shim.LAST, - messageHandler: function () { - return { parameters: { a: 'a', b: 'b' } } - } - }) - - helper.runInTransaction(agent, function () { - const segment = wrapped('foo', function () {}) - const attributes = segment.getAttributes() - t.notOk(attributes.a) - t.notOk(attributes.b) - t.end() - }) - }) - t.test('should be able to get destinationName from arguments', function (t) { shim.recordConsume(wrappable, 'getActiveSegment', { destinationName: shim.FIRST, @@ -557,9 +510,8 @@ tap.test('MessageShim', function (t) { const wrapped = shim.recordConsume(wrapMe, { destinationName: shim.FIRST, promise: true, - messageHandler: function (shim, fn, name, message) { - t.equal(message, msg) - return { parameters: { a: 'a', b: 'b' } } + after: function ({ result }) { + t.equal(result, msg) } }) @@ -568,9 +520,6 @@ tap.test('MessageShim', function (t) { const duration = segment.getDurationInMillis() t.ok(duration > DELAY - 1, 'segment duration should be at least 100 ms') t.equal(message, msg) - const attributes = segment.getAttributes() - t.equal(attributes.a, 'a') - t.equal(attributes.b, 'b') }) }) }) @@ -621,26 +570,6 @@ tap.test('MessageShim', function (t) { }) }) - t.test('should invoke the spec in the context of the wrapped function', function (t) { - const original = wrappable.bar - let executed = false - shim.recordConsume(wrappable, 'bar', function (_, fn, name, args) { - executed = true - t.equal(fn, original) - t.equal(name, 'bar') - t.equal(this, wrappable) - t.same(args, ['a', 'b', 'c']) - - return { destinationName: 'foobar' } - }) - - helper.runInTransaction(agent, function () { - wrappable.bar('a', 'b', 'c') - t.ok(executed) - t.end() - }) - }) - t.test('should create a child segment when `opaque` is false', function (t) { shim.recordConsume(wrappable, 'withNested', function () { return { destinationName: 'foobar', opaque: false } diff --git a/test/unit/shim/shim.test.js b/test/unit/shim/shim.test.js index 3adbc2ce90..d5e215d564 100644 --- a/test/unit/shim/shim.test.js +++ b/test/unit/shim/shim.test.js @@ -979,14 +979,14 @@ tap.test('Shim', function (t) { return { name: 'test segment', callback: shim.LAST, - after(...args) { - t.equal(args.length, 6, 'should have 6 args to after hook') - const [, fn, fnName, err, val, segment] = args + after(args) { + t.equal(Object.keys(args).length, 6, 'should have 6 args to after hook') + const { fn, name, error, result, segment } = args t.equal(segment.name, 'test segment') - t.not(err) + t.not(error) t.same(fn, testAfter) - t.equal(fnName, testAfter.name) - t.equal(val, 'result') + t.equal(name, testAfter.name) + t.equal(result, 'result') } } }) @@ -1009,14 +1009,14 @@ tap.test('Shim', function (t) { return { name: 'test segment', callback: shim.LAST, - after(...args) { - t.equal(args.length, 6, 'should have 6 args to after hook') - const [, fn, fnName, expectedErr, val, segment] = args + after(args) { + t.equal(Object.keys(args).length, 6, 'should have 6 args to after hook') + const { fn, name, error, result, segment } = args t.equal(segment.name, 'test segment') - t.same(expectedErr, err) - t.equal(val, undefined) + t.same(error, err) + t.equal(result, undefined) t.same(fn, testAfter) - t.equal(fnName, testAfter.name) + t.equal(name, testAfter.name) } } }) @@ -1354,20 +1354,20 @@ tap.test('Shim', function (t) { }) t.test('should call after hook when promise resolves', (t) => { - const name = 'test segment' - const result = { returned: true } + const segmentName = 'test segment' + const expectedResult = { returned: true } const wrapped = shim.record(toWrap, function () { return { - name, + name: segmentName, promise: true, - after(...args) { - t.equal(args.length, 6, 'should have 6 args to after hook') - const [, fn, fnName, err, val, segment] = args + after(args) { + t.equal(Object.keys(args).length, 6, 'should have 6 args to after hook') + const { fn, name, error, result, segment } = args t.same(fn, toWrap) - t.equal(fnName, toWrap.name) - t.not(err) - t.same(val, result) - t.equal(segment.name, name) + t.equal(name, toWrap.name) + t.not(error) + t.same(result, expectedResult) + t.equal(segment.name, segmentName) t.end() } } @@ -1379,25 +1379,24 @@ tap.test('Shim', function (t) { }) setTimeout(function () { - promise.resolve(result) + promise.resolve(expectedResult) }, 5) }) t.test('should call after hook when promise reject', (t) => { - const name = 'test segment' - const result = { returned: true } + const segmentName = 'test segment' + const expectedResult = { returned: true } const wrapped = shim.record(toWrap, function () { return { - name, + name: segmentName, promise: true, - after(...args) { - t.equal(args.length, 6, 'should have 6 args to after hook') - const [, fn, fnName, err, val, segment] = args + after(args) { + t.equal(Object.keys(args).length, 5, 'should have 6 args to after hook') + const { fn, name, error, segment } = args t.same(fn, toWrap) - t.equal(fnName, toWrap.name) - t.same(err, result) - t.not(val) - t.equal(segment.name, name) + t.equal(name, toWrap.name) + t.same(error, expectedResult) + t.equal(segment.name, segmentName) t.end() } } @@ -1409,7 +1408,7 @@ tap.test('Shim', function (t) { }) setTimeout(function () { - promise.reject(result) + promise.reject(expectedResult) }, 5) }) }) @@ -2066,7 +2065,7 @@ tap.test('Shim', function (t) { t.test('should wrap the callback in place', function (t) { const args = ['a', cb, 'b'] - shim.bindCallbackSegment(args, shim.SECOND) + shim.bindCallbackSegment({}, args, shim.SECOND) const [, wrapped] = args t.ok(wrapped instanceof Function) @@ -2079,21 +2078,21 @@ tap.test('Shim', function (t) { t.test('should work with an array and numeric index', function (t) { const args = ['a', cb, 'b'] - shim.bindCallbackSegment(args, 1) + shim.bindCallbackSegment({}, args, 1) t.ok(shim.isWrapped(args[1])) t.end() }) t.test('should work with an object and a string index', function (t) { const opts = { a: 'a', cb: cb, b: 'b' } - shim.bindCallbackSegment(opts, 'cb') + shim.bindCallbackSegment({}, opts, 'cb') t.ok(shim.isWrapped(opts, 'cb')) t.end() }) t.test('should not error if `args` is `null`', function (t) { t.doesNotThrow(function () { - shim.bindCallbackSegment(null, 1) + shim.bindCallbackSegment({}, null, 1) }) t.end() }) @@ -2101,7 +2100,7 @@ tap.test('Shim', function (t) { t.test('should not error if the callback does not exist', function (t) { t.doesNotThrow(function () { const args = ['a'] - shim.bindCallbackSegment(args, 1) + shim.bindCallbackSegment({}, args, 1) }) t.end() }) @@ -2110,7 +2109,7 @@ tap.test('Shim', function (t) { let args t.doesNotThrow(function () { args = ['a'] - shim.bindCallbackSegment(args, 0) + shim.bindCallbackSegment({}, args, 0) }) t.notOk(shim.isWrapped(args[0])) @@ -2120,7 +2119,7 @@ tap.test('Shim', function (t) { t.test('should execute the callback', function (t) { const args = ['a', 'b', cb] - shim.bindCallbackSegment(args, shim.LAST) + shim.bindCallbackSegment({}, args, shim.LAST) t.notOk(cbCalled) args[2]() @@ -2133,7 +2132,7 @@ tap.test('Shim', function (t) { const args = [wrappable.getActiveSegment] const segment = wrappable.getActiveSegment() const parent = shim.createSegment('test segment') - shim.bindCallbackSegment(args, shim.LAST, parent) + shim.bindCallbackSegment({}, args, shim.LAST, parent) const cbSegment = args[0]() t.not(cbSegment, segment) @@ -2148,7 +2147,7 @@ tap.test('Shim', function (t) { const args = [wrappable.getActiveSegment] const parent = shim.createSegment('test segment') parent.opaque = true - shim.bindCallbackSegment(args, shim.LAST, parent) + shim.bindCallbackSegment({}, args, shim.LAST, parent) const cbSegment = args[0]() t.not(cbSegment, parent) @@ -2162,11 +2161,31 @@ tap.test('Shim', function (t) { helper.runInTransaction(agent, function () { const args = [wrappable.getActiveSegment] const segment = wrappable.getActiveSegment() - shim.bindCallbackSegment(args, shim.LAST) + shim.bindCallbackSegment({}, args, shim.LAST) + const cbSegment = args[0]() + + t.not(cbSegment, segment) + t.compareSegments(segment, [cbSegment]) + t.end() + }) + }) + + t.test('should call the after hook if specified on the spec', function (t) { + let executed = false + const spec = { + after() { + executed = true + } + } + helper.runInTransaction(agent, function () { + const args = [wrappable.getActiveSegment] + const segment = wrappable.getActiveSegment() + shim.bindCallbackSegment(spec, args, shim.LAST) const cbSegment = args[0]() t.not(cbSegment, segment) t.compareSegments(segment, [cbSegment]) + t.ok(executed) t.end() }) }) diff --git a/test/versioned/amqplib/amqp-utils.js b/test/versioned/amqplib/amqp-utils.js index e821dc86bc..0599a08d90 100644 --- a/test/versioned/amqplib/amqp-utils.js +++ b/test/versioned/amqplib/amqp-utils.js @@ -227,7 +227,7 @@ function verifyProduce(t, tx, exchangeName, routingKey) { } } -function verifyGet(t, tx, exchangeName, routingKey, queue) { +function verifyGet({ t, tx, exchangeName, routingKey, queue, assertAttr }) { const isCallback = !!metrics.findSegment(tx.trace.root, 'Callback: ') const produceName = 'MessageBroker/RabbitMQ/Exchange/Produce/Named/' + exchangeName const consumeName = 'MessageBroker/RabbitMQ/Exchange/Consume/Named/' + queue @@ -237,6 +237,11 @@ function verifyGet(t, tx, exchangeName, routingKey, queue) { t.assertSegments(tx.trace.root, [produceName, consumeName]) } t.assertMetrics(tx.metrics, [[{ name: produceName }], [{ name: consumeName }]], false, false) + if (assertAttr) { + const segment = metrics.findSegment(tx.trace.root, consumeName) + const attributes = segment.getAttributes() + t.equal(attributes.routing_key, routingKey, 'should have routing key on get') + } } function verifyPurge(t, tx) { diff --git a/test/versioned/amqplib/callback.tap.js b/test/versioned/amqplib/callback.tap.js index 8b5cc91c7e..0d8d5fa186 100644 --- a/test/versioned/amqplib/callback.tap.js +++ b/test/versioned/amqplib/callback.tap.js @@ -232,7 +232,58 @@ tap.test('amqplib callback instrumentation', function (t) { channel.ack(msg) setImmediate(function () { tx.end() - amqpUtils.verifyGet(t, tx, exchange, 'consume-tx-key', queue) + amqpUtils.verifyGet({ + t, + tx, + exchangeName: exchange, + routingKey: 'consume-tx-key', + queue, + assertAttr: true + }) + t.end() + }) + }) + }) + }) + }) + }) + }) + + t.test('get a message disable parameters', function (t) { + agent.config.message_tracer.segment_parameters.enabled = false + const exchange = amqpUtils.DIRECT_EXCHANGE + let queue = null + + channel.assertExchange(exchange, 'direct', null, function (err) { + t.error(err, 'should not error asserting exchange') + + channel.assertQueue('', { exclusive: true }, function (err, res) { + t.error(err, 'should not error asserting queue') + queue = res.queue + + channel.bindQueue(queue, exchange, 'consume-tx-key', null, function (err) { + t.error(err, 'should not error binding queue') + + helper.runInTransaction(agent, function (tx) { + channel.publish(exchange, 'consume-tx-key', Buffer.from('hello')) + channel.get(queue, {}, function (err, msg) { + t.notOk(err, 'should not cause an error') + t.ok(msg, 'should receive a message') + + amqpUtils.verifyTransaction(t, tx, 'get') + const body = msg.content.toString('utf8') + t.equal(body, 'hello', 'should receive expected body') + + channel.ack(msg) + setImmediate(function () { + tx.end() + amqpUtils.verifyGet({ + t, + tx, + exchangeName: exchange, + routingKey: 'consume-tx-key', + queue + }) t.end() }) }) diff --git a/test/versioned/amqplib/promises.tap.js b/test/versioned/amqplib/promises.tap.js index 73ceea75d1..89563a73e6 100644 --- a/test/versioned/amqplib/promises.tap.js +++ b/test/versioned/amqplib/promises.tap.js @@ -235,7 +235,61 @@ tap.test('amqplib promise instrumentation', function (t) { }) .then(function () { tx.end() - amqpUtils.verifyGet(t, tx, exchange, 'consume-tx-key', queue) + amqpUtils.verifyGet({ + t, + tx, + exchangeName: exchange, + routingKey: 'consume-tx-key', + queue, + assertAttr: true + }) + t.end() + }) + }) + }) + .catch(function (err) { + t.fail(err) + t.end() + }) + }) + + t.test('get a message disable parameters', function (t) { + agent.config.message_tracer.segment_parameters.enabled = false + let queue = null + const exchange = amqpUtils.DIRECT_EXCHANGE + + channel + .assertExchange(exchange, 'direct') + .then(function () { + return channel.assertQueue('', { exclusive: true }) + }) + .then(function (res) { + queue = res.queue + return channel.bindQueue(queue, exchange, 'consume-tx-key') + }) + .then(function () { + return helper.runInTransaction(agent, function (tx) { + channel.publish(exchange, 'consume-tx-key', Buffer.from('hello')) + return channel + .get(queue) + .then(function (msg) { + t.ok(msg, 'should receive a message') + + const body = msg.content.toString('utf8') + t.equal(body, 'hello', 'should receive expected body') + + amqpUtils.verifyTransaction(t, tx, 'get') + channel.ack(msg) + }) + .then(function () { + tx.end() + amqpUtils.verifyGet({ + t, + tx, + exchangeName: exchange, + routingKey: 'consume-tx-key', + queue + }) t.end() }) })