Skip to content

Commit

Permalink
[filebeat][azure-blob-storage] - Fixed concurrency & flakey tests iss…
Browse files Browse the repository at this point in the history
…ue (elastic#36124)

## Type of change
- Bug

## What does this PR do?
This PR fixes the concurrency issues present in the azure blob storage
input and the flakey tests issue.

## Why is it important?
Concurrent ops were failing at scale and this fix addresses that issue. 

## Checklist

- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
~~- [ ] I have made corresponding changes to the documentation~~
~~- [ ] I have made corresponding change to the default configuration
files~~
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have added an entry in `CHANGELOG.next.asciidoc` or
`CHANGELOG-developer.next.asciidoc`.

## Author's Checklist

<!-- Recommended
Add a checklist of things that are required to be reviewed in order to
have the PR approved
-->
- [ ]

## How to test this PR locally

<!-- Recommended
Explain here how this PR will be tested by the reviewer: commands,
dependencies, steps, etc.
-->

## Related issues

- Relates elastic#35983


## Use cases

<!-- Recommended
Explain here the different behaviors that this PR introduces or modifies
in this project, user roles, environment configuration, etc.

If you are familiar with Gherkin test scenarios, we recommend its usage:
https://cucumber.io/docs/gherkin/reference/
-->

## Screenshots

<!-- Optional
Add here screenshots about how the project will be changed after the PR
is applied. They could be related to web pages, terminal, etc, or any
other image you consider important to be shared with the team.
-->

## Logs

<!-- Recommended
Paste here output logs discovered while creating this PR, such as stack
traces or integration logs, or any other output you consider important
to be shared with the team.
-->
  • Loading branch information
ShourieG authored and Scholar-Li committed Feb 5, 2024
1 parent 098b8fe commit 4f22bb8
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 62 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix handling of NUL-terminated log lines in Fortinet Firewall module. {issue}36026[36026] {pull}36027[36027]
- Make redact field configuration recommended in CEL input and log warning if missing. {pull}36008[36008]
- Fix handling of region name configuration in awss3 input {pull}36034[36034]
- Fixed concurrency and flakey tests issue in azure blob storage input. {issue}35983[35983] {pull}36124[36124]
- Fix panic when sqs input metrics getter is invoked {pull}36101[36101] {issue}36077[36077]
- Make CEL input's `now` global variable static for evaluation lifetime. {pull}36107[36107]
- Update mito CEL extension library to v1.5.0. {pull}36146[36146]
Expand Down
16 changes: 10 additions & 6 deletions x-pack/filebeat/input/azureblobstorage/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"

"github.com/elastic/beats/v7/libbeat/feature"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -48,6 +49,7 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) {
return nil, nil, err
}

//nolint:prealloc // No need to preallocate the slice here
var sources []cursor.Source
for _, c := range config.Containers {
container := tryOverrideOrDefault(config, c)
Expand Down Expand Up @@ -111,20 +113,22 @@ func (input *azurebsInput) Test(src cursor.Source, ctx v2.TestContext) error {
}

func (input *azurebsInput) Run(inputCtx v2.Context, src cursor.Source, cursor cursor.Cursor, publisher cursor.Publisher) error {
currentSource := src.(*Source)

log := inputCtx.Logger.With("account_name", currentSource.AccountName).With("container_name", currentSource.ContainerName)
log.Infof("Running azure blob storage for account: %s", input.config.AccountName)

var cp *Checkpoint
st := newState()
if !cursor.IsNew() {
if err := cursor.Unpack(&cp); err != nil {
return err
}

st.setCheckpoint(cp)
}
return input.run(inputCtx, src, st, publisher)
}

func (input *azurebsInput) run(inputCtx v2.Context, src cursor.Source, st *state, publisher cursor.Publisher) error {
currentSource := src.(*Source)

log := inputCtx.Logger.With("account_name", currentSource.AccountName).With("container_name", currentSource.ContainerName)
log.Infof("Running azure blob storage for account: %s", input.config.AccountName)

ctx, cancel := context.WithCancel(context.Background())
go func() {
Expand Down
183 changes: 140 additions & 43 deletions x-pack/filebeat/input/azureblobstorage/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@ package azureblobstorage

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/beat"
beattest "github.com/elastic/beats/v7/libbeat/publisher/testing"
"github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/mock"
conf "github.com/elastic/elastic-agent-libs/config"
Expand All @@ -33,14 +38,12 @@ const (
)

func Test_StorageClient(t *testing.T) {
t.Skip("Flaky test: issue - https://github.com/elastic/beats/issues/34332")
tests := []struct {
name string
baseConfig map[string]interface{}
mockHandler func() http.Handler
expected map[string]bool
isError error
unexpectedError error
name string
baseConfig map[string]interface{}
mockHandler func() http.Handler
expected map[string]bool
expectedError error
}{
{
name: "SingleContainerWithPoll_NoErr",
Expand All @@ -62,7 +65,6 @@ func Test_StorageClient(t *testing.T) {
mock.Beatscontainer_blob_data3_json: true,
mock.Beatscontainer_blob_docs_ata_json: true,
},
unexpectedError: context.Canceled,
},
{
name: "SingleContainerWithoutPoll_NoErr",
Expand All @@ -84,7 +86,6 @@ func Test_StorageClient(t *testing.T) {
mock.Beatscontainer_blob_data3_json: true,
mock.Beatscontainer_blob_docs_ata_json: true,
},
unexpectedError: nil,
},
{
name: "TwoContainersWithPoll_NoErr",
Expand All @@ -111,7 +112,6 @@ func Test_StorageClient(t *testing.T) {
mock.Beatscontainer_2_blob_ata_json: true,
mock.Beatscontainer_2_blob_data3_json: true,
},
unexpectedError: context.Canceled,
},
{
name: "TwoContainersWithoutPoll_NoErr",
Expand All @@ -138,7 +138,6 @@ func Test_StorageClient(t *testing.T) {
mock.Beatscontainer_2_blob_ata_json: true,
mock.Beatscontainer_2_blob_data3_json: true,
},
unexpectedError: context.Canceled,
},
{
name: "SingleContainerPoll_InvalidContainerErr",
Expand All @@ -154,10 +153,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.AzureStorageServer,
expected: map[string]bool{},
isError: mock.NotFoundErr,
unexpectedError: nil,
mockHandler: mock.AzureStorageServer,
expected: map[string]bool{},
expectedError: mock.NotFoundErr,
},
{
name: "SingleContainerWithoutPoll_InvalidBucketErr",
Expand All @@ -173,10 +171,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.AzureStorageServer,
expected: map[string]bool{},
isError: mock.NotFoundErr,
unexpectedError: nil,
mockHandler: mock.AzureStorageServer,
expected: map[string]bool{},
expectedError: mock.NotFoundErr,
},
{
name: "TwoContainersWithPoll_InvalidBucketErr",
Expand All @@ -195,10 +192,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.AzureStorageServer,
expected: map[string]bool{},
isError: mock.NotFoundErr,
unexpectedError: nil,
mockHandler: mock.AzureStorageServer,
expected: map[string]bool{},
expectedError: mock.NotFoundErr,
},
{
name: "SingleBucketWithPoll_InvalidConfigValue",
Expand All @@ -214,10 +210,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.AzureStorageServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
unexpectedError: nil,
mockHandler: mock.AzureStorageServer,
expected: map[string]bool{},
expectedError: errors.New("requires value <= 5000 accessing 'max_workers'"),
},
{
name: "TwoBucketWithPoll_InvalidConfigValue",
Expand All @@ -236,10 +231,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.AzureStorageServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
unexpectedError: nil,
mockHandler: mock.AzureStorageServer,
expected: map[string]bool{},
expectedError: errors.New("requires value <= 5000 accessing 'max_workers'"),
},
{
name: "ReadJSON",
Expand All @@ -261,7 +255,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesContainer_log_json[1]: true,
mock.BeatsFilesContainer_log_json[2]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadOctetStreamJSON",
Expand All @@ -282,7 +275,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesContainer_multiline_json[0]: true,
mock.BeatsFilesContainer_multiline_json[1]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadNdJSON",
Expand All @@ -303,7 +295,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesContainer_log_ndjson[0]: true,
mock.BeatsFilesContainer_log_ndjson[1]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadMultilineGzJSON",
Expand All @@ -324,7 +315,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesContainer_multiline_json_gz[0]: true,
mock.BeatsFilesContainer_multiline_json_gz[1]: true,
},
unexpectedError: context.Canceled,
},
}
for _, tt := range tests {
Expand All @@ -336,7 +326,7 @@ func Test_StorageClient(t *testing.T) {
conf := config{}
err := cfg.Unpack(&conf)
if err != nil {
assert.EqualError(t, err, tt.isError.Error())
assert.EqualError(t, err, tt.expectedError.Error())
return
}
input := newStatelessInput(conf, serv.URL+"/")
Expand All @@ -349,6 +339,7 @@ func Test_StorageClient(t *testing.T) {

ctx, cancel := newV2Context()
t.Cleanup(cancel)
ctx.ID += tt.name

var g errgroup.Group
g.Go(func() error {
Expand All @@ -364,14 +355,14 @@ func Test_StorageClient(t *testing.T) {
t.Cleanup(func() { timeout.Stop() })

if len(tt.expected) == 0 {
if tt.isError != nil && g.Wait() != nil {
if tt.expectedError != nil && g.Wait() != nil {
//nolint:errorlint // This will never be a wrapped error
if tt.isError == mock.NotFoundErr {
if tt.expectedError == mock.NotFoundErr {
arr := strings.Split(g.Wait().Error(), "\n")
errStr := strings.Join(arr[1:], "\n")
assert.Equal(t, tt.isError.Error(), errStr)
assert.Equal(t, tt.expectedError.Error(), errStr)
} else {
assert.EqualError(t, g.Wait(), tt.isError.Error())
assert.EqualError(t, g.Wait(), tt.expectedError.Error())
}
cancel()
} else {
Expand All @@ -395,24 +386,130 @@ func Test_StorageClient(t *testing.T) {
val, err = got.Fields.GetValue("message")
assert.NoError(t, err)
assert.True(t, tt.expected[val.(string)])
assert.Equal(t, tt.isError, err)
assert.Equal(t, tt.expectedError, err)
receivedCount += 1
if receivedCount == len(tt.expected) {
cancel()
break wait
}
}
}
assert.ErrorIs(t, g.Wait(), tt.unexpectedError)
})
}
}

func Test_Concurrency(t *testing.T) {
for _, workers := range []int{100, 1000, 2000, 3000} {
t.Run(fmt.Sprintf("TestConcurrency_%d_Workers", workers), func(t *testing.T) {
const expectedLen = mock.TotalRandomDataSets
serv := httptest.NewServer(mock.AzureConcurrencyServer())
t.Cleanup(serv.Close)

cfg := conf.MustNewConfigFrom(map[string]interface{}{
"account_name": "beatsblobnew",
"auth.shared_credentials.account_key": "7pfLm1betGiRyyABEM/RFrLYlafLZHbLtGhB52LkWVeBxE7la9mIvk6YYAbQKYE/f0GdhiaOZeV8+AStsAdr/Q==",
"max_workers": workers,
"poll": true,
"poll_interval": "10s",
"containers": []map[string]interface{}{
{
"name": mock.ConcurrencyContainer,
},
},
})
conf := config{}
err := cfg.Unpack(&conf)
assert.NoError(t, err)
input := azurebsInput{
config: conf,
serviceURL: serv.URL + "/",
}
name := input.Name()
if name != "azure-blob-storage" {
t.Errorf(`unexpected input name: got:%q want:"azure-blob-storage"`, name)
}

var src cursor.Source
// This test will always have only one container
for _, c := range input.config.Containers {
container := tryOverrideOrDefault(input.config, c)
src = &Source{
AccountName: input.config.AccountName,
ContainerName: c.Name,
MaxWorkers: *container.MaxWorkers,
Poll: *container.Poll,
PollInterval: *container.PollInterval,
}
}
v2Ctx, cancel := newV2Context()
t.Cleanup(cancel)
v2Ctx.ID += t.Name()
client := publisher{
stop: func(e []beat.Event) {
if len(e) >= expectedLen {
cancel()
}
},
}
st := newState()
var g errgroup.Group
g.Go(func() error {
return input.run(v2Ctx, src, st, &client)
})
timeout := time.NewTimer(100 * time.Second)
t.Cleanup(func() { timeout.Stop() })
select {
case <-timeout.C:
t.Errorf("timed out waiting for %d events", expectedLen)
cancel()
case <-v2Ctx.Cancelation.Done():
}
//nolint:errcheck // We can ignore as the error will always be context canceled, which is expected in this case
g.Wait()
if len(client.events) < expectedLen {
t.Errorf("failed to get all events: got:%d want:%d", len(client.events), expectedLen)
}
})
}
}

type publisher struct {
stop func([]beat.Event)
events []beat.Event
mu sync.Mutex
cursors []map[string]interface{}
}

func (p *publisher) Publish(e beat.Event, cursor interface{}) error {
p.mu.Lock()
p.events = append(p.events, e)
if cursor != nil {
var c map[string]interface{}
chkpt, ok := cursor.(*Checkpoint)
if !ok {
return fmt.Errorf("invalid cursor type for testing: %T", cursor)
}
cursorBytes, err := json.Marshal(chkpt)
if err != nil {
return fmt.Errorf("error marshaling cursor data: %w", err)
}
err = json.Unmarshal(cursorBytes, &c)
if err != nil {
return fmt.Errorf("error converting checkpoint struct to cursor map: %w", err)
}

p.cursors = append(p.cursors, c)
}
p.stop(p.events)
p.mu.Unlock()
return nil
}

func newV2Context() (v2.Context, func()) {
ctx, cancel := context.WithCancel(context.Background())
return v2.Context{
Logger: logp.NewLogger("azure-blob-storage_test"),
ID: "test_id",
ID: "test_id:",
Cancelation: ctx,
}, cancel
}
Loading

0 comments on commit 4f22bb8

Please sign in to comment.