diff --git a/lambda-dynamodb-streams/README.md b/lambda-dynamodb-streams/README.md new file mode 100644 index 0000000..bc542bd --- /dev/null +++ b/lambda-dynamodb-streams/README.md @@ -0,0 +1,154 @@ +# Using filters to process all events with DynamoDB and Lambda + +This example demonstrates how to create a Lambda trigger to process a stream from a DynamoDB table. This example will demonstrate the following workflow: + +- A user writes an item to a DynamoDB table. Each item in the table represents a bark. +- A new stream record reflects that a new item has been added to the DynamoDB table. +- The new stream record triggers a Lambda function. +- If the stream record indicates that a new item was added to the DynamoDB table, the Lambda function reads the data from the stream record and publishes a message to a topic in SNS. +- The message is received by subscribers to the SNS topic. + +## Prerequisites + +* LocalStack +* Docker +* `awslocal` CLI +* `jq` + +## Start LocalStack + +Start LocalStack using the following command: + +```bash +localsatck start +``` + +## Create a DynamoDB table with a stream enabled + +Create a DynamoDB table (`BarkTable`) to store all of the barks from Woofer users. The primary key is composed of `Username` (partition key) and `Timestamp` (sort key). `BarkTable` has a stream enabled. + +```bash +awslocal dynamodb create-table \ + --table-name BarkTable \ + --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \ + --key-schema AttributeName=Username,KeyType=HASH AttributeName=Timestamp,KeyType=RANGE \ + --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \ + --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES +``` + +Fetch the ARN of the stream from the DynamoDB table: + +```bash +export latest_stream_arn=$(awslocal dynamodb describe-table --table-name BarkTable --query 'Table.LatestStreamArn' --output text) +``` + +## Create a Lambda execution role + +Create a `WooferLambdaRole` IAM role using the policy defined in the `trust-relationship.json` file. Run the following command to create the role: + +```bash +awslocal iam create-role --role-name WooferLambdaRole \ + --path "/service-role/" \ + --assume-role-policy-document file://trust-relationship.json +``` + +Enter the following command to attach the policy defined in the `role-policy.json` document to `WooferLambdaRole`: + +```bash +awslocal iam put-role-policy --role-name WooferLambdaRole \ + --policy-name WooferLambdaRolePolicy \ + --policy-document file://role-policy.json +``` + +## Create a SNS topic + +Enter the following command to create a new SNS topic: + +```bash +awslocal sns create-topic --name wooferTopic +``` + +Enter the following command to subscribe an email address to `wooferTopic`: + +```bash +awslocal sns subscribe \ + --topic-arn arn:aws:sns:us-east-1:000000000000:wooferTopic \ + --protocol email \ + --notification-endpoint user1@yourdomain.com +``` + +You can customize the notification endpoint to your own preference. + +## Create and test a Lambda function + +We have defined a Lambda function `publishNewBark` to process stream records from `BarkTable`. + +Create a zip file to contain `publishNewBark.js`: + +```bash +zip publishNewBark.zip publishNewBark.js +``` + +Create the Lambda function: + +```bash +awslocal lambda create-function \ + --region us-east-1 \ + --function-name publishNewBark \ + --zip-file fileb://publishNewBark.zip \ + --role arn:aws:iam::000000000000:role/service-role/WooferLambdaRole \ + --handler publishNewBark.handler \ + --timeout 15 \ + --runtime nodejs16.x +``` + +Enter the following command to test the `publishNewBark` function: + +```bash +awslocal lambda invoke \ + --function-name publishNewBark \ + --payload file://payload.json \ + --cli-binary-format raw-in-base64-out output.txt +``` + +If the test was successful, you will see the following output: + +```bash +{ + "StatusCode": 200, + "ExecutedVersion": "$LATEST" +} +``` + +In addition, the output.txt file will contain the following text. + +```txt +"Successfully processed 1 records." +``` + +## Create and test a trigger + +Enter the following command to create the trigger: + +```bash +awslocal lambda create-event-source-mapping \ + --region us-east-1 \ + --function-name publishNewBark \ + --event-source arn:aws:dynamodb:us-east-1:000000000000:table/BarkTable/stream/2023-07-09T12:00:13.312 \ + --batch-size 1 \ + --starting-position TRIM_HORIZON +``` + +Replace the `event-source` value with the value of `latest_stream_arn` that you saved earlier. + +Run the following command to test the trigger: + +```bash +awslocal dynamodb put-item \ + --table-name BarkTable \ + --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"} +``` + +You should receive a new message on your notification endpoint within a few minutes. + +The Lambda function processes only new items that you add to `BarkTable`. If you update or delete an item in the table, the function does nothing. diff --git a/lambda-dynamodb-streams/payload.json b/lambda-dynamodb-streams/payload.json new file mode 100644 index 0000000..2579cc1 --- /dev/null +++ b/lambda-dynamodb-streams/payload.json @@ -0,0 +1,37 @@ +{ + "Records": [ + { + "eventID": "7de3041dd709b024af6f29e4fa13d34c", + "eventName": "INSERT", + "eventVersion": "1.1", + "eventSource": "aws:dynamodb", + "awsRegion": "us-east-1", + "dynamodb": { + "ApproximateCreationDateTime": 1479499740, + "Keys": { + "Timestamp": { + "S": "2016-11-18:12:09:36" + }, + "Username": { + "S": "John Doe" + } + }, + "NewImage": { + "Timestamp": { + "S": "2016-11-18:12:09:36" + }, + "Message": { + "S": "This is a bark from the Woofer social network" + }, + "Username": { + "S": "John Doe" + } + }, + "SequenceNumber": "13021600000000001596893679", + "SizeBytes": 112, + "StreamViewType": "NEW_IMAGE" + }, + "eventSourceARN": "arn:aws:dynamodb:us-east-1:000000000000:table/BarkTable/stream/2023-07-09T12:00:13.312" + } + ] +} diff --git a/lambda-dynamodb-streams/publishNewBark.js b/lambda-dynamodb-streams/publishNewBark.js new file mode 100644 index 0000000..b44fe63 --- /dev/null +++ b/lambda-dynamodb-streams/publishNewBark.js @@ -0,0 +1,29 @@ +'use strict'; +var AWS = require("aws-sdk"); +var sns = new AWS.SNS(); + +exports.handler = (event, context, callback) => { + + event.Records.forEach((record) => { + console.log('Stream record: ', JSON.stringify(record, null, 2)); + + if (record.eventName == 'INSERT') { + var who = JSON.stringify(record.dynamodb.NewImage.Username.S); + var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S); + var what = JSON.stringify(record.dynamodb.NewImage.Message.S); + var params = { + Subject: 'A new bark from ' + who, + Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what, + TopicArn: 'arn:aws:sns:us-east-1:000000000000:wooferTopic' + }; + sns.publish(params, function(err, data) { + if (err) { + console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2)); + } else { + console.log("Results from sending message: ", JSON.stringify(data, null, 2)); + } + }); + } + }); + callback(null, `Successfully processed ${event.Records.length} records.`); +}; \ No newline at end of file diff --git a/lambda-dynamodb-streams/role-policy.json b/lambda-dynamodb-streams/role-policy.json new file mode 100644 index 0000000..2013555 --- /dev/null +++ b/lambda-dynamodb-streams/role-policy.json @@ -0,0 +1,33 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents" + ], + "Resource": "arn:aws:logs:us-east-1:000000000000:*" + }, + { + "Effect": "Allow", + "Action": [ + "dynamodb:DescribeStream", + "dynamodb:GetRecords", + "dynamodb:GetShardIterator", + "dynamodb:ListStreams" + ], + "Resource": "arn:aws:dynamodb:us-east-1:000000000000:table/BarkTable/stream/*" + }, + { + "Effect": "Allow", + "Action": [ + "sns:Publish" + ], + "Resource": [ + "*" + ] + } + ] +} diff --git a/lambda-dynamodb-streams/run.sh b/lambda-dynamodb-streams/run.sh new file mode 100644 index 0000000..0acd968 --- /dev/null +++ b/lambda-dynamodb-streams/run.sh @@ -0,0 +1,50 @@ +#!/bin/bash + +awslocal dynamodb create-table \ + --table-name BarkTable \ + --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \ + --key-schema AttributeName=Username,KeyType=HASH AttributeName=Timestamp,KeyType=RANGE \ + --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \ + --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES + +latest_stream_arn=$(awslocal dynamodb describe-table --table-name BarkTable --query 'Table.LatestStreamArn' --output text) + +awslocal iam create-role --role-name WooferLambdaRole \ + --path "/service-role/" \ + --assume-role-policy-document file://trust-relationship.json + +awslocal iam put-role-policy --role-name WooferLambdaRole \ + --policy-name WooferLambdaRolePolicy \ + --policy-document file://role-policy.json + +awslocal sns create-topic --name wooferTopic + +awslocal sns subscribe \ + --topic-arn arn:aws:sns:us-east-1:000000000000:wooferTopic \ + --protocol email \ + --notification-endpoint user1@yourdomain.com + +awslocal lambda create-function \ + --region us-east-1 \ + --function-name publishNewBark \ + --zip-file fileb://publishNewBark.zip \ + --role arn:aws:iam::000000000000:role/service-role/WooferLambdaRole \ + --handler publishNewBark.handler \ + --timeout 15 \ + --runtime nodejs16.x + +awslocal lambda invoke \ + --function-name publishNewBark \ + --payload file://payload.json \ + --cli-binary-format raw-in-base64-out output.txt + +awslocal lambda create-event-source-mapping \ + --region us-east-1 \ + --function-name publishNewBark \ + --event-source arn:aws:dynamodb:us-east-1:000000000000:table/BarkTable/stream/2023-07-09T12:00:13.312 \ + --batch-size 1 \ + --starting-position TRIM_HORIZON + +awslocal dynamodb put-item \ + --table-name BarkTable \ + --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"} diff --git a/lambda-dynamodb-streams/trust-relationship.json b/lambda-dynamodb-streams/trust-relationship.json new file mode 100644 index 0000000..064f493 --- /dev/null +++ b/lambda-dynamodb-streams/trust-relationship.json @@ -0,0 +1,12 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] + }