-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
make codec::decode::Streaming a trait for easier testability #462
Comments
Not exactly what you are after but maybe you could delegate complex logic to a function similar to the one you propose, and write most of your tests against that: impl BytestreamService for Foo {
async fn write(
&self,
request: Request<Streaming<WriteRequest>>,
) -> Result<Response<WriteResponse>, Status> {
do_write(request).await // if you need the request
do_write2(request.into_inner()).await // or just the stream
}
}
async fn do_write<S>(request: Request<S>) -> Result<Response<WriteResponse>, Status>
where
S: Stream<Item = Result<WriteRequest, Status>> + Unpin,
{
//..
}
async fn do_write2<S>(stream: S) -> Result<Response<WriteResponse>, Status>
where
S: Stream<Item = Result<WriteRequest, Status>> + Unpin,
{
//..
} |
@alce yes, this workaround I am currently using. |
Put up a Draft-PR how this could work! There are two tests I may need some guidance how to solve this (see PR). This will also break backwards-compatibility, but maybe it can be put behind a feature flag? |
I don't know if moving stream to a trait like this is the correct solution. I think adding a way to pass in complete messages to the stream struct would be better. |
FWIW, I ran into this recently when writing a bazel remote execution server in rust. Everything was going so smooth until I started pulling my hair out trying to find the easiest way to wire up |
After lots of document & code reading I finally was able to work around this issue. The key came from an undocumented public function. Probably not good for long-term, but works for now at least and allows me to write my tests before writing my code to a satisfactory amount. Here's the code for anyone that would like a work-around: use std::convert::TryFrom;
use bytestream_server::ByteStreamServer;
use prost::{bytes::Bytes, Message};
use tonic::{
codec::Codec, // Needed for .decoder().
codec::ProstCodec,
transport::Body,
Request,
Streaming,
};
use proto::google::bytestream::{
byte_stream_server::ByteStream, // Needed to call .write().
WriteRequest,
};
#[cfg(test)]
pub mod write_tests {
use super::*;
// Utility to encode our proto into GRPC stream format.
fn encode<T: Message>(proto: &T) -> Result<Bytes, Box<dyn std::error::Error>> {
use bytes::{BufMut, BytesMut};
let mut buf = BytesMut::new();
// See below comment on spec.
use std::mem::size_of;
const PREFIX_BYTES: usize = size_of::<u8>() + size_of::<u32>();
for _ in 0..PREFIX_BYTES {
// Advance our buffer first.
// We will backfill it once we know the size of the message.
buf.put_u8(0);
}
proto.encode(&mut buf)?;
let len = buf.len() - PREFIX_BYTES;
{
let mut buf = &mut buf[0..PREFIX_BYTES];
// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#:~:text=Compressed-Flag
// for more details on spec.
// Compressed-Flag -> 0 / 1 # encoded as 1 byte unsigned integer.
buf.put_u8(0);
// Message-Length -> {length of Message} # encoded as 4 byte unsigned integer (big endian).
buf.put_u32(len as u32);
// Message -> *{binary octet}.
}
Ok(buf.freeze())
}
#[tokio::test]
pub async fn chunked_stream_receives_all_data() -> Result<(), Box<dyn std::error::Error>> {
let bs_server = YourByteStreamServer::default();
// Setup stream.
let (mut tx, join_handle) = {
let (tx, body) = Body::channel();
let mut codec = ProstCodec::<WriteRequest, WriteRequest>::default();
// Note: This is an undocumented function.
let stream = Streaming::new_request(codec.decoder(), body);
let join_handle = tokio::spawn(async move {
let response_future = bs_server.write(Request::new(stream));
response_future.await
});
(tx, join_handle)
};
// Send data.
let data_size = {
let raw_data = "1234".as_bytes();
let data_size = raw_data.len();
// Chunk our data into two chunks to simulate something a client
// might do.
const BYTE_SPLIT_OFFSET: usize = 2;
tx.send_data(encode(&WriteRequest {
resource_name: "foobar".to_string(),
write_offset: 0,
finish_write: false,
data: raw_data[..BYTE_SPLIT_OFFSET].to_vec(),
})?)
.await?;
tx.send_data(encode(&WriteRequest {
resource_name: "foobar".to_string(),
write_offset: 0,
finish_write: true,
data: raw_data[..BYTE_SPLIT_OFFSET].to_vec(),
})?)
.await?;
let _ = tx; // Emulate sender-side stream hangup.
data_size
};
// Check results of server.
{
// One for spawn() future and one for result.
let server_result = join_handle.await??;
let committed_size = usize::try_from(server_result.into_inner().committed_size)
.or(Err("Cant convert i64 to usize"))?;
assert_eq!(committed_size as usize, data_size);
}
Ok(())
}
} |
Yeah, I agree there should be an easier way. Let me take a think here. |
I stumbled across this issue as well, when trying to mock out a |
There is a mock example you can use that uses an in-memory connection. |
Based on @allada's example, the following is a bit more concise and good enough for testing: fn setup_stream<T: prost::Message + Default + 'static>(
s: impl Stream<Item = T> + Send + 'static,
) -> Request<Streaming<T>> {
let body = Body::wrap_stream(s.map(|m| Ok::<Bytes, EncodeError>(m.encode_to_vec().into())));
let mut codec = ProstCodec::<T, T>::default();
Request::new(Streaming::new_request(codec.decoder(), body, None))
} |
Adds an option for server trait generation that causes client-streaming methods to receive a generic `impl IntoStreamingRequest` param instead of a concrete `Request<Streaming<...>>` param. This allows for testing trait implementions with any type implementing `Stream`, and not having to go through the trouble of encoding a `Streaming` object. Fixes hyperium#462
Adds an option for server trait generation that causes client-streaming methods to receive a generic `impl IntoStreamingRequest` param instead of a concrete `Request<Streaming<...>>` param. This allows for testing trait implementions with any type implementing `Stream`, and not having to go through the trouble of encoding a `Streaming` object. Fixes hyperium#462
Adds an option for server trait generation that causes client-streaming methods to receive a generic `impl IntoStreamingRequest` param instead of a concrete `Request<Streaming<...>>` param. This allows for testing trait implementions with any type implementing `Stream`, and not having to go through the trouble of encoding a `Streaming` object. Fixes hyperium#462
Adds an option for server trait generation that causes client-streaming methods to receive a generic `Request<impl Stream>` param instead of a concrete `Request<Streaming<...>>` param. This allows for testing trait implementions with any type implementing `Stream`, and not having to go through the trouble of encoding a `Streaming` object. Fixes hyperium#462
Feature Request
make codec::decode::Streaming a trait.
This will allow easier mocking needed for testing of grpc calls using client streaming.
Crates
tonic
tonic-build
Motivation
currently when I have a grpc call with client streaming like ByteStream::Write I get a rust function with this signature:
If I want to test this function I would need to create a
Streaming
object which is quite complicated.It would be much easier if the generated function
write
would be generic accepting a argument implementing the traitStream
andStreaming
would implement this.Proposal
Adding a new trait
and generate function definitions for
ByteStream::write
with such a signatureThis would allow to create simple Mocks which implement
Stream
for testing.And otherwise
Streaming
is still used, which must then implementStream
.Alternatives
Please let me know if you know alternative how testing can be simplified.
The text was updated successfully, but these errors were encountered: