Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jsumners-nr committed Jun 6, 2024
1 parent b5640bd commit e8030f9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 18 deletions.
22 changes: 14 additions & 8 deletions test/versioned/kafkajs/kafka.tap.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ tap.test('send passes along DT headers', (t) => {
agent.config.primary_application_id = 'app_1'
agent.config.trusted_account_key = 42
let produceTx = null
let consumeTx = null
const consumeTxs = []
let txCount = 0

agent.on('transactionFinished', (tx) => {
Expand All @@ -130,11 +130,11 @@ tap.test('send passes along DT headers', (t) => {
if (tx.name === expectedName) {
produceTx = tx
} else {
consumeTx = tx
consumeTxs.push(tx)
}

if (txCount === 2) {
utils.verifyDistributedTrace({ t, consumeTx, produceTx })
if (txCount === 3) {
utils.verifyDistributedTrace({ t, consumeTxs, produceTx })
t.end()
}
})
Expand All @@ -144,10 +144,13 @@ tap.test('send passes along DT headers', (t) => {
await consumer.subscribe({ topic, fromBeginning: true })

const promise = new Promise((resolve) => {
let msgCount = 0
consumer.run({
eachMessage: async ({ message: actualMessage }) => {
t.equal(actualMessage.value.toString(), 'one')
resolve()
eachMessage: async () => {
++msgCount
if (msgCount === 2) {
resolve()
}
}
})
})
Expand All @@ -156,7 +159,10 @@ tap.test('send passes along DT headers', (t) => {
await producer.send({
acks: 1,
topic,
messages: [{ key: 'key', value: 'one' }]
messages: [
{ key: 'key', value: 'one' },
{ key: 'key2', value: 'two' }
]
})

await promise
Expand Down
20 changes: 10 additions & 10 deletions test/versioned/kafkajs/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,18 @@ utils.verifyConsumeTransaction = ({ t, tx, topic, clientId }) => {
* Asserts the properties on both the produce and consume transactions
* @param {object} params function params
* @param {object} params.t test instance
* @param {object} params.consumeTx consumer transaction
* @param {object} params.consumeTxs consumer transactions
* @param {object} params.produceTx produce transaction
*/
utils.verifyDistributedTrace = ({ t, consumeTx, produceTx }) => {
utils.verifyDistributedTrace = ({ t, consumeTxs, produceTx }) => {
t.ok(produceTx.isDistributedTrace, 'should mark producer as distributed')
t.ok(consumeTx.isDistributedTrace, 'should mark consumer as distributed')

t.equal(consumeTx.incomingCatId, null, 'should not set old CAT properties')

t.equal(produceTx.id, consumeTx.parentId, 'should have proper parent id')
t.equal(produceTx.traceId, consumeTx.traceId, 'should have proper trace id')
const produceSegment = produceTx.trace.root.children[3]
t.equal(produceSegment.id, consumeTx.parentSpanId, 'should have proper parentSpanId')
t.equal(consumeTx.parentTransportType, 'Kafka', 'should have correct transport type')
consumeTxs.forEach((consumeTx) => {
t.ok(consumeTx.isDistributedTrace, 'should mark consumer as distributed')
t.equal(consumeTx.incomingCatId, null, 'should not set old CAT properties')
t.equal(produceTx.id, consumeTx.parentId, 'should have proper parent id')
t.equal(produceTx.traceId, consumeTx.traceId, 'should have proper trace id')
t.equal(produceSegment.id, consumeTx.parentSpanId, 'should have proper parentSpanId')
t.equal(consumeTx.parentTransportType, 'Kafka', 'should have correct transport type')
})
}

0 comments on commit e8030f9

Please sign in to comment.