Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fetch remainder of metadata if it is large #873

Merged
merged 4 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/src/format/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl From<&pb::DataFile> for DataFile {
///
/// A fragment is a set of files which represent the different columns of the same rows.
/// If column exists in the schema, but the related file does not exist, treat this column as `nulls`.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it require PartialEq, do you need to sort / storing them in some containers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was for the sake of testing (assert_eq requires this).

pub struct Fragment {
/// Fragment ID
pub id: u64,
Expand Down
2 changes: 1 addition & 1 deletion rust/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::format::{pb, ProtoStruct};
/// * Version
/// * Fragments.
/// * Indices.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct Manifest {
/// Dataset schema.
pub schema: Schema,
Expand Down
99 changes: 88 additions & 11 deletions rust/src/io/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
use arrow_select::concat::{concat, concat_batches};
use async_recursion::async_recursion;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Bytes, BytesMut};
use futures::stream::{self, TryStreamExt};
use futures::StreamExt;
use object_store::path::Path;
Expand All @@ -52,11 +53,14 @@ use crate::{
use super::object_store::ObjectStore;

/// Read Manifest on URI.
///
/// This only reads manifest files. It does not read data files.
pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result<Manifest> {
let file_size = object_store.inner.head(path).await?.size;
const PREFETCH_SIZE: usize = 64 * 1024;
let initial_start = std::cmp::max(file_size as i64 - PREFETCH_SIZE as i64, 0) as usize;
let range = Range {
start: std::cmp::max(file_size as i64 - PREFETCH_SIZE as i64, 0) as usize,
start: initial_start,
end: file_size,
};
let buf = object_store.inner.get_range(path, range).await?;
Expand All @@ -71,15 +75,43 @@ pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result<Ma
));
}
let manifest_pos = LittleEndian::read_i64(&buf[buf.len() - 16..buf.len() - 8]) as usize;
assert!(
file_size - manifest_pos <= buf.len(),
"Assert file_size - manifest_pos <= buf.len(), but got file_size = {}, manifest_pos = {}, buf.len() = {}",
file_size,
manifest_pos,
buf.len()
);
let proto =
pb::Manifest::decode(&buf[buf.len() - (file_size - manifest_pos) + 4..buf.len() - 16])?;
let manifest_len = file_size - manifest_pos;

let buf: Bytes = if manifest_len <= buf.len() {
// The prefetch catpured the entire manifest. We just need to trim the buffer.
buf.slice(buf.len() - manifest_len..buf.len())
} else {
// The prefetch only captured part of the manifest. We need to make an
// additional range request to read the remainder.
let mut buf2: BytesMut = object_store
.inner
.get_range(
path,
Range {
start: manifest_pos,
end: file_size - PREFETCH_SIZE,
},
)
.await?
.into_iter()
.collect();
buf2.extend_from_slice(&buf);
buf2.freeze()
};

let recorded_length = LittleEndian::read_u32(&buf[0..4]) as usize;
// Need to trim the magic number at end and message length at beginning
let buf = buf.slice(4..buf.len() - 16);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this 4 stand for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm well my comment ", and the non-manifest data at the beginning" isn't right / specific. I'm not sure yet; all I knew is that it was skipped previously. I guess it's the u32 message length here:

https://github.com/eto-ai/lance/blob/d3f6f6c31c909da22c0b7d486bbc35ec101f6079/rust/src/io/object_writer.rs#L68

Perhaps we should be reading that length and validating it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added validation for that and a better comment 👍


if buf.len() != recorded_length {
return Err(Error::IO(format!(
"Invalid format: manifest length does not match. Expected {}, got {}",
recorded_length,
buf.len()
)));
}

let proto = pb::Manifest::decode(buf)?;
Ok(Manifest::from(proto))
}

Expand Down Expand Up @@ -588,9 +620,11 @@ mod tests {
RecordBatchReader, StringArray, StructArray, UInt32Array, UInt8Array,
};
use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
use rand::{distributions::Alphanumeric, Rng};
use tempfile::tempdir;
use tokio::io::AsyncWriteExt;

use crate::io::FileWriter;
use crate::io::{write_manifest, FileWriter};

#[tokio::test]
async fn read_with_row_id() {
Expand Down Expand Up @@ -1182,4 +1216,47 @@ mod tests {
&Int64Array::from_iter_values(7..25)
);
}

async fn test_roundtrip_manifest(prefix_size: usize, manifest_min_size: usize) {
let store = ObjectStore::memory();
let path = Path::from("/read_large_manifest");

let mut writer = store.create(&path).await.unwrap();

// Write prefix we should ignore
let prefix: Vec<u8> = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(prefix_size)
.collect();
writer.write_all(&prefix).await.unwrap();

let long_name: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(manifest_min_size)
.map(char::from)
.collect();

let arrow_schema =
ArrowSchema::new(vec![ArrowField::new(long_name, DataType::Int64, false)]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let mut manifest = Manifest::new(&schema, Arc::new(vec![]));
let pos = write_manifest(&mut writer, &mut manifest, None)
.await
.unwrap();
writer.write_magics(pos).await.unwrap();
writer.shutdown().await.unwrap();

let roundtripped_manifest = read_manifest(&store, &path).await.unwrap();

assert_eq!(manifest, roundtripped_manifest);

store.inner.delete(&path).await.unwrap();
}

#[tokio::test]
async fn test_read_large_manifest() {
test_roundtrip_manifest(0, 100_000).await;
test_roundtrip_manifest(1000, 100_000).await;
test_roundtrip_manifest(1000, 1000).await;
}
}