Skip to content

Commit

Permalink
feat(subscriber): support grpc-web and add grpc-web feature (#498)
Browse files Browse the repository at this point in the history
## Description

This pull request adds support for `grpc-web` to `console-subscriber`.
Once you enable this feature by calling the `enable_grpc_web` function,
you can connect the console-subscriber gRPC server using a browser
client.

## Explanation of Changes

1. Added a new feature called `grpc-web` which requires the `tonic-web`
   crate as a dependency.
2. A new API named `serve_with_grpc_web` has been introduced. It appears
   to be similar to the `serve_with` API. However, if we were to use the
   same API with `serve_with`, it would result in a bound issue. We
   attempted to combine `serve_with_grpc_web` and `serve_with`, but it
   would create a very complex trait bound for the function. Therefore,
   we decided to introduce a new API to address this problem.
4. Added a new example named `grpc_web` to show how to use the
   `into_parts` API to customize the CORS layer.

Ref #497

Signed-off-by: hi-rustin <[email protected]>
Co-authored-by: Hayden Stainsby <[email protected]>
  • Loading branch information
Rustin170506 and hds authored Feb 16, 2024
1 parent 28a27fc commit 4150253
Show file tree
Hide file tree
Showing 5 changed files with 345 additions and 2 deletions.
47 changes: 47 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ keywords = [
default = ["env-filter"]
parking_lot = ["parking_lot_crate", "tracing-subscriber/parking_lot"]
env-filter = ["tracing-subscriber/env-filter"]
grpc-web = ["tonic-web"]

[dependencies]

crossbeam-utils = "0.8.7"
tokio = { version = "^1.21", features = ["sync", "time", "macros", "tracing"] }
tokio-stream = { version = "0.1", features = ["net"] }
Expand All @@ -54,11 +54,20 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
crossbeam-channel = "0.5"

# Only for the web feature:
tonic-web = { version = "0.10.2", optional = true }

[dev-dependencies]
tokio = { version = "^1.21", features = ["full", "rt-multi-thread"] }
tower = { version = "0.4", default-features = false }
futures = "0.3"
http = "0.2"
tower-http = { version = "0.4", features = ["cors"] }

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[[example]]
name = "grpc_web"
required-features = ["grpc-web"]
122 changes: 122 additions & 0 deletions console-subscriber/examples/grpc_web.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
//! Example of using the console subscriber with tonic-web.
//! This example requires the `grpc-web` feature to be enabled.
//! Run with:
//! ```sh
//! cargo run --example grpc_web --features grpc-web
//! ```
use std::{thread, time::Duration};

use console_subscriber::{ConsoleLayer, ServerParts};
use http::header::HeaderName;
use tonic_web::GrpcWebLayer;
use tower_http::cors::{AllowOrigin, CorsLayer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60);
const DEFAULT_EXPOSED_HEADERS: [&str; 3] =
["grpc-status", "grpc-message", "grpc-status-details-bin"];
const DEFAULT_ALLOW_HEADERS: [&str; 5] = [
"x-grpc-web",
"content-type",
"x-user-agent",
"grpc-timeout",
"user-agent",
];

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let (console_layer, server) = ConsoleLayer::builder().with_default_env().build();
thread::Builder::new()
.name("subscriber".into())
.spawn(move || {
// Do not trace anything in this thread.
let _subscriber_guard =
tracing::subscriber::set_default(tracing_core::subscriber::NoSubscriber::default());
// Custom CORS configuration.
let cors = CorsLayer::new()
.allow_origin(AllowOrigin::mirror_request())
.allow_credentials(true)
.max_age(DEFAULT_MAX_AGE)
.expose_headers(
DEFAULT_EXPOSED_HEADERS
.iter()
.cloned()
.map(HeaderName::from_static)
.collect::<Vec<HeaderName>>(),
)
.allow_headers(
DEFAULT_ALLOW_HEADERS
.iter()
.cloned()
.map(HeaderName::from_static)
.collect::<Vec<HeaderName>>(),
);
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("console subscriber runtime initialization failed");
runtime.block_on(async move {
let ServerParts {
instrument_server,
aggregator,
..
} = server.into_parts();
tokio::spawn(aggregator.run());
let router = tonic::transport::Server::builder()
// Accept gRPC-Web requests and enable CORS.
.accept_http1(true)
.layer(cors)
.layer(GrpcWebLayer::new())
.add_service(instrument_server);
let serve = router.serve(std::net::SocketAddr::new(
std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
// 6669 is a restricted port on Chrome, so we cannot use it. We use a different port instead.
9999,
));
serve.await.expect("console subscriber server failed");
});
})
.expect("console subscriber could not spawn thread");
tracing_subscriber::registry().with(console_layer).init();

let task1 = tokio::task::Builder::new()
.name("task1")
.spawn(spawn_tasks(1, 10))
.unwrap();
let task2 = tokio::task::Builder::new()
.name("task2")
.spawn(spawn_tasks(10, 30))
.unwrap();

let result = tokio::try_join! {
task1,
task2,
};
result?;

Ok(())
}

#[tracing::instrument]
async fn spawn_tasks(min: u64, max: u64) {
loop {
for i in min..max {
tracing::trace!(i, "spawning wait task");
tokio::task::Builder::new()
.name("wait")
.spawn(wait(i))
.unwrap();

let sleep = Duration::from_secs(max) - Duration::from_secs(i);
tracing::trace!(?sleep, "sleeping...");
tokio::time::sleep(sleep).await;
}
}
}

#[tracing::instrument]
async fn wait(seconds: u64) {
tracing::debug!("waiting...");
tokio::time::sleep(Duration::from_secs(seconds)).await;
tracing::trace!("done!");
}
40 changes: 39 additions & 1 deletion console-subscriber/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ pub struct Builder {
/// Any scheduled times exceeding this duration will be clamped to this
/// value. Higher values will result in more memory usage.
pub(super) scheduled_duration_max: Duration,

/// Whether to enable the grpc-web support.
#[cfg(feature = "grpc-web")]
enable_grpc_web: bool,
}

impl Default for Builder {
Expand All @@ -71,6 +75,8 @@ impl Default for Builder {
recording_path: None,
filter_env_var: "RUST_LOG".to_string(),
self_trace: false,
#[cfg(feature = "grpc-web")]
enable_grpc_web: false,
}
}
}
Expand Down Expand Up @@ -268,6 +274,28 @@ impl Builder {
Self { self_trace, ..self }
}

/// Sets whether to enable the grpc-web support.
///
/// By default, this is `false`. If enabled, the console subscriber will
/// serve the gRPC-Web protocol in addition to the standard gRPC protocol.
/// This is useful for serving the console subscriber to web clients.
/// Please be aware that the current default server port is set to 6669.
/// However, certain browsers may restrict this port due to security reasons.
/// If you encounter issues with this, consider changing the port to an
/// alternative one that is not commonly blocked by browsers.
///
/// [`serve_with_grpc_web`] is used to provide more advanced configuration
/// for the gRPC-Web server.
///
/// [`serve_with_grpc_web`]: crate::Server::serve_with_grpc_web
#[cfg(feature = "grpc-web")]
pub fn enable_grpc_web(self, enable_grpc_web: bool) -> Self {
Self {
enable_grpc_web,
..self
}
}

/// Completes the builder, returning a [`ConsoleLayer`] and [`Server`] task.
pub fn build(self) -> (ConsoleLayer, Server) {
ConsoleLayer::build(self)
Expand Down Expand Up @@ -481,6 +509,8 @@ impl Builder {
}

let self_trace = self.self_trace;
#[cfg(feature = "grpc-web")]
let enable_grpc_web = self.enable_grpc_web;

let (layer, server) = self.build();
let filter =
Expand All @@ -501,8 +531,16 @@ impl Builder {
.enable_time()
.build()
.expect("console subscriber runtime initialization failed");

runtime.block_on(async move {
#[cfg(feature = "grpc-web")]
if enable_grpc_web {
server
.serve_with_grpc_web(tonic::transport::Server::builder())
.await
.expect("console subscriber server failed");
return;
}

server
.serve()
.await
Expand Down
Loading

0 comments on commit 4150253

Please sign in to comment.