Skip to content

Commit

Permalink
feat(event-source): add SQS as Lambda event source (#451)
Browse files Browse the repository at this point in the history
  • Loading branch information
brettstack authored Jun 1, 2018
1 parent 3cfb56d commit a5a3845
Show file tree
Hide file tree
Showing 29 changed files with 596 additions and 208 deletions.
1 change: 1 addition & 0 deletions docs/cloudformation_compatibility.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ Kinesis
Property Name Intrinsic(s) Supported Reasons
======================== ================================== ========================
Stream All
Queue All
StartingPosition All
BatchSize All
======================== ================================== ========================
Expand Down
27 changes: 27 additions & 0 deletions docs/internals/generated_resources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,33 @@ AWS::Lambda::Permissions MyFunction\ **MyTrigger**\ Permission
AWS::Lambda::EventSourceMapping MyFunction\ **MyTrigger**
================================== ================================

SQS
^^^^^^^

Example:

.. code:: yaml
MyFunction:
Type: AWS::Serverless::Function
Properties:
...
Events:
MyTrigger:
Type: SQS
Properties:
Queue: arn:aws:sqs:us-east-1:123456789012:my-queue
...
Additional generated resources:

================================== ================================
CloudFormation Resource Type Logical ID
================================== ================================
AWS::Lambda::Permissions MyFunction\ **MyTrigger**\ Permission
AWS::Lambda::EventSourceMapping MyFunction\ **MyTrigger**
================================== ================================

DynamoDb
^^^^^^^^

Expand Down
1 change: 1 addition & 0 deletions examples/2016-10-31/sqs/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
transformed-cfn-template.yaml
19 changes: 19 additions & 0 deletions examples/2016-10-31/sqs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# SQS Event Source Example

Example SAM template for processing messages on an SQS queue.

## Running the example

```bash
# Replace YOUR_S3_ARTIFACTS_BUCKET
YOUR_S3_ARTIFACTS_BUCKET='YOUR_S3_ARTIFACTS_BUCKET'; \
aws cloudformation package --template-file template.yaml --output-template-file cfn-transformed-template.yaml --s3-bucket $YOUR_S3_ARTIFACTS_BUCKET
aws cloudformation deploy --template-file ./cfn-transformed-template.yaml --stack-name example-logs-processor --capabilities CAPABILITY_IAM
```

After your CloudFormation Stack has completed creation, push a message to the SQS queue. To see it in action, modify and run the command below:

```bash
YOUR_SQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123456789012/my-queue; \
aws sqs send-message --queue-url $YOUR_SQS_QUEUE_URL --message-body '{ "myMessage": "Hello SAM!" }'
```
9 changes: 9 additions & 0 deletions examples/2016-10-31/sqs/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
async function handler (event, context) {
// TODO: Handle message...

console.log(event)

return {}
}

module.exports.handler = handler
19 changes: 19 additions & 0 deletions examples/2016-10-31/sqs/template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Example of processing messages on an SQS queue with Lambda
Resources:
MySQSQueueFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./index.js
Handler: index.handler
Runtime: nodejs8.10
Events:
MySQSEvent:
Type: SQS
Properties:
Queue: !Ref MyQueue

MyQueue:
Type: AWS::SQS::Queue
Properties:
34 changes: 26 additions & 8 deletions samtranslator/model/eventsources/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@

from samtranslator.model.lambda_ import LambdaEventSourceMapping
from samtranslator.translator.arn_generator import ArnGenerator
from samtranslator.model.exceptions import InvalidEventException


class PullEventSource(ResourceMacro):
"""Base class for pull event sources for SAM Functions.
The pull events are the streams--Kinesis and DynamoDB Streams. Both of these correspond to an EventSourceMapping in
Lambda, and require that the execution role be given to Kinesis or DynamoDB Streams, respectively.
The pull events are Kinesis Streams, DynamoDB Streams, and SQS Queues. All of these correspond to an EventSourceMapping in
Lambda, and require that the execution role be given to Kinesis Streams, DynamoDB Streams, or SQS Queues, respectively.
:cvar str policy_arn: The ARN of the AWS managed role policy corresponding to this pull event source
"""
resource_type = None
property_types = {
'Stream': PropertyType(True, is_str()),
'BatchSize': PropertyType(False, is_type(int)),
'StartingPosition': PropertyType(True, is_str())
'Stream': PropertyType(False, is_str()),
'Queue': PropertyType(False, is_str()),
'BatchSize': PropertyType(False, is_type(int)),
'StartingPosition': PropertyType(False, is_str())
}

def get_policy_arn(self):
Expand All @@ -32,23 +34,31 @@ def to_cloudformation(self, **kwargs):
:rtype: list
"""
function = kwargs.get('function')

if not function:
raise TypeError("Missing required keyword argument: function")

resources = []

lambda_eventsourcemapping = LambdaEventSourceMapping(self.logical_id)
resources.append(lambda_eventsourcemapping)

try:
# Name will not be available for Alias resources
function_name_or_arn = function.get_runtime_attr("name")
except NotImplementedError:
function_name_or_arn = function.get_runtime_attr("arn")

if not self.Stream and not self.Queue:
raise InvalidEventException(
self.relative_id, "No Queue (for SQS) or Stream (for Kinesis or DynamoDB) provided.")

if self.Stream and not self.StartingPosition:
raise InvalidEventException(
self.relative_id, "StartingPosition is required for Kinesis and DynamoDB.")

lambda_eventsourcemapping.FunctionName = function_name_or_arn
lambda_eventsourcemapping.EventSourceArn = self.Stream
lambda_eventsourcemapping.EventSourceArn = self.Stream or self.Queue
lambda_eventsourcemapping.StartingPosition = self.StartingPosition
lambda_eventsourcemapping.BatchSize = self.BatchSize

Expand Down Expand Up @@ -82,3 +92,11 @@ class DynamoDB(PullEventSource):

def get_policy_arn(self):
return ArnGenerator.generate_aws_managed_policy_arn('service-role/AWSLambdaDynamoDBExecutionRole')


class SQS(PullEventSource):
"""SQS Queue event source."""
resource_type = 'SQS'

def get_policy_arn(self):
return ArnGenerator.generate_aws_managed_policy_arn('service-role/AWSLambdaSQSExecutionRole')
2 changes: 1 addition & 1 deletion samtranslator/model/lambda_.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class LambdaEventSourceMapping(Resource):
'Enabled': PropertyType(False, is_type(bool)),
'EventSourceArn': PropertyType(True, is_str()),
'FunctionName': PropertyType(True, is_str()),
'StartingPosition': PropertyType(True, is_str())
'StartingPosition': PropertyType(False, is_str())
}

runtime_attrs = {
Expand Down
18 changes: 18 additions & 0 deletions samtranslator/validator/sam_schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@
{
"$ref": "#/definitions/AWS::Serverless::Function.KinesisEvent"
},
{
"$ref": "#/definitions/AWS::Serverless::Function.SQSEvent"
},
{
"$ref": "#/definitions/AWS::Serverless::Function.DynamoDBEvent"
},
Expand Down Expand Up @@ -481,6 +484,21 @@
],
"type": "object"
},
"AWS::Serverless::Function.SQSEvent": {
"additionalProperties": false,
"properties": {
"BatchSize": {
"type": "number"
},
"Queue": {
"type": "string"
}
},
"required": [
"Queue"
],
"type": "object"
},
"AWS::Serverless::Function.S3Event": {
"additionalProperties": false,
"properties": {
Expand Down
17 changes: 17 additions & 0 deletions tests/translator/input/error_missing_queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# File: sam.yml
# Version: 0.9

AWSTemplateFormatVersion: '2010-09-09'
Parameters: {}
Resources:
SQSFunction:
Type: 'AWS::Serverless::Function'
Properties:
CodeUri: s3://sam-demo-bucket/queues.zip
Handler: queue.sqs_handler
Runtime: python2.7
Events:
MySqsQueue:
Type: SQS
Properties:
BatchSize: 10
13 changes: 13 additions & 0 deletions tests/translator/input/error_missing_startingposition.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Resources:
KinesisFunction:
Type: 'AWS::Serverless::Function'
Properties:
CodeUri: s3://sam-demo-bucket/streams.zip
Handler: stream.kinesis_handler
Runtime: python2.7
Events:
MyKinesisStream:
Type: Kinesis
Properties:
Stream: arn:aws:kinesis:us-west-2:012345678901:stream/my-stream
BatchSize: 100
15 changes: 15 additions & 0 deletions tests/translator/input/error_missing_stream.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
AWSTemplateFormatVersion: '2010-09-09'
Parameters: {}
Resources:
DynamoDBFunction:
Type: 'AWS::Serverless::Function'
Properties:
CodeUri: s3://sam-demo-bucket/streams.zip
Handler: stream.ddb_handler
Runtime: python2.7
Events:
MyDDBStream:
Type: DynamoDB
Properties:
BatchSize: 200
StartingPosition: LATEST
13 changes: 13 additions & 0 deletions tests/translator/input/sqs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Resources:
SQSFunction:
Type: 'AWS::Serverless::Function'
Properties:
CodeUri: s3://sam-demo-bucket/queues.zip
Handler: queue.sqs_handler
Runtime: python2.7
Events:
MySqsQueue:
Type: SQS
Properties:
Queue: arn:aws:sqs:us-west-2:012345678901:my-queue
BatchSize: 10
6 changes: 6 additions & 0 deletions tests/translator/output/aws-cn/error_missing_queue.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"errors": [{
"errorMessage": "Resource with id [SQSFunction] is invalid. Event with id [MySqsQueue] is invalid. No Queue (for SQS) or Stream (for Kinesis or DynamoDB) provided."
}],
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [SQSFunction] is invalid. Event with id [MySqsQueue] is invalid. No Queue (for SQS) or Stream (for Kinesis or DynamoDB) provided."
}
6 changes: 6 additions & 0 deletions tests/translator/output/aws-cn/error_missing_stream.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"errors": [{
"errorMessage": "Resource with id [DynamoDBFunction] is invalid. Event with id [MyDDBStream] is invalid. No Queue (for SQS) or Stream (for Kinesis or DynamoDB) provided."
}],
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Resource with id [DynamoDBFunction] is invalid. Event with id [MyDDBStream] is invalid. No Queue (for SQS) or Stream (for Kinesis or DynamoDB) provided."
}
58 changes: 58 additions & 0 deletions tests/translator/output/aws-cn/sqs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{
"Resources": {
"SQSFunctionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"ManagedPolicyArns": [
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaSQSExecutionRole"
],
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
}
}]
}
}
},
"SQSFunction": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"S3Bucket": "sam-demo-bucket",
"S3Key": "queues.zip"
},
"Handler": "queue.sqs_handler",
"Role": {
"Fn::GetAtt": [
"SQSFunctionRole",
"Arn"
]
},
"Runtime": "python2.7",
"Tags": [{
"Value": "SAM",
"Key": "lambda:createdBy"
}]
}
},
"SQSFunctionMySqsQueue": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"BatchSize": 10,
"EventSourceArn": "arn:aws:sqs:us-west-2:012345678901:my-queue",
"FunctionName": {
"Ref": "SQSFunction"
}
}
}
}
}
Loading

0 comments on commit a5a3845

Please sign in to comment.