Skip to content

Commit

Permalink
Sfn map parallism POC
Browse files Browse the repository at this point in the history
  • Loading branch information
rivernews committed Sep 17, 2022
1 parent e29fdc8 commit d2b025e
Show file tree
Hide file tree
Showing 15 changed files with 334 additions and 121 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
**/builds/**

lambda_golang/landing
lambda_golang/story
lambda_golang/stories
lambda_golang/landing_metadata
lambda_golang/story
venv

# Binaries for programs and plugins
Expand Down
1 change: 1 addition & 0 deletions cloud_environments/terraform.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ if (
cd $GOLANG_SRC_DIR && \
go build ./cmd/landing && \
go build ./cmd/landing_metadata && \
go build ./cmd/stories && \
go build ./cmd/story && \
cd $PYTHON_SRC_DIR && python -m compileall layer src
); then
Expand Down
18 changes: 0 additions & 18 deletions cloud_module/api.tf
Original file line number Diff line number Diff line change
Expand Up @@ -93,29 +93,11 @@ module "slack_command_lambda" {
}
}

# allow lambda to invoke step function
attach_policy_json = true
policy_json = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"states:StartExecution"
],
"Resource": ["${module.batch_stories_sfn.state_machine_arn}"]
}
]
}
EOF

environment_variables = {
SLACK_SIGNING_SECRET = var.slack_signing_secret
SLACK_POST_WEBHOOK_URL = var.slack_post_webhook_url

PIPELINE_QUEUE_NAME = module.pipeline_queue.this_sqs_queue_name
BATCH_STORIES_SFN_ARN = module.batch_stories_sfn.state_machine_arn

LOGLEVEL = "DEBUG"
ENV = local.environment
Expand Down
40 changes: 26 additions & 14 deletions cloud_module/sfn_def/batch_stories_def.json
Original file line number Diff line number Diff line change
@@ -1,18 +1,30 @@
{
"Comment": "Batch processing stories for a landing page",
"StartAt": "BatchStoriesFetchParse",
"States": {
"BatchStoriesFetchParse":{
"Comment": "Fetch and parse all stories",
"Type":"Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters":{
"FunctionName": "${LANDING_PARSE_METADATA_LAMBDA_ARN}",
"Payload": {
"landingS3Key.$": "$.landingS3Key"
"Comment": "Batch processing stories for a landing page",
"StartAt": "Fetch-All",
"States": {
"Fetch-All": {
"Type": "Map",
"InputPath": "$",
"ItemsPath": "$.stories",
"MaxConcurrency": 1,
"ResultPath": "$.stories",
"Comment": "`Parameters` replaces `InputPath`, see https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-map-state.html ",
"Parameters": {
"story.$": "$$.Map.Item.Value",
"newsSiteAlias.$": "$.newsSiteAlias",
"landingPageTimeStamp.$": "$.landingPageTimeStamp"
},
"Iterator": {
"StartAt": "Fetch",
"States": {
"Fetch": {
"Type": "Task",
"Resource": "${FETCH_STORY_LAMBDA_ARN}",
"End": true
}
},
"End":true
}
}
},
"End": true
}
}
}
60 changes: 58 additions & 2 deletions cloud_module/stories_sfn.tf
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module batch_stories_sfn {
name = "${local.project_name}-batch-stories-sfn"

definition = templatefile("${path.module}/sfn_def/batch_stories_def.json", {
LANDING_PARSE_METADATA_LAMBDA_ARN = module.landing_parse_metadata_lambda.lambda_function_arn
FETCH_STORY_LAMBDA_ARN = module.fetch_story_lambda.lambda_function_arn
})

# allow step function to invoke other service
Expand All @@ -14,7 +14,7 @@ module batch_stories_sfn {
service_integrations = {
lambda = {
lambda = [
module.landing_parse_metadata_lambda.lambda_function_arn
module.fetch_story_lambda.lambda_function_arn
]
}
}
Expand Down Expand Up @@ -91,3 +91,59 @@ module landing_parse_metadata_lambda {
Project = local.project_name
}
}

module fetch_story_lambda {
source = "terraform-aws-modules/lambda/aws"
create_function = true
function_name = "${local.project_name}-fetch-story"
description = "Fetch and archive a story page"
handler = "story"
runtime = "go1.x"

source_path = [{
path = "${path.module}/../lambda_golang/"
commands = ["${local.go_build_flags} go build ./cmd/story", ":zip"]
patterns = ["story"]
}]

timeout = 900
cloudwatch_logs_retention_in_days = 7

publish = true

attach_policy_statements = true
policy_statements = {
s3_archive_bucket = {
effect = "Allow",
actions = [
"s3:PutObject",
"s3:GetObject"
],
resources = [
"${data.aws_s3_bucket.archive.arn}/*",
]
}
# enable getting 404 instead of 403 in case of not found
# https://stackoverflow.com/a/19808954/9814131
s3_archive_bucket_check_404 = {
effect = "Allow",
actions = [
"s3:ListBucket",
],
resources = [
"${data.aws_s3_bucket.archive.arn}",
]
}
}

environment_variables = {
SLACK_WEBHOOK_URL = var.slack_post_webhook_url
LOG_LEVEL = "DEBUG"
DEBUG = "true"
S3_ARCHIVE_BUCKET = data.aws_s3_bucket.archive.id
}

tags = {
Project = local.project_name
}
}
28 changes: 23 additions & 5 deletions cloud_module/stories_sqs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ module "stories_queue_consumer_lambda" {
source = "terraform-aws-modules/lambda/aws"

create_function = true
function_name = "${local.project_name}-stories-queue-consumer-lambda"
description = "Consumes ${local.project_name} stories queue"
handler = "story"
function_name = "${local.project_name}-fetch-stories"
description = "Fetch ${local.project_name} stories; triggered by metadata.json creation"
handler = "stories"
runtime = "go1.x"
source_path = [{
path = "${path.module}/../lambda_golang/"
commands = ["${local.go_build_flags} go build ./cmd/story", ":zip"]
patterns = ["story"]
commands = ["${local.go_build_flags} go build ./cmd/stories", ":zip"]
patterns = ["stories"]
}]
publish = true

Expand All @@ -45,6 +45,23 @@ module "stories_queue_consumer_lambda" {

reserved_concurrent_executions = -1

# allow lambda to invoke step function
attach_policy_json = true
policy_json = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"states:StartExecution"
],
"Resource": ["${module.batch_stories_sfn.state_machine_arn}"]
}
]
}
EOF

# event source mapping for long polling
event_source_mapping = {
sqs = {
Expand Down Expand Up @@ -92,6 +109,7 @@ module "stories_queue_consumer_lambda" {
ENV = local.environment

S3_ARCHIVE_BUCKET = data.aws_s3_bucket.archive.id
SFN_ARN = module.batch_stories_sfn.state_machine_arn
}

tags = {
Expand Down
3 changes: 2 additions & 1 deletion lambda/src/slack_command_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def lambda_handler(request: APIGatewayRequest, context):
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/sqs.html#sending-messages
queue = sqs.get_queue_by_name(QueueName=PIPELINE_QUEUE_NAME)
res = queue.send_message(MessageBody=str(request.body), MessageGroupId=f'{PIPELINE_QUEUE_NAME}')

'''
elif command.startswith(SlackCommandMantra.FETCH_LANDING_STORIES):
text = request.body.get('text', '')
landing_html_key = text[0] if text and isinstance(text, list) else text
Expand All @@ -53,6 +53,7 @@ def lambda_handler(request: APIGatewayRequest, context):
})
)
loop.run_until_complete(SlackService.send(f'received landing page key: `{landing_html_key}`'))
'''

loop.run_until_complete(SlackService.send(f'You sent a slack command. Processed response: {res}'))

Expand Down
117 changes: 117 additions & 0 deletions lambda_golang/cmd/stories/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package main

import (
"fmt"
"strings"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sfn"

"context"
"math/rand"
"time"

"github.com/rivernews/GoTools"
"github.com/rivernews/media-literacy/pkg/cloud"
"github.com/rivernews/media-literacy/pkg/newssite"
)

func main() {
rand.Seed(time.Now().UnixNano())
lambda.Start(HandleRequest)
}

type LambdaResponse struct {
OK bool `json:"OK:"`
Message string `json:"message:"`
}

var sfnClient *sfn.Client

// SQS event
// refer to https://github.com/aws/aws-lambda-go/blob/v1.26.0/events/README_SQS.md
func HandleRequest(ctx context.Context, S3Event events.S3Event) (LambdaResponse, error) {
GoTools.Logger("INFO", "Fetch stories lambda launched ... triggered by metadata.json creation.")

for _, record := range S3Event.Records {

GoTools.Logger("INFO", fmt.Sprintf("S3 event ``` %s ```\n ", newssite.AsJson(record)))

metadataS3KeyTokens := strings.Split(record.S3.Object.URLDecodedKey, "/")
newsSiteAlias := metadataS3KeyTokens[0]
landingPageTimeStamp := metadataS3KeyTokens[len(metadataS3KeyTokens)-2]

metadataJSONString := cloud.Pull(record.S3.Object.URLDecodedKey)
var metadata newssite.LandingPageMetadata
newssite.FromJson([]byte(metadataJSONString), &metadata)

GoTools.Logger("INFO", fmt.Sprintf("Test first story: %d:%d", len(metadata.Stories), len(metadata.UntitledStories)))

// TODO: move below into sfn map lambda
story := metadata.Stories[0]
storyHtmlBodyText := newssite.Fetch(story.URL)
cloud.Archive(cloud.ArchiveArgs{
BodyText: storyHtmlBodyText,
Key: fmt.Sprintf("%s/stories/%s-%s/story.html", newsSiteAlias, landingPageTimeStamp, story.Name),
})

// fire step function, input = {stories: [{}, {}, {}...], newsSiteAlias:"", landingPageTimeStamp:""}
sfnInput := newssite.AsJson(&newssite.StepFunctionInput{
Stories: metadata.Stories,
NewsSiteAlias: newsSiteAlias,
LandingPageTimeStamp: landingPageTimeStamp,
})
executionName := strings.ReplaceAll(fmt.Sprintf("%s--%s", landingPageTimeStamp, time.Now().Format(time.RFC3339)), ":", "")
sfnArn := GoTools.GetEnvVarHelper("SFN_ARN")
GoTools.Logger("INFO", fmt.Sprintf("execName: `%s`\nSfnArn: `%s`\nInput:``` %s ```\n", executionName, sfnArn, sfnInput))

sfnOutput := cloud.SfnStartExecution(
ctx,
&sfn.StartExecutionInput{
Input: aws.String(sfnInput),
Name: aws.String(executionName),
StateMachineArn: aws.String(sfnArn),
},
)

GoTools.Logger("INFO", fmt.Sprintf("Sfn output ``` %s ```\n", newssite.AsJson(sfnOutput)))

/*
storyChunk := message.Body
GoTools.Logger("INFO", fmt.Sprintf("Story consumer! story chunk: %s", storyChunk))
// TODO: fetch and archive for the chunk of storyURLs
storyURLs := strings.Split(storyChunk, " ")
for _, storyURL := range storyURLs {
// TODO: randomized interval
time.Sleep(time.Duration(rand.Intn(5)+5) * time.Second)
storyS3Path := fmt.Sprintf("story/%s", storyURL)
if !cloud.IsDuplicated(storyS3Path) {
GoTools.Logger("INFO", fmt.Sprintf("Archiving story %s", storyURL))
} else {
GoTools.Logger("DEBUG", fmt.Sprintf("Skip archiving story %s", storyURL))
}
// _, resMessage, err := GoTools.Fetch(GoTools.FetchOption{
// Method: "GET",
// URL: "https://ipv4bot.whatismyipaddress.com",
// })
// if err != nil {
// GoTools.Logger("ERROR", err.Error())
// }
// GoTools.Logger("INFO", fmt.Sprintf("%s-%s ip: %s", storyChunkId, storyURL, resMessage))
}
*/
}

return LambdaResponse{
OK: true,
Message: "Story consumer fetch parse ok",
}, nil
}
Loading

0 comments on commit d2b025e

Please sign in to comment.