Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GrpcStore retry first #616

Merged
merged 1 commit into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nativelink-service/src/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl ByteStreamServer {
let any_store = store.clone().inner_store(Some(digest)).as_any();
let maybe_grpc_store = any_store.downcast_ref::<Arc<GrpcStore>>();
if let Some(grpc_store) = maybe_grpc_store {
let stream = grpc_store.read(Request::new(read_request)).await?.into_inner();
let stream = grpc_store.read(Request::new(read_request)).await?;
return Ok(Response::new(Box::pin(stream)));
}

Expand Down
55 changes: 43 additions & 12 deletions nativelink-store/src/grpc_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::time::Duration;
use async_trait::async_trait;
use bytes::BytesMut;
use futures::stream::{unfold, FuturesUnordered};
use futures::{future, Future, Stream, TryStreamExt};
use futures::{future, Future, Stream, StreamExt, TryStreamExt};
use nativelink_error::{error_if, make_input_err, Error, ResultExt};
use nativelink_proto::build::bazel::remote::execution::v2::action_cache_client::ActionCacheClient;
use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_client::ContentAddressableStorageClient;
Expand All @@ -45,7 +45,7 @@ use rand::rngs::OsRng;
use rand::Rng;
use tokio::time::sleep;
use tonic::transport::Channel;
use tonic::{transport, IntoRequest, Request, Response, Streaming};
use tonic::{transport, IntoRequest, Request, Response, Status, Streaming};
use tracing::error;
use uuid::Uuid;

Expand All @@ -65,6 +65,31 @@ pub struct GrpcStore {
retrier: Retrier,
}

/// This provides a buffer for the first response from GrpcStore.read in order
/// to allow the first read to occur within the retry loop. That means that if
/// the connection establishes fine, but reading the first byte of the file
/// fails we have the ability to retry before returning to the caller.
struct FirstStream {
/// Contains the first response from the stream (which could be an EOF,
/// hence the nested Option). This should be populated on creation and
/// returned as the first result from the stream. Subsequent reads from the
/// stream will use the encapsulated stream.
first_response: Option<Option<ReadResponse>>,
/// The stream to get responses from when first_response is None.
stream: Streaming<ReadResponse>,
}

impl Stream for FirstStream {
type Item = Result<ReadResponse, Status>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
if let Some(first_response) = self.first_response.take() {
return std::task::Poll::Ready(first_response.map(Ok));
}
Pin::new(&mut self.stream).poll_next(cx)
}
}

impl GrpcStore {
pub async fn new(config: &nativelink_config::stores::GrpcStore) -> Result<Self, Error> {
let jitter_amt = config.retry.jitter;
Expand Down Expand Up @@ -233,7 +258,7 @@ impl GrpcStore {
pub async fn read(
&self,
grpc_request: impl IntoRequest<ReadRequest>,
) -> Result<Response<Streaming<ReadResponse>>, Error> {
) -> Result<impl Stream<Item = Result<ReadResponse, Status>>, Error> {
error_if!(
matches!(self.store_type, nativelink_config::stores::StoreType::ac),
"CAS operation on AC store"
Expand All @@ -253,11 +278,21 @@ impl GrpcStore {
);

self.perform_request(request, |request| async move {
self.bytestream_client
let mut response = self
.bytestream_client
.clone()
.read(Request::new(request))
.await
.err_tip(|| "in GrpcStore::read")
.err_tip(|| "in GrpcStore::read")?
.into_inner();
let first_response = response
.message()
.await
.err_tip(|| "Fetching first chunk in GrpcStore::read()")?;
Ok(FirstStream {
first_response: Some(first_response),
stream: response,
})
})
.await
}
Expand Down Expand Up @@ -640,21 +675,17 @@ impl Store for GrpcStore {
read_limit: length.unwrap_or(0) as i64,
}))
.await
.err_tip(|| "in GrpcStore::get_part()")?
.into_inner();
.err_tip(|| "in GrpcStore::get_part()")?;

loop {
let maybe_message = stream
.message()
.await
.err_tip(|| "While fetching message in GrpcStore::get_part()")?;
let Some(message) = maybe_message else {
let Some(maybe_message) = stream.next().await else {
writer
.send_eof()
.await
.err_tip(|| "Could not send eof in GrpcStore::get_part()")?;
break; // EOF.
};
let message = maybe_message.err_tip(|| "While fetching message in GrpcStore::get_part()")?;
if message.data.is_empty() {
writer
.send_eof()
Expand Down
7 changes: 6 additions & 1 deletion nativelink-util/src/buf_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,12 @@ impl DropCloserReadHalf {
/// Receive a chunk of data.
pub async fn recv(&mut self) -> Result<Bytes, Error> {
let maybe_chunk = match self.partial.take() {
Some(result_bytes) => Some(result_bytes),
// `partial` is allowed to have empty bytes that represent EOF (as
// returned in the None case below), but `self.rx.recv()` should
// never respond with empty bytes as EOF. If `partial` is empty,
// then pass None to simulate the stream's version of EOF.
Some(Ok(result_bytes)) => (!result_bytes.is_empty()).then(|| Ok(result_bytes)),
Some(Err(cached_error)) => Some(Err(cached_error)),
None => self.rx.recv().await,
};
match maybe_chunk {
Expand Down