From 0482fc802c8955c6d4eda1075cd1ba9fa8db64bd Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Wed, 16 Jan 2019 11:02:05 -0500 Subject: [PATCH] Allow to deploy a Kinesis function using the CLI This commit add support to publish a function that listen to a Kinesis stream using the CLI. Features: - Kinesis can now defined in the YML. - AWS installer can now define a custom policies to be added to the lambda role. - Kinesis support `TRIM_HORIZON` and `LATEST` as the starting position strategy, `AT_TIMESTAMP` is currently not support because the cloudformation API doesn't accept a TIMESTAMP when configuring the subscription. - Kinesis allow to configure the batch size. --- x-pack/functionbeat/_meta/beat.reference.yml | 55 +++++- x-pack/functionbeat/_meta/beat.yml | 55 ++++++ .../functionbeat/docs/config-options.asciidoc | 15 ++ .../functionbeat/functionbeat.reference.yml | 55 +++++- x-pack/functionbeat/functionbeat.yml | 55 ++++++ .../functionbeat/provider/aws/cli_manager.go | 39 ++-- .../provider/aws/cloudwatch_logs.go | 5 + x-pack/functionbeat/provider/aws/kinesis.go | 165 +++++++++++++++- .../functionbeat/provider/aws/kinesis_test.go | 179 +++++++++++++++++- x-pack/functionbeat/provider/aws/sqs.go | 5 + 10 files changed, 604 insertions(+), 24 deletions(-) diff --git a/x-pack/functionbeat/_meta/beat.reference.yml b/x-pack/functionbeat/_meta/beat.reference.yml index bf0ef017d65..6a65ec6db8e 100644 --- a/x-pack/functionbeat/_meta/beat.reference.yml +++ b/x-pack/functionbeat/_meta/beat.reference.yml @@ -59,7 +59,7 @@ functionbeat.provider.aws.functions: type: sqs # Description of the method to help identify them when you run multiples functions. - description: "lambda function for sqs events" + description: "lambda function for SQS events" # Concurrency, is the reserved number of instances for that function. # Default is 5. @@ -80,8 +80,9 @@ functionbeat.provider.aws.functions: #fields: # env: staging - # List of cloudwatch log group registered to that function. + # List of SQS queues. triggers: + # Arn for the SQS queue. - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents # Define custom processors for this function. @@ -92,3 +93,53 @@ functionbeat.provider.aws.functions: # max_depth: 1 # target: "" # overwrite_keys: false + # + + # Create a function that accepts events from Kinesis streams. + - name: kinesis + enabled: false + type: sqs + + # Description of the method to help identify them when you run multiples functions. + description: "lambda function for Kinesis events" + + # Concurrency, is the reserved number of instances for that function. + # Default is 5. + # + # Note: There is a hard limit of 1000 functions of any kind per account. + #concurrency: 5 + + # The maximum memory allocated for this function, the configured size must be a factor of 64. + # There is a hard limit of 3008MiB for each function. Default is 128MiB. + #memory_size: 128MiB + + # Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue. + #dead_letter_config.target_arn: + + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + # Define custom processors for this function. + #processors: + # - decode_json_fields: + # fields: ["message"] + # process_array: false + # max_depth: 1 + # target: "" + # overwrite_keys: false + + # List of Kinesis streams. + triggers: + # Arn for the Kinesis stream. + - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents + + # batch_size is the number of events read in a batch. + # Default is 10. + #batch_size: 100 + + # Starting position is where to start reading events from the Kinesis stream. + # Default is trim_horizon. + #starting_position: "trim_horizon" diff --git a/x-pack/functionbeat/_meta/beat.yml b/x-pack/functionbeat/_meta/beat.yml index a0fde2438b9..d7bf1f68a27 100644 --- a/x-pack/functionbeat/_meta/beat.yml +++ b/x-pack/functionbeat/_meta/beat.yml @@ -81,6 +81,48 @@ functionbeat.provider.aws.functions: #fields: # env: staging + # List of SQS queues. + triggers: + # Arn for the SQS queue. + - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents + + # Define custom processors for this function. + #processors: + # - decode_json_fields: + # fields: ["message"] + # process_array: false + # max_depth: 1 + # target: "" + # overwrite_keys: false + # + + # Create a function that accepts events from Kinesis streams. + - name: kinesis + enabled: false + type: sqs + + # Description of the method to help identify them when you run multiples functions. + description: "lambda function for Kinesis events" + + # Concurrency, is the reserved number of instances for that function. + # Default is 5. + # + # Note: There is a hard limit of 1000 functions of any kind per account. + #concurrency: 5 + + # The maximum memory allocated for this function, the configured size must be a factor of 64. + # There is a hard limit of 3008MiB for each function. Default is 128MiB. + #memory_size: 128MiB + + # Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue. + #dead_letter_config.target_arn: + + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + # Define custom processors for this function. #processors: # - decode_json_fields: @@ -89,3 +131,16 @@ functionbeat.provider.aws.functions: # max_depth: 1 # target: "" # overwrite_keys: false + + # List of Kinesis streams. + triggers: + # Arn for the Kinesis stream. + - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents + + # batch_size is the number of events read in a batch. + # Default is 10. + #batch_size: 100 + + # Starting position is where to start reading events from the Kinesis stream. + # Default is trim_horizon. + #starting_position: "trim_horizon" diff --git a/x-pack/functionbeat/docs/config-options.asciidoc b/x-pack/functionbeat/docs/config-options.asciidoc index dcf38dc5266..84404a2f78b 100644 --- a/x-pack/functionbeat/docs/config-options.asciidoc +++ b/x-pack/functionbeat/docs/config-options.asciidoc @@ -78,6 +78,7 @@ are: `cloudwatch_logs`:: Collects events from CloudWatch logs. `sqs`:: Collects data from Amazon Simple Queue Service (SQS). +`kinesis`:: Collects data from a Kinesis stream. [float] [id="{beatname_lc}-description"] @@ -125,3 +126,17 @@ default is 128 MiB. The dead letter queue to use for messages that can't be processed successfully. Set this option to an ARN that points to an SQS queue. + +[float] +[id="{beatname_lc}-batch-size"] +==== `batch_size` + +The number of events to read from a Kinesis stream, the minimal values is 100 and the maximun is +10000. The default is 100. + +[float] +[id="{beatname_lc}-batch-size"] +==== `starting_position` + +The starting position to read from a Kinesis stream, valids values are `trim_horizon` and `latest`. +The default is trim_horizon. diff --git a/x-pack/functionbeat/functionbeat.reference.yml b/x-pack/functionbeat/functionbeat.reference.yml index de8bc6ee2fc..d422ec89fa6 100644 --- a/x-pack/functionbeat/functionbeat.reference.yml +++ b/x-pack/functionbeat/functionbeat.reference.yml @@ -59,7 +59,7 @@ functionbeat.provider.aws.functions: type: sqs # Description of the method to help identify them when you run multiples functions. - description: "lambda function for sqs events" + description: "lambda function for SQS events" # Concurrency, is the reserved number of instances for that function. # Default is 5. @@ -80,8 +80,9 @@ functionbeat.provider.aws.functions: #fields: # env: staging - # List of cloudwatch log group registered to that function. + # List of SQS queues. triggers: + # Arn for the SQS queue. - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents # Define custom processors for this function. @@ -92,6 +93,56 @@ functionbeat.provider.aws.functions: # max_depth: 1 # target: "" # overwrite_keys: false + # + + # Create a function that accepts events from Kinesis streams. + - name: kinesis + enabled: false + type: sqs + + # Description of the method to help identify them when you run multiples functions. + description: "lambda function for Kinesis events" + + # Concurrency, is the reserved number of instances for that function. + # Default is 5. + # + # Note: There is a hard limit of 1000 functions of any kind per account. + #concurrency: 5 + + # The maximum memory allocated for this function, the configured size must be a factor of 64. + # There is a hard limit of 3008MiB for each function. Default is 128MiB. + #memory_size: 128MiB + + # Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue. + #dead_letter_config.target_arn: + + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + # Define custom processors for this function. + #processors: + # - decode_json_fields: + # fields: ["message"] + # process_array: false + # max_depth: 1 + # target: "" + # overwrite_keys: false + + # List of Kinesis streams. + triggers: + # Arn for the Kinesis stream. + - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents + + # batch_size is the number of events read in a batch. + # Default is 10. + #batch_size: 100 + + # Starting position is where to start reading events from the Kinesis stream. + # Default is trim_horizon. + #starting_position: "trim_horizon" #================================ General ====================================== diff --git a/x-pack/functionbeat/functionbeat.yml b/x-pack/functionbeat/functionbeat.yml index d595636e225..f9e04b2d3fa 100644 --- a/x-pack/functionbeat/functionbeat.yml +++ b/x-pack/functionbeat/functionbeat.yml @@ -81,6 +81,48 @@ functionbeat.provider.aws.functions: #fields: # env: staging + # List of SQS queues. + triggers: + # Arn for the SQS queue. + - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents + + # Define custom processors for this function. + #processors: + # - decode_json_fields: + # fields: ["message"] + # process_array: false + # max_depth: 1 + # target: "" + # overwrite_keys: false + # + + # Create a function that accepts events from Kinesis streams. + - name: kinesis + enabled: false + type: sqs + + # Description of the method to help identify them when you run multiples functions. + description: "lambda function for Kinesis events" + + # Concurrency, is the reserved number of instances for that function. + # Default is 5. + # + # Note: There is a hard limit of 1000 functions of any kind per account. + #concurrency: 5 + + # The maximum memory allocated for this function, the configured size must be a factor of 64. + # There is a hard limit of 3008MiB for each function. Default is 128MiB. + #memory_size: 128MiB + + # Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue. + #dead_letter_config.target_arn: + + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + # Define custom processors for this function. #processors: # - decode_json_fields: @@ -90,6 +132,19 @@ functionbeat.provider.aws.functions: # target: "" # overwrite_keys: false + # List of Kinesis streams. + triggers: + # Arn for the Kinesis stream. + - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents + + # batch_size is the number of events read in a batch. + # Default is 10. + #batch_size: 100 + + # Starting position is where to start reading events from the Kinesis stream. + # Default is trim_horizon. + #starting_position: "trim_horizon" + #================================ General ===================================== # The name of the shipper that publishes the network data. It can be used to group diff --git a/x-pack/functionbeat/provider/aws/cli_manager.go b/x-pack/functionbeat/provider/aws/cli_manager.go index 8ae4d622868..acac7701547 100644 --- a/x-pack/functionbeat/provider/aws/cli_manager.go +++ b/x-pack/functionbeat/provider/aws/cli_manager.go @@ -40,6 +40,7 @@ type AWSLambdaFunction struct { } type installer interface { + Policies() []cloudformation.AWSIAMRole_Policy Template() *cloudformation.Template LambdaConfig() *lambdaConfig } @@ -84,6 +85,27 @@ func (c *CLIManager) template(function installer, name, codeLoc string) *cloudfo // Documentation: https://docs.aws.amazon.com/AWSCloudFormation/latest/APIReference/Welcome.html // Intrinsic function reference: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/intrinsic-function-reference.html + // Default policies to writes logs from the Lambda. + policies := []cloudformation.AWSIAMRole_Policy{ + cloudformation.AWSIAMRole_Policy{ + PolicyName: cloudformation.Join("-", []string{"fnb", "lambda", name}), + PolicyDocument: map[string]interface{}{ + "Statement": []map[string]interface{}{ + map[string]interface{}{ + "Action": []string{"logs:CreateLogStream", "Logs:PutLogEvents"}, + "Effect": "Allow", + "Resource": []string{ + cloudformation.Sub("arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/" + name + ":*"), + }, + }, + }, + }, + }, + } + + // Merge any specific policies from the service. + policies = append(policies, function.Policies()...) + // Create the roles for the lambda. template := cloudformation.NewTemplate() // doc: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-iam-role.html @@ -106,22 +128,7 @@ func (c *CLIManager) template(function installer, name, codeLoc string) *cloudfo RoleName: "functionbeat-lambda-" + name, // Allow the lambda to write log to cloudwatch logs. // doc: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-iam-policy.html - Policies: []cloudformation.AWSIAMRole_Policy{ - cloudformation.AWSIAMRole_Policy{ - PolicyName: cloudformation.Join("-", []string{"fnb", "lambda", name}), - PolicyDocument: map[string]interface{}{ - "Statement": []map[string]interface{}{ - map[string]interface{}{ - "Action": []string{"logs:CreateLogStream", "Logs:PutLogEvents"}, - "Effect": "Allow", - "Resource": []string{ - cloudformation.Sub("arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/" + name + ":*"), - }, - }, - }, - }, - }, - }, + Policies: policies, } // Configure the Dead letter, any failed events will be send to the configured amazon resource name. diff --git a/x-pack/functionbeat/provider/aws/cloudwatch_logs.go b/x-pack/functionbeat/provider/aws/cloudwatch_logs.go index ded21d6f7d9..ffd95b1dca7 100644 --- a/x-pack/functionbeat/provider/aws/cloudwatch_logs.go +++ b/x-pack/functionbeat/provider/aws/cloudwatch_logs.go @@ -210,3 +210,8 @@ func (c *CloudwatchLogs) Template() *cloudformation.Template { func (c *CloudwatchLogs) LambdaConfig() *lambdaConfig { return c.config.LambdaConfig } + +// Policies returns a slice of policy to add to the lambda. +func (c *CloudwatchLogs) Policies() []cloudformation.AWSIAMRole_Policy { + return []cloudformation.AWSIAMRole_Policy{} +} diff --git a/x-pack/functionbeat/provider/aws/kinesis.go b/x-pack/functionbeat/provider/aws/kinesis.go index 6c88b26ff26..b53d9449dac 100644 --- a/x-pack/functionbeat/provider/aws/kinesis.go +++ b/x-pack/functionbeat/provider/aws/kinesis.go @@ -6,9 +6,14 @@ package aws import ( "context" + "errors" + "fmt" + "sort" + "strings" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" + "github.com/awslabs/goformation/cloudformation" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -17,14 +22,109 @@ import ( "github.com/elastic/beats/x-pack/functionbeat/provider/aws/transformer" ) +type startingPosition uint + +const ( + // Looking at the documentation, Kinesis should also support `AT_TIMESTAMP` but looking at the + // request format for cloudformation, I don't see a way to define the timestamp. + // I've looked at other frameworks, and it seems a bug in the cloudformation API. + // doc: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html + trimHorizonPos startingPosition = iota + 1 + latestPos +) + +var ( + mapStartingPosition = map[string]startingPosition{ + "trim_horizon": trimHorizonPos, + "latest": latestPos, + } + + mapStartingPositionReverse = make(map[startingPosition]string, len(mapStartingPosition)) +) + +func init() { + for k, v := range mapStartingPosition { + mapStartingPositionReverse[v] = strings.ToUpper(k) + } +} + +func (s *startingPosition) Unpack(str string) error { + v, ok := mapStartingPosition[str] + if !ok { + validValues := make([]string, len(mapStartingPosition)) + pos := 0 + for k := range mapStartingPosition { + validValues[pos] = k + pos++ + } + return fmt.Errorf("unknown value %s, valid values are: %s", str, strings.Join(validValues, ", ")) + } + *s = v + return nil +} + +func (s *startingPosition) String() string { + v, ok := mapStartingPositionReverse[*s] + if !ok { + panic("unknown starting position: " + string(*s)) + } + return v +} + +// KinesisConfig is the configuration for the Kinesis event type. +type KinesisConfig struct { + Description string `config:"description"` + Name string `config:"name" validate:"nonzero,required"` + Triggers []*KinesisTriggerConfig `config:"triggers"` + LambdaConfig *lambdaConfig `config:",inline"` +} + +// Validate validates the configuration. +func (cfg *KinesisConfig) Validate() error { + if len(cfg.Triggers) == 0 { + return errors.New("you need to specify at least one trigger") + } + return nil +} + +// KinesisTriggerConfig configuration for the current trigger. +type KinesisTriggerConfig struct { + EventSourceArn string + BatchSize int + StartingPosition startingPosition +} + +// Unpack unpacks the trigger and make sure the defaults settings are correctly sets. +func (c *KinesisTriggerConfig) Unpack(cfg *common.Config) error { + // Create an anonymous struct so we don't go into a recursive unpack. + config := struct { + EventSourceArn string `config:"event_source_arn" validate:"required"` + BatchSize int `config:"batch_size" validate:"min=100,max=10000"` + StartingPosition startingPosition `config:"starting_position"` + }{ + BatchSize: 100, + StartingPosition: trimHorizonPos, + } + if err := cfg.Unpack(&config); err != nil { + return err + } + *c = KinesisTriggerConfig(config) + return nil +} + // Kinesis receives events from a kinesis stream and forward them to elasticsearch. type Kinesis struct { - log *logp.Logger + log *logp.Logger + config *KinesisConfig } // NewKinesis creates a new function to receives events from a kinesis stream. -func NewKinesis(provider provider.Provider, config *common.Config) (provider.Function, error) { - return &Kinesis{log: logp.NewLogger("kinesis")}, nil +func NewKinesis(provider provider.Provider, cfg *common.Config) (provider.Function, error) { + config := &KinesisConfig{LambdaConfig: DefaultLambdaConfig} + if err := cfg.Unpack(config); err != nil { + return nil, err + } + return &Kinesis{log: logp.NewLogger("kinesis"), config: config}, nil } // Run starts the lambda function and wait for web triggers. @@ -51,3 +151,62 @@ func (k *Kinesis) createHandler(client core.Client) func(request events.KinesisE func (k *Kinesis) Name() string { return "kinesis" } + +// LambdaConfig returns the configuration to use when creating the lambda. +func (k *Kinesis) LambdaConfig() *lambdaConfig { + return k.config.LambdaConfig +} + +// Template returns the cloudformation template for configuring the service with the specified +// triggers. +func (k *Kinesis) Template() *cloudformation.Template { + template := cloudformation.NewTemplate() + prefix := func(suffix string) string { + return normalizeResourceName("fnb" + k.config.Name + k.Name() + suffix) + } + + for _, trigger := range k.config.Triggers { + resourceName := prefix(trigger.EventSourceArn) + template.Resources[resourceName] = &cloudformation.AWSLambdaEventSourceMapping{ + BatchSize: batchSize, + EventSourceArn: trigger.EventSourceArn, + FunctionName: cloudformation.GetAtt("fnb"+k.config.Name, "Arn"), + StartingPosition: trigger.StartingPosition.String(), + } + } + + return template +} + +// Policies returns a slice of policy to add to the lambda role. +func (k *Kinesis) Policies() []cloudformation.AWSIAMRole_Policy { + resources := make([]string, len(k.config.Triggers)) + for idx, trigger := range k.config.Triggers { + resources[idx] = trigger.EventSourceArn + } + + // Give us a chance to generate the same document indenpendant of the changes, + // to help with updates. + sort.Strings(resources) + + policies := []cloudformation.AWSIAMRole_Policy{ + cloudformation.AWSIAMRole_Policy{ + PolicyName: cloudformation.Join("-", []string{"fnb", "kinesis", k.config.Name}), + PolicyDocument: map[string]interface{}{ + "Statement": []map[string]interface{}{ + map[string]interface{}{ + "Action": []string{ + "kinesis:GetRecords", + "kinesis:GetShardIterator", + "Kinesis:DescribeStream", + }, + "Effect": "Allow", + "Resource": resources, + }, + }, + }, + }, + } + + return policies +} diff --git a/x-pack/functionbeat/provider/aws/kinesis_test.go b/x-pack/functionbeat/provider/aws/kinesis_test.go index 33c6aba9183..0ae41cf851b 100644 --- a/x-pack/functionbeat/provider/aws/kinesis_test.go +++ b/x-pack/functionbeat/provider/aws/kinesis_test.go @@ -6,9 +6,11 @@ package aws import ( "errors" + "fmt" "testing" "github.com/aws/aws-lambda-go/events" + "github.com/awslabs/goformation/cloudformation" "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/common" @@ -18,9 +20,14 @@ import ( func TestKinesis(t *testing.T) { cfg := common.MustNewConfigFrom(map[string]interface{}{ "name": "foobar", + "triggers": []map[string]interface{}{ + map[string]interface{}{ + "event_source_arn": "abc123", + }, + }, }) - t.Run("when publish is succesful", func(t *testing.T) { + t.Run("when publish is successful", func(t *testing.T) { client := &arrayBackedClient{} k, err := NewKinesis(&provider.DefaultProvider{}, cfg) if !assert.NoError(t, err) { @@ -47,6 +54,10 @@ func TestKinesis(t *testing.T) { err = handler(generateKinesisEvent()) assert.Equal(t, e, err) }) + + t.Run("test config validation", testKinesisConfig) + t.Run("test starting position", testStartingPosition) + t.Run("test permissions for event_source_arn", testPolicies) } func generateKinesisEvent() events.KinesisEvent { @@ -68,3 +79,169 @@ func generateKinesisEvent() events.KinesisEvent { }, } } + +func testKinesisConfig(t *testing.T) { + tests := map[string]struct { + valid bool + rawConfig map[string]interface{} + expected *KinesisConfig + }{ + "minimal valid configuration": { + valid: true, + rawConfig: map[string]interface{}{ + "name": "mysuperfunctionname", + "description": "mylong description", + "triggers": []map[string]interface{}{ + map[string]interface{}{ + "event_source_arn": "mycustomarn", + }, + }, + }, + }, + "missing event triggers": { + valid: false, + rawConfig: map[string]interface{}{ + "name": "mysuperfunctionname", + "description": "mylong description", + }, + }, + "empty or missing event source arn": { + valid: false, + rawConfig: map[string]interface{}{ + "name": "mysuperfunctionname", + "description": "mylong description", + "triggers": []map[string]interface{}{ + map[string]interface{}{ + "event_source_arn": "", + }, + }, + }, + }, + "test upper bound batch size limit": { + valid: false, + rawConfig: map[string]interface{}{ + "name": "mysuperfunctionname", + "description": "mylong description", + "triggers": []map[string]interface{}{ + map[string]interface{}{ + "event_source_arn": "abc123", + "batch_size": 20000, + }, + }, + }, + }, + "test lower bound batch size limit": { + valid: false, + rawConfig: map[string]interface{}{ + "name": "mysuperfunctionname", + "description": "mylong description", + "triggers": []map[string]interface{}{ + map[string]interface{}{ + "event_source_arn": "abc123", + "batch_size": 10, + }, + }, + }, + }, + "test default values": { + valid: true, + rawConfig: map[string]interface{}{ + "name": "mysuperfunctionname", + "description": "mylongdescription", + "triggers": []map[string]interface{}{ + map[string]interface{}{ + "event_source_arn": "abc123", + }, + }, + }, + expected: &KinesisConfig{ + Name: "mysuperfunctionname", + Description: "mylongdescription", + LambdaConfig: DefaultLambdaConfig, + Triggers: []*KinesisTriggerConfig{ + &KinesisTriggerConfig{ + EventSourceArn: "abc123", + BatchSize: 100, + StartingPosition: latestPos, + }, + }, + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + cfg := common.MustNewConfigFrom(test.rawConfig) + config := &KinesisConfig{LambdaConfig: DefaultLambdaConfig} + err := cfg.Unpack(config) + if !assert.Equal(t, test.valid, err == nil, fmt.Sprintf("error: %+v", err)) { + return + } + + if test.expected != nil { + assert.Equal(t, test.expected, config) + } + }) + } +} + +func testStartingPosition(t *testing.T) { + // NOTE(ph) when adding support for at_timestamp we also need to make sure the cloudformation + // template send the timestamp. + // doc: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html + t.Run("AT_TIMESTAMP is not supported yet", func(t *testing.T) { + var s startingPosition + err := s.Unpack("at_timestamp") + assert.Error(t, err) + }) +} + +func testPolicies(t *testing.T) { + cfg := common.MustNewConfigFrom(map[string]interface{}{ + "name": "myfunction", + "description": "mydescription", + "triggers": []map[string]interface{}{ + map[string]interface{}{ + "event_source_arn": "abc456", + }, + map[string]interface{}{ + "event_source_arn": "abc1234", + }, + }, + }) + + k, err := NewKinesis(&provider.DefaultProvider{}, cfg) + if !assert.NoError(t, err) { + return + } + + i, ok := k.(installer) + if !assert.True(t, ok) { + return + } + + policies := i.Policies() + if !assert.Equal(t, 1, len(policies)) { + return + } + + // ensure permissions on specified resources + expected := cloudformation.AWSIAMRole_Policy{ + PolicyName: cloudformation.Join("-", []string{"fnb", "kinesis", "myfunction"}), + PolicyDocument: map[string]interface{}{ + "Statement": []map[string]interface{}{ + map[string]interface{}{ + "Action": []string{ + "kinesis:GetRecords", + "kinesis:GetShardIterator", + "Kinesis:DescribeStream", + }, + "Effect": "Allow", + "Resource": []string{"abc1234", "abc456"}, + }, + }, + }, + } + + assert.Equal(t, expected, policies[0]) +} diff --git a/x-pack/functionbeat/provider/aws/sqs.go b/x-pack/functionbeat/provider/aws/sqs.go index 5b377fdd918..49dbd1c0fa3 100644 --- a/x-pack/functionbeat/provider/aws/sqs.go +++ b/x-pack/functionbeat/provider/aws/sqs.go @@ -105,3 +105,8 @@ func (s *SQS) Template() *cloudformation.Template { func (s *SQS) LambdaConfig() *lambdaConfig { return s.config.LambdaConfig } + +// Policies returns a slice of policy to add to the lambda. +func (s *SQS) Policies() []cloudformation.AWSIAMRole_Policy { + return []cloudformation.AWSIAMRole_Policy{} +}