Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ARC, networking): improve request settings #19101

Merged
merged 12 commits into from
Nov 15, 2023
2 changes: 1 addition & 1 deletion src/components/validation/runner/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ fn build_output_edge() -> (OutputEdge, impl Into<BoxedSink>) {
// we don't want to waste time performing retries, especially when the test
// harness is shutting down.
output_sink.batch.timeout_secs = Some(0.1);
output_sink.request.retry_attempts = Some(0);
output_sink.request.retry_attempts = 0;

let output_edge = OutputEdge::from_address(output_listen_addr);

Expand Down
2 changes: 1 addition & 1 deletion src/components/validation/runner/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl Telemetry {
// disable retries, as we don't want to waste time performing retries,
// especially when the test harness is shutting down.
vector_sink.batch.timeout_secs = Some(0.1);
vector_sink.request.retry_attempts = Some(0);
vector_sink.request.retry_attempts = 0;

config_builder.add_source(INTERNAL_LOGS_KEY, internal_logs);
config_builder.add_source(INTERNAL_METRICS_KEY, internal_metrics);
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/appsignal/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl AppsignalConfig {
let service = AppsignalService::new(http_client, endpoint, push_api_key, compression);

let request_opts = self.request;
let request_settings = request_opts.unwrap_with(&TowerRequestConfig::default());
let request_settings = request_opts.into_settings();
let retry_logic = HttpStatusRetryLogic::new(|req: &AppsignalResponse| req.http_status);

let service = ServiceBuilder::new()
Expand Down
6 changes: 1 addition & 5 deletions src/sinks/aws_cloudwatch_logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::{
},
util::{
http::RequestConfig, BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings,
TowerRequestConfig,
},
Healthcheck, VectorSink,
},
Expand Down Expand Up @@ -211,10 +210,7 @@ impl CloudwatchLogsSinkConfig {
impl SinkConfig for CloudwatchLogsSinkConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let batcher_settings = self.batch.into_batcher_settings()?;
let request_settings = self
.request
.tower
.unwrap_with(&TowerRequestConfig::default());
let request_settings = self.request.tower.into_settings();
let client = self.create_client(cx.proxy()).await?;
let smithy_client = self.create_smithy_client(cx.proxy()).await?;
let svc = ServiceBuilder::new()
Expand Down
9 changes: 2 additions & 7 deletions src/sinks/aws_cloudwatch_logs/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ use crate::sinks::{
config::CloudwatchLogsSinkConfig, config::Retention, request, retry::CloudwatchRetryLogic,
sink::BatchCloudwatchRequest, CloudwatchKey,
},
util::{
retries::FibonacciRetryPolicy, EncodedLength, TowerRequestConfig, TowerRequestSettings,
},
util::{retries::FibonacciRetryPolicy, EncodedLength, TowerRequestSettings},
};

type Svc = Buffer<
Expand Down Expand Up @@ -138,10 +136,7 @@ impl CloudwatchLogsPartitionSvc {
// https://github.com/awslabs/aws-sdk-rust/issues/537
smithy_client: SmithyClient,
) -> Self {
let request_settings = config
.request
.tower
.unwrap_with(&TowerRequestConfig::default());
let request_settings = config.request.tower.into_settings();

Self {
config,
Expand Down
17 changes: 11 additions & 6 deletions src/sinks/aws_cloudwatch_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use crate::{
tls::TlsConfig,
};

use super::util::service::TowerRequestConfigDefaults;

#[derive(Clone, Copy, Debug, Default)]
pub struct CloudWatchMetricsDefaultBatchSettings;

Expand All @@ -45,6 +47,13 @@ impl SinkBatchSettings for CloudWatchMetricsDefaultBatchSettings {
const TIMEOUT_SECS: f64 = 1.0;
}

#[derive(Clone, Copy, Debug)]
pub struct CloudWatchMetricsTowerRequestConfigDefaults;

impl TowerRequestConfigDefaults for CloudWatchMetricsTowerRequestConfigDefaults {
const RATE_LIMIT_NUM: u64 = 150;
}

/// Configuration for the `aws_cloudwatch_metrics` sink.
#[configurable_component(sink(
"aws_cloudwatch_metrics",
Expand Down Expand Up @@ -79,7 +88,7 @@ pub struct CloudWatchMetricsSinkConfig {

#[configurable(derived)]
#[serde(default)]
pub request: TowerRequestConfig,
pub request: TowerRequestConfig<CloudWatchMetricsTowerRequestConfigDefaults>,

#[configurable(derived)]
pub tls: Option<TlsConfig>,
Expand Down Expand Up @@ -225,11 +234,7 @@ impl CloudWatchMetricsSvc {
) -> crate::Result<VectorSink> {
let default_namespace = config.default_namespace.clone();
let batch = config.batch.into_batch_settings()?;
let request_settings = config.request.unwrap_with(
&TowerRequestConfig::default()
.timeout_secs(30)
.rate_limit_num(150),
);
let request_settings = config.request.into_settings();

let service = CloudWatchMetricsSvc { client };
let buffer = PartitionBuffer::new(MetricsBuffer::new(batch.size));
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_kinesis/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ where
E: Send + 'static,
RT: RetryLogic<Response = KinesisResponse> + Default,
{
let request_limits = config.request.unwrap_with(&TowerRequestConfig::default());
let request_limits = config.request.into_settings();

let region = config.region.region();
let service = ServiceBuilder::new()
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/aws_kinesis/firehose/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ async fn firehose_put_records() {
encoding: JsonSerializerConfig::default().into(), // required for ES destination w/ localstack
compression: Compression::None,
request: TowerRequestConfig {
timeout_secs: Some(10),
retry_attempts: Some(0),
timeout_secs: 10,
retry_attempts: 0,
..Default::default()
},
tls: None,
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl S3SinkConfig {
// requests into in order to ship files to S3. We build this here in
// order to configure the client/service with retries, concurrency
// limits, rate limits, and whatever else the client should have.
let request_limits = self.request.unwrap_with(&Default::default());
let request_limits = self.request.into_settings();
let service = ServiceBuilder::new()
.settings(request_limits, S3RetryLogic)
.service(service);
Expand Down
4 changes: 1 addition & 3 deletions src/sinks/aws_s_s/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ where
}

async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let request = self
.request
.unwrap_with(&TowerRequestConfig::default().timeout_secs(30));
let request = self.request.into_settings();
let retry_logic: SSRetryLogic<E> = super::retry::SSRetryLogic::new();
let service = tower::ServiceBuilder::new()
.settings(request, retry_logic)
Expand Down
14 changes: 10 additions & 4 deletions src/sinks/azure_blob/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vector_lib::configurable::configurable_component;
use vector_lib::sensitive_string::SensitiveString;

use super::request_builder::AzureBlobRequestOptions;
use crate::sinks::util::service::TowerRequestConfigDefaults;
use crate::{
codecs::{Encoder, EncodingConfigWithFraming, SinkType},
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
Expand All @@ -24,6 +25,13 @@ use crate::{
Result,
};

#[derive(Clone, Copy, Debug)]
pub struct AzureBlobTowerRequestConfigDefaults;

impl TowerRequestConfigDefaults for AzureBlobTowerRequestConfigDefaults {
const RATE_LIMIT_NUM: u64 = 250;
}

/// Configuration for the `azure_blob` sink.
#[configurable_component(sink(
"azure_blob",
Expand Down Expand Up @@ -131,7 +139,7 @@ pub struct AzureBlobSinkConfig {

#[configurable(derived)]
#[serde(default)]
pub request: TowerRequestConfig,
pub request: TowerRequestConfig<AzureBlobTowerRequestConfigDefaults>,

#[configurable(derived)]
#[serde(
Expand Down Expand Up @@ -202,9 +210,7 @@ const DEFAULT_FILENAME_APPEND_UUID: bool = true;

impl AzureBlobSinkConfig {
pub fn build_processor(&self, client: Arc<ContainerClient>) -> crate::Result<VectorSink> {
let request_limits = self
.request
.unwrap_with(&TowerRequestConfig::default().rate_limit_num(250));
let request_limits = self.request.into_settings();
let service = ServiceBuilder::new()
.settings(request_limits, AzureBlobRetryLogic)
.service(AzureBlobService::new(client));
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/azure_monitor_logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl AzureMonitorLogsConfig {

let retry_logic =
HttpStatusRetryLogic::new(|res: &AzureMonitorLogsResponse| res.http_status);
let request_settings = self.request.unwrap_with(&Default::default());
let request_settings = self.request.into_settings();
let service = ServiceBuilder::new()
.settings(request_settings, retry_logic)
.service(service);
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/clickhouse/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl SinkConfig for ClickhouseConfig {
self.date_time_best_effort,
);

let request_limits = self.request.unwrap_with(&Default::default());
let request_limits = self.request.into_settings();
let service = ServiceBuilder::new()
.settings(request_limits, ClickhouseRetryLogic::default())
.service(service);
Expand Down
6 changes: 3 additions & 3 deletions src/sinks/clickhouse/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn insert_events() {
compression: Compression::None,
batch,
request: TowerRequestConfig {
retry_attempts: Some(1),
retry_attempts: 1,
..Default::default()
},
..Default::default()
Expand Down Expand Up @@ -100,7 +100,7 @@ async fn skip_unknown_fields() {
compression: Compression::None,
batch,
request: TowerRequestConfig {
retry_attempts: Some(1),
retry_attempts: 1,
..Default::default()
},
..Default::default()
Expand Down Expand Up @@ -146,7 +146,7 @@ async fn insert_events_unix_timestamps() {
encoding: Transformer::new(None, None, Some(TimestampFormat::Unix)).unwrap(),
batch,
request: TowerRequestConfig {
retry_attempts: Some(1),
retry_attempts: 1,
..Default::default()
},
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/databend/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl SinkConfig for DatabendConfig {
DatabendAPIClient::new(self.build_client(&cx)?, endpoint.clone(), auth.clone());
let healthcheck = select_one(health_client).boxed();

let request_settings = self.request.unwrap_with(&TowerRequestConfig::default());
let request_settings = self.request.into_settings();
let batch_settings = self.batch.into_batcher_settings()?;

let database = config.database;
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/datadog/events/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl DatadogEventsConfig {
);

let request_opts = self.request;
let request_settings = request_opts.unwrap_with(&TowerRequestConfig::default());
let request_settings = request_opts.into_settings();
let retry_logic = HttpStatusRetryLogic::new(|req: &DatadogEventsResponse| req.http_status);

let service = ServiceBuilder::new()
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/datadog/logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl DatadogLogsConfig {
dd_evp_origin: String,
) -> crate::Result<VectorSink> {
let default_api_key: Arc<str> = Arc::from(self.dd_common.default_api_key.inner());
let request_limits = self.request.tower.unwrap_with(&Default::default());
let request_limits = self.request.tower.into_settings();

// We forcefully cap the provided batch configuration to the size/log line limits imposed by
// the Datadog Logs API, but we still allow them to be lowered if need be.
Expand Down
7 changes: 1 addition & 6 deletions src/sinks/datadog/metrics/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ use crate::{
pub const MAXIMUM_PAYLOAD_COMPRESSED_SIZE: usize = 3_200_000;
pub const MAXIMUM_PAYLOAD_SIZE: usize = 62_914_560;

// TODO: revisit our concurrency and batching defaults
const DEFAULT_REQUEST_RETRY_ATTEMPTS: usize = 5;

#[derive(Clone, Copy, Debug, Default)]
pub struct DatadogMetricsDefaultBatchSettings;

Expand Down Expand Up @@ -232,9 +229,7 @@ impl DatadogMetricsConfig {
let batcher_settings = self.batch.into_batcher_settings()?;

// TODO: revisit our concurrency and batching defaults
let request_limits = self.request.unwrap_with(
&TowerRequestConfig::default().retry_attempts(DEFAULT_REQUEST_RETRY_ATTEMPTS),
);
let request_limits = self.request.into_settings();

let endpoint_configuration = self.generate_metrics_endpoint_configuration()?;
let service = ServiceBuilder::new()
Expand Down
9 changes: 1 addition & 8 deletions src/sinks/datadog/traces/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ pub const BATCH_DEFAULT_TIMEOUT_SECS: f64 = 10.0;

pub const PAYLOAD_LIMIT: usize = 3_200_000;

const DEFAULT_REQUEST_RETRY_ATTEMPTS: usize = 5;
const DEFAULT_REQUEST_RETRY_MAX_DURATION_SECS: u64 = 300;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opted to remove this and use the new global default value of 30 seconds.


#[derive(Clone, Copy, Debug, Default)]
pub struct DatadogTracesDefaultBatchSettings;

Expand Down Expand Up @@ -130,11 +127,7 @@ impl DatadogTracesConfig {

pub fn build_sink(&self, client: HttpClient) -> crate::Result<VectorSink> {
let default_api_key: Arc<str> = Arc::from(self.dd_common.default_api_key.inner());
let request_limits = self.request.unwrap_with(
&TowerRequestConfig::default()
.retry_attempts(DEFAULT_REQUEST_RETRY_ATTEMPTS)
.retry_max_duration_secs(DEFAULT_REQUEST_RETRY_MAX_DURATION_SECS),
);
let request_limits = self.request.into_settings();
let endpoints = self.generate_traces_endpoint_configuration()?;

let batcher_settings = self
Expand Down
7 changes: 2 additions & 5 deletions src/sinks/elasticsearch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
ElasticsearchAuthConfig, ElasticsearchCommonMode, ElasticsearchConfig, ParseError,
},
util::auth::Auth,
util::{http::RequestConfig, TowerRequestConfig, UriSerde},
util::{http::RequestConfig, UriSerde},
HealthcheckError,
},
tls::TlsSettings,
Expand Down Expand Up @@ -89,10 +89,7 @@ impl ElasticsearchCommon {

let mode = config.common_mode()?;

let tower_request = config
.request
.tower
.unwrap_with(&TowerRequestConfig::default());
let tower_request = config.request.tower.into_settings();

let mut query_params = config.query.clone().unwrap_or_default();
query_params.insert(
Expand Down
7 changes: 2 additions & 5 deletions src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
},
util::{
http::RequestConfig, service::HealthConfig, BatchConfig, Compression,
RealtimeSizeBasedDefaultBatchSettings, TowerRequestConfig,
RealtimeSizeBasedDefaultBatchSettings,
},
Healthcheck, VectorSink,
},
Expand Down Expand Up @@ -482,10 +482,7 @@ impl SinkConfig for ElasticsearchConfig {

let client = HttpClient::new(common.tls_settings.clone(), cx.proxy())?;

let request_limits = self
.request
.tower
.unwrap_with(&TowerRequestConfig::default());
let request_limits = self.request.tower.into_settings();

let health_config = self.endpoint_health.clone().unwrap_or_default();

Expand Down
14 changes: 9 additions & 5 deletions src/sinks/gcp/chronicle_unstructured.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use vector_lib::{
};
use vrl::value::Kind;

use crate::sinks::util::service::TowerRequestConfigDefaults;
use crate::{
codecs::{self, EncodingConfig},
config::{GenerateConfig, SinkConfig, SinkContext},
Expand Down Expand Up @@ -97,6 +98,12 @@ impl SinkBatchSettings for ChronicleUnstructuredDefaultBatchSettings {
const TIMEOUT_SECS: f64 = 15.0;
}

#[derive(Clone, Copy, Debug)]
pub struct ChronicleUnstructuredTowerRequestConfigDefaults;

impl TowerRequestConfigDefaults for ChronicleUnstructuredTowerRequestConfigDefaults {
const RATE_LIMIT_NUM: u64 = 1_000;
}
/// Configuration for the `gcp_chronicle_unstructured` sink.
#[configurable_component(sink(
"gcp_chronicle_unstructured",
Expand Down Expand Up @@ -132,7 +139,7 @@ pub struct ChronicleUnstructuredConfig {

#[configurable(derived)]
#[serde(default)]
pub request: TowerRequestConfig,
pub request: TowerRequestConfig<ChronicleUnstructuredTowerRequestConfigDefaults>,

#[configurable(derived)]
pub tls: Option<TlsConfig>,
Expand Down Expand Up @@ -235,10 +242,7 @@ impl ChronicleUnstructuredConfig {
) -> crate::Result<VectorSink> {
use crate::sinks::util::service::ServiceBuilderExt;

let request = self.request.unwrap_with(&TowerRequestConfig {
rate_limit_num: Some(1000),
..Default::default()
});
let request = self.request.into_settings();

let batch_settings = self.batch.into_batcher_settings()?;

Expand Down
Loading
Loading