diff --git a/Cargo.lock b/Cargo.lock index d805f2974e..3a7190b189 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -740,9 +740,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "aws-config" -version = "1.1.9" +version = "1.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "297b64446175a73987cedc3c438d79b2a654d0fff96f65ff530fbe039347644c" +checksum = "caf6cfe2881cb1fcbba9ae946fb9a6480d3b7a714ca84c74925014a89ef3387a" dependencies = [ "aws-credential-types", "aws-runtime", @@ -771,9 +771,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.1.8" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa8587ae17c8e967e4b05a62d495be2fb7701bec52a97f7acfe8a29f938384c8" +checksum = "e16838e6c9e12125face1c1eff1343c75e3ff540de98ff7ebd61874a89bcfeb9" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -781,39 +781,11 @@ dependencies = [ "zeroize", ] -[[package]] -name = "aws-lc-rs" -version = "1.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf7d844e282b4b56750b2d4e893b2205581ded8709fddd2b6aa5418c150ca877" -dependencies = [ - "aws-lc-sys", - "mirai-annotations", - "paste", - "untrusted 0.7.1", - "zeroize", -] - -[[package]] -name = "aws-lc-sys" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3a2c29203f6bf296d01141cc8bb9dbd5ecd4c27843f2ee0767bcd5985a927da" -dependencies = [ - "bindgen", - "cc", - "cmake", - "dunce", - "fs_extra", - "libc", - "paste", -] - [[package]] name = "aws-runtime" -version = "1.1.8" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b13dc54b4b49f8288532334bba8f87386a40571c47c37b1304979b556dc613c8" +checksum = "87c5f920ffd1e0526ec9e70e50bf444db50b204395a0fa7016bbf9e31ea1698f" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -835,9 +807,9 @@ dependencies = [ [[package]] name = "aws-sdk-lambda" -version = "1.19.0" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5059a0d0e5ac4465bd7455652b3881e25e2260050301ec89a34b736ad0276864" +checksum = "9971a1def5081ef7f0278d51122dadc0031fdf9d74704abb38e0ffbc0bcd338f" dependencies = [ "aws-credential-types", "aws-runtime", @@ -858,9 +830,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.18.0" +version = "1.35.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "019a07902c43b03167ea5df0182f0cb63fae89f9a9682c44d18cf2e4a042cb34" +checksum = "fc3ef4ee9cdd19ec6e8b10d963b79637844bbf41c31177b77a188eaa941e69f7" dependencies = [ "aws-credential-types", "aws-runtime", @@ -880,9 +852,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.18.0" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04c46ee08a48a7f4eaa4ad201dcc1dd537b49c50859d14d4510e00ad9d3f9af2" +checksum = "527f3da450ea1f09f95155dba6153bd0d83fe0923344a12e1944dfa5d0b32064" dependencies = [ "aws-credential-types", "aws-runtime", @@ -902,9 +874,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.18.0" +version = "1.35.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f752ac730125ca6017f72f9db5ec1772c9ecc664f87aa7507a7d81b023c23713" +checksum = "94316606a4aa2cb7a302388411b8776b3fbd254e8506e2dc43918286d8212e9b" dependencies = [ "aws-credential-types", "aws-runtime", @@ -925,9 +897,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.0" +version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11d6f29688a4be9895c0ba8bef861ad0c0dac5c15e9618b9b7a6c233990fc263" +checksum = "5df1b0fa6be58efe9d4ccc257df0a53b89cd8909e86591a13ca54817c87517be" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -1021,17 +993,13 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", - "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "http-body 1.0.1", "httparse", - "hyper 0.14.30", - "hyper-rustls 0.24.1", "once_cell", "pin-project-lite", "pin-utils", - "rustls 0.21.11", "tokio", "tracing", ] @@ -1081,24 +1049,23 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.7" +version = "0.60.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "872c68cf019c0e4afc5de7753c4f7288ce4b71663212771bf5e4542eb9346ca9" +checksum = "d123fbc2a4adc3c301652ba8e149bf4bc1d1725affb9784eb20c953ace06bf55" dependencies = [ "xmlparser", ] [[package]] name = "aws-types" -version = "1.1.8" +version = "1.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0dbf2f3da841a8930f159163175cf6a3d16ddde517c1b0fba7aa776822800f40" +checksum = "5221b91b3e441e6675310829fd8984801b772cb1546ef6c0e54dec9f1ac13fef" dependencies = [ "aws-credential-types", "aws-smithy-async", "aws-smithy-runtime-api", "aws-smithy-types", - "http 0.2.12", "rustc_version", "tracing", ] @@ -1265,15 +1232,12 @@ dependencies = [ "itertools 0.12.1", "lazy_static", "lazycell", - "log", - "prettyplease", "proc-macro2", "quote", "regex", "rustc-hash", "shlex", "syn 2.0.65", - "which", ] [[package]] @@ -2466,12 +2430,6 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" -[[package]] -name = "dunce" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56ce8c6da7551ec6c462cbaf3bfbc75131ebbfa1c944aeaa9dab51ca1c5f0c3b" - [[package]] name = "dyn-clone" version = "1.0.17" @@ -2718,12 +2676,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fs_extra" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" - [[package]] name = "fsevent-sys" version = "4.1.0" @@ -3926,12 +3878,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "mirai-annotations" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1" - [[package]] name = "mlua" version = "0.8.10" @@ -4614,7 +4560,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22fb7a8b4570b74080587c5f3e187553375d18e72a38c72ca7f70a065972c65d" dependencies = [ "async-trait", - "aws-lc-rs", "bytes", "chrono", "derive-new", @@ -4623,6 +4568,7 @@ dependencies = [ "md5", "postgres-types", "rand", + "ring", "thiserror", "tokio", "tokio-rustls 0.26.0", @@ -6003,7 +5949,10 @@ dependencies = [ "aws-credential-types", "aws-sdk-lambda", "aws-sdk-sts", + "aws-smithy-async", "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", "base64 0.22.0", "bs58", "bytes", @@ -6015,17 +5964,15 @@ dependencies = [ "http-body-util", "http-serde 2.1.1", "humantime", - "hyper 0.14.30", "hyper 1.4.1", - "hyper-rustls 0.24.1", "hyper-rustls 0.27.2", "hyper-util", "jsonwebtoken", "once_cell", "pem", + "pin-project-lite", "restate-types", "ring", - "rustls 0.21.11", "rustls 0.23.11", "schemars", "serde", @@ -6033,6 +5980,7 @@ dependencies = [ "serde_with", "tempfile", "thiserror", + "tower", "tower-service", "tracing", ] @@ -6414,7 +6362,7 @@ dependencies = [ "getrandom", "libc", "spin", - "untrusted 0.9.0", + "untrusted", "windows-sys 0.52.0", ] @@ -6490,9 +6438,9 @@ version = "0.23.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4828ea528154ae444e5a642dbb7d5623354030dc9822b83fd9bb79683c7399d0" dependencies = [ - "aws-lc-rs", "log", "once_cell", + "ring", "rustls-pki-types", "rustls-webpki 0.102.5", "subtle", @@ -6556,7 +6504,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" dependencies = [ "ring", - "untrusted 0.9.0", + "untrusted", ] [[package]] @@ -6565,10 +6513,9 @@ version = "0.102.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9a6fccd794a42c2c105b513a2f62bc3fd8f3ba57a4593677ceb0bd035164d78" dependencies = [ - "aws-lc-rs", "ring", "rustls-pki-types", - "untrusted 0.9.0", + "untrusted", ] [[package]] @@ -6641,7 +6588,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" dependencies = [ "ring", - "untrusted 0.9.0", + "untrusted", ] [[package]] @@ -7930,12 +7877,6 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" -[[package]] -name = "untrusted" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" - [[package]] name = "untrusted" version = "0.9.0" @@ -8437,20 +8378,6 @@ name = "zeroize" version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" -dependencies = [ - "zeroize_derive", -] - -[[package]] -name = "zeroize_derive" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.65", -] [[package]] name = "zip" diff --git a/crates/service-client/Cargo.toml b/crates/service-client/Cargo.toml index 31962f69cb..783dfa47c8 100644 --- a/crates/service-client/Cargo.toml +++ b/crates/service-client/Cargo.toml @@ -13,11 +13,10 @@ options_schema = ["dep:schemars", "restate-types/schemars"] [dependencies] arc-swap = { workspace = true } -aws-config = { version = "1.1.9", features = ["sso"] } -aws-credential-types = "1.1.8" -aws-sdk-lambda = "1.19.0" -aws-sdk-sts = "1.18.0" -aws-smithy-runtime = "1.1.8" +aws-config = { version = "1.5.4", default-features = false, features = ["rt-tokio", "sso"] } +aws-credential-types = {version = "1.2.0", default-features = false} +aws-sdk-lambda = {version = "1.36.0", default-features = false, features = ["rt-tokio"]} +aws-sdk-sts = {version = "1.35.0", default-features = false, features = ["rt-tokio"]} base64 = { workspace = true } bytes = { workspace = true } bytestring = { workspace = true } @@ -29,12 +28,8 @@ http = "1.1.0" http-serde = "2.0.0" http-body-util = "0.1.2" hyper = { version = "1.4.1", features = ["http1", "http2", "client"] } -# TODO Remove once bumped lambda -hyper-0-14 = { package = "hyper", version = "0.14.30" } hyper-util = { version = "0.1.6", features = ["client-legacy"] } -hyper-rustls = { version = "0.27.2", features = ["http2"] } -# TODO Remove once bumped lambda -hyper-rustls-0-24 = { package = "hyper-rustls", version = "=0.24.1", features = ["http2"] } +hyper-rustls = { version = "0.27.2", default-features = false, features = ["http1", "http2", "ring", "native-tokio", "tls12", "logging"] } humantime = { workspace = true } jsonwebtoken = { version = "9.1.0" } once_cell = { workspace = true } @@ -42,15 +37,21 @@ pem = { version = "3.0.3" } tower-service = { version = "0.3" } ring = { version = "0.17.8" } restate-types = { workspace = true } -rustls = "0.23.11" -# TODO Remove once bumped lambda -rustls_0_21 = { version = "0.21.11", package = "rustls" } +rustls = {version = "0.23.11", default-features = false, features = ["ring"]} schemars = { workspace = true, optional = true } serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } thiserror = { workspace = true } +tower = { workspace = true } tracing = { workspace = true } +# dependencies for aws_hyper_client.rs until the aws hyper 1.0 connector supports setting nodelay... +aws-smithy-async = {version = "1.2.1", default-features = false} +aws-smithy-runtime = {version = "1.6.2", default-features = false} +aws-smithy-runtime-api = {version = "1.7.1", default-features = false} +aws-smithy-types = { version = "1.2.0", default-features = false} +pin-project-lite = "0.2.13" + [dev-dependencies] tempfile = { workspace = true } diff --git a/crates/service-client/src/aws_hyper_client.rs b/crates/service-client/src/aws_hyper_client.rs new file mode 100644 index 0000000000..e5160865bb --- /dev/null +++ b/crates/service-client/src/aws_hyper_client.rs @@ -0,0 +1,754 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Copied from https://github.com/smithy-lang/smithy-rs/blob/release-2024-07-16/rust-runtime/aws-smithy-experimental/src/hyper_1_0.rs +//! necessary to set nodelay(true) on http connector, which has a significant Lambda speedup +//! https://github.com/smithy-lang/smithy-rs/issues/3769 is the issue which will enable removing +//! this file. +//! License Apache-2.0 + +use aws_smithy_async::future::timeout::TimedOutError; +use aws_smithy_async::rt::sleep::{default_async_sleep, SharedAsyncSleep}; +use aws_smithy_runtime_api::box_error::BoxError; +use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata; +use aws_smithy_runtime_api::client::http::{ + HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient, + SharedHttpConnector, +}; +use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse}; +use aws_smithy_runtime_api::client::result::ConnectorError; +use aws_smithy_runtime_api::client::runtime_components::{ + RuntimeComponents, RuntimeComponentsBuilder, +}; +use aws_smithy_types::body::SdkBody; +use aws_smithy_types::config_bag::ConfigBag; +use aws_smithy_types::error::display::DisplayErrorContext; +use aws_smithy_types::retry::ErrorKind; +use client::connect::Connection; +use h2::Reason; +use http::Uri; +use hyper::rt::{Read, Write}; +use hyper_util::client::legacy as client; +use hyper_util::client::legacy::connect::Connect; +use hyper_util::rt::TokioExecutor; +use rustls::crypto::CryptoProvider; +use std::borrow::Cow; +use std::collections::HashMap; +use std::error::Error; +use std::fmt; +use std::sync::RwLock; +use std::time::Duration; + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +#[non_exhaustive] +pub enum CryptoMode { + Ring, +} + +impl CryptoMode { + fn provider(self) -> CryptoProvider { + match self { + CryptoMode::Ring => rustls::crypto::ring::default_provider(), + } + } +} + +#[allow(unused_imports)] +mod cached_connectors { + use client::connect::HttpConnector; + use hyper_util::client::legacy as client; + use hyper_util::client::legacy::connect::dns::GaiResolver; + + use crate::aws_hyper_client::build_connector::make_tls; + use crate::aws_hyper_client::{CryptoMode, Inner}; + + pub(crate) static HTTPS_NATIVE_ROOTS_RING: once_cell::sync::Lazy< + hyper_rustls::HttpsConnector, + > = once_cell::sync::Lazy::new(|| make_tls(GaiResolver::new(), CryptoMode::Ring.provider())); + + pub(super) fn cached_https(mode: Inner) -> hyper_rustls::HttpsConnector { + match mode { + Inner::Standard(CryptoMode::Ring) => HTTPS_NATIVE_ROOTS_RING.clone(), + } + } +} + +mod build_connector { + use client::connect::HttpConnector; + use hyper_util::client::legacy as client; + use rustls::crypto::CryptoProvider; + use std::sync::Arc; + + fn restrict_ciphers(base: CryptoProvider) -> CryptoProvider { + let suites = &[ + rustls::CipherSuite::TLS13_AES_256_GCM_SHA384, + rustls::CipherSuite::TLS13_AES_128_GCM_SHA256, + // TLS1.2 suites + rustls::CipherSuite::TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + rustls::CipherSuite::TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + rustls::CipherSuite::TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + rustls::CipherSuite::TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + rustls::CipherSuite::TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256, + ]; + let supported_suites = suites + .iter() + .flat_map(|suite| { + base.cipher_suites + .iter() + .find(|s| &s.suite() == suite) + .cloned() + }) + .collect::>(); + CryptoProvider { + cipher_suites: supported_suites, + ..base + } + } + + pub(crate) fn make_tls( + resolver: R, + crypto_provider: CryptoProvider, + ) -> hyper_rustls::HttpsConnector> { + use hyper_rustls::ConfigBuilderExt; + let mut base_connector = HttpConnector::new_with_resolver(resolver); + base_connector.enforce_http(false); + // RESTATE ADDITION + base_connector.set_nodelay(true); + // END RESTATE ADDITION + hyper_rustls::HttpsConnectorBuilder::new() + .with_tls_config( + rustls::ClientConfig::builder_with_provider(Arc::new(restrict_ciphers(crypto_provider))) + .with_safe_default_protocol_versions() + .expect("Error with the TLS configuration. Please file a bug report under https://github.com/smithy-lang/smithy-rs/issues.") + .with_native_roots().expect("error with TLS configuration.") + .with_no_client_auth() + ) + .https_or_http() + .enable_http1() + .enable_http2() + .wrap_connector(base_connector) + } +} + +/// [`HttpConnector`] that uses [`hyper`] to make HTTP requests. +/// +/// This connector also implements socket connect and read timeouts. +/// +/// This shouldn't be used directly in most cases. +/// See the docs on [`HyperClientBuilder`] for examples of how +/// to customize the Hyper client. +#[derive(Debug)] +pub struct HyperConnector { + adapter: Box, +} + +impl HyperConnector { + /// Builder for a Hyper connector. + pub fn builder() -> HyperConnectorBuilder { + Default::default() + } +} + +impl HttpConnector for HyperConnector { + fn call(&self, request: HttpRequest) -> HttpConnectorFuture { + self.adapter.call(request) + } +} + +/// Builder for [`HyperConnector`]. +#[derive(Default, Debug)] +pub struct HyperConnectorBuilder { + connector_settings: Option, + sleep_impl: Option, + client_builder: Option, + #[allow(unused)] + crypto: Crypto, +} + +#[derive(Default)] +#[non_exhaustive] +pub struct CryptoUnset {} + +pub struct CryptoProviderSelected { + crypto_provider: Inner, +} + +#[derive(Clone)] +enum Inner { + Standard(CryptoMode), +} + +impl HyperConnectorBuilder { + /// Create a [`HyperConnector`] from this builder and a given connector. + pub(crate) fn build(self, tcp_connector: C) -> HyperConnector + where + C: Send + Sync + 'static, + C: Clone, + C: tower::Service, + C::Response: Read + Write + Connection + Send + Sync + Unpin, + C: Connect, + C::Future: Unpin + Send + 'static, + C::Error: Into, + { + let client_builder = + self.client_builder + .unwrap_or(hyper_util::client::legacy::Builder::new( + TokioExecutor::new(), + )); + let sleep_impl = self.sleep_impl.or_else(default_async_sleep); + let (connect_timeout, read_timeout) = self + .connector_settings + .map(|c| (c.connect_timeout(), c.read_timeout())) + .unwrap_or((None, None)); + + let connector = match connect_timeout { + Some(duration) => timeout_middleware::ConnectTimeout::new( + tcp_connector, + sleep_impl + .clone() + .expect("a sleep impl must be provided in order to have a connect timeout"), + duration, + ), + None => timeout_middleware::ConnectTimeout::no_timeout(tcp_connector), + }; + let base = client_builder.build(connector); + let read_timeout = match read_timeout { + Some(duration) => timeout_middleware::HttpReadTimeout::new( + base, + sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"), + duration, + ), + None => timeout_middleware::HttpReadTimeout::no_timeout(base), + }; + HyperConnector { + adapter: Box::new(Adapter { + client: read_timeout, + }), + } + } + + /// Set the async sleep implementation used for timeouts + /// + /// Calling this is only necessary for testing or to use something other than + /// [`default_async_sleep`]. + pub fn set_sleep_impl(&mut self, sleep_impl: Option) -> &mut Self { + self.sleep_impl = sleep_impl; + self + } + + /// Configure the HTTP settings for the `HyperAdapter` + pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self { + self.connector_settings = Some(connector_settings); + self + } + + /// Override the Hyper client [`Builder`](hyper_util::client::legacy::Builder) used to construct this client. + /// + /// This enables changing settings like forcing HTTP2 and modifying other default client behavior. + pub(crate) fn hyper_builder( + mut self, + hyper_builder: hyper_util::client::legacy::Builder, + ) -> Self { + self.set_hyper_builder(Some(hyper_builder)); + self + } + + /// Override the Hyper client [`Builder`](hyper_util::client::legacy::Builder) used to construct this client. + /// + /// This enables changing settings like forcing HTTP2 and modifying other default client behavior. + pub(crate) fn set_hyper_builder( + &mut self, + hyper_builder: Option, + ) -> &mut Self { + self.client_builder = hyper_builder; + self + } +} + +/// Adapter to use a Hyper 1.0-based Client as an `HttpConnector` +/// +/// This adapter also enables TCP `CONNECT` and HTTP `READ` timeouts via [`HyperConnector::builder`]. +struct Adapter { + client: timeout_middleware::HttpReadTimeout< + hyper_util::client::legacy::Client, SdkBody>, + >, +} + +impl fmt::Debug for Adapter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Adapter") + .field("client", &"** hyper client **") + .finish() + } +} + +impl HttpConnector for Adapter +where + C: Clone + Send + Sync + 'static, + C: tower::Service, + C::Response: Connection + Read + Write + Unpin + 'static, + timeout_middleware::ConnectTimeout: Connect, + C::Future: Unpin + Send + 'static, + C::Error: Into, +{ + fn call(&self, request: HttpRequest) -> HttpConnectorFuture { + let request = match request.try_into_http1x() { + Ok(request) => request, + Err(err) => { + return HttpConnectorFuture::ready(Err(ConnectorError::user(err.into()))); + } + }; + /*let capture_connection = capture_connection(&mut request); + if let Some(capture_smithy_connection) = + request.extensions().get::() + { + capture_smithy_connection + .set_connection_retriever(move || extract_smithy_connection(&capture_connection)); + }*/ + let mut client = self.client.clone(); + use tower::Service; + let fut = client.call(request); + HttpConnectorFuture::new(async move { + let response = fut + .await + .map_err(downcast_error)? + .map(SdkBody::from_body_1_x); + match HttpResponse::try_from(response) { + Ok(response) => Ok(response), + Err(err) => Err(ConnectorError::other(err.into(), None)), + } + }) + } +} + +/// Downcast errors coming out of hyper into an appropriate `ConnectorError` +fn downcast_error(err: BoxError) -> ConnectorError { + // is a `TimedOutError` (from aws_smithy_async::timeout) in the chain? if it is, this is a timeout + if find_source::(err.as_ref()).is_some() { + return ConnectorError::timeout(err); + } + // is the top of chain error actually already a `ConnectorError`? return that directly + let err = match err.downcast::() { + Ok(connector_error) => return *connector_error, + Err(box_error) => box_error, + }; + // generally, the top of chain will probably be a hyper error. Go through a set of hyper specific + // error classifications + let err = match find_source::(err.as_ref()) { + Some(hyper_error) => return to_connector_error(hyper_error)(err), + None => err, + }; + + // otherwise, we have no idea! + ConnectorError::other(err, None) +} + +/// Convert a [`hyper::Error`] into a [`ConnectorError`] +fn to_connector_error(err: &hyper::Error) -> fn(BoxError) -> ConnectorError { + if err.is_timeout() || find_source::(err).is_some() { + return ConnectorError::timeout; + } + if err.is_user() { + return ConnectorError::user; + } + if err.is_closed() || err.is_canceled() || find_source::(err).is_some() { + return ConnectorError::io; + } + // We sometimes receive this from S3: hyper::Error(IncompleteMessage) + if err.is_incomplete_message() { + return |err: BoxError| ConnectorError::other(err, Some(ErrorKind::TransientError)); + } + + if let Some(h2_err) = find_source::(err) { + if h2_err.is_go_away() + || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM)) + { + return ConnectorError::io; + } + } + + tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue."); + |err: BoxError| ConnectorError::other(err, None) +} + +fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> { + let mut next = Some(err); + while let Some(err) = next { + if let Some(matching_err) = err.downcast_ref::() { + return Some(matching_err); + } + next = err.source(); + } + None +} + +// TODO(https://github.com/awslabs/aws-sdk-rust/issues/1090): CacheKey must also include ptr equality to any +// runtime components that are used—sleep_impl as a base (unless we prohibit overriding sleep impl) +// If we decide to put a DnsResolver in RuntimeComponents, then we'll need to handle that as well. +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +struct CacheKey { + connect_timeout: Option, + read_timeout: Option, +} + +impl From<&HttpConnectorSettings> for CacheKey { + fn from(value: &HttpConnectorSettings) -> Self { + Self { + connect_timeout: value.connect_timeout(), + read_timeout: value.read_timeout(), + } + } +} + +struct HyperClient { + connector_cache: RwLock>, + client_builder: hyper_util::client::legacy::Builder, + tcp_connector_fn: F, +} + +impl fmt::Debug for HyperClient { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("HyperClient") + .field("connector_cache", &self.connector_cache) + .field("client_builder", &self.client_builder) + .finish() + } +} + +impl HttpClient for HyperClient +where + F: Fn() -> C + Send + Sync, + C: Clone + Send + Sync + 'static, + C: tower::Service, + C::Response: Connection + Read + Write + Send + Sync + Unpin + 'static, + C::Future: Unpin + Send + 'static, + C::Error: Into, +{ + fn http_connector( + &self, + settings: &HttpConnectorSettings, + components: &RuntimeComponents, + ) -> SharedHttpConnector { + let key = CacheKey::from(settings); + let mut connector = self.connector_cache.read().unwrap().get(&key).cloned(); + if connector.is_none() { + let mut cache = self.connector_cache.write().unwrap(); + // Short-circuit if another thread already wrote a connector to the cache for this key + if !cache.contains_key(&key) { + let mut builder = HyperConnector::builder() + .hyper_builder(self.client_builder.clone()) + .connector_settings(settings.clone()); + builder.set_sleep_impl(components.sleep_impl()); + + let start = components.time_source().map(|ts| ts.now()); + let tcp_connector = (self.tcp_connector_fn)(); + let end = components.time_source().map(|ts| ts.now()); + if let (Some(start), Some(end)) = (start, end) { + if let Ok(elapsed) = end.duration_since(start) { + tracing::debug!("new TCP connector created in {:?}", elapsed); + } + } + let connector = SharedHttpConnector::new(builder.build(tcp_connector)); + cache.insert(key.clone(), connector); + } + connector = cache.get(&key).cloned(); + } + + connector.expect("cache populated above") + } + + fn validate_base_client_config( + &self, + _: &RuntimeComponentsBuilder, + _: &ConfigBag, + ) -> Result<(), BoxError> { + // Initialize the TCP connector at this point so that native certs load + // at client initialization time instead of upon first request. We do it + // here rather than at construction so that it won't run if this is not + // the selected HTTP client for the base config (for example, if this was + // the default HTTP client, and it was overridden by a later plugin). + let _ = (self.tcp_connector_fn)(); + Ok(()) + } + + fn connector_metadata(&self) -> Option { + Some(ConnectorMetadata::new("hyper", Some(Cow::Borrowed("1.x")))) + } +} + +/// Builder for a hyper-backed [`HttpClient`] implementation. +/// +/// This builder can be used to customize the underlying TCP connector used, as well as +/// hyper client configuration. +/// +/// # Examples +/// +/// Construct a Hyper client with the RusTLS TLS implementation. +/// This can be useful when you want to share a Hyper connector between multiple +/// generated Smithy clients. +#[derive(Clone, Default, Debug)] +pub struct HyperClientBuilder { + client_builder: Option, + crypto_provider: Crypto, +} + +impl HyperClientBuilder { + /// Create a hyper client using RusTLS for TLS + /// + /// The trusted certificates will be loaded later when this becomes the selected + /// HTTP client for a Smithy client. + pub fn build_https(self) -> SharedHttpClient { + let crypto = self.crypto_provider.crypto_provider; + build_with_fn(self.client_builder, move || { + cached_connectors::cached_https(crypto.clone()) + }) + } +} + +impl HyperClientBuilder { + /// Creates a new builder. + pub fn new() -> Self { + Self::default() + } + + pub fn crypto_mode(self, provider: CryptoMode) -> HyperClientBuilder { + HyperClientBuilder { + client_builder: self.client_builder, + crypto_provider: CryptoProviderSelected { + crypto_provider: Inner::Standard(provider), + }, + } + } +} + +fn build_with_fn( + client_builder: Option, + tcp_connector_fn: F, +) -> SharedHttpClient +where + F: Fn() -> C + Send + Sync + 'static, + C: Clone + Send + Sync + 'static, + C: tower::Service, + C::Response: Connection + Read + Write + Send + Sync + Unpin + 'static, + C::Future: Unpin + Send + 'static, + C::Error: Into, + C: Connect, +{ + SharedHttpClient::new(HyperClient { + connector_cache: RwLock::new(HashMap::new()), + client_builder: client_builder + .unwrap_or_else(|| hyper_util::client::legacy::Builder::new(TokioExecutor::new())), + tcp_connector_fn, + }) +} + +mod timeout_middleware { + use std::error::Error; + use std::fmt::Formatter; + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + use std::time::Duration; + + use http::Uri; + use pin_project_lite::pin_project; + + use aws_smithy_async::future::timeout::{TimedOutError, Timeout}; + use aws_smithy_async::rt::sleep::Sleep; + use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep}; + use aws_smithy_runtime_api::box_error::BoxError; + + #[derive(Debug)] + pub(crate) struct HttpTimeoutError { + kind: &'static str, + duration: Duration, + } + + impl std::fmt::Display for HttpTimeoutError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{} timeout occurred after {:?}", + self.kind, self.duration + ) + } + } + + impl Error for HttpTimeoutError { + // We implement the `source` function as returning a `TimedOutError` because when `downcast_error` + // or `find_source` is called with an `HttpTimeoutError` (or another error wrapping an `HttpTimeoutError`) + // this method will be checked to determine if it's a timeout-related error. + fn source(&self) -> Option<&(dyn Error + 'static)> { + Some(&TimedOutError) + } + } + + /// Timeout wrapper that will timeout on the initial TCP connection + /// + /// # Stability + /// This interface is unstable. + #[derive(Clone, Debug)] + pub(super) struct ConnectTimeout { + inner: I, + timeout: Option<(SharedAsyncSleep, Duration)>, + } + + impl ConnectTimeout { + /// Create a new `ConnectTimeout` around `inner`. + /// + /// Typically, `I` will implement [`hyper_util::client::legacy::connect::Connect`]. + pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self { + Self { + inner, + timeout: Some((sleep, timeout)), + } + } + + pub(crate) fn no_timeout(inner: I) -> Self { + Self { + inner, + timeout: None, + } + } + } + + #[derive(Clone, Debug)] + pub(crate) struct HttpReadTimeout { + inner: I, + timeout: Option<(SharedAsyncSleep, Duration)>, + } + + impl HttpReadTimeout { + /// Create a new `HttpReadTimeout` around `inner`. + /// + /// Typically, `I` will implement [`tower::Service>`]. + pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self { + Self { + inner, + timeout: Some((sleep, timeout)), + } + } + + pub(crate) fn no_timeout(inner: I) -> Self { + Self { + inner, + timeout: None, + } + } + } + + pin_project! { + /// Timeout future for Tower services + /// + /// Timeout future to handle timing out, mapping errors, and the possibility of not timing out + /// without incurring an additional allocation for each timeout layer. + #[project = MaybeTimeoutFutureProj] + pub enum MaybeTimeoutFuture { + Timeout { + #[pin] + timeout: Timeout, + error_type: &'static str, + duration: Duration, + }, + NoTimeout { + #[pin] + future: F + } + } + } + + impl Future for MaybeTimeoutFuture + where + F: Future>, + E: Into, + { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let (timeout_future, kind, &mut duration) = match self.project() { + MaybeTimeoutFutureProj::NoTimeout { future } => { + return future.poll(cx).map_err(|err| err.into()); + } + MaybeTimeoutFutureProj::Timeout { + timeout, + error_type, + duration, + } => (timeout, error_type, duration), + }; + match timeout_future.poll(cx) { + Poll::Ready(Ok(response)) => Poll::Ready(response.map_err(|err| err.into())), + Poll::Ready(Err(_timeout)) => { + Poll::Ready(Err(HttpTimeoutError { kind, duration }.into())) + } + Poll::Pending => Poll::Pending, + } + } + } + + impl tower::Service for ConnectTimeout + where + I: tower::Service, + I::Error: Into, + { + type Response = I::Response; + type Error = BoxError; + type Future = MaybeTimeoutFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(|err| err.into()) + } + + fn call(&mut self, req: Uri) -> Self::Future { + match &self.timeout { + Some((sleep, duration)) => { + let sleep = sleep.sleep(*duration); + MaybeTimeoutFuture::Timeout { + timeout: Timeout::new(self.inner.call(req), sleep), + error_type: "HTTP connect", + duration: *duration, + } + } + None => MaybeTimeoutFuture::NoTimeout { + future: self.inner.call(req), + }, + } + } + } + + impl tower::Service> for HttpReadTimeout + where + I: tower::Service>, + I::Error: Send + Sync + Error + 'static, + { + type Response = I::Response; + type Error = BoxError; + type Future = MaybeTimeoutFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(|err| err.into()) + } + + fn call(&mut self, req: http::Request) -> Self::Future { + match &self.timeout { + Some((sleep, duration)) => { + let sleep = sleep.sleep(*duration); + MaybeTimeoutFuture::Timeout { + timeout: Timeout::new(self.inner.call(req), sleep), + error_type: "HTTP read", + duration: *duration, + } + } + None => MaybeTimeoutFuture::NoTimeout { + future: self.inner.call(req), + }, + } + } + } +} diff --git a/crates/service-client/src/lambda.rs b/crates/service-client/src/lambda.rs index f403b1643b..e587c1ff55 100644 --- a/crates/service-client/src/lambda.rs +++ b/crates/service-client/src/lambda.rs @@ -9,9 +9,9 @@ // by the Apache License, Version 2.0. //! Some parts copied from https://github.com/awslabs/aws-sdk-rust/blob/0.55.x/sdk/aws-config/src/sts/assume_role.rs -//! Some parts copied from https://github.com/awslabs/aws-sdk-rust/blob/0.55.x/sdk/aws-smithy-client/src/conns.rs //! License Apache-2.0 +use crate::aws_hyper_client::{CryptoMode, HyperClientBuilder}; use crate::utils::ErrorExt; use arc_swap::ArcSwap; use assume_role::AssumeRoleProvider; @@ -20,7 +20,6 @@ use aws_sdk_lambda::config::Region; use aws_sdk_lambda::error::{DisplayErrorContext, SdkError}; use aws_sdk_lambda::operation::invoke::InvokeError; use aws_sdk_lambda::primitives::Blob; -use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder; use base64::display::Base64Display; use base64::Engine; use bytes::Bytes; @@ -31,9 +30,6 @@ use http::uri::PathAndQuery; use http::{HeaderMap, HeaderValue, Method, Response}; use http_body_util::{BodyExt, Full}; use hyper::body::Body; -use hyper_0_14::client::HttpConnector; -use hyper_rustls_0_24::{ConfigBuilderExt, HttpsConnector, HttpsConnectorBuilder}; -use once_cell::sync::Lazy; use restate_types::config::AwsOptions; use restate_types::identifiers::LambdaARN; use serde::ser::Error as _; @@ -84,39 +80,6 @@ struct LambdaClientInner { assume_role_external_id: Option, } -/// Copied from `aws_smithy_runtime::client::http::HTTPS_NATIVE_ROOTS` but with SO_NODELAY set -static HTTPS_NATIVE_ROOTS: Lazy> = Lazy::new(|| { - let mut http = HttpConnector::new(); - // HttpConnector won't enforce scheme, but HttpsConnector will - http.enforce_http(false); - // Set SO_NODELAY, which we have found significantly improves Lambda invocation latency - http.set_nodelay(true); - HttpsConnectorBuilder::new() - .with_tls_config( - rustls_0_21::ClientConfig::builder() - .with_cipher_suites(&[ - // TLS1.3 suites - rustls_0_21::cipher_suite::TLS13_AES_256_GCM_SHA384, - rustls_0_21::cipher_suite::TLS13_AES_128_GCM_SHA256, - // TLS1.2 suites - rustls_0_21::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, - rustls_0_21::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, - rustls_0_21::cipher_suite::TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, - rustls_0_21::cipher_suite::TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, - rustls_0_21::cipher_suite::TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256, - ]) - .with_safe_default_kx_groups() - .with_safe_default_protocol_versions() - .expect("Error with the TLS configuration. Please file a bug report under https://github.com/restatedev/restate/issues.") - .with_native_roots() - .with_no_client_auth() - ) - .https_or_http() - .enable_http1() - .enable_http2() - .wrap_connector(http) -}); - impl LambdaClient { pub fn new( profile_name: Option, @@ -128,7 +91,11 @@ impl LambdaClient { if let Some(profile_name) = profile_name { config = config.profile_name(profile_name); }; - config = config.http_client(HyperClientBuilder::new().build(HTTPS_NATIVE_ROOTS.clone())); + config = config.http_client( + HyperClientBuilder::new() + .crypto_mode(CryptoMode::Ring) + .build_https(), + ); let inner = async move { let config = config.load().await; diff --git a/crates/service-client/src/lib.rs b/crates/service-client/src/lib.rs index 296db7ee64..9315f3c6ee 100644 --- a/crates/service-client/src/lib.rs +++ b/crates/service-client/src/lib.rs @@ -33,6 +33,7 @@ use std::future; use std::future::Future; use std::sync::Arc; +mod aws_hyper_client; mod http; mod lambda; mod proxy; diff --git a/crates/storage-query-postgres/Cargo.toml b/crates/storage-query-postgres/Cargo.toml index a525643ac2..ea7bf89475 100644 --- a/crates/storage-query-postgres/Cargo.toml +++ b/crates/storage-query-postgres/Cargo.toml @@ -31,7 +31,7 @@ datafusion-expr = { workspace = true } derive_builder = { workspace = true } futures = { workspace = true } paste = { workspace = true} -pgwire = "0.23" +pgwire = {version = "0.23", default-features = false, features = ["server-api-ring"]} prost = {workspace = true} schemars = { workspace = true, optional = true } serde = { workspace = true }