-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[filebeat][gcs] - Added retry config & refactored the default config options #41699
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -10,9 +10,7 @@ | |||||
++++ | ||||||
|
||||||
Use the `google cloud storage input` to read content from files stored in buckets which reside on your Google Cloud. | ||||||
The input can be configured to work with and without polling, though currently, if polling is disabled it will only | ||||||
perform a one time passthrough, list the file contents and end the process. Polling is generally recommented for most cases | ||||||
even though it can get expensive with dealing with a very large number of files. | ||||||
The input can be configured to work with and without polling, though if polling is disabled, it will only perform a one time passthrough, list the file contents and end the process. | ||||||
|
||||||
*To mitigate errors and ensure a stable processing environment, this input employs the following features :* | ||||||
|
||||||
|
@@ -66,12 +64,11 @@ many buckets as we deem fit. We are also able to configure the attributes `max_w | |||||
then be applied to all buckets which do not specify any of these attributes explicitly. | ||||||
|
||||||
NOTE: If the attributes `max_workers`, `poll`, `poll_interval` and `bucket_timeout` are specified at the root level, these can still be overridden at the bucket level with | ||||||
different values, thus offering extensive flexibility and customization. Examples <<bucket-overrides,below>> show this behaviour. | ||||||
different values, thus offering extensive flexibility and customization. Examples <<bucket-overrides,below>> show this behavior. | ||||||
|
||||||
On receiving this config the google cloud storage input will connect to the service and retrieve a `Storage Client` using the given `bucket_name` and | ||||||
`auth.credentials_file`, then it will spawn two main go-routines, one for each bucket. After this each of these routines (threads) will initialize a scheduler | ||||||
which will in turn use the `max_workers` value to initialize an in-memory worker pool (thread pool) with `3` `workers` available. Basically that equates to two instances of a worker pool, | ||||||
one per bucket, each having 3 workers. These `workers` will be responsible for performing `jobs` that process a file (in this case read and output the contents of a file). | ||||||
which will in turn use the `max_workers` value to initialize an in-memory worker pool (thread pool) with `3` `workers` available. Basically that equates to two instances of a worker pool, one per bucket, each having 3 workers. These `workers` will be responsible for performing `jobs` that process a file (in this case read and output the contents of a file). | ||||||
Comment on lines
+67
to
+71
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to change these in this PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. have replied with an explanation and option below |
||||||
|
||||||
NOTE: The scheduler is responsible for scheduling jobs, and uses the `maximum available workers` in the pool, at each iteration, to decide the number of files to retrieve and | ||||||
process. This keeps work distribution efficient. The scheduler uses `poll_interval` attribute value to decide how long to wait after each iteration. The `bucket_timeout` value is used to timeout calls to the bucket list api if it exceeds the given value. Each iteration consists of processing a certain number of files, decided by the `maximum available workers` value. | ||||||
|
@@ -167,6 +164,7 @@ Now let's explore the configuration attributes a bit more elaborately. | |||||
11. <<attrib-file_selectors-gcs,file_selectors>> | ||||||
12. <<attrib-expand_event_list_from_field-gcs,expand_event_list_from_field>> | ||||||
13. <<attrib-timestamp_epoch-gcs,timestamp_epoch>> | ||||||
14. <<attrib-retry-gcs,retry>> | ||||||
|
||||||
|
||||||
[id="attrib-project-id"] | ||||||
|
@@ -213,7 +211,7 @@ This is a specific subfield of a bucket. It specifies the bucket name. | |||||
|
||||||
This attribute defines the maximum amount of time after which a bucket operation will give and stop if no response is recieved (example: reading a file / listing a file). | ||||||
It can be defined in the following formats : `{{x}}s`, `{{x}}m`, `{{x}}h`, here `s = seconds`, `m = minutes` and `h = hours`. The value `{{x}}` can be anything we wish. | ||||||
If no value is specified for this, by default its initialized to `50 seconds`. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified. The value of `bucket_timeout` that should be used depends on the size of the files and the network speed. If the timeout is too low, the input will not be able to read the file completely and `context_deadline_exceeded` errors will be seen in the logs. If the timeout is too high, the input will wait for a long time for the file to be read, which can cause the input to be slow. The ratio between the `bucket_timeout` and `poll_interval` should be considered while setting both the values. A low `poll_interval` and a very high `bucket_timeout` can cause resource utilization issues as schedule ops will be spawned every poll iteration. If previous poll ops are still running, this could result in concurrently running ops and so could cause a bottleneck over time. | ||||||
If no value is specified for this, by default its initialized to `120 seconds`. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified. The value of `bucket_timeout` that should be used depends on the size of the files and the network speed. If the timeout is too low, the input will not be able to read the file completely and `context_deadline_exceeded` errors will be seen in the logs. If the timeout is too high, the input will wait for a long time for the file to be read, which can cause the input to be slow. The ratio between the `bucket_timeout` and `poll_interval` should be considered while setting both the values. A low `poll_interval` and a very high `bucket_timeout` can cause resource utilization issues as schedule ops will be spawned every poll iteration. If previous poll ops are still running, this could result in concurrently running ops and so could cause a bottleneck over time. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is being changed here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. initialized to |
||||||
|
||||||
[id="attrib-max_workers-gcs"] | ||||||
[float] | ||||||
|
@@ -228,17 +226,16 @@ NOTE: The value of `max_workers` is tied to the `batch_size` currently to ensure | |||||
[float] | ||||||
==== `poll` | ||||||
|
||||||
This attribute informs the scheduler whether to keep polling for new files or not. Default value of this is `false`, so it will not keep polling if not explicitly | ||||||
specified. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always | ||||||
take priority and override the root level values if both are specified. | ||||||
This attribute informs the scheduler whether to keep polling for new files or not. Default value of this is set to `true`. This attribute can be specified both at the | ||||||
root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified. | ||||||
|
||||||
[id="attrib-poll_interval-gcs"] | ||||||
[float] | ||||||
==== `poll_interval` | ||||||
|
||||||
This attribute defines the maximum amount of time after which the internal scheduler will make the polling call for the next set of objects/files. It can be | ||||||
defined in the following formats : `{{x}}s`, `{{x}}m`, `{{x}}h`, here `s = seconds`, `m = minutes` and `h = hours`. The value `{{x}}` can be anything we wish. | ||||||
Example : `10s` would mean we would like the polling to occur every 10 seconds. If no value is specified for this, by default its initialized to `300 seconds`. | ||||||
Example : `10s` would mean we would like the polling to occur every 10 seconds. If no value is specified for this, by default its initialized to `5 minutes`. | ||||||
This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority | ||||||
and override the root level values if both are specified. The `poll_interval` should be set to a value that is equal to the `bucket_timeout` value. This would ensure that another schedule operation is not started before the current buckets have all been processed. If the `poll_interval` is set to a value that is less than the `bucket_timeout`, then the input will start another schedule operation before the current one has finished, which can cause a bottleneck over time. Having a lower `poll_interval` can make the input faster at the cost of more resource utilization. | ||||||
|
||||||
|
@@ -401,6 +398,43 @@ filebeat.inputs: | |||||
|
||||||
The GCS APIs don't provide a direct way to filter files based on the timestamp, so the input will download all the files and then filter them based on the timestamp. This can cause a bottleneck in processing if the number of files are very high. It is recommended to use this attribute only when the number of files are limited or ample resources are available. This option scales vertically and not horizontally. | ||||||
|
||||||
[id="attrib-retry-gcs"] | ||||||
[float] | ||||||
==== `retry` | ||||||
|
||||||
This attribute can be used to configure a list of sub attributes that directly control how the input should behave when a download for a file/object fails or gets interrupted. The list of sub attributes are as follows :- | ||||||
|
||||||
1. `max_attempts`: This attribute defines the maximum number of retries that should be attempted for a failed download. The default value for this is `3`. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
2. `initial_backoff_duration`: This attribute defines the initial backoff time in seconds. The default value for this is `1s`. | ||||||
3. `max_backoff_duration`: This attribute defines the maximum backoff time in seconds. The default value for this is `30s`. | ||||||
4. `backoff_multiplier`: This attribute defines the backoff multiplication factor. The default value for this is `2`. | ||||||
|
||||||
By configuring these attributes, the user is given the flexibility to control how the input should behave when a download fails or gets interrupted. This attribute can only be | ||||||
specified at the root level of the configuration and not at the bucket level. It applies uniformly to all the buckets. | ||||||
|
||||||
An example configuration is given below :- | ||||||
|
||||||
[source, yml] | ||||||
---- | ||||||
filebeat.inputs: | ||||||
- type: gcs | ||||||
project_id: my_project_id | ||||||
auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json | ||||||
retry: | ||||||
max_attempts: 3 | ||||||
initial_backoff_duration: 2s | ||||||
max_backoff_duration: 60s | ||||||
backoff_multiplier: 2 | ||||||
buckets: | ||||||
- name: obs-bucket | ||||||
max_workers: 3 | ||||||
poll: true | ||||||
poll_interval: 11m | ||||||
bucket_timeout: 10m | ||||||
---- | ||||||
|
||||||
While configuring the `retry` attribute, the user should take into consideration the `bucket_timeout` value. The `retry` attribute should be configured in such a way that the retries are completed within the `bucket_timeout` window. If the `retry` attribute is configured in such a way that the retries are not completed successfully within the `bucket_timeout` window, then the input will suffer a `context timeout` for that specific object/file which it was retrying. This can cause gaps in ingested data to pile up over time. | ||||||
|
||||||
[id="bucket-overrides"] | ||||||
*The sample configs below will explain the bucket level overriding of attributes a bit further :-* | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,16 +28,16 @@ type config struct { | |
// Auth - Defines the authentication mechanism to be used for accessing the gcs bucket. | ||
Auth authConfig `config:"auth"` | ||
// MaxWorkers - Defines the maximum number of go routines that will be spawned. | ||
MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` | ||
MaxWorkers int `config:"max_workers,omitempty" validate:"max=5000"` | ||
// Poll - Defines if polling should be performed on the input bucket source. | ||
Poll *bool `config:"poll,omitempty"` | ||
Poll bool `config:"poll,omitempty"` | ||
// PollInterval - Defines the maximum amount of time to wait before polling for the next batch of objects from the bucket. | ||
PollInterval *time.Duration `config:"poll_interval,omitempty"` | ||
PollInterval time.Duration `config:"poll_interval,omitempty"` | ||
// ParseJSON - Informs the publisher whether to parse & objectify json data or not. By default this is set to | ||
// false, since it can get expensive dealing with highly nested json data. | ||
ParseJSON *bool `config:"parse_json,omitempty"` | ||
ParseJSON bool `config:"parse_json,omitempty"` | ||
// BucketTimeOut - Defines the maximum time that the sdk will wait for a bucket api response before timing out. | ||
BucketTimeOut *time.Duration `config:"bucket_timeout,omitempty"` | ||
BucketTimeOut time.Duration `config:"bucket_timeout,omitempty"` | ||
Comment on lines
+31
to
+40
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why were these changed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. explanation below |
||
// Buckets - Defines a list of buckets that will be polled for objects. | ||
Buckets []bucket `config:"buckets" validate:"required"` | ||
// FileSelectors - Defines a list of regex patterns that can be used to filter out objects from the bucket. | ||
|
@@ -50,6 +50,8 @@ type config struct { | |
ExpandEventListFromField string `config:"expand_event_list_from_field"` | ||
// This field is only used for system test purposes, to override the HTTP endpoint. | ||
AlternativeHost string `config:"alternative_host,omitempty"` | ||
// Retry - Defines the retry configuration for the input. | ||
Retry retryConfig `config:"retry"` | ||
} | ||
|
||
// bucket contains the config for each specific object storage bucket in the root account | ||
|
@@ -90,6 +92,17 @@ type jsonCredentialsConfig struct { | |
AccountKey string `config:"account_key"` | ||
} | ||
|
||
type retryConfig struct { | ||
// MaxAttempts is the maximum number of retry attempts, defaults to 3. | ||
MaxAttempts int `config:"max_attempts" validate:"min=0"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why |
||
// InitialBackOffDuration is the initial value of the retry period, defaults to 1 second. | ||
InitialBackOffDuration time.Duration `config:"initial_backoff_duration" validate:"min=1"` | ||
// MaxBackOffDuration is the maximum value of the retry period, defaults to 30 seconds. | ||
MaxBackOffDuration time.Duration `config:"max_backoff_duration" validate:"min=2"` | ||
// BackOffMultiplier is the factor by which the retry period increases. It should be greater than 1 and defaults to 2. | ||
BackOffMultiplier float64 `config:"backoff_multiplier" validate:"min=2"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does go-ucfg allow float constraints? Is a value greater than one but less than two a valid configuration in principle? If it is, should we not allow that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CEL also uses As for if we should allow or not, I mean if the sdk allows it then why not extend the same feature set to the enduser ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's not what I'm asking. I'm asking if min can be a float constraint. |
||
} | ||
|
||
func (c authConfig) Validate() error { | ||
// credentials_file | ||
if c.CredentialsFile != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change
s/passthrough/…/
significantly changes the semantics, but I don't think the original meaning is correct; passthrough is a noun that doesn't describe an action.Though I'm unclear why this is being changed since it does not appear to be related to the logic change here.