Skip to content

Commit

Permalink
Nuke non-delta streams (#965)
Browse files Browse the repository at this point in the history
Delta has been out long enough
  • Loading branch information
Jake-Shadle authored May 28, 2024
1 parent 6f16b57 commit fe21f08
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 598 deletions.
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

299 changes: 0 additions & 299 deletions crates/xds/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
*/

use std::{
collections::HashSet,
sync::atomic::{AtomicBool, Ordering},
sync::Arc,
time::Duration,
};

use futures::StreamExt;
use rand::Rng;
use tokio::sync::{broadcast, Mutex};
use tonic::transport::{channel::Channel as TonicChannel, Endpoint, Error as TonicError};
use tracing::Instrument;
use tryhard::{
Expand All @@ -48,12 +46,9 @@ use crate::{

type AdsGrpcClient = AggregatedDiscoveryServiceClient<TonicChannel>;
type MdsGrpcClient = AggregatedControlPlaneDiscoveryServiceClient<TonicChannel>;
type SubscribedResources = Arc<Mutex<HashSet<(ResourceType, Vec<String>)>>>;

pub type AdsClient = Client<AdsGrpcClient>;
pub type AdsStream = BidirectionalStream<AdsGrpcClient>;
pub type MdsClient = Client<MdsGrpcClient>;
pub type MdsStream = BidirectionalStream<MdsGrpcClient>;

pub(crate) const IDLE_REQUEST_INTERVAL: Duration = Duration::from_secs(30);

Expand Down Expand Up @@ -221,14 +216,6 @@ impl<C: ServiceClient> Client<C> {
}

impl MdsClient {
pub fn mds_client_stream<C: crate::config::Configuration>(
&self,
config: Arc<C>,
is_healthy: Arc<AtomicBool>,
) -> MdsStream {
MdsStream::mds_client_stream(self, config, is_healthy)
}

pub async fn delta_stream<C: crate::config::Configuration>(
self,
config: Arc<C>,
Expand Down Expand Up @@ -412,15 +399,6 @@ impl Drop for DeltaSubscription {
}

impl AdsClient {
/// Starts a new stream to the xDS management server.
pub fn xds_client_stream<C: crate::config::Configuration>(
&self,
config: Arc<C>,
is_healthy: Arc<AtomicBool>,
) -> AdsStream {
AdsStream::xds_client_stream(self, config, is_healthy)
}

/// Attempts to start a new delta stream to the xDS management server, if the
/// management server does not support delta xDS we return the client as an error
pub async fn delta_subscribe<C: crate::config::Configuration>(
Expand Down Expand Up @@ -524,283 +502,6 @@ impl AdsClient {
}
}

/// An active xDS gRPC management stream.
pub struct BidirectionalStream<C: ServiceClient> {
identifier: Arc<str>,
requests: broadcast::Sender<C::Request>,
handle_discovery_response: tokio::task::JoinHandle<Result<()>>,
subscribed_resources: SubscribedResources,
}

impl AdsStream {
pub fn xds_client_stream<C: crate::config::Configuration>(
Client {
client,
identifier,
management_servers,
..
}: &AdsClient,
config: Arc<C>,
is_healthy: Arc<AtomicBool>,
) -> Self {
let mut client = client.clone();
let identifier = identifier.clone();
let management_servers = management_servers.clone();

Self::connect(
identifier.clone(),
move |(mut requests, mut rx), subscribed_resources| async move {
tracing::trace!("starting xDS client stream task");
loop {
let config = config.clone();
tracing::trace!("connecting to grpc stream");
let result = client
.stream_requests(
// Errors only happen if the stream is behind, which
// we don't care about, we only want the latest
// state of the world.
tokio_stream::wrappers::BroadcastStream::from(rx)
.filter_map(|result| futures::future::ready(result.ok())),
)
.in_current_span()
.await
.map(|streaming| streaming.into_inner());

let stream = match result {
Ok(stream) => stream,
Err(error) => {
tracing::warn!(%error, "stream broken");
client = AdsClient::connect_with_backoff(&management_servers)
.await?
.0;
rx = requests.subscribe();
Self::refresh_resources(
&identifier,
&subscribed_resources,
&mut requests,
)
.await?;
continue;
}
};

tracing::trace!("creating discovery response handler");
let mut stream = handle_discovery_responses(
(&*identifier).into(),
stream,
move |resource| config.apply(resource),
);

loop {
let next_response =
tokio::time::timeout(IDLE_REQUEST_INTERVAL, stream.next());

match next_response.await {
Ok(Some(Ok(ack))) => {
is_healthy.store(true, Ordering::SeqCst);
tracing::trace!("received ack");
requests.send(ack)?;
continue;
}
Ok(Some(Err(error))) => {
tracing::warn!(%error, "xds stream error");
break;
}
Ok(None) => {
tracing::warn!("xDS stream terminated");
break;
}
Err(_) => {
tracing::debug!(
"exceeded idle request interval sending new requests"
);
Self::refresh_resources(
&identifier,
&subscribed_resources,
&mut requests,
)
.await?;
}
}
}

is_healthy.store(false, Ordering::SeqCst);
tracing::info!("Lost connection to xDS, retrying");
client = AdsClient::connect_with_backoff(&management_servers)
.await?
.0;
rx = requests.subscribe();
Self::refresh_resources(&identifier, &subscribed_resources, &mut requests)
.await?;
}
},
)
}

#[tracing::instrument(level = "trace", skip(self))]
pub async fn aggregated_subscribe(
&mut self,
resource_type: ResourceType,
names: &[String],
) -> Result<()> {
self.subscribed_resources
.lock()
.await
.insert((resource_type, names.to_vec()));
Self::discovery_request_without_cache(
&self.identifier,
&mut self.requests,
resource_type,
names,
)
}
}

impl MdsStream {
pub fn mds_client_stream<C: crate::config::Configuration>(
Client {
client,
identifier,
management_servers,
..
}: &MdsClient,
config: Arc<C>,
is_healthy: Arc<AtomicBool>,
) -> Self {
let mut client = client.clone();
let identifier = identifier.clone();
let management_servers = management_servers.clone();

Self::connect(
identifier.clone(),
move |(requests, mut rx), _| async move {
tracing::trace!("starting relay client stream task");

loop {
let initial_response = DiscoveryResponse {
control_plane: Some(crate::core::ControlPlane {
identifier: (&*identifier).into(),
}),
..<_>::default()
};
tracing::trace!("sending initial mds response");
let _ = requests.send(initial_response);
let stream = client
.stream_requests(
// Errors only happen if the stream is behind, which
// we don't care about, we only want the latest
// state of the world.
tokio_stream::wrappers::BroadcastStream::from(rx)
.filter_map(|result| futures::future::ready(result.ok())),
)
.in_current_span()
.await?
.into_inner();

let control_plane = super::server::ControlPlane::from_arc(
config.clone(),
IDLE_REQUEST_INTERVAL,
);
let mut stream = control_plane.stream_resources(stream).await?;
is_healthy.store(true, Ordering::SeqCst);

while let Some(result) = stream.next().await {
let response = result?;
tracing::debug!("received discovery response");
requests.send(response)?;
}

is_healthy.store(false, Ordering::SeqCst);

tracing::warn!("lost connection to relay server, retrying");
client = MdsClient::connect_with_backoff(&management_servers)
.await
.unwrap()
.0;
rx = requests.subscribe();
}
},
)
}
}

impl<C: ServiceClient> BidirectionalStream<C> {
pub fn connect<F>(
identifier: Arc<str>,
response_task: impl FnOnce(
(
broadcast::Sender<C::Request>,
broadcast::Receiver<C::Request>,
),
SubscribedResources,
) -> F,
) -> Self
where
F: std::future::Future<Output = crate::Result<()>> + Send + 'static,
{
let (requests, rx) = broadcast::channel::<C::Request>(12);
let subscribed_resources: SubscribedResources = <_>::default();

tracing::trace!("spawning stream background task");
let handle_discovery_response = tokio::spawn({
let requests = requests.clone();
let subscribed_resources = subscribed_resources.clone();
(response_task)((requests, rx), subscribed_resources)
.instrument(tracing::trace_span!("handle_discovery_response"))
});

Self {
identifier,
requests,
handle_discovery_response,
subscribed_resources,
}
}

pub(crate) fn requests(&self) -> broadcast::Sender<C::Request> {
self.requests.clone()
}

async fn refresh_resources(
identifier: &str,
subscribed_resources: &SubscribedResources,
requests: &mut broadcast::Sender<DiscoveryRequest>,
) -> Result<()> {
for (resource, names) in subscribed_resources.lock().await.iter() {
Self::discovery_request_without_cache(identifier, requests, *resource, names)?;
}

Ok(())
}

pub(crate) fn discovery_request_without_cache(
identifier: &str,
requests: &mut broadcast::Sender<DiscoveryRequest>,
resource_type: ResourceType,
names: &[String],
) -> Result<()> {
let request = DiscoveryRequest {
node: Some(Node {
id: identifier.into(),
user_agent_name: "quilkin".into(),
..Node::default()
}),
resource_names: names.to_vec(),
type_url: resource_type.type_url().into(),
..DiscoveryRequest::default()
};

tracing::trace!(r#type=%resource_type, ?names, "sending discovery request");
requests.send(request).map_err(From::from).map(drop)
}
}

impl<C: ServiceClient> Drop for BidirectionalStream<C> {
fn drop(&mut self) {
self.handle_discovery_response.abort();
}
}

#[derive(Debug, thiserror::Error)]
enum RpcSessionError {
#[error("Invalid endpoint. \n {0}")]
Expand Down
Loading

0 comments on commit fe21f08

Please sign in to comment.