-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
Copy pathinput.go
67 lines (53 loc) · 1.86 KB
/
input.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package awss3
import (
"fmt"
"github.com/elastic/beats/v7/filebeat/beater"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/feature"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/go-concert/unison"
)
const inputName = "aws-s3"
func Plugin(store beater.StateStore) v2.Plugin {
return v2.Plugin{
Name: inputName,
Stability: feature.Stable,
Deprecated: false,
Info: "Collect logs from s3",
Manager: &s3InputManager{store: store},
}
}
type s3InputManager struct {
store beater.StateStore
}
func (im *s3InputManager) Init(grp unison.Group) error {
return nil
}
func (im *s3InputManager) Create(cfg *conf.C) (v2.Input, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig)
if err != nil {
return nil, fmt.Errorf("initializing AWS config: %w", err)
}
if config.RegionName != "" {
// The awsConfig now contains the region from the credential profile or default region
// if the region is explicitly set in the config, then it wins
awsConfig.Region = config.RegionName
}
if config.QueueURL != "" {
return newSQSReaderInput(config, awsConfig), nil
}
if config.BucketARN != "" || config.AccessPointARN != "" || config.NonAWSBucketName != "" {
return newS3PollerInput(config, awsConfig, im.store)
}
return nil, fmt.Errorf("configuration has no SQS queue URL and no S3 bucket ARN")
}
// boolPtr returns a pointer to b.
func boolPtr(b bool) *bool { return &b }