Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][gcs] - Added retry config & refactored the default config options #41699

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add support for Journald in the System module. {pull}41555[41555]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Added support for retry config in GCS input. {issue}11580[11580] {pull}41699[41699]

*Auditbeat*

Expand Down
56 changes: 45 additions & 11 deletions x-pack/filebeat/docs/inputs/input-gcs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
++++

Use the `google cloud storage input` to read content from files stored in buckets which reside on your Google Cloud.
The input can be configured to work with and without polling, though currently, if polling is disabled it will only
perform a one time passthrough, list the file contents and end the process. Polling is generally recommented for most cases
even though it can get expensive with dealing with a very large number of files.
The input can be configured to work with and without polling, though if polling is disabled, it will only perform a one time passthrough, list the file contents and end the process.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The input can be configured to work with and without polling, though if polling is disabled, it will only perform a one time passthrough, list the file contents and end the process.
The input can be configured to work with and without polling, though if polling is disabled it will only perform a single collection of data, list the file contents and end the process.

The change s/passthrough/…/ significantly changes the semantics, but I don't think the original meaning is correct; passthrough is a noun that doesn't describe an action.

Though I'm unclear why this is being changed since it does not appear to be related to the logic change here.


*To mitigate errors and ensure a stable processing environment, this input employs the following features :*

Expand Down Expand Up @@ -66,12 +64,11 @@ many buckets as we deem fit. We are also able to configure the attributes `max_w
then be applied to all buckets which do not specify any of these attributes explicitly.

NOTE: If the attributes `max_workers`, `poll`, `poll_interval` and `bucket_timeout` are specified at the root level, these can still be overridden at the bucket level with
different values, thus offering extensive flexibility and customization. Examples <<bucket-overrides,below>> show this behaviour.
different values, thus offering extensive flexibility and customization. Examples <<bucket-overrides,below>> show this behavior.

On receiving this config the google cloud storage input will connect to the service and retrieve a `Storage Client` using the given `bucket_name` and
`auth.credentials_file`, then it will spawn two main go-routines, one for each bucket. After this each of these routines (threads) will initialize a scheduler
which will in turn use the `max_workers` value to initialize an in-memory worker pool (thread pool) with `3` `workers` available. Basically that equates to two instances of a worker pool,
one per bucket, each having 3 workers. These `workers` will be responsible for performing `jobs` that process a file (in this case read and output the contents of a file).
which will in turn use the `max_workers` value to initialize an in-memory worker pool (thread pool) with `3` `workers` available. Basically that equates to two instances of a worker pool, one per bucket, each having 3 workers. These `workers` will be responsible for performing `jobs` that process a file (in this case read and output the contents of a file).
Comment on lines +67 to +71
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change these in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have replied with an explanation and option below


NOTE: The scheduler is responsible for scheduling jobs, and uses the `maximum available workers` in the pool, at each iteration, to decide the number of files to retrieve and
process. This keeps work distribution efficient. The scheduler uses `poll_interval` attribute value to decide how long to wait after each iteration. The `bucket_timeout` value is used to timeout calls to the bucket list api if it exceeds the given value. Each iteration consists of processing a certain number of files, decided by the `maximum available workers` value.
Expand Down Expand Up @@ -167,6 +164,7 @@ Now let's explore the configuration attributes a bit more elaborately.
11. <<attrib-file_selectors-gcs,file_selectors>>
12. <<attrib-expand_event_list_from_field-gcs,expand_event_list_from_field>>
13. <<attrib-timestamp_epoch-gcs,timestamp_epoch>>
14. <<attrib-retry-gcs,retry>>


[id="attrib-project-id"]
Expand Down Expand Up @@ -213,7 +211,7 @@ This is a specific subfield of a bucket. It specifies the bucket name.

This attribute defines the maximum amount of time after which a bucket operation will give and stop if no response is recieved (example: reading a file / listing a file).
It can be defined in the following formats : `{{x}}s`, `{{x}}m`, `{{x}}h`, here `s = seconds`, `m = minutes` and `h = hours`. The value `{{x}}` can be anything we wish.
If no value is specified for this, by default its initialized to `50 seconds`. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified. The value of `bucket_timeout` that should be used depends on the size of the files and the network speed. If the timeout is too low, the input will not be able to read the file completely and `context_deadline_exceeded` errors will be seen in the logs. If the timeout is too high, the input will wait for a long time for the file to be read, which can cause the input to be slow. The ratio between the `bucket_timeout` and `poll_interval` should be considered while setting both the values. A low `poll_interval` and a very high `bucket_timeout` can cause resource utilization issues as schedule ops will be spawned every poll iteration. If previous poll ops are still running, this could result in concurrently running ops and so could cause a bottleneck over time.
If no value is specified for this, by default its initialized to `120 seconds`. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified. The value of `bucket_timeout` that should be used depends on the size of the files and the network speed. If the timeout is too low, the input will not be able to read the file completely and `context_deadline_exceeded` errors will be seen in the logs. If the timeout is too high, the input will wait for a long time for the file to be read, which can cause the input to be slow. The ratio between the `bucket_timeout` and `poll_interval` should be considered while setting both the values. A low `poll_interval` and a very high `bucket_timeout` can cause resource utilization issues as schedule ops will be spawned every poll iteration. If previous poll ops are still running, this could result in concurrently running ops and so could cause a bottleneck over time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is being changed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialized to 50 seconds -> initialized to 120 seconds


[id="attrib-max_workers-gcs"]
[float]
Expand All @@ -228,17 +226,16 @@ NOTE: The value of `max_workers` is tied to the `batch_size` currently to ensure
[float]
==== `poll`

This attribute informs the scheduler whether to keep polling for new files or not. Default value of this is `false`, so it will not keep polling if not explicitly
specified. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always
take priority and override the root level values if both are specified.
This attribute informs the scheduler whether to keep polling for new files or not. Default value of this is set to `true`. This attribute can be specified both at the
root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified.

[id="attrib-poll_interval-gcs"]
[float]
==== `poll_interval`

This attribute defines the maximum amount of time after which the internal scheduler will make the polling call for the next set of objects/files. It can be
defined in the following formats : `{{x}}s`, `{{x}}m`, `{{x}}h`, here `s = seconds`, `m = minutes` and `h = hours`. The value `{{x}}` can be anything we wish.
Example : `10s` would mean we would like the polling to occur every 10 seconds. If no value is specified for this, by default its initialized to `300 seconds`.
Example : `10s` would mean we would like the polling to occur every 10 seconds. If no value is specified for this, by default its initialized to `5 minutes`.
This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority
and override the root level values if both are specified. The `poll_interval` should be set to a value that is equal to the `bucket_timeout` value. This would ensure that another schedule operation is not started before the current buckets have all been processed. If the `poll_interval` is set to a value that is less than the `bucket_timeout`, then the input will start another schedule operation before the current one has finished, which can cause a bottleneck over time. Having a lower `poll_interval` can make the input faster at the cost of more resource utilization.

Expand Down Expand Up @@ -401,6 +398,43 @@ filebeat.inputs:

The GCS APIs don't provide a direct way to filter files based on the timestamp, so the input will download all the files and then filter them based on the timestamp. This can cause a bottleneck in processing if the number of files are very high. It is recommended to use this attribute only when the number of files are limited or ample resources are available. This option scales vertically and not horizontally.

[id="attrib-retry-gcs"]
[float]
==== `retry`

This attribute can be used to configure a list of sub attributes that directly control how the input should behave when a download for a file/object fails or gets interrupted. The list of sub attributes are as follows :-

1. `max_attempts`: This attribute defines the maximum number of retries that should be attempted for a failed download. The default value for this is `3`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
1. `max_attempts`: This attribute defines the maximum number of retries that should be attempted for a failed download. The default value for this is `3`.
1. `max_retries`: This attribute defines the maximum number of retries that should be attempted for a failed download. The default value for this is `3`.

2. `initial_backoff_duration`: This attribute defines the initial backoff time in seconds. The default value for this is `1s`.
3. `max_backoff_duration`: This attribute defines the maximum backoff time in seconds. The default value for this is `30s`.
4. `backoff_multiplier`: This attribute defines the backoff multiplication factor. The default value for this is `2`.

By configuring these attributes, the user is given the flexibility to control how the input should behave when a download fails or gets interrupted. This attribute can only be
specified at the root level of the configuration and not at the bucket level. It applies uniformly to all the buckets.

An example configuration is given below :-

[source, yml]
----
filebeat.inputs:
- type: gcs
project_id: my_project_id
auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
retry:
max_attempts: 3
initial_backoff_duration: 2s
max_backoff_duration: 60s
backoff_multiplier: 2
buckets:
- name: obs-bucket
max_workers: 3
poll: true
poll_interval: 11m
bucket_timeout: 10m
----

While configuring the `retry` attribute, the user should take into consideration the `bucket_timeout` value. The `retry` attribute should be configured in such a way that the retries are completed within the `bucket_timeout` window. If the `retry` attribute is configured in such a way that the retries are not completed successfully within the `bucket_timeout` window, then the input will suffer a `context timeout` for that specific object/file which it was retrying. This can cause gaps in ingested data to pile up over time.

[id="bucket-overrides"]
*The sample configs below will explain the bucket level overriding of attributes a bit further :-*

Expand Down
4 changes: 1 addition & 3 deletions x-pack/filebeat/input/gcs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ import (
"cloud.google.com/go/storage"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"

"github.com/elastic/elastic-agent-libs/logp"
)

func fetchStorageClient(ctx context.Context, cfg config, log *logp.Logger) (*storage.Client, error) {
func fetchStorageClient(ctx context.Context, cfg config) (*storage.Client, error) {
if cfg.AlternativeHost != "" {
var h *url.URL
h, err := url.Parse(cfg.AlternativeHost)
Expand Down
23 changes: 18 additions & 5 deletions x-pack/filebeat/input/gcs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ type config struct {
// Auth - Defines the authentication mechanism to be used for accessing the gcs bucket.
Auth authConfig `config:"auth"`
// MaxWorkers - Defines the maximum number of go routines that will be spawned.
MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"`
MaxWorkers int `config:"max_workers,omitempty" validate:"max=5000"`
// Poll - Defines if polling should be performed on the input bucket source.
Poll *bool `config:"poll,omitempty"`
Poll bool `config:"poll,omitempty"`
// PollInterval - Defines the maximum amount of time to wait before polling for the next batch of objects from the bucket.
PollInterval *time.Duration `config:"poll_interval,omitempty"`
PollInterval time.Duration `config:"poll_interval,omitempty"`
// ParseJSON - Informs the publisher whether to parse & objectify json data or not. By default this is set to
// false, since it can get expensive dealing with highly nested json data.
ParseJSON *bool `config:"parse_json,omitempty"`
ParseJSON bool `config:"parse_json,omitempty"`
// BucketTimeOut - Defines the maximum time that the sdk will wait for a bucket api response before timing out.
BucketTimeOut *time.Duration `config:"bucket_timeout,omitempty"`
BucketTimeOut time.Duration `config:"bucket_timeout,omitempty"`
Comment on lines +31 to +40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why were these changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explanation below

// Buckets - Defines a list of buckets that will be polled for objects.
Buckets []bucket `config:"buckets" validate:"required"`
// FileSelectors - Defines a list of regex patterns that can be used to filter out objects from the bucket.
Expand All @@ -50,6 +50,8 @@ type config struct {
ExpandEventListFromField string `config:"expand_event_list_from_field"`
// This field is only used for system test purposes, to override the HTTP endpoint.
AlternativeHost string `config:"alternative_host,omitempty"`
// Retry - Defines the retry configuration for the input.
Retry retryConfig `config:"retry"`
}

// bucket contains the config for each specific object storage bucket in the root account
Expand Down Expand Up @@ -90,6 +92,17 @@ type jsonCredentialsConfig struct {
AccountKey string `config:"account_key"`
}

type retryConfig struct {
// MaxAttempts is the maximum number of retry attempts, defaults to 3.
MaxAttempts int `config:"max_attempts" validate:"min=0"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why min=0? Is this attempts or retries? If it's retries, I would rename this MaxRetries and then a zero makes sense, otherwise it's saying don't try the first time.

// InitialBackOffDuration is the initial value of the retry period, defaults to 1 second.
InitialBackOffDuration time.Duration `config:"initial_backoff_duration" validate:"min=1"`
// MaxBackOffDuration is the maximum value of the retry period, defaults to 30 seconds.
MaxBackOffDuration time.Duration `config:"max_backoff_duration" validate:"min=2"`
// BackOffMultiplier is the factor by which the retry period increases. It should be greater than 1 and defaults to 2.
BackOffMultiplier float64 `config:"backoff_multiplier" validate:"min=2"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does go-ucfg allow float constraints? Is a value greater than one but less than two a valid configuration in principle? If it is, should we not allow that?

Copy link
Contributor Author

@ShourieG ShourieG Nov 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CEL also uses Limit *float64 config:"limit"` in its resource config as well as a couple of other inputs. There is a float unpacker present. FloatUnpacker.

As for if we should allow or not, I mean if the sdk allows it then why not extend the same feature set to the enduser ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not what I'm asking. I'm asking if min can be a float constraint.

}

func (c authConfig) Validate() error {
// credentials_file
if c.CredentialsFile != nil {
Expand Down
57 changes: 30 additions & 27 deletions x-pack/filebeat/input/gcs/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
ExpandEventListFromField: bucket.ExpandEventListFromField,
FileSelectors: bucket.FileSelectors,
ReaderConfig: bucket.ReaderConfig,
Retry: config.Retry,
})
}

Expand All @@ -83,39 +84,19 @@
// are absent, assigns default values
func tryOverrideOrDefault(cfg config, b bucket) bucket {
if b.MaxWorkers == nil {
maxWorkers := 1
if cfg.MaxWorkers != nil {
maxWorkers = *cfg.MaxWorkers
}
b.MaxWorkers = &maxWorkers
b.MaxWorkers = &cfg.MaxWorkers
}
if b.Poll == nil {
var poll bool
if cfg.Poll != nil {
poll = *cfg.Poll
}
b.Poll = &poll
b.Poll = &cfg.Poll
}
if b.PollInterval == nil {
interval := time.Second * 300
if cfg.PollInterval != nil {
interval = *cfg.PollInterval
}
b.PollInterval = &interval
b.PollInterval = &cfg.PollInterval
}
if b.ParseJSON == nil {
parse := false
if cfg.ParseJSON != nil {
parse = *cfg.ParseJSON
}
b.ParseJSON = &parse
b.ParseJSON = &cfg.ParseJSON
}
if b.BucketTimeOut == nil {
timeOut := time.Second * 50
if cfg.BucketTimeOut != nil {
timeOut = *cfg.BucketTimeOut
}
b.BucketTimeOut = &timeOut
b.BucketTimeOut = &cfg.BucketTimeOut
}
if b.TimeStampEpoch == nil {
b.TimeStampEpoch = cfg.TimeStampEpoch
Expand All @@ -131,6 +112,23 @@
return b
}

// defaultConfig returns the default configuration for the input
func defaultConfig() config {
return config{
MaxWorkers: 1,
Poll: true,
PollInterval: 5 * time.Minute,
BucketTimeOut: 120 * time.Second,
ParseJSON: false,
Retry: retryConfig{
MaxAttempts: 3,
InitialBackOffDuration: 1 * time.Second,
MaxBackOffDuration: 30 * time.Second,
BackOffMultiplier: 2,
},
}
}

// isValidUnixTimestamp checks if the timestamp is a valid Unix timestamp
func isValidUnixTimestamp(timestamp int64) bool {
// checks if the timestamp is within the valid range
Expand All @@ -148,7 +146,7 @@
func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source,
cursor cursor.Cursor, publisher cursor.Publisher) error {
st := newState()
currentSource := src.(*Source)

Check failure on line 149 in x-pack/filebeat/input/gcs/input.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value is not checked (errcheck)

log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName)
log.Infof("Running google cloud storage for project: %s", input.config.ProjectId)
Expand All @@ -173,15 +171,20 @@
cancel()
}()

client, err := fetchStorageClient(ctx, input.config, log)
client, err := fetchStorageClient(ctx, input.config)
if err != nil {
metrics.errorsTotal.Inc()
return err
}

bucket := client.Bucket(currentSource.BucketName).Retryer(
// Use WithMaxAttempts to change the maximum number of attempts.
storage.WithMaxAttempts(currentSource.Retry.MaxAttempts),
// Use WithBackoff to change the timing of the exponential backoff.
storage.WithBackoff(gax.Backoff{
Initial: 2 * time.Second,
Initial: currentSource.Retry.InitialBackOffDuration,
Max: currentSource.Retry.MaxBackOffDuration,
Multiplier: currentSource.Retry.BackOffMultiplier,
}),
// RetryAlways will retry the operation even if it is non-idempotent.
// Since we are only reading, the operation is always idempotent
Expand Down
9 changes: 6 additions & 3 deletions x-pack/filebeat/input/gcs/input_stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import (
"context"
"time"

"cloud.google.com/go/storage"
gax "github.com/googleapis/gax-go/v2"
Expand Down Expand Up @@ -64,10 +63,11 @@
ExpandEventListFromField: bucket.ExpandEventListFromField,
FileSelectors: bucket.FileSelectors,
ReaderConfig: bucket.ReaderConfig,
Retry: in.config.Retry,
}

st := newState()
currentSource := source.(*Source)

Check failure on line 70 in x-pack/filebeat/input/gcs/input_stateless.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value is not checked (errcheck)
log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName)
metrics := newInputMetrics(inputCtx.ID+":"+currentSource.BucketName, nil)
defer metrics.Close()
Expand All @@ -80,15 +80,18 @@
}()

bkt := client.Bucket(currentSource.BucketName).Retryer(
// Use WithMaxAttempts to change the maximum number of attempts.
storage.WithMaxAttempts(currentSource.Retry.MaxAttempts),
// Use WithBackoff to change the timing of the exponential backoff.
storage.WithBackoff(gax.Backoff{
Initial: 2 * time.Second,
Initial: currentSource.Retry.InitialBackOffDuration,
Max: currentSource.Retry.MaxBackOffDuration,
Multiplier: currentSource.Retry.BackOffMultiplier,
}),
// RetryAlways will retry the operation even if it is non-idempotent.
// Since we are only reading, the operation is always idempotent
storage.WithPolicy(storage.RetryAlways),
)

scheduler := newScheduler(pub, bkt, currentSource, &in.config, st, metrics, log)
// allows multiple containers to be scheduled concurrently while testing
// the stateless input is triggered only while testing and till now it did not mimic
Expand Down
Loading
Loading