From 76d141ad485b9d79dbdea4f912cdd0843071a6f8 Mon Sep 17 00:00:00 2001 From: Chris Staite Date: Wed, 17 Jan 2024 17:57:27 +0000 Subject: [PATCH] GrpcStore retry first We see the first request in a GrpcStore request failing but this bypasses the retry logic because it's already passed out of the retry function. Read the first message from the upstream GrpcStore within the retry logic, only exit from the retry logic after the first message was recieved. --- nativelink-service/src/bytestream_server.rs | 2 +- nativelink-store/src/grpc_store.rs | 55 ++++++++++++++++----- nativelink-util/src/buf_channel.rs | 7 ++- 3 files changed, 50 insertions(+), 14 deletions(-) diff --git a/nativelink-service/src/bytestream_server.rs b/nativelink-service/src/bytestream_server.rs index adb8b450a..7ea4dce46 100644 --- a/nativelink-service/src/bytestream_server.rs +++ b/nativelink-service/src/bytestream_server.rs @@ -251,7 +251,7 @@ impl ByteStreamServer { let any_store = store.clone().inner_store(Some(digest)).as_any(); let maybe_grpc_store = any_store.downcast_ref::>(); if let Some(grpc_store) = maybe_grpc_store { - let stream = grpc_store.read(Request::new(read_request)).await?.into_inner(); + let stream = grpc_store.read(Request::new(read_request)).await?; return Ok(Response::new(Box::pin(stream))); } diff --git a/nativelink-store/src/grpc_store.rs b/nativelink-store/src/grpc_store.rs index a5fbf9b83..f4b30efd9 100644 --- a/nativelink-store/src/grpc_store.rs +++ b/nativelink-store/src/grpc_store.rs @@ -20,7 +20,7 @@ use std::time::Duration; use async_trait::async_trait; use bytes::BytesMut; use futures::stream::{unfold, FuturesUnordered}; -use futures::{future, Future, Stream, TryStreamExt}; +use futures::{future, Future, Stream, StreamExt, TryStreamExt}; use nativelink_error::{error_if, make_input_err, Error, ResultExt}; use nativelink_proto::build::bazel::remote::execution::v2::action_cache_client::ActionCacheClient; use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_client::ContentAddressableStorageClient; @@ -45,7 +45,7 @@ use rand::rngs::OsRng; use rand::Rng; use tokio::time::sleep; use tonic::transport::Channel; -use tonic::{transport, IntoRequest, Request, Response, Streaming}; +use tonic::{transport, IntoRequest, Request, Response, Status, Streaming}; use tracing::error; use uuid::Uuid; @@ -65,6 +65,31 @@ pub struct GrpcStore { retrier: Retrier, } +/// This provides a buffer for the first response from GrpcStore.read in order +/// to allow the first read to occur within the retry loop. That means that if +/// the connection establishes fine, but reading the first byte of the file +/// fails we have the ability to retry before returning to the caller. +struct FirstStream { + /// Contains the first response from the stream (which could be an EOF, + /// hence the nested Option). This should be populated on creation and + /// returned as the first result from the stream. Subsequent reads from the + /// stream will use the encapsulated stream. + first_response: Option>, + /// The stream to get responses from when first_response is None. + stream: Streaming, +} + +impl Stream for FirstStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + if let Some(first_response) = self.first_response.take() { + return std::task::Poll::Ready(first_response.map(Ok)); + } + Pin::new(&mut self.stream).poll_next(cx) + } +} + impl GrpcStore { pub async fn new(config: &nativelink_config::stores::GrpcStore) -> Result { let jitter_amt = config.retry.jitter; @@ -233,7 +258,7 @@ impl GrpcStore { pub async fn read( &self, grpc_request: impl IntoRequest, - ) -> Result>, Error> { + ) -> Result>, Error> { error_if!( matches!(self.store_type, nativelink_config::stores::StoreType::ac), "CAS operation on AC store" @@ -253,11 +278,21 @@ impl GrpcStore { ); self.perform_request(request, |request| async move { - self.bytestream_client + let mut response = self + .bytestream_client .clone() .read(Request::new(request)) .await - .err_tip(|| "in GrpcStore::read") + .err_tip(|| "in GrpcStore::read")? + .into_inner(); + let first_response = response + .message() + .await + .err_tip(|| "Fetching first chunk in GrpcStore::read()")?; + Ok(FirstStream { + first_response: Some(first_response), + stream: response, + }) }) .await } @@ -640,21 +675,17 @@ impl Store for GrpcStore { read_limit: length.unwrap_or(0) as i64, })) .await - .err_tip(|| "in GrpcStore::get_part()")? - .into_inner(); + .err_tip(|| "in GrpcStore::get_part()")?; loop { - let maybe_message = stream - .message() - .await - .err_tip(|| "While fetching message in GrpcStore::get_part()")?; - let Some(message) = maybe_message else { + let Some(maybe_message) = stream.next().await else { writer .send_eof() .await .err_tip(|| "Could not send eof in GrpcStore::get_part()")?; break; // EOF. }; + let message = maybe_message.err_tip(|| "While fetching message in GrpcStore::get_part()")?; if message.data.is_empty() { writer .send_eof() diff --git a/nativelink-util/src/buf_channel.rs b/nativelink-util/src/buf_channel.rs index 535a15e32..296830b57 100644 --- a/nativelink-util/src/buf_channel.rs +++ b/nativelink-util/src/buf_channel.rs @@ -163,7 +163,12 @@ impl DropCloserReadHalf { /// Receive a chunk of data. pub async fn recv(&mut self) -> Result { let maybe_chunk = match self.partial.take() { - Some(result_bytes) => Some(result_bytes), + // `partial` is allowed to have empty bytes that represent EOF (as + // returned in the None case below), but `self.rx.recv()` should + // never respond with empty bytes as EOF. If `partial` is empty, + // then pass None to simulate the stream's version of EOF. + Some(Ok(result_bytes)) => (!result_bytes.is_empty()).then(|| Ok(result_bytes)), + Some(Err(cached_error)) => Some(Err(cached_error)), None => self.rx.recv().await, }; match maybe_chunk {