Skip to content

Commit

Permalink
Uplink connections now reuse reqwest client.
Browse files Browse the repository at this point in the history
Fixes #3333
  • Loading branch information
bryn committed Aug 29, 2023
1 parent 9316de0 commit 895fb8d
Showing 1 changed file with 17 additions and 12 deletions.
29 changes: 17 additions & 12 deletions apollo-router/src/uplink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;
use std::time::Instant;

use futures::Stream;
use futures::StreamExt;
use graphql_client::QueryBody;
use thiserror::Error;
use tokio::sync::mpsc::channel;
Expand Down Expand Up @@ -169,6 +170,17 @@ where
{
let query = query_name::<Query>();
let (sender, receiver) = channel(2);
let client = match reqwest::Client::builder()
.timeout(uplink_config.timeout)
.build()
{
Ok(client) => client,
Err(err) => {
tracing::error!("unable to create client to query uplink: {err}", err = err);
return futures::stream::empty().boxed();
}
};

let task = async move {
let mut last_id = None;
let mut endpoints = uplink_config.endpoints.unwrap_or_default();
Expand All @@ -181,13 +193,7 @@ where

let query_body = Query::build_query(variables.into());

match fetch::<Query, Response>(
&query_body,
&mut endpoints.iter(),
uplink_config.timeout,
)
.await
{
match fetch::<Query, Response>(&client, &query_body, &mut endpoints.iter()).await {
Ok(response) => {
tracing::info!(
counter.apollo_router_uplink_fetch_count_total = 1,
Expand Down Expand Up @@ -255,13 +261,13 @@ where
};
drop(tokio::task::spawn(task.with_current_subscriber()));

ReceiverStream::new(receiver)
ReceiverStream::new(receiver).boxed()
}

pub(crate) async fn fetch<Query, Response>(
client: &reqwest::Client,
request_body: &QueryBody<Query::Variables>,
urls: &mut impl Iterator<Item = &Url>,
timeout: Duration,
) -> Result<UplinkResponse<Response>, Error>
where
Query: graphql_client::GraphQLQuery,
Expand All @@ -272,7 +278,7 @@ where
let query = query_name::<Query>();
for url in urls {
let now = Instant::now();
match http_request::<Query>(url.as_str(), request_body, timeout).await {
match http_request::<Query>(client, url.as_str(), request_body).await {
Ok(response) => {
let response = response.data.map(Into::into);
match &response {
Expand Down Expand Up @@ -352,14 +358,13 @@ fn query_name<Query>() -> &'static str {
}

async fn http_request<Query>(
client: &reqwest::Client,
url: &str,
request_body: &QueryBody<Query::Variables>,
timeout: Duration,
) -> Result<graphql_client::Response<Query::ResponseData>, reqwest::Error>
where
Query: graphql_client::GraphQLQuery,
{
let client = reqwest::Client::builder().timeout(timeout).build()?;
// It is possible that istio-proxy is re-configuring networking beneath us. If it is, we'll see an error something like this:
// level: "ERROR"
// message: "fetch failed from all endpoints"
Expand Down

0 comments on commit 895fb8d

Please sign in to comment.