Skip to content

Commit

Permalink
chore: Implemented DT header injection for message specs
Browse files Browse the repository at this point in the history
  • Loading branch information
jsumners-nr committed Jun 5, 2024
1 parent 8b1fa5d commit 3a420b6
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 11 deletions.
28 changes: 20 additions & 8 deletions lib/instrumentation/kafkajs/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@ module.exports = function instrumentProducer({ shim, kafkajs }) {

shim.recordProduce(producer, 'send', function nrSend(shim, fn, n, args) {
const data = args[0]
const firstMessage = getByPath(data, 'messages[0]')

if (firstMessage) {
firstMessage.headers = firstMessage.headers ?? {}
}

return new MessageSpec({
promise: true,
destinationName: data.topic,
destinationType: shim.TOPIC,
headers: firstMessage.headers
messageHeaders: (inject) => {
return data.messages.map((msg) => {
if (msg.headers) {
return inject(msg.headers, true)
}
msg.headers = {}
return inject(msg.headers, true)
})
}
})
})

Expand All @@ -49,7 +51,17 @@ module.exports = function instrumentProducer({ shim, kafkajs }) {
promise: true,
destinationName: data.topicMessages[0].topic,
destinationType: shim.TOPIC,
headers: firstMessage.headers
messageHeaders: (inject) => {
return data.topicMessages.map((tm) => {
return tm.messages.map((m) => {
if (m.headers) {
return inject(m.headers, true)
}
m.headers = {}
return inject(m.headers, true)
})
})
}
})
})

Expand Down
10 changes: 10 additions & 0 deletions lib/shim/message-shim/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,16 @@ function recordProduce(nodule, properties, recordNamer) {
if (msgDesc.headers) {
shim.insertCATRequestHeaders(msgDesc.headers, true)
}

if (msgDesc.messageHeaders) {
// Some message broker clients, like kafkajs, allow for sending multiple
// messages in one send. Clients are likely to pick up such messages
// individually. Thus, we need to propagate any distributed trace
// headers to every message in the payload.
msgDesc.messageHeaders((headers, altNames = true) => {
shim.insertCATRequestHeaders(headers, altNames)
})
}
}
msgDesc.recorder = genericRecorder
return msgDesc
Expand Down
23 changes: 23 additions & 0 deletions lib/shim/specs/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,23 @@ const OperationSpec = require('./operation')
* @property {number|string} [destinationName]
* @property {string|null} [destinationType]
* @property {Object<string, string>|null} [headers]
* @property {MessageBrokerHeadersFn|null} [messageHeaders]
* @property {MessageHandlerFunction|null} [messageHandler]
* @property {number|string|null} [queue]
* @property {string|null} [routingKey]
*/

/**
* @typedef {Function} MessageBrokerHeadersFn
* @param {Function} inject A function with the signature
* `function(headers, useAlternateHeaderNames)`. The passed in headers object
* will be updated with distributed trace headers. When the second parameter
* is `true` (the default), alternate style (not HTTP style) header names will
* be used, i.e. names that are safe for non-HTTP transports.
* @returns {object[]} An array of objects, wherein each object will be updated
* with distributed trace headers.
*/

/**
* Spec that describes how to instrument a message broker.
*/
Expand Down Expand Up @@ -46,6 +58,16 @@ class MessageSpec extends OperationSpec {
*/
headers

/**
* Function that returns an iterable set of message header objects. The
* header objects will be modified to include distributed tracing headers so
* that they will be included in the payloads delivered, and read from, the
* message broker.
*
* @type {MessageBrokerHeadersFn}
*/
messageHeaders

/**
* A function to handle the result of the instrumented message broker
* function.
Expand Down Expand Up @@ -81,6 +103,7 @@ class MessageSpec extends OperationSpec {
this.destinationName = params.destinationName ?? null
this.destinationType = params.destinationType ?? null
this.headers = params.headers ?? null
this.messageHeaders = params.messageHeaders ?? null
this.messageHandler = params.messageHandler ?? null
this.queue = params.queue ?? null
this.routingKey = params.routingKey ?? null
Expand Down
55 changes: 55 additions & 0 deletions test/unit/shim/message-shim.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const DESTINATIONS = require('../../../lib/config/attribute-filter').DESTINATION
const hashes = require('../../../lib/util/hashes')
const helper = require('../../lib/agent_helper')
const MessageShim = require('../../../lib/shim/message-shim')
const { MessageSpec } = require('../../../lib/shim/specs')

tap.test('MessageShim', function (t) {
t.autoend()
Expand Down Expand Up @@ -49,6 +50,7 @@ tap.test('MessageShim', function (t) {
getActiveSegment: function () {
return agent.tracer.getSegment()
},
sendMessages: function () {},
withNested: function () {
const segment = agent.tracer.getSegment()
segment.add('ChildSegment')
Expand Down Expand Up @@ -385,6 +387,59 @@ tap.test('MessageShim', function (t) {
})
})

t.test('should insert distributed trace headers in all messages', function (t) {
t.plan(1)
const messages = [{}, { headers: { foo: 'foo' } }, {}]

shim.recordProduce(
wrappable,
'sendMessages',
() =>
new MessageSpec({
messageHeaders(inject) {
for (const msg of messages) {
if (msg.headers) {
inject(msg.headers)
continue
}
msg.headers = {}
inject(msg.headers)
}
}
})
)

agent.on('transactionFinished', () => {
t.match(messages, [
{
headers: {
newrelic: '',
traceparent: /^00-/
}
},
{
headers: {
newrelic: '',
traceparent: /^00-/,
foo: 'foo'
}
},
{
headers: {
newrelic: '',
traceparent: /^00-/
}
}
])
t.end()
})

helper.runInTransaction(agent, (tx) => {
wrappable.sendMessages()
tx.end()
})
})

t.test('should create message broker metrics', function (t) {
let transaction = null

Expand Down
10 changes: 7 additions & 3 deletions test/versioned/kafkajs/kafka.tap.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ tap.afterEach(async (t) => {
})

tap.test('send records correctly', (t) => {
t.plan(4)
t.plan(6)

const { agent, consumer, producer, topic } = t.context
const message = 'test message'
Expand Down Expand Up @@ -83,7 +83,9 @@ tap.test('send records correctly', (t) => {
consumer.run({
eachMessage: async ({ message: actualMessage }) => {
t.equal(actualMessage.value.toString(), message)
t.match(actualMessage.headers['x-foo'].toString(), 'foo')
t.equal(actualMessage.headers['x-foo'].toString(), 'foo')
t.equal(actualMessage.headers.newrelic.toString(), '')
t.equal(actualMessage.headers.traceparent.toString().startsWith('00-'), true)
resolve()
}
})
Expand Down Expand Up @@ -164,7 +166,7 @@ tap.test('send passes along DT headers', (t) => {
})

tap.test('sendBatch records correctly', (t) => {
t.plan(5)
t.plan(7)

const { agent, consumer, producer, topic } = t.context
const message = 'test message'
Expand Down Expand Up @@ -195,6 +197,8 @@ tap.test('sendBatch records correctly', (t) => {
eachMessage: async ({ message: actualMessage }) => {
t.equal(actualMessage.value.toString(), message)
t.match(actualMessage.headers['x-foo'].toString(), 'foo')
t.equal(actualMessage.headers.newrelic.toString(), '')
t.equal(actualMessage.headers.traceparent.toString().startsWith('00-'), true)
resolve()
}
})
Expand Down

0 comments on commit 3a420b6

Please sign in to comment.