From 3ce61d9860528dd4a13f719774d5c649198fb55c Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Wed, 30 Oct 2019 15:00:31 -0400 Subject: [PATCH] fix(codec): Enforce encoders/decoders are `Sync` (#84) Closes #81 --- tonic-build/src/server.rs | 4 ++-- tonic-examples/src/routeguide/server.rs | 5 +++-- tonic-interop/src/server.rs | 5 +++-- tonic/Cargo.toml | 9 +++++---- tonic/src/body.rs | 10 +++++----- tonic/src/client/grpc.rs | 20 ++++++++++---------- tonic/src/codec/decode.rs | 21 ++++++++++++--------- tonic/src/codec/encode.rs | 12 +++++++----- tonic/src/codec/mod.rs | 4 ++-- tonic/src/request.rs | 6 +++--- tonic/src/server/grpc.rs | 19 ++++++++++--------- 11 files changed, 62 insertions(+), 53 deletions(-) diff --git a/tonic-build/src/server.rs b/tonic-build/src/server.rs index ce8a7a550..9442eb262 100644 --- a/tonic-build/src/server.rs +++ b/tonic-build/src/server.rs @@ -124,7 +124,7 @@ fn generate_trait_methods(service: &Service, proto_path: &str) -> TokenStream { quote! { #stream_doc - type #stream: Stream> + Send + 'static; + type #stream: Stream> + Send + Sync + 'static; #method_doc async fn #name(&self, request: tonic::Request<#req_message>) @@ -142,7 +142,7 @@ fn generate_trait_methods(service: &Service, proto_path: &str) -> TokenStream { quote! { #stream_doc - type #stream: Stream> + Send + 'static; + type #stream: Stream> + Send + Sync + 'static; #method_doc async fn #name(&self, request: tonic::Request>) diff --git a/tonic-examples/src/routeguide/server.rs b/tonic-examples/src/routeguide/server.rs index b6ec19ba8..40d1022dc 100644 --- a/tonic-examples/src/routeguide/server.rs +++ b/tonic-examples/src/routeguide/server.rs @@ -108,7 +108,8 @@ impl server::RouteGuide for RouteGuide { Ok(Response::new(summary)) } - type RouteChatStream = Pin> + Send + 'static>>; + type RouteChatStream = + Pin> + Send + Sync + 'static>>; async fn route_chat( &self, @@ -138,7 +139,7 @@ impl server::RouteGuide for RouteGuide { Ok(Response::new(Box::pin(output) as Pin< - Box> + Send + 'static>, + Box> + Send + Sync + 'static>, >)) } } diff --git a/tonic-interop/src/server.rs b/tonic-interop/src/server.rs index 86c10fd8b..15e7145f4 100644 --- a/tonic-interop/src/server.rs +++ b/tonic-interop/src/server.rs @@ -12,8 +12,9 @@ pub struct TestService; type Result = std::result::Result, Status>; type Streaming = Request>; -type Stream = - Pin> + Send + 'static>>; +type Stream = Pin< + Box> + Send + Sync + 'static>, +>; #[tonic::async_trait] impl pb::server::TestService for TestService { diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index 80a230e89..662ef9fda 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -41,10 +41,6 @@ tls = [] name = "bench_main" harness = false -[dev-dependencies] -rand = "0.7.2" -criterion = "0.3" - [dependencies] bytes = "0.4" futures-core-preview = "=0.3.0-alpha.19" @@ -83,6 +79,11 @@ openssl1 = { package = "openssl", version = "0.10", optional = true } # rustls tokio-rustls = { version = "=0.12.0-alpha.4", optional = true } +[dev-dependencies] +static_assertions = "1.0" +rand = "0.7.2" +criterion = "0.3" + [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] diff --git a/tonic/src/body.rs b/tonic/src/body.rs index 41930edab..ee0a662ad 100644 --- a/tonic/src/body.rs +++ b/tonic/src/body.rs @@ -15,7 +15,7 @@ use std::{ pub(crate) type BytesBuf = ::Buf; /// A trait alias for [`http_body::Body`]. -pub trait Body: sealed::Sealed { +pub trait Body: sealed::Sealed + Send + Sync { /// The body data type. type Data: Buf; /// The errors produced from the body. @@ -45,7 +45,7 @@ pub trait Body: sealed::Sealed { impl Body for T where - T: HttpBody, + T: HttpBody + Send + Sync + 'static, T::Error: Into, { type Data = T::Data; @@ -83,7 +83,7 @@ mod sealed { /// A type erased http body. pub struct BoxBody { - inner: Pin + Send + 'static>>, + inner: Pin + Send + Sync + 'static>>, } struct MapBody(B); @@ -92,7 +92,7 @@ impl BoxBody { /// Create a new `BoxBody` mapping item and error to the default types. pub fn new(inner: B) -> Self where - B: Body + Send + 'static, + B: Body + Send + Sync + 'static, { BoxBody { inner: Box::pin(inner), @@ -102,7 +102,7 @@ impl BoxBody { /// Create a new `BoxBody` mapping item and error to the default types. pub fn map_from(inner: B) -> Self where - B: Body + Send + 'static, + B: Body + Send + Sync + 'static, B::Data: Into, B::Error: Into, { diff --git a/tonic/src/client/grpc.rs b/tonic/src/client/grpc.rs index 09560f3fd..bcc6cbd2b 100644 --- a/tonic/src/client/grpc.rs +++ b/tonic/src/client/grpc.rs @@ -62,8 +62,8 @@ impl Grpc { ::Error: Into, ::Data: Into, C: Codec, - M1: Send + 'static, - M2: Send + 'static, + M1: Send + Sync + 'static, + M2: Send + Sync + 'static, { let request = request.map(|m| stream::once(future::ready(m))); self.client_streaming(request, path, codec).await @@ -81,10 +81,10 @@ impl Grpc { T::ResponseBody: Body + HttpBody + Send + 'static, ::Error: Into, ::Data: Into, - S: Stream + Send + 'static, + S: Stream + Send + Sync + 'static, C: Codec, - M1: Send + 'static, - M2: Send + 'static, + M1: Send + Sync + 'static, + M2: Send + Sync + 'static, { let (mut parts, body) = self.streaming(request, path, codec).await?.into_parts(); @@ -115,8 +115,8 @@ impl Grpc { ::Error: Into, ::Data: Into, C: Codec, - M1: Send + 'static, - M2: Send + 'static, + M1: Send + Sync + 'static, + M2: Send + Sync + 'static, { let request = request.map(|m| stream::once(future::ready(m))); self.streaming(request, path, codec).await @@ -134,10 +134,10 @@ impl Grpc { T::ResponseBody: Body + HttpBody + Send + 'static, ::Data: Into, ::Error: Into, - S: Stream + Send + 'static, + S: Stream + Send + Sync + 'static, C: Codec, - M1: Send + 'static, - M2: Send + 'static, + M1: Send + Sync + 'static, + M2: Send + Sync + 'static, { let mut parts = Parts::default(); parts.path_and_query = Some(path); diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 153a9b6da..fba1ba281 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -19,7 +19,7 @@ const BUFFER_SIZE: usize = 8 * 1024; /// This will wrap some inner [`Body`] and [`Decoder`] and provide an interface /// to fetch the message stream and trailing metadata pub struct Streaming { - decoder: Box + Send + 'static>, + decoder: Box + Send + Sync + 'static>, body: BoxBody, state: State, direction: Direction, @@ -45,40 +45,40 @@ enum Direction { impl Streaming { pub(crate) fn new_response(decoder: D, body: B, status_code: StatusCode) -> Self where - B: Body + Send + 'static, + B: Body + Send + Sync + 'static, B::Data: Into, B::Error: Into, - D: Decoder + Send + 'static, + D: Decoder + Send + Sync + 'static, { Self::new(decoder, body, Direction::Response(status_code)) } pub(crate) fn new_empty(decoder: D, body: B) -> Self where - B: Body + Send + 'static, + B: Body + Send + Sync + 'static, B::Data: Into, B::Error: Into, - D: Decoder + Send + 'static, + D: Decoder + Send + Sync + 'static, { Self::new(decoder, body, Direction::EmptyResponse) } pub(crate) fn new_request(decoder: D, body: B) -> Self where - B: Body + Send + 'static, + B: Body + Send + Sync + 'static, B::Data: Into, B::Error: Into, - D: Decoder + Send + 'static, + D: Decoder + Send + Sync + 'static, { Self::new(decoder, body, Direction::Request) } fn new(decoder: D, body: B, direction: Direction) -> Self where - B: Body + Send + 'static, + B: Body + Send + Sync + 'static, B::Data: Into, B::Error: Into, - D: Decoder + Send + 'static, + D: Decoder + Send + Sync + 'static, { Self { decoder: Box::new(decoder), @@ -291,3 +291,6 @@ impl fmt::Debug for Streaming { f.debug_struct("Streaming").finish() } } + +#[cfg(test)] +static_assertions::assert_impl_all!(Streaming<()>: Send, Sync); diff --git a/tonic/src/codec/encode.rs b/tonic/src/codec/encode.rs index 7d91b4b94..5a0f94b1d 100644 --- a/tonic/src/codec/encode.rs +++ b/tonic/src/codec/encode.rs @@ -16,8 +16,9 @@ pub(crate) fn encode_server( source: U, ) -> EncodeBody>> where - T: Encoder, - U: Stream>, + T: Encoder + Send + Sync + 'static, + T::Item: Send + Sync, + U: Stream> + Send + Sync + 'static, { let stream = encode(encoder, source).into_stream(); EncodeBody::new_server(stream) @@ -28,8 +29,9 @@ pub(crate) fn encode_client( source: U, ) -> EncodeBody>> where - T: Encoder, - U: Stream, + T: Encoder + Send + Sync + 'static, + T::Item: Send + Sync, + U: Stream + Send + Sync + 'static, { let stream = encode(encoder, source.map(|x| Ok(x))).into_stream(); EncodeBody::new_client(stream) @@ -88,7 +90,7 @@ pub(crate) struct EncodeBody { impl EncodeBody where - S: Stream>, + S: Stream> + Send + Sync + 'static, { pub(crate) fn new_client(inner: S) -> Self { Self { diff --git a/tonic/src/codec/mod.rs b/tonic/src/codec/mod.rs index 93bf5db6d..17f20d31d 100644 --- a/tonic/src/codec/mod.rs +++ b/tonic/src/codec/mod.rs @@ -28,9 +28,9 @@ pub trait Codec: Default { type Decode: Send + 'static; /// The encoder that can encode a message. - type Encoder: Encoder + Send + 'static; + type Encoder: Encoder + Send + Sync + 'static; /// The encoder that can decode a message. - type Decoder: Decoder + Send + 'static; + type Decoder: Decoder + Send + Sync + 'static; /// Fetch the encoder. fn encoder(&mut self) -> Self::Encoder; diff --git a/tonic/src/request.rs b/tonic/src/request.rs index 2042e135f..4863555af 100644 --- a/tonic/src/request.rs +++ b/tonic/src/request.rs @@ -77,7 +77,7 @@ pub trait IntoRequest: sealed::Sealed { /// ``` pub trait IntoStreamingRequest: sealed::Sealed { /// The RPC request stream type - type Stream: Stream + Send + 'static; + type Stream: Stream + Send + Sync + 'static; /// The RPC request type type Message; @@ -182,7 +182,7 @@ impl IntoRequest for Request { impl IntoStreamingRequest for T where - T: Stream + Send + 'static, + T: Stream + Send + Sync + 'static, { type Stream = T; type Message = T::Item; @@ -194,7 +194,7 @@ where impl IntoStreamingRequest for Request where - T: Stream + Send + 'static, + T: Stream + Send + Sync + 'static, { type Stream = T; type Message = T::Item; diff --git a/tonic/src/server/grpc.rs b/tonic/src/server/grpc.rs index 9fca3e4a0..fc2f7bcdc 100644 --- a/tonic/src/server/grpc.rs +++ b/tonic/src/server/grpc.rs @@ -26,6 +26,7 @@ pub struct Grpc { impl Grpc where T: Codec, + T::Encode: Sync, { /// Creates a new gRPC client with the provided [`Codec`]. pub fn new(codec: T) -> Self { @@ -40,7 +41,7 @@ where ) -> http::Response where S: UnaryService, - B: Body + Send + 'static, + B: Body + Send + Sync + 'static, B::Data: Into + Send, B::Error: Into + Send, { @@ -70,8 +71,8 @@ where ) -> http::Response where S: ServerStreamingService, - S::ResponseStream: Send + 'static, - B: Body + Send + 'static, + S::ResponseStream: Send + Sync + 'static, + B: Body + Send + Sync + 'static, B::Data: Into + Send, B::Error: Into + Send, { @@ -95,7 +96,7 @@ where ) -> http::Response where S: ClientStreamingService, - B: Body + Send + 'static, + B: Body + Send + Sync + 'static, B::Data: Into + Send + 'static, B::Error: Into + Send + 'static, { @@ -115,8 +116,8 @@ where ) -> http::Response where S: StreamingService + Send, - S::ResponseStream: Send + 'static, - B: Body + Send + 'static, + S::ResponseStream: Send + Sync + 'static, + B: Body + Send + Sync + 'static, B::Data: Into + Send, B::Error: Into + Send, { @@ -130,7 +131,7 @@ where request: http::Request, ) -> Result, Status> where - B: Body + Send + 'static, + B: Body + Send + Sync + 'static, B::Data: Into + Send, B::Error: Into + Send, { @@ -158,7 +159,7 @@ where request: http::Request, ) -> Request> where - B: Body + Send + 'static, + B: Body + Send + Sync + 'static, B::Data: Into + Send, B::Error: Into + Send, { @@ -170,7 +171,7 @@ where response: Result, Status>, ) -> http::Response where - B: TryStream + Send + 'static, + B: TryStream + Send + Sync + 'static, { match response { Ok(r) => {