Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding sample for ddb and kinesis streams #219

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/makefile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ jobs:
LOCALSTACK_API_KEY: ${{ secrets.TEST_LOCALSTACK_API_KEY }}
DNS_ADDRESS: 127.0.0.1
DEBUG: 1
timeout-minutes: 50
timeout-minutes: 60
run: make test-ci-all
45 changes: 45 additions & 0 deletions dynamodb-kinesis-stream/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
export AWS_ACCESS_KEY_ID ?= test
export AWS_SECRET_ACCESS_KEY ?= test
export AWS_DEFAULT_REGION=us-east-1


usage: ## Show this help
@fgrep -h "##" $(MAKEFILE_LIST) | fgrep -v fgrep | sed -e 's/\\$$//' | sed -e 's/##//'

install: ## Install dependencies
@which localstack || pip install localstack
@which awslocal || pip install awscli-local
@which tflocal || pip install terraform-local
@test -e .venv || (virtualenv .venv; . .venv/bin/activate; pip install -r requirements.txt)

run: ## Deploy and run the sample locally
@(test -d .terraform || tflocal init) && tflocal apply --auto-approve
@. .venv/bin/activate; python test_stream_consumer.py &
./ddb-data.sh

start:
localstack start -d

clear: ## remove remnants from older deployments
@rm -f terraform.tfstate terraform.tfstate.backup

clean: clear ## remove all project related files and reverse terraform init
@rm -f -r .terraform .terraform.lock.hcl

stop:
@echo
localstack stop

ready:
@echo Waiting on the LocalStack container...
@localstack wait -t 30 && echo Localstack is ready to use! || (echo Gave up waiting on LocalStack, exiting. && exit 1)

logs:
@localstack logs > logs.txt

test-ci:
make start install ready run; return_code=`echo $$?`;\
make logs; make stop; exit $$return_code;

.PHONY: usage install start run stop ready logs test-ci

43 changes: 43 additions & 0 deletions dynamodb-kinesis-stream/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# DynamoDB and Kinesis Stream Integration

Simple demo illustrating the integration between DynamoDB and Kinesis streams.

## Prerequisites

- LocalStack
- Docker
- `make`
- Python >= 3.7
- `tflocal`


## Running

Make sure that LocalStack is started:

```
DEBUG=1 localstack start
```

Deploy the app with Terraform:

```
tflocal init
tflocal apply --auto-approve
```

You can now start the Python script that subscribes to the Kinesis shard, listen, and prints to the changes happening in the DynamoDB table:

```
pip install boto3
python test_stream_consumer.py
```

You can now populate the DynamoDB table with:

```
./ddb-data.sh
```

The Python script will start printing the records the shards receive to the console.

18 changes: 18 additions & 0 deletions dynamodb-kinesis-stream/ddb-data.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

artists=("Queen" "Queen" "Queen" "The Beatles" "The Beatles" "The Beatles" "The Rolling Stones" "The Rolling Stones" "The Rolling Stones")
songs=("Bohemian Rapsody" "We Will Rock You" "Radio Gaga" "Come Together" "Let it Be" "Here Comes the Sun" "Sympathy For The Devil" "Angie" "Satisfaction")

for i in "${!artists[@]}"; do
artist="${artists[i]}"
song="${songs[i]}"

awslocal dynamodb put-item \
--table-name MusicTable \
--item '{
"Artist": {"S": "'"$artist"'"},
"Song": {"S": "'"$song"'"}
}' \
--return-consumed-capacity TOTAL
sleep 1
done
35 changes: 35 additions & 0 deletions dynamodb-kinesis-stream/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
resource "aws_dynamodb_table" "demo_table" {
name = "MusicTable"
billing_mode = "PROVISIONED"
read_capacity = 20
write_capacity = 20
hash_key = "Artist"
range_key = "Song"

attribute {
name = "Artist"
type = "S"
}

attribute {
name = "Song"
type = "S"
}

stream_enabled = true
stream_view_type = "NEW_AND_OLD_IMAGES"
}

resource "aws_kinesis_stream" "demo_stream" {
name = "demo_stream"
shard_count = 1

retention_period = 24

shard_level_metrics = ["IncomingBytes", "OutgoingBytes"]
}

resource "aws_dynamodb_kinesis_streaming_destination" "streaming_destination" {
stream_arn = aws_kinesis_stream.demo_stream.arn
table_name = aws_dynamodb_table.demo_table.name
}
1 change: 1 addition & 0 deletions dynamodb-kinesis-stream/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
boto3==1.26.72
41 changes: 41 additions & 0 deletions dynamodb-kinesis-stream/test_stream_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import boto3
import time


endpoint_url = "http://localhost.localstack.cloud:4566"
stream_name = "demo_stream"


kinesis_client = boto3.client(
"kinesis",
endpoint_url=endpoint_url,
region_name="us-east-1",
aws_access_key_id="test",
aws_secret_access_key="test",
)

response = kinesis_client.describe_stream(
StreamName=stream_name,
)
stream_arn = response["StreamDescription"]["StreamARN"]
shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]

consumer_name = "ls_consumer"
response = kinesis_client.register_stream_consumer(
StreamARN=stream_arn, ConsumerName=consumer_name
)

consumer_arn = response["Consumer"]["ConsumerARN"]

response = kinesis_client.subscribe_to_shard(
ConsumerARN=consumer_arn,
ShardId=shard_id,
StartingPosition={"Type": "TRIM_HORIZON"},
)

try:
for record in response["EventStream"]:
print("****************")
print(record)
except Exception as e:
print(f"Error reading stream: {str(e)}")