Skip to content

Commit

Permalink
Refactor buf_channel (TraceMachina#849)
Browse files Browse the repository at this point in the history
Major rewrite of how DropCloserReadHalf and DropCloserWriteHalf.
* Simplified DropCloserReadHalf::consume()
* DropCloserWriteHalf will no longer wait for eof to be received
* Added utility func DropCloserWriteHalf::bind()
* DropCloserReadHalf can now be configured to hold a queue of
  recent data to aid with places that might need retry.
* EOF signal is now an AtomicBool instead of a tokio::once channel.
* DropCloserReadHalf::try_reset_stream added to put queue back onto
  head of stream if able.

closes TraceMachina#824
  • Loading branch information
allada authored and chinchaun committed May 6, 2024
1 parent 3883cfb commit 0558fb9
Show file tree
Hide file tree
Showing 16 changed files with 300 additions and 395 deletions.
115 changes: 60 additions & 55 deletions nativelink-service/src/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,65 +309,71 @@ impl ByteStreamServer {

Ok(Response::new(Box::pin(unfold(state, move |state| async {
let mut state = state?; // If None our stream is done.
loop {
tokio::select! {
read_result = state.rx.consume(Some(state.max_bytes_per_stream)) => {
match read_result {
Ok(bytes) => {
if bytes.is_empty() {
// EOF.
return Some((Ok(ReadResponse { ..Default::default() }), None));
}
if bytes.len() > state.max_bytes_per_stream {
let err = make_err!(Code::Internal, "Returned store size was larger than read size");
return Some((Err(err.into()), None));
}
let response = ReadResponse { data: bytes };
info!("\x1b[0;31mBytestream Read Chunk Resp\x1b[0m: {:?}", response);
return Some((Ok(response), Some(state)))
}
Err(mut e) => {
// We may need to propagate the error from reading the data through first.
// For example, the NotFound error will come through `get_part_fut`, and
// will not be present in `e`, but we need to ensure we pass NotFound error
// code or the client won't know why it failed.
let get_part_result = if let Some(result) = state.maybe_get_part_result {
result
} else {
// This should never be `future::pending()` if maybe_get_part_result is
// not set.
state.get_part_fut.await
};
if let Err(err) = get_part_result {
e = err.merge(e);
let mut response = ReadResponse::default();
{
let consume_fut = state.rx.consume(Some(state.max_bytes_per_stream));
tokio::pin!(consume_fut);
loop {
tokio::select! {
read_result = &mut consume_fut => {
match read_result {
Ok(bytes) => {
if bytes.is_empty() {
// EOF.
return Some((Ok(response), None));
}
if bytes.len() > state.max_bytes_per_stream {
let err = make_err!(Code::Internal, "Returned store size was larger than read size");
return Some((Err(err.into()), None));
}
response.data = bytes;
info!("\x1b[0;31mBytestream Read Chunk Resp\x1b[0m: {:?}", response);
break;
}
if e.code == Code::NotFound {
// Trim the error code. Not Found is quite common and we don't want to send a large
// error (debug) message for something that is common. We resize to just the last
// message as it will be the most relevant.
e.messages.truncate(1);
Err(mut e) => {
// We may need to propagate the error from reading the data through first.
// For example, the NotFound error will come through `get_part_fut`, and
// will not be present in `e`, but we need to ensure we pass NotFound error
// code or the client won't know why it failed.
let get_part_result = if let Some(result) = state.maybe_get_part_result {
result
} else {
// This should never be `future::pending()` if maybe_get_part_result is
// not set.
state.get_part_fut.await
};
if let Err(err) = get_part_result {
e = err.merge(e);
}
if e.code == Code::NotFound {
// Trim the error code. Not Found is quite common and we don't want to send a large
// error (debug) message for something that is common. We resize to just the last
// message as it will be the most relevant.
e.messages.truncate(1);
}
info!("\x1b[0;31mBytestream Read Chunk Resp\x1b[0m: Error {:?}", e);
return Some((Err(e.into()), None))
}
info!("\x1b[0;31mBytestream Read Chunk Resp\x1b[0m: Error {:?}", e);
return Some((Err(e.into()), None))
}
}
},
result = &mut state.get_part_fut => {
state.maybe_get_part_result = Some(result);
// It is non-deterministic on which future will finish in what order.
// It is also possible that the `state.rx.take()` call above may not be able to
// respond even though the publishing future is done.
// Because of this we set the writing future to pending so it never finishes.
// The `state.rx.take()` future will eventually finish and return either the
// data or an error.
// An EOF will terminate the `state.rx.take()` future, but we are also protected
// because we are dropping the writing future, it will drop the `tx` channel
// which will eventually propagate an error to the `state.rx.take()` future if
// the EOF was not sent due to some other error.
state.get_part_fut = Box::pin(pending());
},
},
result = &mut state.get_part_fut => {
state.maybe_get_part_result = Some(result);
// It is non-deterministic on which future will finish in what order.
// It is also possible that the `state.rx.consume()` call above may not be able to
// respond even though the publishing future is done.
// Because of this we set the writing future to pending so it never finishes.
// The `state.rx.consume()` future will eventually finish and return either the
// data or an error.
// An EOF will terminate the `state.rx.consume()` future, but we are also protected
// because we are dropping the writing future, it will drop the `tx` channel
// which will eventually propagate an error to the `state.rx.consume()` future if
// the EOF was not sent due to some other error.
state.get_part_fut = Box::pin(pending());
},
}
}
}
Some((Ok(response), Some(state)))
}))))
}

Expand Down Expand Up @@ -474,7 +480,6 @@ impl ByteStreamServer {
if write_request.finish_write {
// Gracefully close our stream.
tx.send_eof()
.await
.err_tip(|| "Failed to send EOF in ByteStream::write")?;
return Ok(());
}
Expand Down
3 changes: 0 additions & 3 deletions nativelink-store/src/compression_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ impl Store for CompressionStore {
.await
.err_tip(|| "Failed to write footer to inner store in compression store")?;
tx.send_eof()
.await
.err_tip(|| "Failed writing EOF in compression store update")?;
}

Expand All @@ -398,7 +397,6 @@ impl Store for CompressionStore {
if is_zero_digest(&digest) {
writer
.send_eof()
.await
.err_tip(|| "Failed to send zero EOF in filesystem store get_part_ref")?;
return Ok(());
}
Expand Down Expand Up @@ -599,7 +597,6 @@ impl Store for CompressionStore {

writer
.send_eof()
.await
.err_tip(|| "Failed to send eof in compression store write")?;
Ok(())
};
Expand Down
9 changes: 2 additions & 7 deletions nativelink-store/src/dedup_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ use bincode::config::{FixintEncoding, WithOtherIntEncoding};
use bincode::{DefaultOptions, Options};
use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt};
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf, StreamReader};
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::common::DigestInfo;
use nativelink_util::fastcdc::FastCDC;
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
use nativelink_util::store_trait::{Store, UploadSizeInfo};
use serde::{Deserialize, Serialize};
use tokio_util::codec::FramedRead;
use tokio_util::io::StreamReader;
use tracing::warn;

// NOTE: If these change update the comments in `stores.rs` to reflect
Expand All @@ -47,7 +48,6 @@ pub struct DedupStore {
content_store: Arc<dyn Store>,
fast_cdc_decoder: FastCDC,
max_concurrent_fetch_per_get: usize,
upload_normal_size: usize,
bincode_options: WithOtherIntEncoding<DefaultOptions, FixintEncoding>,
}

Expand Down Expand Up @@ -82,9 +82,6 @@ impl DedupStore {
content_store,
fast_cdc_decoder: FastCDC::new(min_size, normal_size, max_size),
max_concurrent_fetch_per_get,
// We add 30% because the normal_size is not super accurate and we'd prefer to
// over estimate than under estimate.
upload_normal_size: (normal_size * 13) / 10,
bincode_options: DefaultOptions::new().with_fixint_encoding(),
}
}
Expand Down Expand Up @@ -237,7 +234,6 @@ impl Store for DedupStore {
if length == Some(0) {
writer
.send_eof()
.await
.err_tip(|| "Failed to write EOF out from get_part dedup")?;
return Ok(());
}
Expand Down Expand Up @@ -342,7 +338,6 @@ impl Store for DedupStore {
// Finish our stream by writing our EOF and shutdown the stream.
writer
.send_eof()
.await
.err_tip(|| "Failed to write EOF out from get_part dedup")?;
Ok(())
}
Expand Down
7 changes: 3 additions & 4 deletions nativelink-store/src/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,11 @@ impl Store for FastSlowStore {
.err_tip(|| "Failed to read buffer in fastslow store")?;
if buffer.is_empty() {
// EOF received.
fast_tx.send_eof().await.err_tip(|| {
fast_tx.send_eof().err_tip(|| {
"Failed to write eof to fast store in fast_slow store update"
})?;
slow_tx
.send_eof()
.await
.err_tip(|| "Failed to write eof to writer in fast_slow store update")?;
return Result::<(), Error>::Ok(());
}
Expand Down Expand Up @@ -307,7 +306,7 @@ impl Store for FastSlowStore {
// Write out our EOF.
// We are dropped as soon as we send_eof to writer_pin, so
// we wait until we've finished all of our joins to do that.
let fast_res = fast_tx.send_eof().await;
let fast_res = fast_tx.send_eof();
return Ok::<_, Error>((fast_res, writer_pin));
}

Expand Down Expand Up @@ -340,7 +339,7 @@ impl Store for FastSlowStore {
fast_eof_res
.merge(fast_res)
.merge(slow_res)
.merge(writer_pin.send_eof().await)
.merge(writer_pin.send_eof())
}
Err(err) => fast_res.merge(slow_res).merge(Err(err)),
}
Expand Down
14 changes: 6 additions & 8 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use async_trait::async_trait;
use bytes::BytesMut;
use filetime::{set_file_atime, FileTime};
use futures::stream::{StreamExt, TryStreamExt};
use futures::{join, Future, TryFutureExt};
use futures::{Future, TryFutureExt};
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
use nativelink_util::buf_channel::{
make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf,
Expand Down Expand Up @@ -716,9 +716,9 @@ impl<Fe: FileEntry> Store for FilesystemStore<Fe> {
continue;
}
let (mut tx, rx) = make_buf_channel_pair();
let update_fut = self.update(*digest, rx, UploadSizeInfo::ExactSize(0));
let (update_result, send_eof_result) = join!(update_fut, tx.send_eof());
update_result
let send_eof_result = tx.send_eof();
self.update(*digest, rx, UploadSizeInfo::ExactSize(0))
.await
.err_tip(|| format!("Failed to create zero file for digest {digest:?}"))
.merge(
send_eof_result
Expand Down Expand Up @@ -808,7 +808,6 @@ impl<Fe: FileEntry> Store for FilesystemStore<Fe> {
.err_tip(|| "Failed to check if zero digest exists in filesystem store")?;
writer
.send_eof()
.await
.err_tip(|| "Failed to send zero EOF in filesystem store get_part_ref")?;
return Ok(());
}
Expand All @@ -823,8 +822,8 @@ impl<Fe: FileEntry> Store for FilesystemStore<Fe> {
let read_limit = length.unwrap_or(usize::MAX) as u64;
let mut resumeable_temp_file = entry.read_file_part(offset as u64, read_limit).await?;

let mut buf = BytesMut::with_capacity(length.unwrap_or(self.read_buffer_size));
loop {
let mut buf = BytesMut::with_capacity(self.read_buffer_size);
resumeable_temp_file
.as_reader()
.await
Expand All @@ -841,7 +840,7 @@ impl<Fe: FileEntry> Store for FilesystemStore<Fe> {
// because it is waiting for a file descriptor to open before receiving data.
// Using `ResumeableFileSlot` will re-open the file in the event it gets closed on the
// next iteration.
let buf_content = buf.split().freeze();
let buf_content = buf.freeze();
loop {
let sleep_fn = (self.sleep_fn)(fs::idle_file_descriptor_timeout());
tokio::pin!(sleep_fn);
Expand All @@ -866,7 +865,6 @@ impl<Fe: FileEntry> Store for FilesystemStore<Fe> {
}
writer
.send_eof()
.await
.err_tip(|| "Filed to send EOF in filesystem store get_part")?;

Ok(())
Expand Down
4 changes: 1 addition & 3 deletions nativelink-store/src/grpc_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,6 @@ impl GrpcStore {
}
writer
.send_eof()
.await
.err_tip(|| "Failed to write EOF in grpc store get_action_result_as_part")?;
Ok(())
}
Expand Down Expand Up @@ -639,7 +638,7 @@ impl Store for GrpcStore {

// Shortcut for empty blobs.
if digest.size_bytes == 0 {
return writer.send_eof().await;
return writer.send_eof();
}

let resource_name = format!(
Expand Down Expand Up @@ -700,7 +699,6 @@ impl Store for GrpcStore {
let eof_result = local_state
.writer
.send_eof()
.await
.err_tip(|| "Could not send eof in GrpcStore::get_part()")
.map_or_else(RetryResult::Err, RetryResult::Ok);
return Some((eof_result, local_state));
Expand Down
11 changes: 2 additions & 9 deletions nativelink-store/src/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,13 @@ impl Store for MemoryStore {
self: Pin<&Self>,
digest: DigestInfo,
mut reader: DropCloserReadHalf,
size_info: UploadSizeInfo,
_size_info: UploadSizeInfo,
) -> Result<(), Error> {
let max_size = match size_info {
UploadSizeInfo::ExactSize(sz) => sz,
UploadSizeInfo::MaxSize(sz) => sz,
};

// Internally Bytes might hold a reference to more data than just our data. To prevent
// this potential case, we make a full copy of our data for long-term storage.
let final_buffer = {
let buffer = reader
.consume(Some(max_size))
.consume(None)
.await
.err_tip(|| "Failed to collect all bytes from reader in memory_store::update")?;
let mut new_buffer = BytesMut::with_capacity(buffer.len());
Expand All @@ -133,7 +128,6 @@ impl Store for MemoryStore {
if is_zero_digest(&digest) {
writer
.send_eof()
.await
.err_tip(|| "Failed to send zero EOF in filesystem store get_part_ref")?;
return Ok(());
}
Expand All @@ -158,7 +152,6 @@ impl Store for MemoryStore {
}
writer
.send_eof()
.await
.err_tip(|| "Failed to write EOF in memory store get_part")?;
Ok(())
}
Expand Down
9 changes: 1 addition & 8 deletions nativelink-store/src/s3_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ impl Store for S3Store {
// multipart upload requests.
if max_size < MIN_MULTIPART_SIZE {
let (body, content_length) = if let UploadSizeInfo::ExactSize(sz) = upload_size {
reader.set_close_after_size(sz as u64);
(
ByteStream::new(SdkBody::from(
reader
Expand Down Expand Up @@ -465,7 +464,6 @@ impl Store for S3Store {
if is_zero_digest(&digest) {
writer
.send_eof()
.await
.err_tip(|| "Failed to send zero EOF in filesystem store get_part_ref")?;
return Ok(());
}
Expand All @@ -475,11 +473,6 @@ impl Store for S3Store {
.map_or(Some(None), |length| Some(offset.checked_add(length)))
.err_tip(|| "Integer overflow protection triggered")?;

// S3 drops connections when a stream is done. This means that we can't
// run the EOF error check. It's safe to disable it since S3 can be
// trusted to handle incomplete data properly.
writer.set_ignore_eof();

self.retrier
.retry(unfold(writer, move |writer| async move {
let result = self
Expand Down Expand Up @@ -547,7 +540,7 @@ impl Store for S3Store {
}
}
}
if let Err(e) = writer.send_eof().await {
if let Err(e) = writer.send_eof() {
return Some((
RetryResult::Err(make_input_err!(
"Failed to send EOF to consumer in S3: {e}"
Expand Down
Loading

0 comments on commit 0558fb9

Please sign in to comment.