diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7d97283e6384..8e4ea63bbbe3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -68,6 +68,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Fix handling of MySQL audit logs with strict JSON parser. {issue}35158[35158] {pull}35160[35160] - Fix accidental error overwrite in defer statement in entityanalytics Azure AD input. {issue}35153[35153] {pull}35169[35169] - Fixing the grok expression outputs of log files {pull}35221[35221] +- [GCS Input] - Fixed an issue where bucket_timeout was being applied to the entire bucket poll interval and not individual bucket object read operations. Fixed a map write concurrency issue arising from data races when using a high number of workers. Fixed the flaky tests that were present in the GCS test suit. {pull}35605[35605] - Fix handling of IPv6 unspecified addresses in TCP input. {issue}35064[35064] {pull}35637[35637] *Heartbeat* diff --git a/x-pack/filebeat/input/gcs/input.go b/x-pack/filebeat/input/gcs/input.go index 9bc897f64fc8..b5e4c23b8b2b 100644 --- a/x-pack/filebeat/input/gcs/input.go +++ b/x-pack/filebeat/input/gcs/input.go @@ -9,7 +9,7 @@ import ( "time" "cloud.google.com/go/storage" - "github.com/googleapis/gax-go/v2" + gax "github.com/googleapis/gax-go/v2" v2 "github.com/elastic/beats/v7/filebeat/input/v2" cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" @@ -127,7 +127,6 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source, log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName) log.Infof("Running google cloud storage for project: %s", input.config.ProjectId) - var cp *Checkpoint if !cursor.IsNew() { if err := cursor.Unpack(&cp); err != nil { diff --git a/x-pack/filebeat/input/gcs/input_stateless.go b/x-pack/filebeat/input/gcs/input_stateless.go index b6b3b14cda7c..b81ccf79b125 100644 --- a/x-pack/filebeat/input/gcs/input_stateless.go +++ b/x-pack/filebeat/input/gcs/input_stateless.go @@ -9,7 +9,7 @@ import ( "time" "cloud.google.com/go/storage" - "github.com/googleapis/gax-go/v2" + gax "github.com/googleapis/gax-go/v2" v2 "github.com/elastic/beats/v7/filebeat/input/v2" cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" diff --git a/x-pack/filebeat/input/gcs/input_test.go b/x-pack/filebeat/input/gcs/input_test.go index 1cb658377ab4..bd9028d6bf9a 100644 --- a/x-pack/filebeat/input/gcs/input_test.go +++ b/x-pack/filebeat/input/gcs/input_test.go @@ -37,15 +37,13 @@ const ( ) func Test_StorageClient(t *testing.T) { - t.Skip("Flaky test: issue (could possibly affect this also) - https://github.com/elastic/beats/issues/34332") tests := []struct { - name string - baseConfig map[string]interface{} - mockHandler func() http.Handler - expected map[string]bool - checkJSON bool - isError error - unexpectedError error + name string + baseConfig map[string]interface{} + mockHandler func() http.Handler + expected map[string]bool + checkJSON bool + isError error }{ { name: "SingleBucketWithPoll_NoErr", @@ -67,7 +65,6 @@ func Test_StorageClient(t *testing.T) { mock.Gcs_test_new_object_data3_json: true, mock.Gcs_test_new_object_docs_ata_json: true, }, - unexpectedError: context.Canceled, }, { name: "SingleBucketWithoutPoll_NoErr", @@ -89,7 +86,6 @@ func Test_StorageClient(t *testing.T) { mock.Gcs_test_new_object_data3_json: true, mock.Gcs_test_new_object_docs_ata_json: true, }, - unexpectedError: nil, }, { name: "TwoBucketsWithPoll_NoErr", @@ -116,7 +112,6 @@ func Test_StorageClient(t *testing.T) { mock.Gcs_test_latest_object_ata_json: true, mock.Gcs_test_latest_object_data3_json: true, }, - unexpectedError: context.Canceled, }, { name: "TwoBucketsWithoutPoll_NoErr", @@ -143,7 +138,6 @@ func Test_StorageClient(t *testing.T) { mock.Gcs_test_latest_object_ata_json: true, mock.Gcs_test_latest_object_data3_json: true, }, - unexpectedError: nil, }, { name: "SingleBucketWithPoll_InvalidBucketErr", @@ -159,10 +153,9 @@ func Test_StorageClient(t *testing.T) { }, }, }, - mockHandler: mock.GCSServer, - expected: map[string]bool{}, - isError: errors.New("storage: bucket doesn't exist"), - unexpectedError: nil, + mockHandler: mock.GCSServer, + expected: map[string]bool{}, + isError: errors.New("storage: bucket doesn't exist"), }, { name: "SingleBucketWithoutPoll_InvalidBucketErr", @@ -178,10 +171,9 @@ func Test_StorageClient(t *testing.T) { }, }, }, - mockHandler: mock.GCSServer, - expected: map[string]bool{}, - isError: errors.New("storage: bucket doesn't exist"), - unexpectedError: nil, + mockHandler: mock.GCSServer, + expected: map[string]bool{}, + isError: errors.New("storage: bucket doesn't exist"), }, { name: "TwoBucketsWithPoll_InvalidBucketErr", @@ -200,10 +192,9 @@ func Test_StorageClient(t *testing.T) { }, }, }, - mockHandler: mock.GCSServer, - expected: map[string]bool{}, - isError: errors.New("storage: bucket doesn't exist"), - unexpectedError: nil, + mockHandler: mock.GCSServer, + expected: map[string]bool{}, + isError: errors.New("storage: bucket doesn't exist"), }, { name: "SingleBucketWithPoll_InvalidConfigValue", @@ -219,10 +210,9 @@ func Test_StorageClient(t *testing.T) { }, }, }, - mockHandler: mock.GCSServer, - expected: map[string]bool{}, - isError: errors.New("requires value <= 5000 accessing 'max_workers'"), - unexpectedError: nil, + mockHandler: mock.GCSServer, + expected: map[string]bool{}, + isError: errors.New("requires value <= 5000 accessing 'max_workers'"), }, { name: "TwoBucketWithPoll_InvalidConfigValue", @@ -241,10 +231,9 @@ func Test_StorageClient(t *testing.T) { }, }, }, - mockHandler: mock.GCSServer, - expected: map[string]bool{}, - isError: errors.New("requires value <= 5000 accessing 'max_workers'"), - unexpectedError: nil, + mockHandler: mock.GCSServer, + expected: map[string]bool{}, + isError: errors.New("requires value <= 5000 accessing 'max_workers'"), }, { name: "SingleBucketWithPoll_parseJSON", @@ -267,7 +256,6 @@ func Test_StorageClient(t *testing.T) { mock.Gcs_test_latest_object_ata_json_parsed: true, mock.Gcs_test_latest_object_data3_json_parsed: true, }, - unexpectedError: context.Canceled, }, { name: "ReadJSON", @@ -289,7 +277,6 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesBucket_log_json[1]: true, mock.BeatsFilesBucket_log_json[2]: true, }, - unexpectedError: context.Canceled, }, { name: "ReadOctetStreamJSON", @@ -310,7 +297,6 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesBucket_multiline_json[0]: true, mock.BeatsFilesBucket_multiline_json[1]: true, }, - unexpectedError: context.Canceled, }, { name: "ReadNDJSON", @@ -331,7 +317,6 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesBucket_log_ndjson[0]: true, mock.BeatsFilesBucket_log_ndjson[1]: true, }, - unexpectedError: context.Canceled, }, { name: "ReadMultilineGzJSON", @@ -352,7 +337,6 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesBucket_multiline_json_gz[0]: true, mock.BeatsFilesBucket_multiline_json_gz[1]: true, }, - unexpectedError: context.Canceled, }, { name: "ReadJSONWithRootAsArray", @@ -375,7 +359,6 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesBucket_json_array[2]: true, mock.BeatsFilesBucket_json_array[3]: true, }, - unexpectedError: context.Canceled, }, } for _, tt := range tests { @@ -462,7 +445,6 @@ func Test_StorageClient(t *testing.T) { } } } - assert.ErrorIs(t, g.Wait(), tt.unexpectedError) }) } } diff --git a/x-pack/filebeat/input/gcs/job.go b/x-pack/filebeat/input/gcs/job.go index 118e89287acf..edcb7fe976ac 100644 --- a/x-pack/filebeat/input/gcs/job.go +++ b/x-pack/filebeat/input/gcs/job.go @@ -15,7 +15,6 @@ import ( "errors" "fmt" "io" - "sync" "time" "unicode" @@ -28,8 +27,6 @@ import ( ) type job struct { - // Mutex lock for concurrent publishes - mu sync.Mutex // gcs bucket handle bucket *storage.BucketHandle // gcs object attribute struct @@ -109,13 +106,13 @@ func (j *job) do(ctx context.Context, id string) { Fields: fields, } event.SetID(objectID(j.hash, 0)) - j.state.save(j.object.Name, j.object.Updated) - // locks while data is being published to avoid concurrent map read/writes - j.mu.Lock() - if err := j.publisher.Publish(event, j.state.checkpoint()); err != nil { + // locks while data is being saved and published to avoid concurrent map read/writes + cp, done := j.state.saveForTx(j.object.Name, j.object.Updated) + if err := j.publisher.Publish(event, cp); err != nil { j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err) } - j.mu.Unlock() + // unlocks after data is saved and published + done() } } @@ -216,19 +213,23 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er // updates the offset after reading the file // this avoids duplicates for the last read when resuming operation offset = dec.InputOffset() + // locks while data is being saved and published to avoid concurrent map read/writes + var ( + done func() + cp *Checkpoint + ) if !dec.More() { // if this is the last object, then peform a complete state save - j.state.save(j.object.Name, j.object.Updated) + cp, done = j.state.saveForTx(j.object.Name, j.object.Updated) } else { // partially saves read state using offset - j.state.savePartial(j.object.Name, offset+relativeOffset) + cp, done = j.state.savePartialForTx(j.object.Name, offset+relativeOffset) } - // locks while data is being published to avoid concurrent map read/writes - j.mu.Lock() - if err := j.publisher.Publish(evt, j.state.checkpoint()); err != nil { + if err := j.publisher.Publish(evt, cp); err != nil { j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err) } - j.mu.Unlock() + // unlocks after data is saved and published + done() } return nil } diff --git a/x-pack/filebeat/input/gcs/scheduler.go b/x-pack/filebeat/input/gcs/scheduler.go index 7feb57f7c1e5..a5da0b9576d7 100644 --- a/x-pack/filebeat/input/gcs/scheduler.go +++ b/x-pack/filebeat/input/gcs/scheduler.go @@ -55,16 +55,11 @@ func newScheduler(ctx context.Context, publisher cursor.Publisher, bucket *stora // Schedule, is responsible for fetching & scheduling jobs using the workerpool model func (s *scheduler) schedule() error { if !s.src.Poll { - ctxWithTimeout, cancel := context.WithTimeout(s.parentCtx, s.src.BucketTimeOut) - defer cancel() - return s.scheduleOnce(ctxWithTimeout) + return s.scheduleOnce(s.parentCtx) } for { - ctxWithTimeout, cancel := context.WithTimeout(s.parentCtx, s.src.BucketTimeOut) - defer cancel() - - err := s.scheduleOnce(ctxWithTimeout) + err := s.scheduleOnce(s.parentCtx) if err != nil { return err } @@ -92,9 +87,9 @@ func (l *limiter) release() { l.wg.Done() } -func (s *scheduler) scheduleOnce(ctxWithTimeout context.Context) error { +func (s *scheduler) scheduleOnce(ctx context.Context) error { defer s.limiter.wait() - pager := s.fetchObjectPager(ctxWithTimeout, s.src.MaxWorkers) + pager := s.fetchObjectPager(ctx, *s.cfg.MaxWorkers) for { var objects []*storage.ObjectAttrs nextPageToken, err := pager.NextPage(&objects) @@ -107,7 +102,7 @@ func (s *scheduler) scheduleOnce(ctxWithTimeout context.Context) error { if !s.state.checkpoint().LatestEntryTime.IsZero() { jobs = s.moveToLastSeenJob(jobs) if len(s.state.checkpoint().FailedJobs) > 0 { - jobs = s.addFailedJobs(ctxWithTimeout, jobs) + jobs = s.addFailedJobs(ctx, jobs) } } @@ -118,7 +113,7 @@ func (s *scheduler) scheduleOnce(ctxWithTimeout context.Context) error { s.limiter.acquire() go func() { defer s.limiter.release() - job.do(s.parentCtx, id) + job.do(ctx, id) }() } @@ -126,6 +121,7 @@ func (s *scheduler) scheduleOnce(ctxWithTimeout context.Context) error { break } } + return nil } diff --git a/x-pack/filebeat/input/gcs/state.go b/x-pack/filebeat/input/gcs/state.go index 6b2a269481f4..afa20e5d52dd 100644 --- a/x-pack/filebeat/input/gcs/state.go +++ b/x-pack/filebeat/input/gcs/state.go @@ -47,8 +47,11 @@ func newState() *state { } } -// save, saves/updates the current state for cursor checkpoint -func (s *state) save(name string, lastModifiedOn time.Time) { +// saveForTx updates and returns the current state checkpoint, locks the state +// and returns an unlock function, done. The caller must call done when +// s and cp are no longer needed in a locked state. done may not be called +// more than once. +func (s *state) saveForTx(name string, lastModifiedOn time.Time) (cp *Checkpoint, done func()) { s.mu.Lock() delete(s.cp.LastProcessedOffset, name) delete(s.cp.IsRootArray, name) @@ -68,20 +71,23 @@ func (s *state) save(name string, lastModifiedOn time.Time) { // clear entry if this is a failed job delete(s.cp.FailedJobs, name) } - s.mu.Unlock() + return s.cp, func() { s.mu.Unlock() } } -// setRootArray, sets boolean true for objects that have their roots defined as an array type -func (s *state) setRootArray(name string) { +// savePartialForTx partially updates and returns the current state checkpoint, locks the state +// and returns an unlock function, done. The caller must call done when +// s and cp are no longer needed in a locked state. done may not be called +// more than once. +func (s *state) savePartialForTx(name string, offset int64) (cp *Checkpoint, done func()) { s.mu.Lock() - s.cp.IsRootArray[name] = true - s.mu.Unlock() + s.cp.LastProcessedOffset[name] = offset + return s.cp, func() { s.mu.Unlock() } } -// savePartial, partially saves/updates the current state for cursor checkpoint -func (s *state) savePartial(name string, offset int64) { +// setRootArray, sets boolean true for objects that have their roots defined as an array type +func (s *state) setRootArray(name string) { s.mu.Lock() - s.cp.LastProcessedOffset[name] = offset + s.cp.IsRootArray[name] = true s.mu.Unlock() }