diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ae12305f52..f7aaed3a725 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ * [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123 * [FEATURE] Query-frontend, querier: new experimental `/cardinality/active_native_histogram_metrics` API to get active native histogram metric names with statistics about active native histogram buckets. #7982 #7986 #8008 * [FEATURE] Alertmanager: Added `-alertmanager.max-silences-count` and `-alertmanager.max-silence-size-bytes` to set limits on per tenant silences. Disabled by default. #6898 -* [FEATURE] Ingester: add experimental support for the server-side circuit breakers when writing to and reading from ingesters. This can be enabled using `-ingester.circuit-breaker.enabled` option. Further `-ingester.circuit-breaker.*` options for configuring circuit-breaker are available. Added metrics `cortex_ingester_circuit_breaker_results_total`, `cortex_ingester_circuit_breaker_transitions_total` and `cortex_ingester_circuit_breaker_current_state`. #8180 #8285 +* [FEATURE] Ingester: add experimental support for the server-side circuit breakers when writing to and reading from ingesters. This can be enabled using `-ingester.push-circuit-breaker.enabled` and `-ingester.read-circuit-breaker.enabled` options. Further `-ingester.push-circuit-breaker.*` and `-ingester.read-circuit-breaker.*` options for configuring circuit-breaker are available. Added metrics `cortex_ingester_circuit_breaker_results_total`, `cortex_ingester_circuit_breaker_transitions_total` and `cortex_ingester_circuit_breaker_current_state`. #8180 #8285 #8315 * [FEATURE] Distributor, ingester: add new setting `-validation.past-grace-period` to limit how old (based on the wall clock minus OOO window) the ingested samples can be. The default 0 value disables this limit. #8262 * [ENHANCEMENT] Distributor: add metrics `cortex_distributor_samples_per_request` and `cortex_distributor_exemplars_per_request` to track samples/exemplars per request. #8265 * [ENHANCEMENT] Reduced memory allocations in functions used to propagate contextual information between gRPC calls. #7529 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 15e59fd87b2..7ceb453e86d 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -3143,7 +3143,7 @@ }, { "kind": "block", - "name": "circuit_breaker", + "name": "push_circuit_breaker", "required": false, "desc": "", "blockEntries": [ @@ -3154,7 +3154,7 @@ "desc": "Enable circuit breaking when making requests to ingesters", "fieldValue": null, "fieldDefaultValue": false, - "fieldFlag": "ingester.circuit-breaker.enabled", + "fieldFlag": "ingester.push-circuit-breaker.enabled", "fieldType": "boolean", "fieldCategory": "experimental" }, @@ -3165,7 +3165,7 @@ "desc": "Max percentage of requests that can fail over period before the circuit breaker opens", "fieldValue": null, "fieldDefaultValue": 10, - "fieldFlag": "ingester.circuit-breaker.failure-threshold-percentage", + "fieldFlag": "ingester.push-circuit-breaker.failure-threshold-percentage", "fieldType": "int", "fieldCategory": "experimental" }, @@ -3176,7 +3176,7 @@ "desc": "How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures", "fieldValue": null, "fieldDefaultValue": 100, - "fieldFlag": "ingester.circuit-breaker.failure-execution-threshold", + "fieldFlag": "ingester.push-circuit-breaker.failure-execution-threshold", "fieldType": "int", "fieldCategory": "experimental" }, @@ -3187,7 +3187,7 @@ "desc": "Moving window of time that the percentage of failed requests is computed over", "fieldValue": null, "fieldDefaultValue": 60000000000, - "fieldFlag": "ingester.circuit-breaker.thresholding-period", + "fieldFlag": "ingester.push-circuit-breaker.thresholding-period", "fieldType": "duration", "fieldCategory": "experimental" }, @@ -3198,7 +3198,7 @@ "desc": "How long the circuit breaker will stay in the open state before allowing some requests", "fieldValue": null, "fieldDefaultValue": 10000000000, - "fieldFlag": "ingester.circuit-breaker.cooldown-period", + "fieldFlag": "ingester.push-circuit-breaker.cooldown-period", "fieldType": "duration", "fieldCategory": "experimental" }, @@ -3209,31 +3209,107 @@ "desc": "How long the circuit breaker should wait between an activation request and becoming effectively active. During that time both failures and successes will not be counted.", "fieldValue": null, "fieldDefaultValue": 0, - "fieldFlag": "ingester.circuit-breaker.initial-delay", + "fieldFlag": "ingester.push-circuit-breaker.initial-delay", "fieldType": "duration", "fieldCategory": "experimental" }, { "kind": "field", - "name": "push_timeout", + "name": "request_timeout", "required": false, - "desc": "The maximum length of time an ingester's Push request can last before it triggers a circuit breaker. This configuration is used for circuit breakers only, and its timeouts aren't reported as errors.", + "desc": "The maximum duration of an ingester's request before it triggers a circuit breaker. This configuration is used for circuit breakers only, and its timeouts aren't reported as errors.", "fieldValue": null, "fieldDefaultValue": 2000000000, - "fieldFlag": "ingester.circuit-breaker.push-timeout", + "fieldFlag": "ingester.push-circuit-breaker.request-timeout", + "fieldType": "duration", + "fieldCategory": "experimental" + } + ], + "fieldValue": null, + "fieldDefaultValue": null + }, + { + "kind": "block", + "name": "read_circuit_breaker", + "required": false, + "desc": "", + "blockEntries": [ + { + "kind": "field", + "name": "enabled", + "required": false, + "desc": "Enable circuit breaking when making requests to ingesters", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "ingester.read-circuit-breaker.enabled", + "fieldType": "boolean", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "failure_threshold_percentage", + "required": false, + "desc": "Max percentage of requests that can fail over period before the circuit breaker opens", + "fieldValue": null, + "fieldDefaultValue": 10, + "fieldFlag": "ingester.read-circuit-breaker.failure-threshold-percentage", + "fieldType": "int", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "failure_execution_threshold", + "required": false, + "desc": "How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures", + "fieldValue": null, + "fieldDefaultValue": 100, + "fieldFlag": "ingester.read-circuit-breaker.failure-execution-threshold", + "fieldType": "int", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "thresholding_period", + "required": false, + "desc": "Moving window of time that the percentage of failed requests is computed over", + "fieldValue": null, + "fieldDefaultValue": 60000000000, + "fieldFlag": "ingester.read-circuit-breaker.thresholding-period", "fieldType": "duration", - "fieldCategory": "experiment" + "fieldCategory": "experimental" }, { "kind": "field", - "name": "read_timeout", + "name": "cooldown_period", "required": false, - "desc": "The maximum length of time an ingester's read-path request can last before it triggers a circuit breaker. This configuration is used for circuit breakers only, and its timeouts aren't reported as errors.", + "desc": "How long the circuit breaker will stay in the open state before allowing some requests", + "fieldValue": null, + "fieldDefaultValue": 10000000000, + "fieldFlag": "ingester.read-circuit-breaker.cooldown-period", + "fieldType": "duration", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "initial_delay", + "required": false, + "desc": "How long the circuit breaker should wait between an activation request and becoming effectively active. During that time both failures and successes will not be counted.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "ingester.read-circuit-breaker.initial-delay", + "fieldType": "duration", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "request_timeout", + "required": false, + "desc": "The maximum duration of an ingester's request before it triggers a circuit breaker. This configuration is used for circuit breakers only, and its timeouts aren't reported as errors.", "fieldValue": null, "fieldDefaultValue": 30000000000, - "fieldFlag": "ingester.circuit-breaker.read-timeout", + "fieldFlag": "ingester.read-circuit-breaker.request-timeout", "fieldType": "duration", - "fieldCategory": "experiment" + "fieldCategory": "experimental" } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 2543c4b77ee..c9f1dce5b30 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1307,22 +1307,6 @@ Usage of ./cmd/mimir/mimir: After what time a series is considered to be inactive. (default 10m0s) -ingester.active-series-metrics-update-period duration How often to update active series metrics. (default 1m0s) - -ingester.circuit-breaker.cooldown-period duration - [experimental] How long the circuit breaker will stay in the open state before allowing some requests (default 10s) - -ingester.circuit-breaker.enabled - [experimental] Enable circuit breaking when making requests to ingesters - -ingester.circuit-breaker.failure-execution-threshold uint - [experimental] How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures (default 100) - -ingester.circuit-breaker.failure-threshold-percentage uint - [experimental] Max percentage of requests that can fail over period before the circuit breaker opens (default 10) - -ingester.circuit-breaker.initial-delay duration - [experimental] How long the circuit breaker should wait between an activation request and becoming effectively active. During that time both failures and successes will not be counted. - -ingester.circuit-breaker.push-timeout duration - The maximum length of time an ingester's Push request can last before it triggers a circuit breaker. This configuration is used for circuit breakers only, and its timeouts aren't reported as errors. (default 2s) - -ingester.circuit-breaker.read-timeout duration - The maximum length of time an ingester's read-path request can last before it triggers a circuit breaker. This configuration is used for circuit breakers only, and its timeouts aren't reported as errors. (default 30s) - -ingester.circuit-breaker.thresholding-period duration - [experimental] Moving window of time that the percentage of failed requests is computed over (default 1m0s) -ingester.client.backoff-max-period duration Maximum delay when backing off. (default 10s) -ingester.client.backoff-min-period duration @@ -1417,8 +1401,36 @@ Usage of ./cmd/mimir/mimir: [experimental] Non-zero value enables out-of-order support for most recent samples that are within the time window in relation to the TSDB's maximum time, i.e., within [db.maxTime-timeWindow, db.maxTime]). The ingester will need more memory as a factor of rate of out-of-order samples being ingested and the number of series that are getting out-of-order samples. If query falls into this window, cached results will use value from -query-frontend.results-cache-ttl-for-out-of-order-time-window option to specify TTL for resulting cache entry. -ingester.owned-series-update-interval duration [experimental] How often to check for ring changes and possibly recompute owned series as a result of detected change. (default 15s) + -ingester.push-circuit-breaker.cooldown-period duration + [experimental] How long the circuit breaker will stay in the open state before allowing some requests (default 10s) + -ingester.push-circuit-breaker.enabled + [experimental] Enable circuit breaking when making requests to ingesters + -ingester.push-circuit-breaker.failure-execution-threshold uint + [experimental] How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures (default 100) + -ingester.push-circuit-breaker.failure-threshold-percentage uint + [experimental] Max percentage of requests that can fail over period before the circuit breaker opens (default 10) + -ingester.push-circuit-breaker.initial-delay duration + [experimental] How long the circuit breaker should wait between an activation request and becoming effectively active. During that time both failures and successes will not be counted. + -ingester.push-circuit-breaker.request-timeout duration + [experimental] The maximum duration of an ingester's request before it triggers a circuit breaker. This configuration is used for circuit breakers only, and its timeouts aren't reported as errors. (default 2s) + -ingester.push-circuit-breaker.thresholding-period duration + [experimental] Moving window of time that the percentage of failed requests is computed over (default 1m0s) -ingester.rate-update-period duration Period with which to update the per-tenant ingestion rates. (default 15s) + -ingester.read-circuit-breaker.cooldown-period duration + [experimental] How long the circuit breaker will stay in the open state before allowing some requests (default 10s) + -ingester.read-circuit-breaker.enabled + [experimental] Enable circuit breaking when making requests to ingesters + -ingester.read-circuit-breaker.failure-execution-threshold uint + [experimental] How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures (default 100) + -ingester.read-circuit-breaker.failure-threshold-percentage uint + [experimental] Max percentage of requests that can fail over period before the circuit breaker opens (default 10) + -ingester.read-circuit-breaker.initial-delay duration + [experimental] How long the circuit breaker should wait between an activation request and becoming effectively active. During that time both failures and successes will not be counted. + -ingester.read-circuit-breaker.request-timeout duration + [experimental] The maximum duration of an ingester's request before it triggers a circuit breaker. This configuration is used for circuit breakers only, and its timeouts aren't reported as errors. (default 30s) + -ingester.read-circuit-breaker.thresholding-period duration + [experimental] Moving window of time that the percentage of failed requests is computed over (default 1m0s) -ingester.read-path-cpu-utilization-limit float [experimental] CPU utilization limit, as CPU cores, for CPU/memory utilization based read request limiting. Use 0 to disable it. -ingester.read-path-memory-utilization-limit uint diff --git a/cmd/mimir/help.txt.tmpl b/cmd/mimir/help.txt.tmpl index d99efd2e244..5812fdc60f6 100644 --- a/cmd/mimir/help.txt.tmpl +++ b/cmd/mimir/help.txt.tmpl @@ -389,10 +389,6 @@ Usage of ./cmd/mimir/mimir: Print basic help. -help-all Print help, also including advanced and experimental parameters. - -ingester.circuit-breaker.push-timeout duration - The maximum length of time an ingester's Push request can last before it triggers a circuit breaker. This configuration is used for circuit breakers only, and its timeouts aren't reported as errors. (default 2s) - -ingester.circuit-breaker.read-timeout duration - The maximum length of time an ingester's read-path request can last before it triggers a circuit breaker. This configuration is used for circuit breakers only, and its timeouts aren't reported as errors. (default 30s) -ingester.max-global-metadata-per-metric int The maximum number of metadata per metric, across the cluster. 0 to disable. -ingester.max-global-metadata-per-user int diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index 5332ba7a6ef..b1a3e78e4d5 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -118,14 +118,20 @@ The following features are currently experimental: - `-ingester.use-ingester-owned-series-for-limits` - `-ingester.owned-series-update-interval` - Per-ingester circuit breaking based on requests timing out or hitting per-instance limits - - `-ingester.circuit-breaker.enabled` - - `-ingester.circuit-breaker.failure-threshold-percentage` - - `-ingester.circuit-breaker.failure-execution-threshold` - - `-ingester.circuit-breaker.thresholding-period` - - `-ingester.circuit-breaker.cooldown-period` - - `-ingester.circuit-breaker.initial-delay` - - `-ingester.circuit-breaker.push-timeout` - - `-ingester.circuit-breaker.read-timeout` + - `-ingester.push-circuit-breaker.circuit-breaker.enabled` + - `-ingester.push-circuit-breaker.failure-threshold-percentage` + - `-ingester.push-circuit-breaker.failure-execution-threshold` + - `-ingester.push-circuit-breaker.thresholding-period` + - `-ingester.push-circuit-breaker.cooldown-period` + - `-ingester.push-circuit-breaker.initial-delay` + - `-ingester.push-circuit-breaker.request-timeout` + - `-ingester.read-circuit-breaker.circuit-breaker.enabled` + - `-ingester.read-circuit-breaker.failure-threshold-percentage` + - `-ingester.read-circuit-breaker.failure-execution-threshold` + - `-ingester.read-circuit-breaker.thresholding-period` + - `-ingester.read-circuit-breaker.cooldown-period` + - `-ingester.read-circuit-breaker.initial-delay` + - `-ingester.read-circuit-breaker.request-timeout` - Ingester client - Per-ingester circuit breaking based on requests timing out or hitting per-instance limits - `-ingester.client.circuit-breaker.enabled` diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 03f643c5d3e..c13d3240386 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -1218,48 +1218,79 @@ instance_limits: # CLI flag: -ingester.owned-series-update-interval [owned_series_update_interval: | default = 15s] -circuit_breaker: +push_circuit_breaker: + # (experimental) Enable circuit breaking when making requests to ingesters + # CLI flag: -ingester.push-circuit-breaker.enabled + [enabled: | default = false] + + # (experimental) Max percentage of requests that can fail over period before + # the circuit breaker opens + # CLI flag: -ingester.push-circuit-breaker.failure-threshold-percentage + [failure_threshold_percentage: | default = 10] + + # (experimental) How many requests must have been executed in period for the + # circuit breaker to be eligible to open for the rate of failures + # CLI flag: -ingester.push-circuit-breaker.failure-execution-threshold + [failure_execution_threshold: | default = 100] + + # (experimental) Moving window of time that the percentage of failed requests + # is computed over + # CLI flag: -ingester.push-circuit-breaker.thresholding-period + [thresholding_period: | default = 1m] + + # (experimental) How long the circuit breaker will stay in the open state + # before allowing some requests + # CLI flag: -ingester.push-circuit-breaker.cooldown-period + [cooldown_period: | default = 10s] + + # (experimental) How long the circuit breaker should wait between an + # activation request and becoming effectively active. During that time both + # failures and successes will not be counted. + # CLI flag: -ingester.push-circuit-breaker.initial-delay + [initial_delay: | default = 0s] + + # (experimental) The maximum duration of an ingester's request before it + # triggers a circuit breaker. This configuration is used for circuit breakers + # only, and its timeouts aren't reported as errors. + # CLI flag: -ingester.push-circuit-breaker.request-timeout + [request_timeout: | default = 2s] + +read_circuit_breaker: # (experimental) Enable circuit breaking when making requests to ingesters - # CLI flag: -ingester.circuit-breaker.enabled + # CLI flag: -ingester.read-circuit-breaker.enabled [enabled: | default = false] # (experimental) Max percentage of requests that can fail over period before # the circuit breaker opens - # CLI flag: -ingester.circuit-breaker.failure-threshold-percentage + # CLI flag: -ingester.read-circuit-breaker.failure-threshold-percentage [failure_threshold_percentage: | default = 10] # (experimental) How many requests must have been executed in period for the # circuit breaker to be eligible to open for the rate of failures - # CLI flag: -ingester.circuit-breaker.failure-execution-threshold + # CLI flag: -ingester.read-circuit-breaker.failure-execution-threshold [failure_execution_threshold: | default = 100] # (experimental) Moving window of time that the percentage of failed requests # is computed over - # CLI flag: -ingester.circuit-breaker.thresholding-period + # CLI flag: -ingester.read-circuit-breaker.thresholding-period [thresholding_period: | default = 1m] # (experimental) How long the circuit breaker will stay in the open state # before allowing some requests - # CLI flag: -ingester.circuit-breaker.cooldown-period + # CLI flag: -ingester.read-circuit-breaker.cooldown-period [cooldown_period: | default = 10s] # (experimental) How long the circuit breaker should wait between an # activation request and becoming effectively active. During that time both # failures and successes will not be counted. - # CLI flag: -ingester.circuit-breaker.initial-delay + # CLI flag: -ingester.read-circuit-breaker.initial-delay [initial_delay: | default = 0s] - # (experiment) The maximum length of time an ingester's Push request can last - # before it triggers a circuit breaker. This configuration is used for circuit - # breakers only, and its timeouts aren't reported as errors. - # CLI flag: -ingester.circuit-breaker.push-timeout - [push_timeout: | default = 2s] - - # (experiment) The maximum length of time an ingester's read-path request can - # last before it triggers a circuit breaker. This configuration is used for - # circuit breakers only, and its timeouts aren't reported as errors. - # CLI flag: -ingester.circuit-breaker.read-timeout - [read_timeout: | default = 30s] + # (experimental) The maximum duration of an ingester's request before it + # triggers a circuit breaker. This configuration is used for circuit breakers + # only, and its timeouts aren't reported as errors. + # CLI flag: -ingester.read-circuit-breaker.request-timeout + [request_timeout: | default = 30s] ``` ### querier diff --git a/pkg/ingester/circuitbreaker.go b/pkg/ingester/circuitbreaker.go index 28911f74983..a5640b19dfc 100644 --- a/pkg/ingester/circuitbreaker.go +++ b/pkg/ingester/circuitbreaker.go @@ -26,6 +26,9 @@ const ( circuitBreakerResultOpen = "circuit_breaker_open" circuitBreakerDefaultPushTimeout = 2 * time.Second circuitBreakerDefaultReadTimeout = 30 * time.Second + circuitBreakerRequestTypeLabel = "request_type" + circuitBreakerPushRequestType = "push" + circuitBreakerReadRequestType = "read" ) type circuitBreakerMetrics struct { @@ -33,34 +36,37 @@ type circuitBreakerMetrics struct { circuitBreakerResults *prometheus.CounterVec } -func newCircuitBreakerMetrics(r prometheus.Registerer, currentStateFn func() circuitbreaker.State) *circuitBreakerMetrics { +func newCircuitBreakerMetrics(r prometheus.Registerer, currentState func() circuitbreaker.State, requestType string) *circuitBreakerMetrics { cbMetrics := &circuitBreakerMetrics{ circuitBreakerTransitions: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ingester_circuit_breaker_transitions_total", - Help: "Number of times the circuit breaker has entered a state.", + Name: "cortex_ingester_circuit_breaker_transitions_total", + Help: "Number of times the circuit breaker has entered a state.", + ConstLabels: map[string]string{circuitBreakerRequestTypeLabel: requestType}, }, []string{"state"}), circuitBreakerResults: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ingester_circuit_breaker_results_total", - Help: "Results of executing requests via the circuit breaker.", + Name: "cortex_ingester_circuit_breaker_results_total", + Help: "Results of executing requests via the circuit breaker.", + ConstLabels: map[string]string{circuitBreakerRequestTypeLabel: requestType}, }, []string{"result"}), } - circuitBreakerCurrentStateGaugeFn := func(state circuitbreaker.State) prometheus.GaugeFunc { + circuitBreakerCurrentStateGauge := func(state circuitbreaker.State) prometheus.GaugeFunc { return promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{ Name: "cortex_ingester_circuit_breaker_current_state", Help: "Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name.", - ConstLabels: map[string]string{"state": state.String()}, + ConstLabels: map[string]string{circuitBreakerRequestTypeLabel: requestType, "state": state.String()}, }, func() float64 { - if currentStateFn() == state { + if currentState() == state { return 1 } return 0 }) } for _, s := range []circuitbreaker.State{circuitbreaker.OpenState, circuitbreaker.HalfOpenState, circuitbreaker.ClosedState} { - circuitBreakerCurrentStateGaugeFn(s) + circuitBreakerCurrentStateGauge(s) // We initialize all possible states for the circuitBreakerTransitions metrics cbMetrics.circuitBreakerTransitions.WithLabelValues(s.String()) } + for _, r := range []string{circuitBreakerResultSuccess, circuitBreakerResultError, circuitBreakerResultOpen} { // We initialize all possible results for the circuitBreakerResults metrics cbMetrics.circuitBreakerResults.WithLabelValues(r) @@ -75,63 +81,62 @@ type CircuitBreakerConfig struct { ThresholdingPeriod time.Duration `yaml:"thresholding_period" category:"experimental"` CooldownPeriod time.Duration `yaml:"cooldown_period" category:"experimental"` InitialDelay time.Duration `yaml:"initial_delay" category:"experimental"` - PushTimeout time.Duration `yaml:"push_timeout" category:"experiment"` - ReadTimeout time.Duration `yaml:"read_timeout" category:"experiment"` + RequestTimeout time.Duration `yaml:"request_timeout" category:"experimental"` testModeEnabled bool `yaml:"-"` } -func (cfg *CircuitBreakerConfig) RegisterFlags(f *flag.FlagSet) { - prefix := "ingester.circuit-breaker." +func (cfg *CircuitBreakerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet, defaultRequestDuration time.Duration) { f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Enable circuit breaking when making requests to ingesters") f.UintVar(&cfg.FailureThresholdPercentage, prefix+"failure-threshold-percentage", 10, "Max percentage of requests that can fail over period before the circuit breaker opens") f.UintVar(&cfg.FailureExecutionThreshold, prefix+"failure-execution-threshold", 100, "How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures") f.DurationVar(&cfg.ThresholdingPeriod, prefix+"thresholding-period", time.Minute, "Moving window of time that the percentage of failed requests is computed over") f.DurationVar(&cfg.CooldownPeriod, prefix+"cooldown-period", 10*time.Second, "How long the circuit breaker will stay in the open state before allowing some requests") f.DurationVar(&cfg.InitialDelay, prefix+"initial-delay", 0, "How long the circuit breaker should wait between an activation request and becoming effectively active. During that time both failures and successes will not be counted.") - f.DurationVar(&cfg.PushTimeout, prefix+"push-timeout", circuitBreakerDefaultPushTimeout, "The maximum length of time an ingester's Push request can last before it triggers a circuit breaker. This configuration is used for circuit breakers only, and its timeouts aren't reported as errors.") - f.DurationVar(&cfg.ReadTimeout, prefix+"read-timeout", circuitBreakerDefaultReadTimeout, "The maximum length of time an ingester's read-path request can last before it triggers a circuit breaker. This configuration is used for circuit breakers only, and its timeouts aren't reported as errors.") + f.DurationVar(&cfg.RequestTimeout, prefix+"request-timeout", defaultRequestDuration, "The maximum duration of an ingester's request before it triggers a circuit breaker. This configuration is used for circuit breakers only, and its timeouts aren't reported as errors.") } // circuitBreaker abstracts the ingester's server-side circuit breaker functionality. // A nil *circuitBreaker is a valid noop implementation. type circuitBreaker struct { - cfg CircuitBreakerConfig - logger log.Logger - metrics *circuitBreakerMetrics - active atomic.Bool - cb circuitbreaker.CircuitBreaker[any] + cfg CircuitBreakerConfig + requestType string + logger log.Logger + metrics *circuitBreakerMetrics + active atomic.Bool + cb circuitbreaker.CircuitBreaker[any] - // testRequestDelay is needed for testing purposes to simulate long lasting requests + // testRequestDelay is needed for testing purposes to simulate long-lasting requests testRequestDelay time.Duration } -func newCircuitBreaker(cfg CircuitBreakerConfig, logger log.Logger, registerer prometheus.Registerer) *circuitBreaker { +func newCircuitBreaker(cfg CircuitBreakerConfig, registerer prometheus.Registerer, requestType string, logger log.Logger) *circuitBreaker { if !cfg.Enabled { return nil } active := atomic.NewBool(false) cb := circuitBreaker{ - cfg: cfg, - logger: logger, - active: *active, + cfg: cfg, + requestType: requestType, + logger: logger, + active: *active, } - circuitBreakerTransitionsCounterFn := func(metrics *circuitBreakerMetrics, state circuitbreaker.State) prometheus.Counter { + circuitBreakerTransitionsCounter := func(metrics *circuitBreakerMetrics, state circuitbreaker.State) prometheus.Counter { return metrics.circuitBreakerTransitions.WithLabelValues(state.String()) } cbBuilder := circuitbreaker.Builder[any](). WithDelay(cfg.CooldownPeriod). OnClose(func(event circuitbreaker.StateChangedEvent) { - circuitBreakerTransitionsCounterFn(cb.metrics, circuitbreaker.ClosedState).Inc() + circuitBreakerTransitionsCounter(cb.metrics, circuitbreaker.ClosedState).Inc() level.Info(logger).Log("msg", "circuit breaker is closed", "previous", event.OldState, "current", event.NewState) }). OnOpen(func(event circuitbreaker.StateChangedEvent) { - circuitBreakerTransitionsCounterFn(cb.metrics, circuitbreaker.OpenState).Inc() + circuitBreakerTransitionsCounter(cb.metrics, circuitbreaker.OpenState).Inc() level.Warn(logger).Log("msg", "circuit breaker is open", "previous", event.OldState, "current", event.NewState) }). OnHalfOpen(func(event circuitbreaker.StateChangedEvent) { - circuitBreakerTransitionsCounterFn(cb.metrics, circuitbreaker.HalfOpenState).Inc() + circuitBreakerTransitionsCounter(cb.metrics, circuitbreaker.HalfOpenState).Inc() level.Info(logger).Log("msg", "circuit breaker is half-open", "previous", event.OldState, "current", event.NewState) }) @@ -145,7 +150,7 @@ func newCircuitBreaker(cfg CircuitBreakerConfig, logger log.Logger, registerer p } cb.cb = cbBuilder.Build() - cb.metrics = newCircuitBreakerMetrics(registerer, cb.cb.State) + cb.metrics = newCircuitBreakerMetrics(registerer, cb.cb.State, requestType) return &cb } @@ -191,32 +196,29 @@ func (cb *circuitBreaker) activate() { }) } +func (cb *circuitBreaker) isOpen() bool { + if !cb.isActive() { + return false + } + return cb.cb.IsOpen() +} + // tryAcquirePermit tries to acquire a permit to use the circuit breaker and returns whether a permit was acquired. -// If it was possible to acquire a permit, success flag true and no error are returned. The acquired permit must be -// returned by a call to finishPushRequest. -// If it was not possible to acquire a permit, success flag false is returned. In this case no call to finishPushRequest -// is needed. If the permit was not acquired because of an error, that causing error is returned as well. -func (cb *circuitBreaker) tryAcquirePermit() (bool, error) { +// If it was possible to acquire a permit, it returns a function that should be called to release the acquired permit. +// If it was not possible, the causing error is returned. +func (cb *circuitBreaker) tryAcquirePermit() (func(time.Duration, error), error) { if !cb.isActive() { - return false, nil + return func(time.Duration, error) {}, nil } + if !cb.cb.TryAcquirePermit() { cb.metrics.circuitBreakerResults.WithLabelValues(circuitBreakerResultOpen).Inc() - return false, newCircuitBreakerOpenError(cb.cb.RemainingDelay()) + return nil, newCircuitBreakerOpenError(cb.requestType, cb.cb.RemainingDelay()) } - return true, nil -} - -// finishPushRequest should be called to complete the push request executed upon a -// successfully acquired circuit breaker permit. -func (cb *circuitBreaker) finishPushRequest(duration time.Duration, pushErr error) { - _ = cb.finishRequest(duration, cb.cfg.PushTimeout, pushErr) -} -// finishReadRequest should be called to complete the read request executed upon a -// successfully acquired circuit breaker permit. -func (cb *circuitBreaker) finishReadRequest(readDuration time.Duration, readErr error) { - _ = cb.finishRequest(readDuration, cb.cfg.ReadTimeout, readErr) + return func(duration time.Duration, err error) { + _ = cb.finishRequest(duration, cb.cfg.RequestTimeout, err) + }, nil } // finishRequest completes a request executed upon a successfully acquired circuit breaker permit. @@ -253,3 +255,44 @@ func (cb *circuitBreaker) recordResult(errs ...error) error { cb.metrics.circuitBreakerResults.WithLabelValues(circuitBreakerResultSuccess).Inc() return nil } + +type ingesterCircuitBreaker struct { + push *circuitBreaker + read *circuitBreaker +} + +func newIngesterCircuitBreaker(pushCfg CircuitBreakerConfig, readCfg CircuitBreakerConfig, logger log.Logger, registerer prometheus.Registerer) ingesterCircuitBreaker { + return ingesterCircuitBreaker{ + push: newCircuitBreaker(pushCfg, registerer, circuitBreakerPushRequestType, logger), + read: newCircuitBreaker(readCfg, registerer, circuitBreakerReadRequestType, logger), + } +} + +func (cb *ingesterCircuitBreaker) activate() { + cb.push.activate() + cb.read.activate() +} + +// tryAcquirePushPermit tries to acquire a permit to use the push circuit breaker and returns whether a permit was acquired. +// If it was possible, tryAcquirePushPermit returns a function that should be called to release the acquired permit. +// If it was not possible, the causing error is returned. +func (cb *ingesterCircuitBreaker) tryAcquirePushPermit() (func(time.Duration, error), error) { + return cb.push.tryAcquirePermit() +} + +// tryAcquireReadPermit tries to acquire a permit to use the read circuit breaker and returns whether a permit was acquired. +// If it was possible, tryAcquireReadPermit returns a function that should be called to release the acquired permit. +// If it was not possible, the causing error is returned. +func (cb *ingesterCircuitBreaker) tryAcquireReadPermit() (func(time.Duration, error), error) { + // If the read circuit breaker is not active, we don't try to acquire a permit. + if !cb.read.isActive() { + return func(time.Duration, error) {}, nil + } + + // We don't want to allow read requests if the push circuit breaker is open. + if cb.push.isOpen() { + return nil, newCircuitBreakerOpenError(cb.push.requestType, cb.push.cb.RemainingDelay()) + } + + return cb.read.tryAcquirePermit() +} diff --git a/pkg/ingester/circuitbreaker_test.go b/pkg/ingester/circuitbreaker_test.go index bc44cb5c77b..de57b77f8bf 100644 --- a/pkg/ingester/circuitbreaker_test.go +++ b/pkg/ingester/circuitbreaker_test.go @@ -19,7 +19,9 @@ import ( "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "google.golang.org/grpc/codes" "github.com/grafana/mimir/pkg/mimirpb" @@ -75,9 +77,8 @@ func TestCircuitBreaker_IsActive(t *testing.T) { require.False(t, cb.isActive()) - registry := prometheus.NewRegistry() cfg := CircuitBreakerConfig{Enabled: true, InitialDelay: 10 * time.Millisecond} - cb = newCircuitBreaker(cfg, log.NewNopLogger(), registry) + cb = newCircuitBreaker(cfg, prometheus.NewRegistry(), "test-request-type", log.NewNopLogger()) cb.activate() // When InitialDelay is set, circuit breaker is not immediately active. @@ -98,94 +99,74 @@ func TestCircuitBreaker_TryAcquirePermit(t *testing.T) { testCases := map[string]struct { initialDelay time.Duration circuitBreakerSetup func(*circuitBreaker) - expectedSuccess bool expectedCircuitBreakerError bool expectedMetrics string }{ - "if circuit breaker is not active, status false and no error are returned": { + "if circuit breaker is not active, finish function and no error are returned": { initialDelay: 1 * time.Minute, circuitBreakerSetup: func(cb *circuitBreaker) { cb.active.Store(false) }, - expectedSuccess: false, expectedCircuitBreakerError: false, }, - "if circuit breaker closed, status true and no error are returned": { + "if circuit breaker closed, finish function and no error are returned": { circuitBreakerSetup: func(cb *circuitBreaker) { cb.activate() cb.cb.Close() }, - expectedSuccess: true, expectedCircuitBreakerError: false, }, - "if circuit breaker open, status false and a circuitBreakerErrorOpen are returned": { + "if circuit breaker open, no finish function and a circuitBreakerErrorOpen are returned": { circuitBreakerSetup: func(cb *circuitBreaker) { cb.activate() cb.cb.Open() }, - expectedSuccess: false, expectedCircuitBreakerError: true, expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 0 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 1 - # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="circuit_breaker_open"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="success"} 0 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. # TYPE cortex_ingester_circuit_breaker_transitions_total counter - cortex_ingester_circuit_breaker_transitions_total{state="closed"} 0 - cortex_ingester_circuit_breaker_transitions_total{state="half-open"} 0 - cortex_ingester_circuit_breaker_transitions_total{state="open"} 1 - # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + cortex_ingester_circuit_breaker_transitions_total{request_type="test-request-type",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="test-request-type",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="test-request-type",state="open"} 1 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. # TYPE cortex_ingester_circuit_breaker_current_state gauge - cortex_ingester_circuit_breaker_current_state{state="open"} 1 - cortex_ingester_circuit_breaker_current_state{state="half-open"} 0 - cortex_ingester_circuit_breaker_current_state{state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="test-request-type",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="test-request-type",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="test-request-type",state="open"} 1 `, }, - "if circuit breaker half-open, status false and a circuitBreakerErrorOpen are returned": { + "if circuit breaker half-open, finish function and no error are returned": { circuitBreakerSetup: func(cb *circuitBreaker) { cb.activate() cb.cb.HalfOpen() }, - expectedSuccess: false, - expectedCircuitBreakerError: true, - expectedMetrics: ` - # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. - # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 0 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 1 - # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. - # TYPE cortex_ingester_circuit_breaker_transitions_total counter - cortex_ingester_circuit_breaker_transitions_total{state="closed"} 0 - cortex_ingester_circuit_breaker_transitions_total{state="half-open"} 1 - cortex_ingester_circuit_breaker_transitions_total{state="open"} 0 - # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. - # TYPE cortex_ingester_circuit_breaker_current_state gauge - cortex_ingester_circuit_breaker_current_state{state="open"} 0 - cortex_ingester_circuit_breaker_current_state{state="half-open"} 1 - cortex_ingester_circuit_breaker_current_state{state="closed"} 0 - `, + expectedCircuitBreakerError: false, }, } for testName, testCase := range testCases { t.Run(testName, func(t *testing.T) { registry := prometheus.NewRegistry() cfg := CircuitBreakerConfig{ - Enabled: true, - CooldownPeriod: 10 * time.Second, - testModeEnabled: true, + Enabled: true, + FailureThresholdPercentage: 20, + CooldownPeriod: 10 * time.Second, + testModeEnabled: true, } - cb := newCircuitBreaker(cfg, log.NewNopLogger(), registry) + cb := newCircuitBreaker(cfg, registry, "test-request-type", log.NewNopLogger()) testCase.circuitBreakerSetup(cb) - status, err := cb.tryAcquirePermit() - require.Equal(t, testCase.expectedSuccess, status) + finish, err := cb.tryAcquirePermit() if testCase.expectedCircuitBreakerError { require.ErrorAs(t, err, &circuitBreakerOpenError{}) + require.Nil(t, finish) assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(testCase.expectedMetrics), metricNames...)) } else { require.NoError(t, err) + require.NotNil(t, finish) } }) } @@ -206,9 +187,9 @@ func TestCircuitBreaker_RecordResult(t *testing.T) { expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 1 - cortex_ingester_circuit_breaker_results_total{result="error"} 0 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="success"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="circuit_breaker_open"} 0 `, }, "erroneous execution not passing the failure check records a success": { @@ -217,9 +198,9 @@ func TestCircuitBreaker_RecordResult(t *testing.T) { expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 1 - cortex_ingester_circuit_breaker_results_total{result="error"} 0 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="success"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="circuit_breaker_open"} 0 `, }, "erroneous execution passing the failure check records an error": { @@ -228,9 +209,9 @@ func TestCircuitBreaker_RecordResult(t *testing.T) { expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 1 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="error"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="circuit_breaker_open"} 0 `, }, "erroneous execution with multiple errors records the first error passing the failure check": { @@ -239,9 +220,9 @@ func TestCircuitBreaker_RecordResult(t *testing.T) { expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 1 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="error"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="circuit_breaker_open"} 0 `, }, } @@ -249,7 +230,7 @@ func TestCircuitBreaker_RecordResult(t *testing.T) { for testName, testCase := range testCases { t.Run(testName, func(t *testing.T) { registry := prometheus.NewRegistry() - cb := newCircuitBreaker(cfg, log.NewNopLogger(), registry) + cb := newCircuitBreaker(cfg, registry, "test-request-type", log.NewNopLogger()) cb.activate() err := cb.recordResult(testCase.errs...) require.Equal(t, testCase.expectedErr, err) @@ -278,9 +259,9 @@ func TestCircuitBreaker_FinishRequest(t *testing.T) { expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 1 - cortex_ingester_circuit_breaker_results_total{result="error"} 0 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="success"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="circuit_breaker_open"} 0 `, }, "with circuit breaker not active, requestDuration lower than maxRequestDuration and no input error, finishRequest does nothing": { @@ -290,9 +271,9 @@ func TestCircuitBreaker_FinishRequest(t *testing.T) { expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 0 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="circuit_breaker_open"} 0 `, }, "with circuit breaker active, requestDuration higher than maxRequestDuration and no input error, finishRequest gives context deadline exceeded error": { @@ -303,9 +284,9 @@ func TestCircuitBreaker_FinishRequest(t *testing.T) { expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 1 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="error"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="circuit_breaker_open"} 0 `, }, "with circuit breaker not active, requestDuration higher than maxRequestDuration and no input error, finishRequest does nothing": { @@ -316,9 +297,35 @@ func TestCircuitBreaker_FinishRequest(t *testing.T) { expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 0 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="circuit_breaker_open"} 0 + `, + }, + "with circuit breaker not active, requestDuration higher than maxRequestDuration and an input error relevant for circuit breakers, finishRequest does nothing": { + requestDuration: 3 * time.Second, + isActive: false, + err: context.DeadlineExceeded, + expectedErr: nil, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="circuit_breaker_open"} 0 + `, + }, + "with circuit breaker not active, requestDuration higher than maxRequestDuration and an input error irrelevant for circuit breakers, finishRequest does nothing": { + requestDuration: 3 * time.Second, + isActive: false, + err: context.Canceled, + expectedErr: nil, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="circuit_breaker_open"} 0 `, }, "with circuit breaker active, requestDuration higher than maxRequestDuration and an input error relevant for circuit breakers, finishRequest gives the input error": { @@ -329,9 +336,9 @@ func TestCircuitBreaker_FinishRequest(t *testing.T) { expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 1 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="error"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="circuit_breaker_open"} 0 `, }, "with circuit breaker active, requestDuration higher than maxRequestDuration and an input error irrelevant for circuit breakers, finishRequest gives context deadline exceeded error": { @@ -342,22 +349,9 @@ func TestCircuitBreaker_FinishRequest(t *testing.T) { expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 1 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 - `, - }, - "with circuit breaker not active, requestDuration higher than maxRequestDuration and an input error different from context deadline exceeded, finishRequest does nothing": { - requestDuration: 3 * time.Second, - isActive: false, - err: newInstanceLimitReachedError("error"), - expectedErr: nil, - expectedMetrics: ` - # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. - # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 0 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="error"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="test-request-type",result="circuit_breaker_open"} 0 `, }, } @@ -365,10 +359,10 @@ func TestCircuitBreaker_FinishRequest(t *testing.T) { t.Run(testName, func(t *testing.T) { registry := prometheus.NewRegistry() cfg := CircuitBreakerConfig{ - Enabled: true, - PushTimeout: 2 * time.Second, + Enabled: true, + RequestTimeout: 2 * time.Second, } - cb := newCircuitBreaker(cfg, log.NewNopLogger(), registry) + cb := newCircuitBreaker(cfg, registry, "test-request-type", log.NewNopLogger()) cb.active.Store(testCase.isActive) err := cb.finishRequest(testCase.requestDuration, maxRequestDuration, testCase.err) if testCase.expectedErr == nil { @@ -422,12 +416,12 @@ func TestIngester_PushToStorage_CircuitBreaker(t *testing.T) { if initialDelayEnabled { initialDelay = 200 * time.Millisecond } - cfg.CircuitBreakerConfig = CircuitBreakerConfig{ + cfg.PushCircuitBreaker = CircuitBreakerConfig{ Enabled: true, FailureThresholdPercentage: uint(failureThreshold), CooldownPeriod: 10 * time.Second, InitialDelay: initialDelay, - PushTimeout: pushTimeout, + RequestTimeout: pushTimeout, testModeEnabled: true, } @@ -478,7 +472,7 @@ func TestIngester_PushToStorage_CircuitBreaker(t *testing.T) { for _, req := range reqs { ctx := user.InjectOrgID(context.Background(), userID) count++ - i.circuitBreaker.testRequestDelay = testCase.pushRequestDelay + i.circuitBreaker.push.testRequestDelay = testCase.pushRequestDelay err = i.PushToStorage(ctx, req) if initialDelayEnabled { if testCase.expectedErrorWhenCircuitBreakerClosed != nil { @@ -504,37 +498,37 @@ func TestIngester_PushToStorage_CircuitBreaker(t *testing.T) { expectedMetrics = ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 0 - cortex_ingester_circuit_breaker_results_total{result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. # TYPE cortex_ingester_circuit_breaker_transitions_total counter - cortex_ingester_circuit_breaker_transitions_total{state="closed"} 0 - cortex_ingester_circuit_breaker_transitions_total{state="half-open"} 0 - cortex_ingester_circuit_breaker_transitions_total{state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. # TYPE cortex_ingester_circuit_breaker_current_state gauge - cortex_ingester_circuit_breaker_current_state{state="open"} 0 - cortex_ingester_circuit_breaker_current_state{state="half-open"} 0 - cortex_ingester_circuit_breaker_current_state{state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 ` } else { expectedMetrics = ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 2 - cortex_ingester_circuit_breaker_results_total{result="error"} 2 - cortex_ingester_circuit_breaker_results_total{result="success"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 2 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 2 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 1 # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. # TYPE cortex_ingester_circuit_breaker_transitions_total counter - cortex_ingester_circuit_breaker_transitions_total{state="closed"} 0 - cortex_ingester_circuit_breaker_transitions_total{state="half-open"} 0 - cortex_ingester_circuit_breaker_transitions_total{state="open"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 1 # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. # TYPE cortex_ingester_circuit_breaker_current_state gauge - cortex_ingester_circuit_breaker_current_state{state="open"} 1 - cortex_ingester_circuit_breaker_current_state{state="half-open"} 0 - cortex_ingester_circuit_breaker_current_state{state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 1 ` } assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...)) @@ -546,7 +540,7 @@ func TestIngester_PushToStorage_CircuitBreaker(t *testing.T) { func TestIngester_StartPushRequest_CircuitBreakerOpen(t *testing.T) { reg := prometheus.NewPedanticRegistry() cfg := defaultIngesterTestConfig(t) - cfg.CircuitBreakerConfig = CircuitBreakerConfig{Enabled: true, CooldownPeriod: 10 * time.Second} + cfg.PushCircuitBreaker = CircuitBreakerConfig{Enabled: true, CooldownPeriod: 10 * time.Second} i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, reg) require.NoError(t, err) @@ -561,12 +555,12 @@ func TestIngester_StartPushRequest_CircuitBreakerOpen(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "test") // If i's circuit breaker is closed, StartPushRequest is successful. - i.circuitBreaker.cb.Close() + i.circuitBreaker.push.cb.Close() _, err = i.StartPushRequest(ctx, 0) require.NoError(t, err) // If i's circuit breaker is open, StartPushRequest returns a circuitBreakerOpenError. - i.circuitBreaker.cb.Open() + i.circuitBreaker.push.cb.Open() _, err = i.StartPushRequest(ctx, 0) require.Error(t, err) require.ErrorAs(t, err, &circuitBreakerOpenError{}) @@ -579,19 +573,19 @@ func TestIngester_StartPushRequest_CircuitBreakerOpen(t *testing.T) { expectedMetrics := ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 0 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. # TYPE cortex_ingester_circuit_breaker_transitions_total counter - cortex_ingester_circuit_breaker_transitions_total{state="closed"} 0 - cortex_ingester_circuit_breaker_transitions_total{state="half-open"} 0 - cortex_ingester_circuit_breaker_transitions_total{state="open"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 1 # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. - # TYPE cortex_ingester_circuit_breaker_current_state gauge - cortex_ingester_circuit_breaker_current_state{state="open"} 1 - cortex_ingester_circuit_breaker_current_state{state="half-open"} 0 - cortex_ingester_circuit_breaker_current_state{state="closed"} 0 + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 1 ` assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...)) } @@ -606,100 +600,100 @@ func TestIngester_FinishPushRequest(t *testing.T) { err error expectedMetrics string }{ - "with a permit acquired, pushRequestDuration lower than PushTimeout and no input err, FinishPushRequest records a success": { + "with a permit acquired, pushRequestDuration lower than RequestTimeout and no input err, FinishPushRequest records a success": { pushRequestDuration: 1 * time.Second, acquiredCircuitBreakerPermit: true, err: nil, expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 1 - cortex_ingester_circuit_breaker_results_total{result="error"} 0 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 1 `, }, - "when a permit not acquired, pushRequestDuration lower than PushTimeout and no input err, FinishPushRequest does nothing": { + "when a permit not acquired, pushRequestDuration lower than RequestTimeout and no input err, FinishPushRequest does nothing": { pushRequestDuration: 1 * time.Second, acquiredCircuitBreakerPermit: false, err: nil, expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 0 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 `, }, - "with a permit acquired, pushRequestDuration higher than PushTimeout and no input error, FinishPushRequest records a failure": { + "with a permit acquired, pushRequestDuration higher than RequestTimeout and no input error, FinishPushRequest records a failure": { pushRequestDuration: 3 * time.Second, acquiredCircuitBreakerPermit: true, err: nil, expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 1 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 `, }, - "with a permit not acquired, pushRequestDuration higher than PushTimeout and no input error, FinishPushRequest does nothing": { + "with a permit not acquired, pushRequestDuration higher than RequestTimeout and no input error, FinishPushRequest does nothing": { pushRequestDuration: 3 * time.Second, acquiredCircuitBreakerPermit: false, err: nil, expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 0 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 `, }, - "with a permit acquired, pushRequestDuration higher than PushTimeout and an input error relevant for the circuit breakers, FinishPushRequest records a failure": { + "with a permit acquired, pushRequestDuration higher than RequestTimeout and an input error relevant for the circuit breakers, FinishPushRequest records a failure": { pushRequestDuration: 3 * time.Second, acquiredCircuitBreakerPermit: true, err: newInstanceLimitReachedError("error"), expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 1 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 `, }, - "with a permit acquired, pushRequestDuration higher than PushTimeout and an input error irrelevant for the circuit breakers, FinishPushRequest records a failure": { + "with a permit acquired, pushRequestDuration higher than RequestTimeout and an input error irrelevant for the circuit breakers, FinishPushRequest records a failure": { pushRequestDuration: 3 * time.Second, acquiredCircuitBreakerPermit: true, err: context.Canceled, expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 1 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 `, }, - "with a permit not acquired, pushRequestDuration higher than PushTimeout and an input error relevant for the circuit breakers, FinishPushRequest does nothing": { + "with a permit not acquired, pushRequestDuration higher than RequestTimeout and an input error relevant for the circuit breakers, FinishPushRequest does nothing": { pushRequestDuration: 3 * time.Second, acquiredCircuitBreakerPermit: false, err: newInstanceLimitReachedError("error"), expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 0 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 `, }, - "with a permit not acquired, pushRequestDuration higher than PushTimeout and an input error irrelevant for the circuit breakers, FinishPushRequest does nothing": { + "with a permit not acquired, pushRequestDuration higher than RequestTimeout and an input error irrelevant for the circuit breakers, FinishPushRequest does nothing": { pushRequestDuration: 3 * time.Second, acquiredCircuitBreakerPermit: false, err: context.Canceled, expectedMetrics: ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 0 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 `, }, } @@ -707,9 +701,9 @@ func TestIngester_FinishPushRequest(t *testing.T) { t.Run(testName, func(t *testing.T) { reg := prometheus.NewPedanticRegistry() cfg := defaultIngesterTestConfig(t) - cfg.CircuitBreakerConfig = CircuitBreakerConfig{ - Enabled: true, - PushTimeout: 2 * time.Second, + cfg.PushCircuitBreaker = CircuitBreakerConfig{ + Enabled: true, + RequestTimeout: 2 * time.Second, } i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, reg) @@ -726,9 +720,13 @@ func TestIngester_FinishPushRequest(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "test") st := &pushRequestState{ - requestDuration: testCase.pushRequestDuration, - acquiredCircuitBreakerPermit: testCase.acquiredCircuitBreakerPermit, - pushErr: testCase.err, + requestDuration: testCase.pushRequestDuration, + requestFinish: func(duration time.Duration, err error) { + if testCase.acquiredCircuitBreakerPermit { + _ = i.circuitBreaker.push.finishRequest(duration, cfg.PushCircuitBreaker.RequestTimeout, err) + } + }, + pushErr: testCase.err, } ctx = context.WithValue(ctx, pushReqCtxKey, st) @@ -759,12 +757,12 @@ func TestIngester_Push_CircuitBreaker_DeadlineExceeded(t *testing.T) { if initialDelayEnabled { initialDelay = 200 * time.Millisecond } - cfg.CircuitBreakerConfig = CircuitBreakerConfig{ + cfg.PushCircuitBreaker = CircuitBreakerConfig{ Enabled: true, FailureThresholdPercentage: uint(failureThreshold), CooldownPeriod: 10 * time.Second, InitialDelay: initialDelay, - PushTimeout: pushTimeout, + RequestTimeout: pushTimeout, testModeEnabled: true, } @@ -788,8 +786,11 @@ func TestIngester_Push_CircuitBreaker_DeadlineExceeded(t *testing.T) { nil, mimirpb.API, ) + ctx, err = i.StartPushRequest(ctx, int64(req.Size())) + require.NoError(t, err) err = i.PushToStorage(ctx, req) require.NoError(t, err) + i.FinishPushRequest(ctx) count := 0 @@ -815,7 +816,7 @@ func TestIngester_Push_CircuitBreaker_DeadlineExceeded(t *testing.T) { for _, req := range reqs { ctx := user.InjectOrgID(context.Background(), userID) // Configure circuit breaker to delay push requests. - i.circuitBreaker.testRequestDelay = pushTimeout + i.circuitBreaker.push.testRequestDelay = pushTimeout count++ ctx, err = i.StartPushRequest(ctx, int64(req.Size())) @@ -826,7 +827,7 @@ func TestIngester_Push_CircuitBreaker_DeadlineExceeded(t *testing.T) { // less than failureThreshold deadline exceeded errors, it is still // closed. require.NoError(t, err) - require.Equal(t, circuitbreaker.ClosedState, i.circuitBreaker.cb.State()) + require.Equal(t, circuitbreaker.ClosedState, i.circuitBreaker.push.cb.State()) st, ok := ctx.Value(pushReqCtxKey).(*pushRequestState) require.True(t, ok) require.Equal(t, int64(req.Size()), st.requestSize) @@ -834,7 +835,7 @@ func TestIngester_Push_CircuitBreaker_DeadlineExceeded(t *testing.T) { require.NoError(t, err) i.FinishPushRequest(ctx) } else { - require.Equal(t, circuitbreaker.OpenState, i.circuitBreaker.cb.State()) + require.Equal(t, circuitbreaker.OpenState, i.circuitBreaker.push.cb.State()) require.Nil(t, ctx) require.ErrorAs(t, err, &circuitBreakerOpenError{}) } @@ -847,40 +848,909 @@ func TestIngester_Push_CircuitBreaker_DeadlineExceeded(t *testing.T) { expectedMetrics = ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 0 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. # TYPE cortex_ingester_circuit_breaker_transitions_total counter - cortex_ingester_circuit_breaker_transitions_total{state="closed"} 0 - cortex_ingester_circuit_breaker_transitions_total{state="half-open"} 0 - cortex_ingester_circuit_breaker_transitions_total{state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. # TYPE cortex_ingester_circuit_breaker_current_state gauge - cortex_ingester_circuit_breaker_current_state{state="open"} 0 - cortex_ingester_circuit_breaker_current_state{state="half-open"} 0 - cortex_ingester_circuit_breaker_current_state{state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 ` } else { expectedMetrics = ` # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. # TYPE cortex_ingester_circuit_breaker_results_total counter - cortex_ingester_circuit_breaker_results_total{result="success"} 0 - cortex_ingester_circuit_breaker_results_total{result="error"} 2 - cortex_ingester_circuit_breaker_results_total{result="circuit_breaker_open"} 2 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 2 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 2 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 1 # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. # TYPE cortex_ingester_circuit_breaker_transitions_total counter - cortex_ingester_circuit_breaker_transitions_total{state="closed"} 0 - cortex_ingester_circuit_breaker_transitions_total{state="half-open"} 0 - cortex_ingester_circuit_breaker_transitions_total{state="open"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 1 # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. # TYPE cortex_ingester_circuit_breaker_current_state gauge - cortex_ingester_circuit_breaker_current_state{state="open"} 1 - cortex_ingester_circuit_breaker_current_state{state="half-open"} 0 - cortex_ingester_circuit_breaker_current_state{state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 1 ` } assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...)) }) } } + +type mockedCircuitBreaker struct { + circuitbreaker.CircuitBreaker[any] + mock.Mock + + acquiredPermitCount *atomic.Int64 + recordSuccessCount *atomic.Int64 + recordFailureCount *atomic.Int64 +} + +func (cb *mockedCircuitBreaker) TryAcquirePermit() bool { + result := cb.CircuitBreaker.TryAcquirePermit() + if result { + cb.acquiredPermitCount.Inc() + } + return result +} + +func (cb *mockedCircuitBreaker) RecordSuccess() { + cb.CircuitBreaker.RecordSuccess() + cb.recordSuccessCount.Inc() + cb.acquiredPermitCount.Dec() +} + +func (cb *mockedCircuitBreaker) RecordFailure() { + cb.CircuitBreaker.RecordSuccess() + cb.recordFailureCount.Inc() + cb.acquiredPermitCount.Dec() +} + +func TestPRCircuitBreaker_NewPRCircuitBreaker(t *testing.T) { + pushCfg := CircuitBreakerConfig{ + Enabled: true, + CooldownPeriod: 10 * time.Second, + testModeEnabled: true, + } + readCfg := CircuitBreakerConfig{ + Enabled: true, + CooldownPeriod: 10 * time.Second, + testModeEnabled: true, + } + registerer := prometheus.NewRegistry() + prCB := newIngesterCircuitBreaker(pushCfg, readCfg, log.NewNopLogger(), registerer) + require.NotNil(t, prCB) + require.NotNil(t, prCB.push) + require.NotNil(t, prCB.read) + + expectedMetrics := ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="circuit_breaker_open"} 0 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="open"} 0 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="closed"} 1 + ` + metricNames := []string{ + "cortex_ingester_circuit_breaker_results_total", + "cortex_ingester_circuit_breaker_transitions_total", + "cortex_ingester_circuit_breaker_current_state", + } + assert.NoError(t, testutil.GatherAndCompare(registerer, strings.NewReader(expectedMetrics), metricNames...)) +} + +func TestPRCircuitBreaker_TryPushAcquirePermit(t *testing.T) { + metricNames := []string{ + "cortex_ingester_circuit_breaker_results_total", + "cortex_ingester_circuit_breaker_transitions_total", + "cortex_ingester_circuit_breaker_current_state", + } + testCases := map[string]struct { + circuitBreakerSetup func(breaker ingesterCircuitBreaker) + expectedCircuitBreakerError bool + expectedMetrics string + }{ + "if push circuit breaker is not active, finish function and no error are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.push.active.Store(false) + }, + expectedCircuitBreakerError: false, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 + `, + }, + "if push circuit breaker is closed, finish function and no error are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.push.activate() + cb.push.cb.Close() + }, + expectedCircuitBreakerError: false, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 1 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 + `, + }, + "if push circuit breaker is open, no finish function and a circuitBreakerErrorOpen are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.push.activate() + cb.push.cb.Open() + }, + expectedCircuitBreakerError: true, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 1 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 1 + `, + }, + "if push circuit breaker is half-open, finish function and no error are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.push.activate() + cb.push.cb.HalfOpen() + }, + expectedCircuitBreakerError: false, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 1 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 + `, + }, + } + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := CircuitBreakerConfig{ + Enabled: true, + FailureThresholdPercentage: 20, + CooldownPeriod: 10 * time.Second, + RequestTimeout: circuitBreakerDefaultPushTimeout, + testModeEnabled: true, + } + acquiredPermitCount := atomic.NewInt64(0) + recordedSuccessCount := atomic.NewInt64(0) + recordedFailureCount := atomic.NewInt64(0) + cb := newIngesterCircuitBreaker(cfg, CircuitBreakerConfig{}, log.NewNopLogger(), registry) + cb.push.cb = &mockedCircuitBreaker{ + CircuitBreaker: cb.push.cb, + acquiredPermitCount: acquiredPermitCount, + recordSuccessCount: recordedSuccessCount, + recordFailureCount: recordedFailureCount, + } + testCase.circuitBreakerSetup(cb) + finish, err := cb.tryAcquirePushPermit() + + if testCase.expectedCircuitBreakerError { + require.Nil(t, finish) + require.Error(t, err) + require.ErrorAs(t, err, &circuitBreakerOpenError{}) + assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(testCase.expectedMetrics), metricNames...)) + } else { + require.NoError(t, err) + require.NotNil(t, finish) + var expectedVal int64 + if cb.push.isActive() { + expectedVal = 1 + } + require.Equal(t, expectedVal, acquiredPermitCount.Load()) + require.Equal(t, int64(0), recordedSuccessCount.Load()) + require.Equal(t, int64(0), recordedFailureCount.Load()) + finish(0, err) + require.Equal(t, int64(0), acquiredPermitCount.Load()) + require.Equal(t, expectedVal, recordedSuccessCount.Load()) + require.Equal(t, int64(0), recordedFailureCount.Load()) + assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(testCase.expectedMetrics), metricNames...)) + } + }) + } +} + +func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) { + metricNames := []string{ + "cortex_ingester_circuit_breaker_results_total", + "cortex_ingester_circuit_breaker_transitions_total", + "cortex_ingester_circuit_breaker_current_state", + } + testCases := map[string]struct { + circuitBreakerSetup func(breaker ingesterCircuitBreaker) + expectedCircuitBreakerError bool + expectedMetrics string + }{ + "if read circuit breaker is not active and push circuit breaker is not active, finish function and no error are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.read.active.Store(false) + cb.push.active.Store(false) + }, + expectedCircuitBreakerError: false, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="success"} 0 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="open"} 0 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="open"} 0 + `, + }, + "if read circuit breaker is not active and push circuit breaker is closed, finish function and no error are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.read.active.Store(false) + cb.push.activate() + cb.push.cb.Close() + }, + expectedCircuitBreakerError: false, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="success"} 0 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="open"} 0 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="open"} 0 + `, + }, + "if read circuit breaker is not active and push circuit breaker is open, finish function and no error are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.read.active.Store(false) + cb.push.activate() + cb.push.cb.Open() + }, + expectedCircuitBreakerError: false, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="success"} 0 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="open"} 0 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="open"} 0 + `, + }, + "if read circuit breaker is not active and push circuit breaker is half-open, finish function and no error are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.read.active.Store(false) + cb.push.activate() + cb.push.cb.HalfOpen() + }, + expectedCircuitBreakerError: false, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="success"} 0 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="open"} 0 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="open"} 0 + `, + }, + "if read circuit breaker is closed and push circuit breaker is is not active, finish function and no error are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.read.activate() + cb.read.cb.Close() + cb.push.active.Store(false) + }, + expectedCircuitBreakerError: false, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="success"} 1 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="open"} 0 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="open"} 0 + `, + }, + "if read circuit breaker is closed and push circuit breaker is closed, finish function and no error are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.read.activate() + cb.read.cb.Close() + cb.push.activate() + cb.push.cb.Close() + }, + expectedCircuitBreakerError: false, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="success"} 1 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="open"} 0 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="open"} 0 + `, + }, + "if read circuit breaker is closed and push circuit breaker is open, finish function and no error are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.read.activate() + cb.read.cb.Close() + cb.push.activate() + cb.push.cb.Open() + }, + expectedCircuitBreakerError: true, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="success"} 0 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="open"} 0 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="open"} 0 + `, + }, + "if read circuit breaker is closed and push circuit breaker is half-open, finish function and no error are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.read.activate() + cb.read.cb.Close() + cb.push.activate() + cb.push.cb.HalfOpen() + }, + expectedCircuitBreakerError: false, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="success"} 1 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="open"} 0 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="open"} 0 + `, + }, + "if read circuit breaker is open and push circuit breaker is not active, no finish function and a circuitBreakerErrorOpen are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.read.activate() + cb.read.cb.Open() + cb.push.active.Store(false) + }, + expectedCircuitBreakerError: true, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="circuit_breaker_open"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="success"} 0 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="open"} 1 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="open"} 1 + `, + }, + "if read circuit breaker is open and push circuit breaker is closed, no finish function and a circuitBreakerErrorOpen are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.read.activate() + cb.read.cb.Open() + cb.push.activate() + cb.push.cb.Close() + }, + expectedCircuitBreakerError: true, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="circuit_breaker_open"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="success"} 0 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="open"} 1 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="open"} 1 + `, + }, + "if read circuit breaker is open and push circuit breaker is open, finish function and no error are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.read.activate() + cb.read.cb.Open() + cb.push.activate() + cb.push.cb.Open() + }, + expectedCircuitBreakerError: true, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="success"} 0 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="open"} 1 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="open"} 1 + `, + }, + "if read circuit breaker is open and push circuit breaker is half-open, finish function and no error are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.read.activate() + cb.read.cb.Open() + cb.push.activate() + cb.push.cb.HalfOpen() + }, + expectedCircuitBreakerError: true, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="circuit_breaker_open"} 1 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="success"} 0 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="open"} 1 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="open"} 1 + `, + }, + "if read circuit breaker is half-open and push circuit breaker is not active, finish function and no error are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.read.activate() + cb.read.cb.HalfOpen() + cb.push.active.Store(false) + }, + expectedCircuitBreakerError: false, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="success"} 1 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="closed"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="half-open"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="open"} 0 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="open"} 0 + `, + }, + "if read circuit breaker is half-open and push circuit breaker is closed, finish function and no error are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.read.activate() + cb.read.cb.HalfOpen() + cb.push.activate() + cb.push.cb.Close() + }, + expectedCircuitBreakerError: false, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="success"} 1 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="closed"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="half-open"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="open"} 0 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="open"} 0 + `, + }, + "if read circuit breaker is half-open and push circuit breaker is open, finish function and no error are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.read.activate() + cb.read.cb.HalfOpen() + cb.push.activate() + cb.push.cb.Open() + }, + expectedCircuitBreakerError: true, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="success"} 0 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="half-open"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="open"} 0 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="half-open"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="open"} 0 + `, + }, + "if read circuit breaker is half-open and push circuit breaker is half-open, finish function and no error are returned": { + circuitBreakerSetup: func(cb ingesterCircuitBreaker) { + cb.read.activate() + cb.read.cb.HalfOpen() + cb.push.activate() + cb.push.cb.HalfOpen() + }, + expectedCircuitBreakerError: false, + expectedMetrics: ` + # HELP cortex_ingester_circuit_breaker_results_total Results of executing requests via the circuit breaker. + # TYPE cortex_ingester_circuit_breaker_results_total counter + cortex_ingester_circuit_breaker_results_total{request_type="push",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="error"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="circuit_breaker_open"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="push",result="success"} 0 + cortex_ingester_circuit_breaker_results_total{request_type="read",result="success"} 1 + # HELP cortex_ingester_circuit_breaker_transitions_total Number of times the circuit breaker has entered a state. + # TYPE cortex_ingester_circuit_breaker_transitions_total counter + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="closed"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="half-open"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="half-open"} 1 + cortex_ingester_circuit_breaker_transitions_total{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_transitions_total{request_type="read",state="open"} 0 + # HELP cortex_ingester_circuit_breaker_current_state Boolean set to 1 whenever the circuit breaker is in a state corresponding to the label name. + # TYPE cortex_ingester_circuit_breaker_current_state gauge + cortex_ingester_circuit_breaker_current_state{request_type="push",state="closed"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="closed"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="half-open"} 1 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="half-open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="push",state="open"} 0 + cortex_ingester_circuit_breaker_current_state{request_type="read",state="open"} 0 + `, + }, + } + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + registry := prometheus.NewRegistry() + pushCfg := CircuitBreakerConfig{ + Enabled: true, + FailureThresholdPercentage: 20, + CooldownPeriod: 10 * time.Second, + RequestTimeout: circuitBreakerDefaultPushTimeout, + testModeEnabled: true, + } + readCfg := CircuitBreakerConfig{ + Enabled: true, + FailureThresholdPercentage: 20, + CooldownPeriod: 10 * time.Second, + RequestTimeout: circuitBreakerDefaultReadTimeout, + testModeEnabled: true, + } + pushAcquiredPermitCount := atomic.NewInt64(0) + pushRecordedSuccessCount := atomic.NewInt64(0) + pushRecordedFailureCount := atomic.NewInt64(0) + readAcquiredPermitCount := atomic.NewInt64(0) + readRecordedSuccessCount := atomic.NewInt64(0) + readRecordedFailureCount := atomic.NewInt64(0) + cb := newIngesterCircuitBreaker(pushCfg, readCfg, log.NewNopLogger(), registry) + cb.push.cb = &mockedCircuitBreaker{ + CircuitBreaker: cb.push.cb, + acquiredPermitCount: pushAcquiredPermitCount, + recordSuccessCount: pushRecordedSuccessCount, + recordFailureCount: pushRecordedFailureCount, + } + cb.read.cb = &mockedCircuitBreaker{ + CircuitBreaker: cb.read.cb, + acquiredPermitCount: readAcquiredPermitCount, + recordSuccessCount: readRecordedSuccessCount, + recordFailureCount: readRecordedFailureCount, + } + testCase.circuitBreakerSetup(cb) + finish, err := cb.tryAcquireReadPermit() + + if testCase.expectedCircuitBreakerError { + require.Nil(t, finish) + require.Error(t, err) + require.ErrorAs(t, err, &circuitBreakerOpenError{}) + assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(testCase.expectedMetrics), metricNames...)) + } else { + require.NoError(t, err) + require.NotNil(t, finish) + var expectedReadVal int64 + if cb.read.isActive() { + expectedReadVal = 1 + } + require.Equal(t, int64(0), pushAcquiredPermitCount.Load()) + require.Equal(t, int64(0), pushRecordedSuccessCount.Load()) + require.Equal(t, int64(0), pushRecordedFailureCount.Load()) + require.Equal(t, expectedReadVal, readAcquiredPermitCount.Load()) + require.Equal(t, int64(0), readRecordedSuccessCount.Load()) + require.Equal(t, int64(0), readRecordedFailureCount.Load()) + finish(0, err) + require.Equal(t, int64(0), pushAcquiredPermitCount.Load()) + require.Equal(t, int64(0), pushRecordedSuccessCount.Load()) + require.Equal(t, int64(0), pushRecordedFailureCount.Load()) + require.Equal(t, int64(0), readAcquiredPermitCount.Load()) + require.Equal(t, expectedReadVal, readRecordedSuccessCount.Load()) + require.Equal(t, int64(0), readRecordedFailureCount.Load()) + assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(testCase.expectedMetrics), metricNames...)) + } + }) + } +} diff --git a/pkg/ingester/errors.go b/pkg/ingester/errors.go index c0c84b2cffc..5010947fc55 100644 --- a/pkg/ingester/errors.go +++ b/pkg/ingester/errors.go @@ -494,15 +494,16 @@ func (e ingesterPushGrpcDisabledError) errorCause() mimirpb.ErrorCause { var _ ingesterError = ingesterPushGrpcDisabledError{} type circuitBreakerOpenError struct { + requestType string remainingDelay time.Duration } -func newCircuitBreakerOpenError(remainingDelay time.Duration) circuitBreakerOpenError { - return circuitBreakerOpenError{remainingDelay: remainingDelay} +func newCircuitBreakerOpenError(requestType string, remainingDelay time.Duration) circuitBreakerOpenError { + return circuitBreakerOpenError{requestType: requestType, remainingDelay: remainingDelay} } func (e circuitBreakerOpenError) Error() string { - return fmt.Sprintf("%s with remaining delay %s", circuitbreaker.ErrOpen.Error(), e.remainingDelay.String()) + return fmt.Sprintf("%s on %s request type with remaining delay %s", circuitbreaker.ErrOpen.Error(), e.requestType, e.remainingDelay.String()) } func (e circuitBreakerOpenError) errorCause() mimirpb.ErrorCause { diff --git a/pkg/ingester/errors_test.go b/pkg/ingester/errors_test.go index b56f565ee75..2226e91e038 100644 --- a/pkg/ingester/errors_test.go +++ b/pkg/ingester/errors_test.go @@ -248,8 +248,8 @@ func TestTooBusyError(t *testing.T) { func TestNewCircuitBreakerOpenError(t *testing.T) { remainingDelay := 1 * time.Second - expectedMsg := fmt.Sprintf("circuit breaker open with remaining delay %s", remainingDelay.String()) - err := newCircuitBreakerOpenError(remainingDelay) + expectedMsg := fmt.Sprintf("circuit breaker open on foo request type with remaining delay %s", remainingDelay.String()) + err := newCircuitBreakerOpenError("foo", remainingDelay) require.Error(t, err) require.EqualError(t, err, expectedMsg) checkIngesterError(t, err, mimirpb.CIRCUIT_BREAKER_OPEN, false) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index a88de4855eb..f01ea0c225f 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -211,6 +211,9 @@ type Config struct { UpdateIngesterOwnedSeries bool `yaml:"track_ingester_owned_series" category:"experimental"` OwnedSeriesUpdateInterval time.Duration `yaml:"owned_series_update_interval" category:"experimental"` + PushCircuitBreaker CircuitBreakerConfig `yaml:"push_circuit_breaker"` + ReadCircuitBreaker CircuitBreakerConfig `yaml:"read_circuit_breaker"` + PushGrpcMethodEnabled bool `yaml:"push_grpc_method_enabled" category:"experimental" doc:"hidden"` // This config is dynamically injected because defined outside the ingester config. @@ -218,8 +221,6 @@ type Config struct { // This config can be overridden in tests. limitMetricsUpdatePeriod time.Duration `yaml:"-"` - - CircuitBreakerConfig CircuitBreakerConfig `yaml:"circuit_breaker" category:"experimental"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -228,7 +229,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { cfg.IngesterPartitionRing.RegisterFlags(f) cfg.DefaultLimits.RegisterFlags(f) cfg.ActiveSeriesMetrics.RegisterFlags(f) - cfg.CircuitBreakerConfig.RegisterFlags(f) + cfg.PushCircuitBreaker.RegisterFlagsWithPrefix("ingester.push-circuit-breaker.", f, circuitBreakerDefaultPushTimeout) + cfg.ReadCircuitBreaker.RegisterFlagsWithPrefix("ingester.read-circuit-breaker.", f, circuitBreakerDefaultReadTimeout) f.DurationVar(&cfg.MetadataRetainPeriod, "ingester.metadata-retain-period", 10*time.Minute, "Period at which metadata we have not seen will remain in memory before being deleted.") f.DurationVar(&cfg.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-tenant ingestion rates.") @@ -356,7 +358,7 @@ type Ingester struct { ingestPartitionID int32 ingestPartitionLifecycler *ring.PartitionInstanceLifecycler - circuitBreaker *circuitBreaker + circuitBreaker ingesterCircuitBreaker } func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { @@ -402,7 +404,7 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, i.activeGroups = activeGroupsCleanupService // We create a circuit breaker, which will be activated on a successful completion of starting. - i.circuitBreaker = newCircuitBreaker(cfg.CircuitBreakerConfig, logger, registerer) + i.circuitBreaker = newIngesterCircuitBreaker(i.cfg.PushCircuitBreaker, i.cfg.ReadCircuitBreaker, logger, registerer) if registerer != nil { promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ @@ -975,10 +977,10 @@ type ctxKey int var pushReqCtxKey ctxKey = 1 type pushRequestState struct { - requestSize int64 - requestDuration time.Duration - acquiredCircuitBreakerPermit bool - pushErr error + requestSize int64 + requestDuration time.Duration + requestFinish func(time.Duration, error) + pushErr error } func getPushRequestState(ctx context.Context) *pushRequestState { @@ -1004,9 +1006,7 @@ func (i *Ingester) FinishPushRequest(ctx context.Context) { if st.requestSize > 0 { i.inflightPushRequestsBytes.Sub(st.requestSize) } - if st.acquiredCircuitBreakerPermit { - i.circuitBreaker.finishPushRequest(st.requestDuration, st.pushErr) - } + st.requestFinish(st.requestDuration, st.pushErr) } // This method can be called in two ways: 1. Ingester.PushWithCleanup, or 2. Ingester.StartPushRequest via gRPC server's method limiter. @@ -1027,18 +1027,17 @@ func (i *Ingester) startPushRequest(ctx context.Context, reqSize int64) (context return ctx, false, nil } - // We try to acquire a permit from the circuit breaker. + // We try to acquire a push permit from the circuit breaker. // If it is not possible, it is because the circuit breaker is open, and a circuitBreakerOpenError is returned. // If it is possible, a permit has to be released by recording either a success or a failure with the circuit // breaker. This is done by FinishPushRequest(). - acquiredCircuitBreakerPermit, err := i.circuitBreaker.tryAcquirePermit() + finish, err := i.circuitBreaker.tryAcquirePushPermit() if err != nil { return nil, false, err } - st := &pushRequestState{ - requestSize: reqSize, - acquiredCircuitBreakerPermit: acquiredCircuitBreakerPermit, + requestSize: reqSize, + requestFinish: finish, } ctx = context.WithValue(ctx, pushReqCtxKey, st) @@ -3791,15 +3790,13 @@ func (i *Ingester) ShutdownHandler(w http.ResponseWriter, _ *http.Request) { // function is returned. func (i *Ingester) startReadRequest() (func(error), error) { start := time.Now() - acquiredCircuitBreakerPermit, err := i.circuitBreaker.tryAcquirePermit() + finish, err := i.circuitBreaker.tryAcquireReadPermit() if err != nil { return nil, err } finishReadRequest := func(err error) { - if acquiredCircuitBreakerPermit { - i.circuitBreaker.finishReadRequest(time.Since(start), err) - } + finish(time.Since(start), err) } if err = i.checkAvailableForRead(); err != nil { diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index f4684ccabd8..a43331a5e53 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -54,6 +54,7 @@ import ( "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/filesystem" + "go.uber.org/atomic" "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -84,8 +85,7 @@ func TestIngester_StartPushRequest(t *testing.T) { const reqSize = 10 instanceLimits := &InstanceLimits{} pushReqState := &pushRequestState{ - requestSize: 10, - acquiredCircuitBreakerPermit: true, + requestSize: 10, } ctx := context.Background() ctxWithPushReqState := context.WithValue(ctx, pushReqCtxKey, pushReqState) @@ -105,7 +105,7 @@ func TestIngester_StartPushRequest(t *testing.T) { setupIngester := func(tc testCase) *failingIngester { cfg := defaultIngesterTestConfig(t) - cfg.CircuitBreakerConfig = CircuitBreakerConfig{ + cfg.PushCircuitBreaker = CircuitBreakerConfig{ Enabled: true, InitialDelay: tc.cbInitialDelay, CooldownPeriod: 10 * time.Second, @@ -128,9 +128,9 @@ func TestIngester_StartPushRequest(t *testing.T) { require.Equal(t, expectedState, failingIng.lifecycler.State()) if tc.cbOpen { - failingIng.circuitBreaker.cb.Open() + failingIng.circuitBreaker.push.cb.Open() } else { - failingIng.circuitBreaker.cb.Close() + failingIng.circuitBreaker.push.cb.Close() } if tc.instanceLimitReached { @@ -229,103 +229,105 @@ func TestIngester_StartPushRequest(t *testing.T) { } func TestIngester_StartReadRequest(t *testing.T) { - type failureCause int - const ( - NONE failureCause = iota - UNAVAILABLE - OVERLOADED - ) - type testCase struct { - failingCause failureCause - cbOpen bool - cbInitialDelay time.Duration - verifyErr func(error) - expectedAcquiredCircuitBreakerPermit bool + setup func(*failingIngester) + verifyErr func(error) + expectedAcquiredPermitCount int } - utilizationLimiter := &fakeUtilizationBasedLimiter{limitingReason: "cpu"} + var ( + acquiredPermitCount *atomic.Int64 + recordedSuccessCount *atomic.Int64 + recordedFailureCount *atomic.Int64 + utilizationLimiter = &fakeUtilizationBasedLimiter{limitingReason: "cpu"} + ) setupIngester := func(tc testCase) *failingIngester { cfg := defaultIngesterTestConfig(t) - cfg.CircuitBreakerConfig = CircuitBreakerConfig{ + cfg.ReadCircuitBreaker = CircuitBreakerConfig{ Enabled: true, - InitialDelay: tc.cbInitialDelay, CooldownPeriod: 10 * time.Second, + RequestTimeout: 30 * time.Second, } - var ( - failingCause error - expectedState services.State - additionalSetup func(cause *failingIngester) - ) - switch tc.failingCause { - case UNAVAILABLE: - expectedState = services.Terminated - failingCause = newUnavailableError(expectedState) - case OVERLOADED: - expectedState = services.Running - additionalSetup = func(failingIng *failingIngester) { - failingIng.utilizationBasedLimiter = utilizationLimiter - } - case NONE: - expectedState = services.Running - } - failingIng := setupFailingIngester(t, cfg, failingCause) + failingIng := newFailingIngester(t, cfg, nil, nil) failingIng.startWaitAndCheck(context.Background(), t) - require.Equal(t, expectedState, failingIng.lifecycler.State()) - if additionalSetup != nil { - additionalSetup(failingIng) - } + require.Equal(t, services.Running, failingIng.lifecycler.State()) - if tc.cbOpen { - failingIng.circuitBreaker.cb.Open() - } else { - failingIng.circuitBreaker.cb.Close() + acquiredPermitCount = atomic.NewInt64(0) + recordedSuccessCount = atomic.NewInt64(0) + recordedFailureCount = atomic.NewInt64(0) + + failingIng.circuitBreaker.read.cb = &mockedCircuitBreaker{ + acquiredPermitCount: acquiredPermitCount, + recordSuccessCount: recordedSuccessCount, + recordFailureCount: recordedFailureCount, + CircuitBreaker: failingIng.circuitBreaker.read.cb, } + failingIng.circuitBreaker.read.cb.Close() return failingIng } testCases := map[string]testCase{ - "fail if ingester is not available for read": { - failingCause: UNAVAILABLE, + "fail if ingester is not available for read, and do not acquire a permit": { + setup: func(failingIng *failingIngester) { + services.StopAndAwaitTerminated(context.Background(), failingIng) //nolint:errcheck + }, + expectedAcquiredPermitCount: 0, verifyErr: func(err error) { require.ErrorAs(t, err, &unavailableError{}) }, }, - "fail if ingester is overloaded": { - failingCause: OVERLOADED, + "fail if ingester is overloaded, and do not acquire a permit": { + setup: func(failingIng *failingIngester) { + failingIng.utilizationBasedLimiter = utilizationLimiter + }, + expectedAcquiredPermitCount: 0, verifyErr: func(err error) { require.ErrorIs(t, err, errTooBusy) }, }, - "fail if circuit breaker is open": { - failingCause: NONE, - cbOpen: true, + "fail if circuit breaker is open, and do not acquire a permit": { + setup: func(failingIng *failingIngester) { + failingIng.circuitBreaker.read.cb.Open() + }, + expectedAcquiredPermitCount: 0, verifyErr: func(err error) { require.ErrorAs(t, err, &circuitBreakerOpenError{}) }, }, - "do not fail if circuit breaker is not active": { - failingCause: NONE, - cbInitialDelay: 1 * time.Minute, - expectedAcquiredCircuitBreakerPermit: false, + "do not fail if circuit breaker is not active, and do not acquire a permit": { + setup: func(failingIng *failingIngester) { + failingIng.circuitBreaker.read.active.Store(false) + }, + expectedAcquiredPermitCount: 0, }, - "do not fail if everything is ok": { - failingCause: NONE, - expectedAcquiredCircuitBreakerPermit: true, + "do not fail if everything is ok, and acquire a permit": { + expectedAcquiredPermitCount: 1, }, } for testName, tc := range testCases { t.Run(testName, func(t *testing.T) { failingIng := setupIngester(tc) + if tc.setup != nil { + tc.setup(failingIng) + } defer services.StopAndAwaitTerminated(context.Background(), failingIng) //nolint:errcheck finish, err := failingIng.startReadRequest() + require.Equal(t, int64(tc.expectedAcquiredPermitCount), acquiredPermitCount.Load()) if err == nil { + require.Nil(t, tc.verifyErr) require.NotNil(t, finish) - require.NotNil(t, finish) + + // Calling finish must release a potentially acquired permit + // and in that case record a success, and no failures. + expectedSuccessCount := acquiredPermitCount.Load() + finish(err) + require.Equal(t, int64(0), acquiredPermitCount.Load()) + require.Equal(t, expectedSuccessCount, recordedSuccessCount.Load()) + require.Equal(t, int64(0), recordedFailureCount.Load()) } else { require.NotNil(t, tc.verifyErr) tc.verifyErr(err)