-
Notifications
You must be signed in to change notification settings - Fork 404
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Added instrumentation for
kafkajs.Kafka.consumer
(#2244)
- Loading branch information
Showing
14 changed files
with
416 additions
and
56 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,5 +4,4 @@ | |
*/ | ||
|
||
'use strict' | ||
|
||
module.exports = require('./kafkajs/index') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
/* | ||
* Copyright 2024 New Relic Corporation. All rights reserved. | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
'use strict' | ||
const { kafkaCtx } = require('../../symbols') | ||
const { MessageSpec, MessageSubscribeSpec, RecorderSpec } = require('../../shim/specs') | ||
const { DESTINATIONS } = require('../../config/attribute-filter') | ||
const CONSUMER_METHODS = [ | ||
'connect', | ||
'disconnect', | ||
'subscribe', | ||
'stop', | ||
'commitOffsets', | ||
'seek', | ||
'pause', | ||
'resume' | ||
] | ||
const SEGMENT_PREFIX = 'kafkajs.Kafka.consumer#' | ||
|
||
module.exports = function instrumentConsumer({ shim, kafkajs }) { | ||
shim.wrap(kafkajs.Kafka.prototype, 'consumer', function wrapConsumer(shim, orig) { | ||
return function wrappedConsumer() { | ||
const args = shim.argsToArray.apply(shim, arguments) | ||
const consumer = orig.apply(this, args) | ||
consumer.on(consumer.events.REQUEST, function listener(data) { | ||
// storing broker for when we add `host`, `port` to messaging spans | ||
consumer[kafkaCtx] = { | ||
clientId: data?.payload?.clientId, | ||
broker: data?.payload.broker | ||
} | ||
}) | ||
shim.record(consumer, CONSUMER_METHODS, function wrapper(shim, fn, name) { | ||
return new RecorderSpec({ | ||
name: `${SEGMENT_PREFIX}${name}`, | ||
promise: true | ||
}) | ||
}) | ||
shim.recordSubscribedConsume( | ||
consumer, | ||
'run', | ||
new MessageSubscribeSpec({ | ||
name: `${SEGMENT_PREFIX}#run`, | ||
destinationType: shim.TOPIC, | ||
promise: true, | ||
consumer: shim.FIRST, | ||
functions: ['eachMessage'], | ||
messageHandler: handler.bind(null, consumer) | ||
}) | ||
) | ||
return consumer | ||
} | ||
}) | ||
} | ||
|
||
/** | ||
* Message handler that extracts the topic and headers from message being consumed. | ||
* | ||
* This also sets some metrics for byte length of message, and number of messages. | ||
* Lastly, adds tx attributes for byteCount and clientId | ||
* | ||
* @param {object} consumer the instance of kafka consumer | ||
* @param {MessageShim} shim instance of shim | ||
* @param {Array} args arguments passed to the `eachMessage` function of the `consumer.run` | ||
* @returns {MessageSpec} spec for message handling | ||
*/ | ||
function handler(consumer, shim, args) { | ||
const [data] = args | ||
const { topic } = data | ||
const segment = shim.getActiveSegment() | ||
|
||
if (segment?.transaction) { | ||
const tx = segment.transaction | ||
const byteLength = data?.message.value?.byteLength | ||
const metricPrefix = `Message/Kafka/Topic/Named/${topic}/Received` | ||
// This will always be 1 | ||
tx.metrics.getOrCreateMetric(`${metricPrefix}/Messages`).recordValue(1) | ||
if (byteLength) { | ||
tx.metrics.measureBytes(`${metricPrefix}/Bytes`, byteLength) | ||
tx.trace.attributes.addAttribute( | ||
DESTINATIONS.TRANS_SCOPE, | ||
'kafka.consume.byteCount', | ||
byteLength | ||
) | ||
} | ||
if (consumer?.[kafkaCtx]) { | ||
tx.trace.attributes.addAttribute( | ||
DESTINATIONS.TRANS_EVENT, | ||
'kafka.consume.client_id', | ||
consumer[kafkaCtx].clientId | ||
) | ||
} | ||
} | ||
|
||
return new MessageSpec({ | ||
destinationType: `Topic/Consume`, | ||
destinationName: data?.topic, | ||
headers: data?.message?.headers | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.