Skip to content

Commit

Permalink
aws-s3 - create beat.Client for each SQS worker (#33658)
Browse files Browse the repository at this point in the history
To address mutex connection in the single beat.Client used for publishing S3
events, create a unique beat.Client for each worker goroutine that is processing
an SQS message. The beat.Client is used for all S3 objects contained within the
SQS message (they are processed serially). After all events are ACKed the beat.Client
is closed.
  • Loading branch information
andrewkroh authored and chrisberkhout committed Jun 1, 2023
1 parent a7deba9 commit 15c6de6
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 74 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Improve httpjson documentation for split processor. {pull}33473[33473]
- Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499]
- Cloud Foundry input uses server-side filtering when retrieving logs. {pull}33456[33456]
- Modified `aws-s3` input to reduce mutex contention when multiple SQS message are being processed concurrently. {pull}33658[33658]
- Disable "event normalization" processing for the aws-s3 input to reduce allocations. {pull}33673[33673]
- Add Common Expression Language input. {pull}31233[31233]


*Auditbeat*


Expand Down
41 changes: 21 additions & 20 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,21 +108,6 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}()
defer cancelInputCtx()

// Create client for publishing events and receive notification of their ACKs.
client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: inputContext.Cancelation,
ACKHandler: awscommon.NewEventACKHandler(),
Processing: beat.ProcessingConfig{
// This input only produces events with basic types so normalization
// is not required.
EventNormalization: boolPtr(false),
},
})
if err != nil {
return fmt.Errorf("failed to create pipeline client: %w", err)
}
defer client.Close()

if in.config.QueueURL != "" {
regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint)
if err != nil {
Expand All @@ -132,7 +117,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
in.awsConfig.Region = regionName

// Create SQS receiver and S3 notification processor.
receiver, err := in.createSQSReceiver(inputContext, client)
receiver, err := in.createSQSReceiver(inputContext, pipeline)
if err != nil {
return fmt.Errorf("failed to initialize sqs receiver: %w", err)
}
Expand All @@ -144,6 +129,21 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}

if in.config.BucketARN != "" || in.config.NonAWSBucketName != "" {
// Create client for publishing events and receive notification of their ACKs.
client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: inputContext.Cancelation,
ACKHandler: awscommon.NewEventACKHandler(),
Processing: beat.ProcessingConfig{
// This input only produces events with basic types so normalization
// is not required.
EventNormalization: boolPtr(false),
},
})
if err != nil {
return fmt.Errorf("failed to create pipeline client: %w", err)
}
defer client.Close()

// Create S3 receiver and S3 notification processor.
poller, err := in.createS3Lister(inputContext, ctx, client, persistentStore, states)
if err != nil {
Expand All @@ -159,7 +159,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
return nil
}

func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsReader, error) {
func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*sqsReader, error) {
sqsAPI := &awsSQSAPI{
client: sqs.NewFromConfig(in.awsConfig, func(o *sqs.Options) {
if in.config.AWSConfig.FIPSEnabled {
Expand Down Expand Up @@ -197,8 +197,8 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsRe
if err != nil {
return nil, err
}
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, fileSelectors)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, s3EventHandlerFactory)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, fileSelectors)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory)
sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler)

return sqsReader, nil
Expand Down Expand Up @@ -272,10 +272,11 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
if len(in.config.FileSelectors) == 0 {
fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}}
}
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, fileSelectors)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, fileSelectors)
s3Poller := newS3Poller(log.Named("s3_poller"),
metrics,
s3API,
client,
s3EventHandlerFactory,
states,
persistentStore,
Expand Down
49 changes: 36 additions & 13 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,37 @@ func (c constantS3) ListObjectsPaginator(bucket, prefix string) s3Pager {
return c.pagerConstant
}

var _ beat.Pipeline = (*fakePipeline)(nil)

// fakePipeline returns new ackClients.
type fakePipeline struct{}

func (c *fakePipeline) ConnectWith(clientConfig beat.ClientConfig) (beat.Client, error) {
return &ackClient{}, nil
}

func (c *fakePipeline) Connect() (beat.Client, error) {
panic("Connect() is not implemented.")
}

var _ beat.Client = (*ackClient)(nil)

// ackClient is a fake beat.Client that ACKs the published messages.
type ackClient struct{}

func (c *ackClient) Close() error { return nil }

func (c *ackClient) Publish(event beat.Event) {
// Fake the ACK handling.
event.Private.(*awscommon.EventACKTracker).ACK()
}

func (c *ackClient) PublishAll(event []beat.Event) {
for _, e := range event {
c.Publish(e)
}
}

func makeBenchmarkConfig(t testing.TB) config {
cfg := conf.MustNewConfigFrom(`---
queue_url: foo
Expand All @@ -171,21 +202,13 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR
metrics := newInputMetrics(metricRegistry, "test_id")
sqsAPI := newConstantSQS()
s3API := newConstantS3(t)
client := pubtest.NewChanClient(100)
defer close(client.Channel)
pipeline := &fakePipeline{}
conf := makeBenchmarkConfig(t)

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, s3EventHandlerFactory)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, conf.FileSelectors)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, pipeline, s3EventHandlerFactory)
sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, maxMessagesInflight, sqsMessageHandler)

go func() {
for event := range client.Channel {
// Fake the ACK handling that's not implemented in pubtest.
event.Private.(*awscommon.EventACKTracker).ACK()
}
}()

ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(cancel)

Expand Down Expand Up @@ -313,8 +336,8 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
return
}

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, config.FileSelectors)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", listPrefix, "region", "provider", numberOfWorkers, time.Second)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, config.FileSelectors)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, client, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", listPrefix, "region", "provider", numberOfWorkers, time.Second)

if err := s3Poller.Poll(ctx); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
Expand Down
5 changes: 3 additions & 2 deletions x-pack/filebeat/input/awss3/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/aws/smithy-go/middleware"

"github.com/elastic/beats/v7/libbeat/beat"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -28,7 +29,7 @@ import (
// Run 'go generate' to create mocks that are used in tests.
//go:generate go install github.com/golang/mock/[email protected]
//go:generate mockgen -source=interfaces.go -destination=mock_interfaces_test.go -package awss3 -mock_names=sqsAPI=MockSQSAPI,sqsProcessor=MockSQSProcessor,s3API=MockS3API,s3Pager=MockS3Pager,s3ObjectHandlerFactory=MockS3ObjectHandlerFactory,s3ObjectHandler=MockS3ObjectHandler
//go:generate mockgen -destination=mock_publisher_test.go -package=awss3 -mock_names=Client=MockBeatClient github.com/elastic/beats/v7/libbeat/beat Client
//go:generate mockgen -destination=mock_publisher_test.go -package=awss3 -mock_names=Client=MockBeatClient,Pipeline=MockBeatPipeline github.com/elastic/beats/v7/libbeat/beat Client,Pipeline

// ------
// SQS interfaces
Expand Down Expand Up @@ -88,7 +89,7 @@ type s3ObjectHandlerFactory interface {
// Create returns a new s3ObjectHandler that can be used to process the
// specified S3 object. If the handler is not configured to process the
// given S3 object (based on key name) then it will return nil.
Create(ctx context.Context, log *logp.Logger, acker *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler
Create(ctx context.Context, log *logp.Logger, client beat.Client, acker *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler
}

type s3ObjectHandler interface {
Expand Down
9 changes: 5 additions & 4 deletions x-pack/filebeat/input/awss3/mock_interfaces_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 54 additions & 1 deletion x-pack/filebeat/input/awss3/mock_publisher_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/gofrs/uuid"
"go.uber.org/multierr"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/statestore"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -53,6 +54,7 @@ type s3Poller struct {
s3 s3API
log *logp.Logger
metrics *inputMetrics
client beat.Client
s3ObjectHandler s3ObjectHandlerFactory
states *states
store *statestore.Store
Expand All @@ -63,6 +65,7 @@ type s3Poller struct {
func newS3Poller(log *logp.Logger,
metrics *inputMetrics,
s3 s3API,
client beat.Client,
s3ObjectHandler s3ObjectHandlerFactory,
states *states,
store *statestore.Store,
Expand All @@ -71,7 +74,8 @@ func newS3Poller(log *logp.Logger,
awsRegion string,
provider string,
numberOfWorkers int,
bucketPollInterval time.Duration) *s3Poller {
bucketPollInterval time.Duration,
) *s3Poller {
if metrics == nil {
metrics = newInputMetrics(monitoring.NewRegistry(), "")
}
Expand All @@ -86,6 +90,7 @@ func newS3Poller(log *logp.Logger,
s3: s3,
log: log,
metrics: metrics,
client: client,
s3ObjectHandler: s3ObjectHandler,
states: states,
store: store,
Expand Down Expand Up @@ -214,7 +219,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<-

acker := awscommon.NewEventACKTracker(ctx)

s3Processor := p.s3ObjectHandler.Create(ctx, p.log, acker, event)
s3Processor := p.s3ObjectHandler.Create(ctx, p.log, p.client, acker, event)
if s3Processor == nil {
p.log.Debugw("empty s3 processor.", "state", state)
continue
Expand Down
8 changes: 4 additions & 4 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ type s3ObjectProcessorFactory struct {
log *logp.Logger
metrics *inputMetrics
s3 s3Getter
publisher beat.Client
fileSelectors []fileSelectorConfig
}

func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3Getter, publisher beat.Client, sel []fileSelectorConfig) *s3ObjectProcessorFactory {
func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3Getter, sel []fileSelectorConfig) *s3ObjectProcessorFactory {
if metrics == nil {
metrics = newInputMetrics(monitoring.NewRegistry(), "")
}
Expand All @@ -59,7 +58,6 @@ func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3G
log: log,
metrics: metrics,
s3: s3,
publisher: publisher,
fileSelectors: sel,
}
}
Expand All @@ -75,7 +73,7 @@ func (f *s3ObjectProcessorFactory) findReaderConfig(key string) *readerConfig {

// Create returns a new s3ObjectProcessor. It returns nil when no file selectors
// match the S3 object key.
func (f *s3ObjectProcessorFactory) Create(ctx context.Context, log *logp.Logger, ack *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler {
func (f *s3ObjectProcessorFactory) Create(ctx context.Context, log *logp.Logger, client beat.Client, ack *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler {
log = log.With(
"bucket_arn", obj.S3.Bucket.Name,
"object_key", obj.S3.Object.Key)
Expand All @@ -90,6 +88,7 @@ func (f *s3ObjectProcessorFactory) Create(ctx context.Context, log *logp.Logger,
s3ObjectProcessorFactory: f,
log: log,
ctx: ctx,
publisher: client,
acker: ack,
readerConfig: readerConfig,
s3Obj: obj,
Expand All @@ -102,6 +101,7 @@ type s3ObjectProcessor struct {

log *logp.Logger
ctx context.Context
publisher beat.Client
acker *awscommon.EventACKTracker // ACKer tied to the SQS message (multiple S3 readers share an ACKer when the S3 notification event contains more than one S3 object).
readerConfig *readerConfig // Config about how to process the object.
s3Obj s3EventV2 // S3 object information.
Expand Down
Loading

0 comments on commit 15c6de6

Please sign in to comment.