Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(app/inbound): address server::conn::Http deprecations #3432

Merged
merged 2 commits into from
Dec 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 36 additions & 47 deletions linkerd/app/inbound/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use linkerd_app_core::{
errors::respond::L5D_PROXY_ERROR,
identity, io, metrics,
proxy::http,
svc::{self, NewService, Param},
svc::{self, http::TracingExecutor, NewService, Param},
tls,
transport::{ClientAddr, OrigDstAddr, Remote, ServerAddr},
NameAddr, ProxyRuntime,
Expand Down Expand Up @@ -47,9 +47,7 @@ where

#[tokio::test(flavor = "current_thread")]
async fn unmeshed_http1_hello_world() {
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut server = hyper::server::conn::Http::new();
server.http1_only(true);
let server = hyper::server::conn::http1::Builder::new();
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
let _trace = trace_init();
Expand Down Expand Up @@ -88,9 +86,7 @@ async fn unmeshed_http1_hello_world() {
#[tokio::test(flavor = "current_thread")]
async fn downgrade_origin_form() {
// Reproduces https://github.com/linkerd/linkerd2/issues/5298
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut server = hyper::server::conn::Http::new();
server.http1_only(true);
let server = hyper::server::conn::http1::Builder::new();
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
client.http2_only(true);
Expand Down Expand Up @@ -131,9 +127,7 @@ async fn downgrade_origin_form() {

#[tokio::test(flavor = "current_thread")]
async fn downgrade_absolute_form() {
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut server = hyper::server::conn::Http::new();
server.http1_only(true);
let server = hyper::server::conn::http1::Builder::new();
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
client.http2_only(true);
Expand Down Expand Up @@ -262,9 +256,7 @@ async fn http1_connect_timeout_meshed_response_error_header() {

// Build a mock connect that sleeps longer than the default inbound
// connect timeout.
#[allow(deprecated)] // linkerd/linkerd2#8733
let server = hyper::server::conn::Http::new();
let connect = support::connect().endpoint(Target::addr(), connect_timeout(server));
let connect = support::connect().endpoint(Target::addr(), connect_timeout());

// Build a client using the connect that always sleeps so that responses
// are GATEWAY_TIMEOUT.
Expand Down Expand Up @@ -309,9 +301,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() {

// Build a mock connect that sleeps longer than the default inbound
// connect timeout.
#[allow(deprecated)] // linkerd/linkerd2#8733
let server = hyper::server::conn::Http::new();
let connect = support::connect().endpoint(Target::addr(), connect_timeout(server));
let connect = support::connect().endpoint(Target::addr(), connect_timeout());

// Build a client using the connect that always sleeps so that responses
// are GATEWAY_TIMEOUT.
Expand Down Expand Up @@ -527,9 +517,7 @@ async fn grpc_response_class() {

// Build a mock connector serves a gRPC server that returns errors.
let connect = {
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut server = hyper::server::conn::Http::new();
server.http2_only(true);
let server = hyper::server::conn::http2::Builder::new(TracingExecutor);
support::connect().endpoint_fn_boxed(
Target::addr(),
grpc_status_server(server, tonic::Code::Unknown),
Expand Down Expand Up @@ -606,9 +594,8 @@ async fn grpc_response_class() {
}

#[tracing::instrument]
#[allow(deprecated)] // linkerd/linkerd2#8733
fn hello_server(
http: hyper::server::conn::Http,
server: hyper::server::conn::http1::Builder,
Comment on lines -611 to +598
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this...

) -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
move |endpoint| {
let span = tracing::info_span!("hello_server", ?endpoint);
Expand All @@ -620,7 +607,8 @@ fn hello_server(
Ok::<_, io::Error>(Response::new(Body::from("Hello world!")))
});
tokio::spawn(
http.serve_connection(server_io, hello_svc)
server
.serve_connection(server_io, hello_svc)
.in_current_span(),
);
Ok(io::BoxedIo::new(client_io))
Expand All @@ -630,7 +618,7 @@ fn hello_server(
#[tracing::instrument]
#[allow(deprecated)] // linkerd/linkerd2#8733
fn grpc_status_server(
http: hyper::server::conn::Http,
server: hyper::server::conn::http2::Builder<TracingExecutor>,
Comment on lines -633 to +621
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...and this, are the other core of this branch. we can tweak these types, without having to do any additional refactoring.

i'm punting the client end of this change into a separate branch, as mentioned in the description, because we currently share common helper functions for the http/1 and http/2 clients.

status: tonic::Code,
) -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
move |endpoint| {
Expand All @@ -639,26 +627,30 @@ fn grpc_status_server(
tracing::info!("mock connecting");
let (client_io, server_io) = support::io::duplex(4096);
tokio::spawn(
http.serve_connection(
server_io,
hyper::service::service_fn(move |request: Request<Body>| async move {
tracing::info!(?request);
let (mut tx, rx) = Body::channel();
tokio::spawn(async move {
let mut trls = ::http::HeaderMap::new();
trls.insert("grpc-status", (status as u32).to_string().parse().unwrap());
tx.send_trailers(trls).await
});
Ok::<_, io::Error>(
http::Response::builder()
.version(::http::Version::HTTP_2)
.header("content-type", "application/grpc")
.body(rx)
.unwrap(),
)
}),
)
.in_current_span(),
server
.serve_connection(
server_io,
hyper::service::service_fn(move |request: Request<Body>| async move {
tracing::info!(?request);
let (mut tx, rx) = Body::channel();
tokio::spawn(async move {
let mut trls = ::http::HeaderMap::new();
trls.insert(
"grpc-status",
(status as u32).to_string().parse().unwrap(),
);
tx.send_trailers(trls).await
});
Ok::<_, io::Error>(
http::Response::builder()
.version(::http::Version::HTTP_2)
.header("content-type", "application/grpc")
.body(rx)
.unwrap(),
)
}),
)
.in_current_span(),
);
Ok(io::BoxedIo::new(client_io))
}
Expand All @@ -675,10 +667,7 @@ fn connect_error() -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
}

#[tracing::instrument]
#[allow(deprecated)] // linkerd/linkerd2#8733
fn connect_timeout(
http: hyper::server::conn::Http,
) -> Box<dyn FnMut(Remote<ServerAddr>) -> ConnectFuture + Send> {
fn connect_timeout() -> Box<dyn FnMut(Remote<ServerAddr>) -> ConnectFuture + Send> {
Comment on lines -678 to +670
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like #3421, this is another exercise in dodging deprecation work by... removing things! 🙂

Box::new(move |endpoint| {
let span = tracing::info_span!("connect_timeout", ?endpoint);
Box::pin(
Expand Down
Loading