-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(event-source): add SQS as Lambda event source #451
Changes from 7 commits
bcb74c0
95aafd7
a5fc1b1
01451e1
062194b
9bbee00
7f186ce
bb325a1
c100723
a26e50c
e6d4910
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
transformed-cfn-template.yaml |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
# SQS Event Source Example | ||
|
||
Example SAM template for processing messages on an SQS queue. | ||
|
||
## Running the example | ||
|
||
```bash | ||
# Replace YOUR_S3_ARTIFACTS_BUCKET | ||
YOUR_S3_ARTIFACTS_BUCKET='YOUR_S3_ARTIFACTS_BUCKET'; \ | ||
aws cloudformation package --template-file template.yaml --output-template-file cfn-transformed-template.yaml --s3-bucket $YOUR_S3_ARTIFACTS_BUCKET | ||
aws cloudformation deploy --template-file ./cfn-transformed-template.yaml --stack-name example-logs-processor --capabilities CAPABILITY_IAM | ||
``` | ||
|
||
After your CloudFormation Stack has completed creation, push a message to the SQS queue. To see it in action, modify and run the command below: | ||
|
||
```bash | ||
YOUR_SQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123456789012/my-queue; \ | ||
aws sqs send-message --queue-url $YOUR_SQS_QUEUE_URL --message-body '{ "myMessage": "Hello SAM!" }' | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
async function handler (event, context) { | ||
// TODO: Handle message... | ||
|
||
console.log(event) | ||
|
||
return {} | ||
} | ||
|
||
module.exports.handler = handler |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
AWSTemplateFormatVersion: '2010-09-09' | ||
Transform: AWS::Serverless-2016-10-31 | ||
Description: Example of processing messages on an SQS queue with Lambda | ||
Resources: | ||
MySQSQueueFunction: | ||
Type: AWS::Serverless::Function | ||
Properties: | ||
CodeUri: ./index.js | ||
Handler: index.handler | ||
Runtime: nodejs8.10 | ||
Events: | ||
MySQSEvent: | ||
Type: SQS | ||
Properties: | ||
Queue: !Ref MyQueue | ||
|
||
MyQueue: | ||
Type: AWS::SQS::Queue | ||
Properties: |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,16 +8,17 @@ | |
class PullEventSource(ResourceMacro): | ||
"""Base class for pull event sources for SAM Functions. | ||
|
||
The pull events are the streams--Kinesis and DynamoDB Streams. Both of these correspond to an EventSourceMapping in | ||
Lambda, and require that the execution role be given to Kinesis or DynamoDB Streams, respectively. | ||
The pull events are Kinesis Streams, DynamoDB Streams, and SQS Queues. All of these correspond to an EventSourceMapping in | ||
Lambda, and require that the execution role be given to Kinesis Streams, DynamoDB Streams, or SQS Streams, respectively. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose we shouldn't be calling SQS as streams. SQS in itself should do. Thoughts ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch; that was a miss. I refer to it as SQS Queues in the first sentence. |
||
|
||
:cvar str policy_arn: The ARN of the AWS managed role policy corresponding to this pull event source | ||
""" | ||
resource_type = None | ||
property_types = { | ||
'Stream': PropertyType(True, is_str()), | ||
'BatchSize': PropertyType(False, is_type(int)), | ||
'StartingPosition': PropertyType(True, is_str()) | ||
'Stream': PropertyType(False, is_str()), | ||
'Queue': PropertyType(False, is_str()), | ||
'BatchSize': PropertyType(False, is_type(int)), | ||
'StartingPosition': PropertyType(False, is_str()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose all properties are changed to not required (False) so as to support event source mapping for both Streams (DynamoDB and Kinesis) and SQS. But aren't we actually missing on validation ? I can create an Event Source mapping for DDB streams without the StartingPosition specified, it will pass this check but will end up in an error from CFN. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. We do need to validate them separately now that these properties are not required |
||
} | ||
|
||
def get_policy_arn(self): | ||
|
@@ -32,23 +33,23 @@ def to_cloudformation(self, **kwargs): | |
:rtype: list | ||
""" | ||
function = kwargs.get('function') | ||
|
||
if not function: | ||
raise TypeError("Missing required keyword argument: function") | ||
|
||
resources = [] | ||
|
||
lambda_eventsourcemapping = LambdaEventSourceMapping(self.logical_id) | ||
resources.append(lambda_eventsourcemapping) | ||
|
||
try: | ||
# Name will not be available for Alias resources | ||
function_name_or_arn = function.get_runtime_attr("name") | ||
except NotImplementedError: | ||
function_name_or_arn = function.get_runtime_attr("arn") | ||
|
||
lambda_eventsourcemapping.FunctionName = function_name_or_arn | ||
lambda_eventsourcemapping.EventSourceArn = self.Stream | ||
lambda_eventsourcemapping.EventSourceArn = self.Stream or self.Queue | ||
lambda_eventsourcemapping.StartingPosition = self.StartingPosition | ||
lambda_eventsourcemapping.BatchSize = self.BatchSize | ||
|
||
|
@@ -82,3 +83,11 @@ class DynamoDB(PullEventSource): | |
|
||
def get_policy_arn(self): | ||
return ArnGenerator.generate_aws_managed_policy_arn('service-role/AWSLambdaDynamoDBExecutionRole') | ||
|
||
|
||
class SQS(PullEventSource): | ||
"""SQS Queue event source.""" | ||
resource_type = 'SQS' | ||
|
||
def get_policy_arn(self): | ||
return ArnGenerator.generate_aws_managed_policy_arn('service-role/AWSLambdaSQSExecutionRole') |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,7 +61,7 @@ class LambdaEventSourceMapping(Resource): | |
'Enabled': PropertyType(False, is_type(bool)), | ||
'EventSourceArn': PropertyType(True, is_str()), | ||
'FunctionName': PropertyType(True, is_str()), | ||
'StartingPosition': PropertyType(True, is_str()) | ||
'StartingPosition': PropertyType(False, is_str()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as the previous comment. Aren't we letting go the validation of StartingPosition for streams because of SQS which doesn't require it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. JSON Schema validation will pick this up once we turn it on (it's currently not doing anything with failed JSON Schemas as we're not yet ready). Would prefer to not add additional validation right now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if someone forgot to set the Queue property? SAM Translator will crash instead of giving them a validation error. After turning on JSON Schema, we do need a deeper scrub of code. This extra validation won't increase the surface area of the scrub significantly IMO |
||
} | ||
|
||
runtime_attrs = { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,3 +25,15 @@ Resources: | |
Stream: arn:aws:dynamodb:us-west-2:012345678901:table/TestTable/stream/2015-05-11T21:21:33.291 | ||
BatchSize: 200 | ||
StartingPosition: LATEST | ||
SQSFunction: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider moving SQS test to a separate file. This file is already getting overloaded. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes we should consider moving this to a separate file. And this file is explicitly named Streams and hence SQS doesn't belong here. |
||
Type: 'AWS::Serverless::Function' | ||
Properties: | ||
CodeUri: s3://sam-demo-bucket/queues.zip | ||
Handler: queue.sqs_handler | ||
Runtime: python2.7 | ||
Events: | ||
MySqsQueue: | ||
Type: SQS | ||
Properties: | ||
Queue: arn:aws:sqs:us-west-2:012345678901:my-queue | ||
BatchSize: 200 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Batch size for SQS is only between 1 and 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we call this property
QueueArn
instead? I wonder if users might get confused and specify a queue name hereThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with Queue to be consistent with Stream. Though I agree, QueueArn is more descriptive. If we want in the future we can add QueueArn (and StreamArn) leave Queue as an alias (and possibly deprecate?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough. Let's stick with Queue to keep the language consistent.