Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove EOF counter #747

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 3 additions & 13 deletions src/context/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,29 +196,19 @@ pub async fn plan_to_object_store(
let mut multipart_upload = store.put_multipart(&file_name).await?;

let error: object_store::Error;
let mut eof_counter = 0;
loop {
match reader.read_buf(&mut part_buffer).await {
Ok(0) if part_buffer.is_empty() => {
// We've reached EOF and there are no pending writes to flush.
// As per the docs size = 0 doesn't seem to guarantee that we've reached EOF, so we use
// a heuristic: if we encounter Ok(0) 3 times in a row it's safe to assume it's EOF.
// Another potential workaround is to use `stream_position` + `stream_len` to determine
// whether we've reached the end (`stream_len` is nightly-only experimental API atm)
eof_counter += 1;
if eof_counter >= 3 {
break;
} else {
continue;
}
// If part_buffer is empty, then it is not full
// According to the the docs, if part_buffer is not full and size = 0, then we've reached EOF
break;
}
Ok(size)
if size != 0
&& part_buffer.len()
< PARTITION_FILE_MIN_PART_SIZE =>
{
// Keep filling the part buffer until it surpasses the minimum required size
eof_counter = 0;
continue;
}
Ok(_) => {
Expand Down