Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make codec::decode::Streaming a trait for easier testability #462

Open
stefanhoelzl opened this issue Sep 24, 2020 · 10 comments
Open

make codec::decode::Streaming a trait for easier testability #462

stefanhoelzl opened this issue Sep 24, 2020 · 10 comments
Labels
A-tonic C-enhancement Category: New feature or request

Comments

@stefanhoelzl
Copy link

stefanhoelzl commented Sep 24, 2020

Feature Request

make codec::decode::Streaming a trait.
This will allow easier mocking needed for testing of grpc calls using client streaming.

Crates

tonic
tonic-build

Motivation

currently when I have a grpc call with client streaming like ByteStream::Write I get a rust function with this signature:

async fn write(&self, request: Request<Streaming<WriteRequest>>) -> Result<Response<WriteResponse>, Status>

If I want to test this function I would need to create a Streaming object which is quite complicated.
It would be much easier if the generated function write would be generic accepting a argument implementing the trait Stream and Streaming would implement this.

Proposal

Adding a new trait

trait Stream {
    type Item;
    async fn message<'_>(&'_ mut self) -> Result<Option<Self::Item>, Status>;
    async fn trailers<'_>(&'_ mut self) -> Result<Option<MetadataMap>, Status>;
}

and generate function definitions for ByteStream::write with such a signature

async fn write<S: Stream<Item = WriteRequest >>(&self, request: Request<S>);

This would allow to create simple Mocks which implement Stream for testing.
And otherwise Streaming is still used, which must then implement Stream.

Alternatives

Please let me know if you know alternative how testing can be simplified.

@alce
Copy link
Collaborator

alce commented Sep 25, 2020

Not exactly what you are after but maybe you could delegate complex logic to a function similar to the one you propose, and write most of your tests against that:

impl BytestreamService for Foo {
    async fn write(
        &self,
        request: Request<Streaming<WriteRequest>>,
    ) -> Result<Response<WriteResponse>, Status> {
        do_write(request).await // if you need the request
        do_write2(request.into_inner()).await // or just the stream
    }
}

async fn do_write<S>(request: Request<S>) -> Result<Response<WriteResponse>, Status>
where
    S: Stream<Item = Result<WriteRequest, Status>> + Unpin,
{
    //..
}

async fn do_write2<S>(stream: S) -> Result<Response<WriteResponse>, Status>
where
    S: Stream<Item = Result<WriteRequest, Status>> + Unpin,
{
    //..
}

@stefanhoelzl
Copy link
Author

@alce yes, this workaround I am currently using.
But I would prefer to fix tonic and not needing this workaround :)

@stefanhoelzl
Copy link
Author

Put up a Draft-PR how this could work!

There are two tests I may need some guidance how to solve this (see PR).

This will also break backwards-compatibility, but maybe it can be put behind a feature flag?

@LucioFranco
Copy link
Member

I don't know if moving stream to a trait like this is the correct solution. I think adding a way to pass in complete messages to the stream struct would be better.

@LucioFranco LucioFranco added this to the 0.4 milestone Nov 27, 2020
@allada
Copy link

allada commented Dec 28, 2020

FWIW, I ran into this recently when writing a bazel remote execution server in rust. Everything was going so smooth until I started pulling my hair out trying to find the easiest way to wire up mpsc (or similar) stream to test my code.

@allada
Copy link

allada commented Dec 28, 2020

After lots of document & code reading I finally was able to work around this issue. The key came from an undocumented public function. Probably not good for long-term, but works for now at least and allows me to write my tests before writing my code to a satisfactory amount.

Here's the code for anyone that would like a work-around:

use std::convert::TryFrom;

use bytestream_server::ByteStreamServer;
use prost::{bytes::Bytes, Message};
use tonic::{
    codec::Codec, // Needed for .decoder().
    codec::ProstCodec,
    transport::Body,
    Request,
    Streaming,
};

use proto::google::bytestream::{
    byte_stream_server::ByteStream, // Needed to call .write().
    WriteRequest,
};

#[cfg(test)]
pub mod write_tests {
    use super::*;

    // Utility to encode our proto into GRPC stream format.
    fn encode<T: Message>(proto: &T) -> Result<Bytes, Box<dyn std::error::Error>> {
        use bytes::{BufMut, BytesMut};
        let mut buf = BytesMut::new();
        // See below comment on spec.
        use std::mem::size_of;
        const PREFIX_BYTES: usize = size_of::<u8>() + size_of::<u32>();
        for _ in 0..PREFIX_BYTES {
            // Advance our buffer first.
            // We will backfill it once we know the size of the message.
            buf.put_u8(0);
        }
        proto.encode(&mut buf)?;
        let len = buf.len() - PREFIX_BYTES;
        {
            let mut buf = &mut buf[0..PREFIX_BYTES];
            // See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#:~:text=Compressed-Flag
            // for more details on spec.
            // Compressed-Flag -> 0 / 1 # encoded as 1 byte unsigned integer.
            buf.put_u8(0);
            // Message-Length -> {length of Message} # encoded as 4 byte unsigned integer (big endian).
            buf.put_u32(len as u32);
            // Message -> *{binary octet}.
        }

        Ok(buf.freeze())
    }

    #[tokio::test]
    pub async fn chunked_stream_receives_all_data() -> Result<(), Box<dyn std::error::Error>> {
        let bs_server = YourByteStreamServer::default();

        // Setup stream.
        let (mut tx, join_handle) = {
            let (tx, body) = Body::channel();
            let mut codec = ProstCodec::<WriteRequest, WriteRequest>::default();
            // Note: This is an undocumented function.
            let stream = Streaming::new_request(codec.decoder(), body);

            let join_handle = tokio::spawn(async move {
                let response_future = bs_server.write(Request::new(stream));
                response_future.await
            });
            (tx, join_handle)
        };

        // Send data.
        let data_size = {
            let raw_data = "1234".as_bytes();
            let data_size = raw_data.len();
            // Chunk our data into two chunks to simulate something a client
            // might do.
            const BYTE_SPLIT_OFFSET: usize = 2;

            tx.send_data(encode(&WriteRequest {
                resource_name: "foobar".to_string(),
                write_offset: 0,
                finish_write: false,
                data: raw_data[..BYTE_SPLIT_OFFSET].to_vec(),
            })?)
            .await?;

            tx.send_data(encode(&WriteRequest {
                resource_name: "foobar".to_string(),
                write_offset: 0,
                finish_write: true,
                data: raw_data[..BYTE_SPLIT_OFFSET].to_vec(),
            })?)
            .await?;
            let _ = tx; // Emulate sender-side stream hangup.
            data_size
        };
        // Check results of server.
        {
            // One for spawn() future and one for result.
            let server_result = join_handle.await??;
            let committed_size = usize::try_from(server_result.into_inner().committed_size)
                .or(Err("Cant convert i64 to usize"))?;
            assert_eq!(committed_size as usize, data_size);
        }
        Ok(())
    }
}

@LucioFranco
Copy link
Member

Yeah, I agree there should be an easier way. Let me take a think here.

@davidpdrsn davidpdrsn added A-tonic C-enhancement Category: New feature or request labels Feb 13, 2021
@LucioFranco LucioFranco removed this from the 0.5 milestone May 12, 2021
@Will-Low
Copy link
Contributor

Will-Low commented Feb 3, 2023

I stumbled across this issue as well, when trying to mock out a Response<Streaming<T>>. Is there any known good workaround for this? The only thing I could think of is sending the request to a separate test server running in the background and passing the value back to the caller.

@LucioFranco
Copy link
Member

There is a mock example you can use that uses an in-memory connection.

@wngr
Copy link

wngr commented Apr 18, 2023

Based on @allada's example, the following is a bit more concise and good enough for testing:

    fn setup_stream<T: prost::Message + Default + 'static>(
        s: impl Stream<Item = T> + Send + 'static,
    ) -> Request<Streaming<T>> {
        let body = Body::wrap_stream(s.map(|m| Ok::<Bytes, EncodeError>(m.encode_to_vec().into())));
        let mut codec = ProstCodec::<T, T>::default();
        Request::new(Streaming::new_request(codec.decoder(), body, None))
    }

yotamofek added a commit to yotamofek/tonic that referenced this issue Jan 2, 2025
Adds an option for server trait generation that causes client-streaming
methods to receive a generic `impl IntoStreamingRequest` param
instead of a concrete `Request<Streaming<...>>` param.
This allows for testing trait implementions with any type implementing `Stream`,
and not having to go through the trouble of encoding a `Streaming`
object.

Fixes hyperium#462
yotamofek added a commit to yotamofek/tonic that referenced this issue Jan 2, 2025
Adds an option for server trait generation that causes client-streaming
methods to receive a generic `impl IntoStreamingRequest` param
instead of a concrete `Request<Streaming<...>>` param.
This allows for testing trait implementions with any type implementing `Stream`,
and not having to go through the trouble of encoding a `Streaming`
object.

Fixes hyperium#462
yotamofek added a commit to yotamofek/tonic that referenced this issue Jan 5, 2025
Adds an option for server trait generation that causes client-streaming
methods to receive a generic `impl IntoStreamingRequest` param
instead of a concrete `Request<Streaming<...>>` param.
This allows for testing trait implementions with any type implementing `Stream`,
and not having to go through the trouble of encoding a `Streaming`
object.

Fixes hyperium#462
yotamofek added a commit to yotamofek/tonic that referenced this issue Jan 7, 2025
Adds an option for server trait generation that causes client-streaming
methods to receive a generic `Request<impl Stream>` param
instead of a concrete `Request<Streaming<...>>` param.
This allows for testing trait implementions with any type implementing `Stream`,
and not having to go through the trouble of encoding a `Streaming`
object.

Fixes hyperium#462
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tonic C-enhancement Category: New feature or request
Projects
None yet
Development

No branches or pull requests

7 participants