-
Notifications
You must be signed in to change notification settings - Fork 251
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: fetch remainder of metadata if it is large #873
Changes from all commits
dd6bb46
9415fe7
2a94717
e55eab9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; | |
use arrow_select::concat::{concat, concat_batches}; | ||
use async_recursion::async_recursion; | ||
use byteorder::{ByteOrder, LittleEndian}; | ||
use bytes::{Bytes, BytesMut}; | ||
use futures::stream::{self, TryStreamExt}; | ||
use futures::StreamExt; | ||
use object_store::path::Path; | ||
|
@@ -52,11 +53,14 @@ use crate::{ | |
use super::object_store::ObjectStore; | ||
|
||
/// Read Manifest on URI. | ||
/// | ||
/// This only reads manifest files. It does not read data files. | ||
pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result<Manifest> { | ||
let file_size = object_store.inner.head(path).await?.size; | ||
const PREFETCH_SIZE: usize = 64 * 1024; | ||
let initial_start = std::cmp::max(file_size as i64 - PREFETCH_SIZE as i64, 0) as usize; | ||
let range = Range { | ||
start: std::cmp::max(file_size as i64 - PREFETCH_SIZE as i64, 0) as usize, | ||
start: initial_start, | ||
end: file_size, | ||
}; | ||
let buf = object_store.inner.get_range(path, range).await?; | ||
|
@@ -71,15 +75,43 @@ pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result<Ma | |
)); | ||
} | ||
let manifest_pos = LittleEndian::read_i64(&buf[buf.len() - 16..buf.len() - 8]) as usize; | ||
assert!( | ||
file_size - manifest_pos <= buf.len(), | ||
"Assert file_size - manifest_pos <= buf.len(), but got file_size = {}, manifest_pos = {}, buf.len() = {}", | ||
file_size, | ||
manifest_pos, | ||
buf.len() | ||
); | ||
let proto = | ||
pb::Manifest::decode(&buf[buf.len() - (file_size - manifest_pos) + 4..buf.len() - 16])?; | ||
let manifest_len = file_size - manifest_pos; | ||
|
||
let buf: Bytes = if manifest_len <= buf.len() { | ||
// The prefetch catpured the entire manifest. We just need to trim the buffer. | ||
buf.slice(buf.len() - manifest_len..buf.len()) | ||
} else { | ||
// The prefetch only captured part of the manifest. We need to make an | ||
// additional range request to read the remainder. | ||
let mut buf2: BytesMut = object_store | ||
.inner | ||
.get_range( | ||
path, | ||
Range { | ||
start: manifest_pos, | ||
end: file_size - PREFETCH_SIZE, | ||
}, | ||
) | ||
.await? | ||
.into_iter() | ||
.collect(); | ||
buf2.extend_from_slice(&buf); | ||
buf2.freeze() | ||
}; | ||
|
||
let recorded_length = LittleEndian::read_u32(&buf[0..4]) as usize; | ||
// Need to trim the magic number at end and message length at beginning | ||
let buf = buf.slice(4..buf.len() - 16); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm well my comment ", and the non-manifest data at the beginning" isn't right / specific. I'm not sure yet; all I knew is that it was skipped previously. I guess it's the u32 message length here: Perhaps we should be reading that length and validating it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added validation for that and a better comment 👍 |
||
|
||
if buf.len() != recorded_length { | ||
return Err(Error::IO(format!( | ||
"Invalid format: manifest length does not match. Expected {}, got {}", | ||
recorded_length, | ||
buf.len() | ||
))); | ||
} | ||
|
||
let proto = pb::Manifest::decode(buf)?; | ||
Ok(Manifest::from(proto)) | ||
} | ||
|
||
|
@@ -588,9 +620,11 @@ mod tests { | |
RecordBatchReader, StringArray, StructArray, UInt32Array, UInt8Array, | ||
}; | ||
use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema}; | ||
use rand::{distributions::Alphanumeric, Rng}; | ||
use tempfile::tempdir; | ||
use tokio::io::AsyncWriteExt; | ||
|
||
use crate::io::FileWriter; | ||
use crate::io::{write_manifest, FileWriter}; | ||
|
||
#[tokio::test] | ||
async fn read_with_row_id() { | ||
|
@@ -1182,4 +1216,47 @@ mod tests { | |
&Int64Array::from_iter_values(7..25) | ||
); | ||
} | ||
|
||
async fn test_roundtrip_manifest(prefix_size: usize, manifest_min_size: usize) { | ||
let store = ObjectStore::memory(); | ||
let path = Path::from("/read_large_manifest"); | ||
|
||
let mut writer = store.create(&path).await.unwrap(); | ||
|
||
// Write prefix we should ignore | ||
let prefix: Vec<u8> = rand::thread_rng() | ||
.sample_iter(&Alphanumeric) | ||
.take(prefix_size) | ||
.collect(); | ||
writer.write_all(&prefix).await.unwrap(); | ||
|
||
let long_name: String = rand::thread_rng() | ||
.sample_iter(&Alphanumeric) | ||
.take(manifest_min_size) | ||
.map(char::from) | ||
.collect(); | ||
|
||
let arrow_schema = | ||
ArrowSchema::new(vec![ArrowField::new(long_name, DataType::Int64, false)]); | ||
let schema = Schema::try_from(&arrow_schema).unwrap(); | ||
let mut manifest = Manifest::new(&schema, Arc::new(vec![])); | ||
let pos = write_manifest(&mut writer, &mut manifest, None) | ||
.await | ||
.unwrap(); | ||
writer.write_magics(pos).await.unwrap(); | ||
writer.shutdown().await.unwrap(); | ||
|
||
let roundtripped_manifest = read_manifest(&store, &path).await.unwrap(); | ||
|
||
assert_eq!(manifest, roundtripped_manifest); | ||
|
||
store.inner.delete(&path).await.unwrap(); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_read_large_manifest() { | ||
test_roundtrip_manifest(0, 100_000).await; | ||
test_roundtrip_manifest(1000, 100_000).await; | ||
test_roundtrip_manifest(1000, 1000).await; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it require
PartialEq
, do you need to sort / storing them in some containers?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was for the sake of testing (
assert_eq
requires this).