Skip to content

Commit

Permalink
update sqs to use new middleware pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
garbados committed Nov 29, 2021
1 parent 7e1130b commit 0e90f59
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 61 deletions.
109 changes: 83 additions & 26 deletions merged/aws-sdk/lib/v3-sqs.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,94 @@ const SEND_COMMANDS = ['SendMessageCommand', 'SendMessageBatchCommand']

const RECEIVE_COMMANDS = ['ReceiveMessageCommand']

function makeWrapClient(commands) {
return function wrapClient(shim, original, name, args) {
const {
constructor,
input: { QueueUrl }
} = args[0]
const type = constructor.name
if (commands.includes(type)) {
return {
callback: shim.LAST,
destinationName: grabLastUrlSegment(QueueUrl),
destinationType: shim.QUEUE,
opaque: true
}
}
module.exports = function instrument(shim, name, resolvedName) {
const fileNameIndex = resolvedName.indexOf('/index')
const relativeFolder = resolvedName.substr(0, fileNameIndex)

// eslint-disable-next-line consistent-return
return
// The path changes depending on the version...
// so we don't want to hard-code the relative
// path from the module root.
const sqsClientExport = shim.require(`${relativeFolder}/SQSClient`)

if (!shim.isFunction(sqsClientExport.SQSClient)) {
shim.logger.debug('Could not find SQSClient, not instrumenting.')
} else {
shim.setLibrary(shim.SQS)
shim.wrapReturn(
sqsClientExport,
'SQSClient',
function wrappedReturn(shim, fn, fnName, instance) {
postClientConstructor.call(instance, shim)
}
)
}
}

module.exports = function instrument(shim, AWS) {
if (!shim.isFunction(AWS.SQS)) {
shim.logger.debug('Could not find SQS, not instrumenting.')
return
/**
* Calls the instances middlewareStack.use to register
* a plugin that adds a middleware to record the time it teakes to publish a message
* see: https://aws.amazon.com/blogs/developer/middleware-stack-modular-aws-sdk-js/
*
* @param {Shim} shim
*/
function postClientConstructor(shim) {
this.middlewareStack.use(getPlugin(shim))
}

/**
* Returns the plugin object that adds middleware
*
* @param {Shim} shim
* @returns {object}
*/
function getPlugin(shim) {
return {
applyToStack: (clientStack) => {
clientStack.add(sqsMiddleware.bind(null, shim), {
name: 'NewRelicSqsMiddleware',
step: 'initialize',
priority: 'high'
})
}
}
}

const wrapClientSend = makeWrapClient(SEND_COMMANDS)
const wrapClientReceive = makeWrapClient(RECEIVE_COMMANDS)
/**
* Middleware hook that records the middleware chain
* when command is `PublishCommand`
*
* @param {Shim} shim
* @param {function} next middleware function
* @param {Object} context
* @returns {function}
*/
function sqsMiddleware(shim, next, context) {
if (SEND_COMMANDS.includes(context.commandName)) {
return shim.recordProduce(next, getSqsSpec)
} else if (RECEIVE_COMMANDS.includes(context.commandName)) {
return shim.recordConsume(next, getSqsSpec)
}
return next
}

shim.setLibrary(shim.SQS)
shim.recordProduce(AWS.SQSClient.prototype, 'send', wrapClientSend)
shim.recordConsume(AWS.SQSClient.prototype, 'send', wrapClientReceive)
/**
* Returns the spec for PublishCommand
*
* @param {Shim} shim
* @param {original} original original middleware function
* @param {Array} args to the middleware function
* @returns {Object}
*/
function getSqsSpec(shim, original, name, args) {
const [
{
input: { QueueUrl }
}
] = args
return {
callback: shim.LAST,
destinationName: grabLastUrlSegment(QueueUrl),
destinationType: shim.QUEUE,
opaque: true
}
}
54 changes: 19 additions & 35 deletions merged/aws-sdk/tests/versioned/aws-sdk-v3/sqs.tap.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ tap.test('SQS API', (t) => {
let ReceiveMessageCommand = null

let queueName = null
// let sendMessageRequestId = null
// let sendMessageBatchRequestId = null
// let receiveMessageRequestId = null

let server = null

t.beforeEach(async () => {
Expand All @@ -41,10 +37,11 @@ tap.test('SQS API', (t) => {
})

helper = utils.TestAgent.makeInstrumented()
common.registerCoreInstrumentation(helper)
helper.registerInstrumentation({
moduleName: '@aws-sdk/client-sqs',
type: 'message',
onRequire: require('../../../lib/v3-sqs')
onResolved: require('../../../lib/v3-sqs')
})

const lib = require('@aws-sdk/client-sqs')
Expand Down Expand Up @@ -77,9 +74,6 @@ tap.test('SQS API', (t) => {
SendMessageBatchCommand = null

queueName = null
// sendMessageRequestId = null
// sendMessageBatchRequestId = null
// receiveMessageRequestId = null
})

t.test('commands with promises', async (t) => {
Expand Down Expand Up @@ -119,27 +113,27 @@ tap.test('SQS API', (t) => {
const expectedSegmentCount = 3

const root = transaction.trace.root
const segments = common.checkAWSAttributes(t, root, common.SQS_PATTERN, [], true)
const segments = common.checkAWSAttributes(t, root, common.SQS_PATTERN)

t.equal(
segments.length,
expectedSegmentCount,
`should have ${expectedSegmentCount} AWS MessageBroker/SQS segments`
)

const externalSegments = common.checkAWSAttributes(t, root, common.EXTERN_PATTERN, [], true)
const externalSegments = common.checkAWSAttributes(t, root, common.EXTERN_PATTERN)
t.equal(externalSegments.length, 0, 'should not have any External segments')

const [sendMessage, sendMessageBatch, receiveMessage] = segments

checkName(t, sendMessage.name, 'Produce', queueName)
// checkAttributes(t, sendMessage, 'sendMessage', sendMessageRequestId)
checkAttributes(t, sendMessage, 'SendMessageCommand')

checkName(t, sendMessageBatch.name, 'Produce', queueName)
// checkAttributes(t, sendMessageBatch, 'sendMessageBatch', sendMessageBatchRequestId)
checkAttributes(t, sendMessageBatch, 'SendMessageBatchCommand')

checkName(t, receiveMessage.name, 'Consume', queueName)
// checkAttributes(t, receiveMessage, 'receiveMessage', receiveMessageRequestId)
checkAttributes(t, receiveMessage, 'ReceiveMessageCommand')

t.end()
}
Expand All @@ -151,28 +145,18 @@ function checkName(t, name, action, queueName) {
t.match(name, specificName, 'should have correct name')
}

// function checkAttributes(t, segment, operation, expectedRequestId) {
// const actualAttributes = segment.attributes.get(common.SEGMENT_DESTINATION)
//
// const expectedAttributes = {
// 'aws.operation': operation,
// 'aws.requestId': expectedRequestId,
// 'aws.service': 'Amazon SQS',
// 'aws.region': AWS_REGION
// }
//
// t.match(
// actualAttributes,
// expectedAttributes,
// `should have expected attributes for ${operation}`
// )
// }

// function getRequestId(t, apiReturnedData) {
// const metadata = apiReturnedData['$metadata']
// t.ok(metadata)
// return metadata.RequestId
// }
function checkAttributes(t, segment, operation) {
const actualAttributes = segment.attributes.get(common.SEGMENT_DESTINATION)

const expectedAttributes = {
'aws.operation': operation,
'aws.requestId': String,
'aws.service': /sqs|SQS/,
'aws.region': AWS_REGION
}

t.match(actualAttributes, expectedAttributes, `should have expected attributes for ${operation}`)
}

function getCreateParams(queueName) {
const params = {
Expand Down

0 comments on commit 0e90f59

Please sign in to comment.