Skip to content

Commit

Permalink
[DSM] Set checkpoints for DSM even when there is no context if the se…
Browse files Browse the repository at this point in the history
…rvice is instrumented and fix typo (#4851)

* [DSM] Set checkpoints for DSM with SQS & Kinesis for consumers even when the producer did not have DSM enabled

* [DSM] Send checkpoints to DSM if its enabled even if there is no streamName
  • Loading branch information
ericfirth authored Nov 15, 2024
1 parent a8896ee commit 51bea54
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 15 deletions.
13 changes: 7 additions & 6 deletions packages/datadog-plugin-aws-sdk/src/services/kinesis.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,15 @@ class Kinesis extends BaseAwsSdkPlugin {
response.Records.forEach(record => {
const parsedAttributes = JSON.parse(Buffer.from(record.Data).toString())

if (
parsedAttributes?._datadog && streamName
) {
const payloadSize = getSizeOrZero(record.Data)
const payloadSize = getSizeOrZero(record.Data)
if (parsedAttributes?._datadog) {
this.tracer.decodeDataStreamsContext(parsedAttributes._datadog)
this.tracer
.setCheckpoint(['direction:in', `topic:${streamName}`, 'type:kinesis'], span, payloadSize)
}
const tags = streamName
? ['direction:in', `topic:${streamName}`, 'type:kinesis']
: ['direction:in', 'type:kinesis']
this.tracer
.setCheckpoint(tags, span, payloadSize)
})
}

Expand Down
16 changes: 8 additions & 8 deletions packages/datadog-plugin-aws-sdk/src/services/sqs.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Sqs extends BaseAwsSdkPlugin {
// extract DSM context after as we might not have a parent-child but may have a DSM context

this.responseExtractDSMContext(
request.operation, request.params, response, span || null, { parsedMessageAttributes }
request.operation, request.params, response, span || null, { parsedAttributes: parsedMessageAttributes }
)
})

Expand Down Expand Up @@ -195,16 +195,16 @@ class Sqs extends BaseAwsSdkPlugin {
parsedAttributes = this.parseDatadogAttributes(message.MessageAttributes._datadog)
}
}
const payloadSize = getHeadersSize({
Body: message.Body,
MessageAttributes: message.MessageAttributes
})
const queue = params.QueueUrl.split('/').pop()
if (parsedAttributes) {
const payloadSize = getHeadersSize({
Body: message.Body,
MessageAttributes: message.MessageAttributes
})
const queue = params.QueueUrl.split('/').pop()
this.tracer.decodeDataStreamsContext(parsedAttributes)
this.tracer
.setCheckpoint(['direction:in', `topic:${queue}`, 'type:sqs'], span, payloadSize)
}
this.tracer
.setCheckpoint(['direction:in', `topic:${queue}`, 'type:sqs'], span, payloadSize)
})
}

Expand Down
26 changes: 26 additions & 0 deletions packages/datadog-plugin-aws-sdk/test/kinesis.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,32 @@ describe('Kinesis', function () {
})
})

it('emits DSM stats to the agent during Kinesis getRecord when the putRecord was done without DSM enabled', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = 0
// we should have only have 1 stats point since we only had 1 put operation
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived += statsBuckets.Stats.length
})
}
}, { timeoutMs: 10000 })
expect(statsPointsReceived).to.equal(1)
expect(agent.dsmStatsExistWithParentHash(agent, '0')).to.equal(true)
}, { timeoutMs: 10000 }).then(done, done)

agent.reload('aws-sdk', { kinesis: { dsmEnabled: false } }, { dsmEnabled: false })
helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => {
if (err) return done(err)

agent.reload('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true })
helpers.getTestData(kinesis, streamNameDSM, data, (err) => {
if (err) return done(err)
})
})
})

it('emits DSM stats to the agent during Kinesis putRecords', done => {
// we need to stub Date.now() to ensure a new stats bucket is created for each call
// otherwise, all stats checkpoints will be combined into a single stats points
Expand Down
40 changes: 40 additions & 0 deletions packages/datadog-plugin-aws-sdk/test/sqs.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const { rawExpectedSchema } = require('./sqs-naming')

const queueName = 'SQS_QUEUE_NAME'
const queueNameDSM = 'SQS_QUEUE_NAME_DSM'
const queueNameDSMConsumerOnly = 'SQS_QUEUE_NAME_DSM_CONSUMER_ONLY'

const getQueueParams = (queueName) => {
return {
Expand All @@ -20,6 +21,7 @@ const getQueueParams = (queueName) => {

const queueOptions = getQueueParams(queueName)
const queueOptionsDsm = getQueueParams(queueNameDSM)
const queueOptionsDsmConsumerOnly = getQueueParams(queueNameDSMConsumerOnly)

describe('Plugin', () => {
describe('aws-sdk (sqs)', function () {
Expand All @@ -30,6 +32,7 @@ describe('Plugin', () => {
let sqs
const QueueUrl = 'http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME'
const QueueUrlDsm = 'http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME_DSM'
const QueueUrlDsmConsumerOnly = 'http://127.0.0.1:4566/00000000000000000000/SQS_QUEUE_NAME_DSM_CONSUMER_ONLY'
let tracer

const sqsClientName = moduleName === '@aws-sdk/smithy-client' ? '@aws-sdk/client-sqs' : 'aws-sdk'
Expand Down Expand Up @@ -412,10 +415,25 @@ describe('Plugin', () => {
})
})

before(done => {
AWS = require(`../../../versions/${sqsClientName}@${version}`).get()

sqs = new AWS.SQS({ endpoint: 'http://127.0.0.1:4566', region: 'us-east-1' })
sqs.createQueue(queueOptionsDsmConsumerOnly, (err, res) => {
if (err) return done(err)

done()
})
})

after(done => {
sqs.deleteQueue({ QueueUrl: QueueUrlDsm }, done)
})

after(done => {
sqs.deleteQueue({ QueueUrl: QueueUrlDsmConsumerOnly }, done)
})

after(() => {
return agent.close({ ritmReset: false })
})
Expand Down Expand Up @@ -546,6 +564,28 @@ describe('Plugin', () => {
})
})

it('Should emit DSM stats when receiving a message when the producer was not instrumented', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = 0
// we should have 2 dsm stats points
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived += statsBuckets.Stats.length
})
}
})
expect(statsPointsReceived).to.equal(1)
expect(agent.dsmStatsExistWithParentHash(agent, '0')).to.equal(true)
}).then(done, done)

agent.reload('aws-sdk', { sqs: { dsmEnabled: false } }, { dsmEnabled: false })
sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsmConsumerOnly }, () => {
agent.reload('aws-sdk', { sqs: { dsmEnabled: true } }, { dsmEnabled: true })
sqs.receiveMessage({ QueueUrl: QueueUrlDsmConsumerOnly, MessageAttributeNames: ['.*'] }, () => {})
})
})

it('Should emit DSM stats to the agent when sending batch messages', done => {
// we need to stub Date.now() to ensure a new stats bucket is created for each call
// otherwise, all stats checkpoints will be combined into a single stats points
Expand Down
21 changes: 20 additions & 1 deletion packages/dd-trace/test/plugins/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,24 @@ function dsmStatsExist (agent, expectedHash, expectedEdgeTags) {
return hashFound
}

function dsmStatsExistWithParentHash (agent, expectedParentHash) {
const dsmStats = agent.getDsmStats()
let hashFound = false
if (dsmStats.length !== 0) {
for (const statsTimeBucket of dsmStats) {
for (const statsBucket of statsTimeBucket.Stats) {
for (const stats of statsBucket.Stats) {
if (stats.ParentHash.toString() === expectedParentHash) {
hashFound = true
return hashFound
}
}
}
}
}
return hashFound
}

function addEnvironmentVariablesToHeaders (headers) {
// get all environment variables that start with "DD_"
const ddEnvVars = new Map(
Expand Down Expand Up @@ -424,5 +442,6 @@ module.exports = {
tracer,
testedPlugins,
getDsmStats,
dsmStatsExist
dsmStatsExist,
dsmStatsExistWithParentHash
}

0 comments on commit 51bea54

Please sign in to comment.