Skip to content

Commit

Permalink
Merge pull request #29 from rivernews/fetch-stories
Browse files Browse the repository at this point in the history
Fetch stories (S3 event driven)
  • Loading branch information
rivernews authored Sep 18, 2022
2 parents a62c33d + ba830cd commit 9ad201f
Show file tree
Hide file tree
Showing 19 changed files with 396 additions and 156 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
**/builds/**

lambda_golang/landing
lambda_golang/story
lambda_golang/stories
lambda_golang/landing_metadata
lambda_golang/story
venv

# Binaries for programs and plugins
Expand Down
1 change: 1 addition & 0 deletions cloud_environments/terraform.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ if (
cd $GOLANG_SRC_DIR && \
go build ./cmd/landing && \
go build ./cmd/landing_metadata && \
go build ./cmd/stories && \
go build ./cmd/story && \
cd $PYTHON_SRC_DIR && python -m compileall layer src
); then
Expand Down
18 changes: 0 additions & 18 deletions cloud_module/api.tf
Original file line number Diff line number Diff line change
Expand Up @@ -93,29 +93,11 @@ module "slack_command_lambda" {
}
}

# allow lambda to invoke step function
attach_policy_json = true
policy_json = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"states:StartExecution"
],
"Resource": ["${module.batch_stories_sfn.state_machine_arn}"]
}
]
}
EOF

environment_variables = {
SLACK_SIGNING_SECRET = var.slack_signing_secret
SLACK_POST_WEBHOOK_URL = var.slack_post_webhook_url

PIPELINE_QUEUE_NAME = module.pipeline_queue.this_sqs_queue_name
BATCH_STORIES_SFN_ARN = module.batch_stories_sfn.state_machine_arn

LOGLEVEL = "DEBUG"
ENV = local.environment
Expand Down
2 changes: 1 addition & 1 deletion cloud_module/landing_s3_trigger.tf
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ resource "aws_s3_bucket_notification" "bucket_notification" {
lambda_function_arn = module.landing_parse_metadata_lambda.lambda_function_arn
events = ["s3:ObjectCreated:*"]
filter_prefix = "${local.newssite_economy_alias}/"
filter_suffix = ".html"
filter_suffix = "landing.html"
}

lambda_function {
Expand Down
48 changes: 34 additions & 14 deletions cloud_module/sfn_def/batch_stories_def.json
Original file line number Diff line number Diff line change
@@ -1,18 +1,38 @@
{
"Comment": "Batch processing stories for a landing page",
"StartAt": "BatchStoriesFetchParse",
"States": {
"BatchStoriesFetchParse":{
"Comment": "Fetch and parse all stories",
"Type":"Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters":{
"FunctionName": "${LANDING_PARSE_METADATA_LAMBDA_ARN}",
"Payload": {
"landingS3Key.$": "$.landingS3Key"
"Comment": "Batch processing stories for a landing page",
"StartAt": "Fetch-All",
"States": {
"Fetch-All": {
"Type": "Map",
"InputPath": "$",
"ItemsPath": "$.stories",
"MaxConcurrency": 28,
"ResultPath": "$.stories",
"Comment": "`Parameters` replaces `InputPath`, see https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-map-state.html ",
"Parameters": {
"story.$": "$$.Map.Item.Value",
"newsSiteAlias.$": "$.newsSiteAlias",
"landingPageTimeStamp.$": "$.landingPageTimeStamp"
},
"Iterator": {
"StartAt": "Fetch",
"States": {
"Fetch": {
"Type": "Task",
"Resource": "${FETCH_STORY_LAMBDA_ARN}",
"Retry" : [
{
"ErrorEquals": [ "States.Timeout", "Lambda.Unknown" ],
"IntervalSeconds": 3,
"MaxAttempts": 99,
"BackoffRate": 1.5
}
],
"End": true
}
},
"End":true
}
}
},
"End": true
}
}
}
60 changes: 58 additions & 2 deletions cloud_module/stories_sfn.tf
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module batch_stories_sfn {
name = "${local.project_name}-batch-stories-sfn"

definition = templatefile("${path.module}/sfn_def/batch_stories_def.json", {
LANDING_PARSE_METADATA_LAMBDA_ARN = module.landing_parse_metadata_lambda.lambda_function_arn
FETCH_STORY_LAMBDA_ARN = module.fetch_story_lambda.lambda_function_arn
})

# allow step function to invoke other service
Expand All @@ -14,7 +14,7 @@ module batch_stories_sfn {
service_integrations = {
lambda = {
lambda = [
module.landing_parse_metadata_lambda.lambda_function_arn
module.fetch_story_lambda.lambda_function_arn
]
}
}
Expand Down Expand Up @@ -91,3 +91,59 @@ module landing_parse_metadata_lambda {
Project = local.project_name
}
}

module fetch_story_lambda {
source = "terraform-aws-modules/lambda/aws"
create_function = true
function_name = "${local.project_name}-fetch-story"
description = "Fetch and archive a story page"
handler = "story"
runtime = "go1.x"

source_path = [{
path = "${path.module}/../lambda_golang/"
commands = ["${local.go_build_flags} go build ./cmd/story", ":zip"]
patterns = ["story"]
}]

timeout = 900
cloudwatch_logs_retention_in_days = 7

publish = true

attach_policy_statements = true
policy_statements = {
s3_archive_bucket = {
effect = "Allow",
actions = [
"s3:PutObject",
"s3:GetObject"
],
resources = [
"${data.aws_s3_bucket.archive.arn}/*",
]
}
# enable getting 404 instead of 403 in case of not found
# https://stackoverflow.com/a/19808954/9814131
s3_archive_bucket_check_404 = {
effect = "Allow",
actions = [
"s3:ListBucket",
],
resources = [
"${data.aws_s3_bucket.archive.arn}",
]
}
}

environment_variables = {
SLACK_WEBHOOK_URL = var.slack_post_webhook_url
LOG_LEVEL = "DEBUG"
DEBUG = "true"
S3_ARCHIVE_BUCKET = data.aws_s3_bucket.archive.id
}

tags = {
Project = local.project_name
}
}
28 changes: 23 additions & 5 deletions cloud_module/stories_sqs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ module "stories_queue_consumer_lambda" {
source = "terraform-aws-modules/lambda/aws"

create_function = true
function_name = "${local.project_name}-stories-queue-consumer-lambda"
description = "Consumes ${local.project_name} stories queue"
handler = "story"
function_name = "${local.project_name}-fetch-stories"
description = "Fetch ${local.project_name} stories; triggered by metadata.json creation"
handler = "stories"
runtime = "go1.x"
source_path = [{
path = "${path.module}/../lambda_golang/"
commands = ["${local.go_build_flags} go build ./cmd/story", ":zip"]
patterns = ["story"]
commands = ["${local.go_build_flags} go build ./cmd/stories", ":zip"]
patterns = ["stories"]
}]
publish = true

Expand All @@ -45,6 +45,23 @@ module "stories_queue_consumer_lambda" {

reserved_concurrent_executions = -1

# allow lambda to invoke step function
attach_policy_json = true
policy_json = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"states:StartExecution"
],
"Resource": ["${module.batch_stories_sfn.state_machine_arn}"]
}
]
}
EOF

# event source mapping for long polling
event_source_mapping = {
sqs = {
Expand Down Expand Up @@ -92,6 +109,7 @@ module "stories_queue_consumer_lambda" {
ENV = local.environment

S3_ARCHIVE_BUCKET = data.aws_s3_bucket.archive.id
SFN_ARN = module.batch_stories_sfn.state_machine_arn
}

tags = {
Expand Down
3 changes: 2 additions & 1 deletion lambda/src/slack_command_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def lambda_handler(request: APIGatewayRequest, context):
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/sqs.html#sending-messages
queue = sqs.get_queue_by_name(QueueName=PIPELINE_QUEUE_NAME)
res = queue.send_message(MessageBody=str(request.body), MessageGroupId=f'{PIPELINE_QUEUE_NAME}')

'''
elif command.startswith(SlackCommandMantra.FETCH_LANDING_STORIES):
text = request.body.get('text', '')
landing_html_key = text[0] if text and isinstance(text, list) else text
Expand All @@ -53,6 +53,7 @@ def lambda_handler(request: APIGatewayRequest, context):
})
)
loop.run_until_complete(SlackService.send(f'received landing page key: `{landing_html_key}`'))
'''

loop.run_until_complete(SlackService.send(f'You sent a slack command. Processed response: {res}'))

Expand Down
29 changes: 3 additions & 26 deletions lambda_golang/cmd/landing/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,9 @@ package main
import (
"context"
"fmt"
"io"
"net/http"
"strings"
"time"

"golang.org/x/net/html/charset"

"github.com/aws/aws-lambda-go/lambda"

"github.com/rivernews/GoTools"
Expand All @@ -33,35 +29,16 @@ type LambdaResponse struct {

func HandleRequest(ctx context.Context, name LambdaEvent) (LambdaResponse, error) {
newsSite := newssite.GetNewsSite("NEWSSITE_ECONOMY")
resp, err := http.Get(newsSite.LandingURL)
if err != nil {
// handle error
GoTools.Logger("ERROR", err.Error())
}
defer resp.Body.Close()

contentType := resp.Header.Get("Content-Type") // Optional, better guessing
GoTools.Logger("INFO", "ContentType is ", contentType)
utf8reader, err := charset.NewReader(resp.Body, contentType)
if err != nil {
GoTools.Logger("ERROR", err.Error())
}

body, err := io.ReadAll(utf8reader)
if err != nil {
// handle error
GoTools.Logger("ERROR", err.Error())
}
bodyText := string(body)

bodyText := newssite.Fetch(newsSite.LandingURL)
GoTools.Logger("INFO", "In golang runtime now!\n\n```\n "+bodyText[:500]+"\n ...```\n End of message")

// scraper
result := newssite.GetStoriesFromEconomy(bodyText)

// print out all story titles
var slackMessage strings.Builder
for i, topic := range result.Topics {
for i, topic := range result.Stories {
slackMessage.WriteString(topic.Name)
slackMessage.WriteString(" ")
slackMessage.WriteString(topic.Description)
Expand All @@ -76,7 +53,7 @@ func HandleRequest(ctx context.Context, name LambdaEvent) (LambdaResponse, error
}
GoTools.SendSlackMessage(slackMessage.String())

successMessage := fmt.Sprintf("Scraper finished - %d links found", len(result.Topics))
successMessage := fmt.Sprintf("Scraper finished - %d links found", len(result.Stories))
GoTools.Logger("INFO", successMessage)

// S3 archive
Expand Down
8 changes: 1 addition & 7 deletions lambda_golang/cmd/landing_metadata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ type LambdaResponse struct {
Message string `json:"message:"`
}

type LandingPageMetadata struct {
Stories []newssite.Topic `json:"stories"`
UntitledStories []newssite.Topic `json:"untitledstories"`
}

func HandleRequest(ctx context.Context, s3Event events.S3Event) (LambdaResponse, error) {
GoTools.Logger("INFO", "Landing page metadata.json generator launched")

Expand All @@ -53,8 +48,7 @@ func HandleRequest(ctx context.Context, s3Event events.S3Event) (LambdaResponse,
metadataS3Key := fmt.Sprintf("%s/metadata.json", strings.Join(metadataS3DirKeyTokens, "/"))

result := newssite.GetStoriesFromEconomy(landingPageHtmlText)

metadataJSONString := newssite.AsJson(LandingPageMetadata{Stories: result.Topics, UntitledStories: result.UntitledTopics})
metadataJSONString := GoTools.AsJson(result)

cloud.Archive(cloud.ArchiveArgs{
BodyText: metadataJSONString,
Expand Down
Loading

0 comments on commit 9ad201f

Please sign in to comment.