Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamodb streams and lambda filter processing #222

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions lambda-dynamodb-streams/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# Using filters to process all events with DynamoDB and Lambda

This example demonstrates how to create a Lambda trigger to process a stream from a DynamoDB table. This example will demonstrate the following workflow:

- A user writes an item to a DynamoDB table. Each item in the table represents a bark.
- A new stream record reflects that a new item has been added to the DynamoDB table.
- The new stream record triggers a Lambda function.
- If the stream record indicates that a new item was added to the DynamoDB table, the Lambda function reads the data from the stream record and publishes a message to a topic in SNS.
- The message is received by subscribers to the SNS topic.

## Prerequisites

* LocalStack
* Docker
* `awslocal` CLI
* `jq`

## Start LocalStack

Start LocalStack using the following command:

```bash
localsatck start
```

## Create a DynamoDB table with a stream enabled

Create a DynamoDB table (`BarkTable`) to store all of the barks from Woofer users. The primary key is composed of `Username` (partition key) and `Timestamp` (sort key). `BarkTable` has a stream enabled.

```bash
awslocal dynamodb create-table \
--table-name BarkTable \
--attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \
--key-schema AttributeName=Username,KeyType=HASH AttributeName=Timestamp,KeyType=RANGE \
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
--stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
```

Fetch the ARN of the stream from the DynamoDB table:

```bash
export latest_stream_arn=$(awslocal dynamodb describe-table --table-name BarkTable --query 'Table.LatestStreamArn' --output text)
```

## Create a Lambda execution role

Create a `WooferLambdaRole` IAM role using the policy defined in the `trust-relationship.json` file. Run the following command to create the role:

```bash
awslocal iam create-role --role-name WooferLambdaRole \
--path "/service-role/" \
--assume-role-policy-document file://trust-relationship.json
```

Enter the following command to attach the policy defined in the `role-policy.json` document to `WooferLambdaRole`:

```bash
awslocal iam put-role-policy --role-name WooferLambdaRole \
--policy-name WooferLambdaRolePolicy \
--policy-document file://role-policy.json
```

## Create a SNS topic

Enter the following command to create a new SNS topic:

```bash
awslocal sns create-topic --name wooferTopic
```

Enter the following command to subscribe an email address to `wooferTopic`:

```bash
awslocal sns subscribe \
--topic-arn arn:aws:sns:us-east-1:000000000000:wooferTopic \
--protocol email \
--notification-endpoint [email protected]
```

You can customize the notification endpoint to your own preference.

## Create and test a Lambda function

We have defined a Lambda function `publishNewBark` to process stream records from `BarkTable`.

Create a zip file to contain `publishNewBark.js`:

```bash
zip publishNewBark.zip publishNewBark.js
```

Create the Lambda function:

```bash
awslocal lambda create-function \
--region us-east-1 \
--function-name publishNewBark \
--zip-file fileb://publishNewBark.zip \
--role arn:aws:iam::000000000000:role/service-role/WooferLambdaRole \
--handler publishNewBark.handler \
--timeout 15 \
--runtime nodejs16.x
```

Enter the following command to test the `publishNewBark` function:

```bash
awslocal lambda invoke \
--function-name publishNewBark \
--payload file://payload.json \
--cli-binary-format raw-in-base64-out output.txt
```

If the test was successful, you will see the following output:

```bash
{
"StatusCode": 200,
"ExecutedVersion": "$LATEST"
}
```

In addition, the output.txt file will contain the following text.

```txt
"Successfully processed 1 records."
```

## Create and test a trigger

Enter the following command to create the trigger:

```bash
awslocal lambda create-event-source-mapping \
--region us-east-1 \
--function-name publishNewBark \
--event-source arn:aws:dynamodb:us-east-1:000000000000:table/BarkTable/stream/2023-07-09T12:00:13.312 \
--batch-size 1 \
--starting-position TRIM_HORIZON
```

Replace the `event-source` value with the value of `latest_stream_arn` that you saved earlier.

Run the following command to test the trigger:

```bash
awslocal dynamodb put-item \
--table-name BarkTable \
--item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}
```

You should receive a new message on your notification endpoint within a few minutes.

The Lambda function processes only new items that you add to `BarkTable`. If you update or delete an item in the table, the function does nothing.
37 changes: 37 additions & 0 deletions lambda-dynamodb-streams/payload.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"Records": [
{
"eventID": "7de3041dd709b024af6f29e4fa13d34c",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-1",
"dynamodb": {
"ApproximateCreationDateTime": 1479499740,
"Keys": {
"Timestamp": {
"S": "2016-11-18:12:09:36"
},
"Username": {
"S": "John Doe"
}
},
"NewImage": {
"Timestamp": {
"S": "2016-11-18:12:09:36"
},
"Message": {
"S": "This is a bark from the Woofer social network"
},
"Username": {
"S": "John Doe"
}
},
"SequenceNumber": "13021600000000001596893679",
"SizeBytes": 112,
"StreamViewType": "NEW_IMAGE"
},
"eventSourceARN": "arn:aws:dynamodb:us-east-1:000000000000:table/BarkTable/stream/2023-07-09T12:00:13.312"
}
]
}
29 changes: 29 additions & 0 deletions lambda-dynamodb-streams/publishNewBark.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict';
var AWS = require("aws-sdk");
var sns = new AWS.SNS();

exports.handler = (event, context, callback) => {

event.Records.forEach((record) => {
console.log('Stream record: ', JSON.stringify(record, null, 2));

if (record.eventName == 'INSERT') {
var who = JSON.stringify(record.dynamodb.NewImage.Username.S);
var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S);
var what = JSON.stringify(record.dynamodb.NewImage.Message.S);
var params = {
Subject: 'A new bark from ' + who,
Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what,
TopicArn: 'arn:aws:sns:us-east-1:000000000000:wooferTopic'
};
sns.publish(params, function(err, data) {
if (err) {
console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2));
} else {
console.log("Results from sending message: ", JSON.stringify(data, null, 2));
}
});
}
});
callback(null, `Successfully processed ${event.Records.length} records.`);
};
33 changes: 33 additions & 0 deletions lambda-dynamodb-streams/role-policy.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:us-east-1:000000000000:*"
},
{
"Effect": "Allow",
"Action": [
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:ListStreams"
],
"Resource": "arn:aws:dynamodb:us-east-1:000000000000:table/BarkTable/stream/*"
},
{
"Effect": "Allow",
"Action": [
"sns:Publish"
],
"Resource": [
"*"
]
}
]
}
50 changes: 50 additions & 0 deletions lambda-dynamodb-streams/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/bin/bash

awslocal dynamodb create-table \
--table-name BarkTable \
--attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \
--key-schema AttributeName=Username,KeyType=HASH AttributeName=Timestamp,KeyType=RANGE \
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
--stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES

latest_stream_arn=$(awslocal dynamodb describe-table --table-name BarkTable --query 'Table.LatestStreamArn' --output text)

awslocal iam create-role --role-name WooferLambdaRole \
--path "/service-role/" \
--assume-role-policy-document file://trust-relationship.json

awslocal iam put-role-policy --role-name WooferLambdaRole \
--policy-name WooferLambdaRolePolicy \
--policy-document file://role-policy.json

awslocal sns create-topic --name wooferTopic

awslocal sns subscribe \
--topic-arn arn:aws:sns:us-east-1:000000000000:wooferTopic \
--protocol email \
--notification-endpoint [email protected]

awslocal lambda create-function \
--region us-east-1 \
--function-name publishNewBark \
--zip-file fileb://publishNewBark.zip \
--role arn:aws:iam::000000000000:role/service-role/WooferLambdaRole \
--handler publishNewBark.handler \
--timeout 15 \
--runtime nodejs16.x

awslocal lambda invoke \
--function-name publishNewBark \
--payload file://payload.json \
--cli-binary-format raw-in-base64-out output.txt

awslocal lambda create-event-source-mapping \
--region us-east-1 \
--function-name publishNewBark \
--event-source arn:aws:dynamodb:us-east-1:000000000000:table/BarkTable/stream/2023-07-09T12:00:13.312 \
--batch-size 1 \
--starting-position TRIM_HORIZON

awslocal dynamodb put-item \
--table-name BarkTable \
--item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}
12 changes: 12 additions & 0 deletions lambda-dynamodb-streams/trust-relationship.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}