Skip to content

Commit

Permalink
wip: show how we could add headers per message
Browse files Browse the repository at this point in the history
  • Loading branch information
bizob2828 committed Jun 3, 2024
1 parent e9029d3 commit 65fa31e
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 4 deletions.
13 changes: 12 additions & 1 deletion lib/instrumentation/kafkajs/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,21 @@ module.exports = function instrumentProducer({ shim, kafkajs }) {
firstMessage.headers = firstMessage.headers ?? {}
}

debugger

Check failure on line 32 in lib/instrumentation/kafkajs/producer.js

View workflow job for this annotation

GitHub Actions / lint (lts/*)

Unexpected 'debugger' statement
return new MessageSpec({
destinationName: data.topic,
destinationType: shim.TOPIC,
headers: firstMessage.headers
messageHeaders: () => {
debugger

Check failure on line 37 in lib/instrumentation/kafkajs/producer.js

View workflow job for this annotation

GitHub Actions / lint (lts/*)

Unexpected 'debugger' statement
return data.messages.map((message) => {
if (message.headers) {
return message.headers
} else {

Check failure on line 41 in lib/instrumentation/kafkajs/producer.js

View workflow job for this annotation

GitHub Actions / lint (lts/*)

Unnecessary 'else' after 'return'
message.headers = {}
return message.headers
}
})
}
})
})

Expand Down
8 changes: 8 additions & 0 deletions lib/shim/message-shim/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,14 @@ function recordProduce(nodule, properties, recordNamer) {
if (msgDesc.headers) {
shim.insertCATRequestHeaders(msgDesc.headers, true)
}

if (msgDesc.messageHeaders) {
const headers = msgDesc.messageHeaders()
for (const msgHeader of headers) {
debugger

Check failure on line 253 in lib/shim/message-shim/index.js

View workflow job for this annotation

GitHub Actions / lint (lts/*)

Unexpected 'debugger' statement
shim.insertCATRequestHeaders(msgHeader, true)
}
}
}
msgDesc.recorder = genericRecorder
return msgDesc
Expand Down
8 changes: 8 additions & 0 deletions lib/shim/specs/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ class MessageSpec extends OperationSpec {
* @type {Object<string, string>|null}
*/
headers

Check failure on line 48 in lib/shim/specs/message.js

View workflow job for this annotation

GitHub Actions / lint (lts/*)

Delete `··`
/**
* Function to extract headers to insert DT/CAT headers

Check failure on line 50 in lib/shim/specs/message.js

View workflow job for this annotation

GitHub Actions / lint (lts/*)

Delete `·`
*
* @type {Function}
*/
messageHeaders

/**
* A function to handle the result of the instrumented message broker
Expand Down Expand Up @@ -81,6 +88,7 @@ class MessageSpec extends OperationSpec {
this.destinationName = params.destinationName ?? null
this.destinationType = params.destinationType ?? null
this.headers = params.headers ?? null
this.messageHeaders = params.messageHeaders ?? null
this.messageHandler = params.messageHandler ?? null
this.queue = params.queue ?? null
this.routingKey = params.routingKey ?? null
Expand Down
15 changes: 12 additions & 3 deletions test/versioned/kafkajs/kafka.tap.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ const utils = require('./utils')
const broker = `${params.kafka_host}:${params.kafka_port}`

tap.beforeEach(async (t) => {
t.context.agent = helper.instrumentMockedAgent()
t.context.agent = helper.instrumentMockedAgent({
feature_flag: {
kafkajs_instrumentation: true
}
})

const { Kafka, logLevel } = require('kafkajs')
t.context.Kafka = Kafka
Expand Down Expand Up @@ -123,12 +127,13 @@ tap.test('send passes along DT headers', (t) => {
agent.on('transactionFinished', (tx) => {
t.equal(tx.isDistributedTrace, true)

const headers = {}
/*const headers = {}

Check failure on line 130 in test/versioned/kafkajs/kafka.tap.js

View workflow job for this annotation

GitHub Actions / lint (lts/*)

Expected space or tab after '/*' in comment
tx.traceContext.addTraceContextHeaders(headers)
t.match(headers, {
traceparent: /00-4bf92f3577b34da6a3ce929d0e0e4736-[a-z0-9]{16}-00/,
tracestate: /42@nr=0-0-account_1-app_1-[a-z0-9]{16}-[a-z0-9]{16}-0-0.123000-1717426365982/
})
*/

t.end()
})
Expand All @@ -137,7 +142,7 @@ tap.test('send passes along DT headers', (t) => {
// This acceptTraceContextPayload is how we are simulating that the agent
// received a distributed trace context that has resulted in the Kafka
// payload production.
tx.acceptTraceContextPayload(traceparent, tracestate)
//tx.acceptTraceContextPayload(traceparent, tracestate)

Check failure on line 145 in test/versioned/kafkajs/kafka.tap.js

View workflow job for this annotation

GitHub Actions / lint (lts/*)

Expected space or tab after '//' in comment

await consumer.subscribe({ topic, fromBeginning: true })

Expand All @@ -146,6 +151,10 @@ tap.test('send passes along DT headers', (t) => {
consumer.run({
eachMessage: async ({ message: actualMessage }) => {
t.equal(messages.includes(actualMessage.value.toString()), true)
const expectedTraceState = actualMessage.headers.tracestate.toString()
const expectedTraceParent = actualMessage.headers.traceparent.toString()
t.equal(expectedTraceState, tracestate)
t.equal(expectedTraceParent, traceparent)
msgCount += 1
if (msgCount === 3) {
resolve()
Expand Down

0 comments on commit 65fa31e

Please sign in to comment.