From dd6bb468e57e185494d3a87196f301fe7e7e32c0 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 16 May 2023 10:32:00 -0700 Subject: [PATCH 1/4] test: add failing test for reading manifest --- rust/src/io/reader.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/rust/src/io/reader.rs b/rust/src/io/reader.rs index 4b73832f3c..524ade2acc 100644 --- a/rust/src/io/reader.rs +++ b/rust/src/io/reader.rs @@ -1182,4 +1182,30 @@ mod tests { &Int64Array::from_iter_values(7..25) ); } + + #[tokio::test] + async fn test_read_large_manifest() { + // Create a very long name, so the manifest is larger than the default buffer size + // let long_name: String = rand::thread_rng() + // .sample_iter(&Alphanumeric) + // .take(7) + // .map(char::from) + // .collect(); + + let arrow_schema = ArrowSchema::new(vec![ArrowField::new("test", DataType::Int64, false)]); + let schema = Schema::try_from(&arrow_schema).unwrap(); + let columns: Vec = vec![Arc::new(Int64Array::from_iter_values(0..100))]; + let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap(); + + let store = ObjectStore::memory(); + let path = Path::from("/read_large_manifest"); + let mut file_writer = FileWriter::try_new(&store, &path, schema.clone()) + .await + .unwrap(); + file_writer.write(&[batch]).await.unwrap(); + file_writer.finish().await.unwrap(); + + let manifest = read_manifest(&store, &path).await.unwrap(); + assert_eq!(schema, manifest.schema); + } } From 9415fe7ef609669e24478b5f46824639164c5c35 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 16 May 2023 11:07:58 -0700 Subject: [PATCH 2/4] add some changes --- rust/src/io/reader.rs | 46 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/rust/src/io/reader.rs b/rust/src/io/reader.rs index 524ade2acc..ea294e74dd 100644 --- a/rust/src/io/reader.rs +++ b/rust/src/io/reader.rs @@ -31,6 +31,7 @@ use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; use arrow_select::concat::{concat, concat_batches}; use async_recursion::async_recursion; use byteorder::{ByteOrder, LittleEndian}; +use bytes::{Bytes, BytesMut}; use futures::stream::{self, TryStreamExt}; use futures::StreamExt; use object_store::path::Path; @@ -55,8 +56,9 @@ use super::object_store::ObjectStore; pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result { let file_size = object_store.inner.head(path).await?.size; const PREFETCH_SIZE: usize = 64 * 1024; + let initial_start = std::cmp::max(file_size as i64 - PREFETCH_SIZE as i64, 0) as usize; let range = Range { - start: std::cmp::max(file_size as i64 - PREFETCH_SIZE as i64, 0) as usize, + start: initial_start, end: file_size, }; let buf = object_store.inner.get_range(path, range).await?; @@ -71,16 +73,51 @@ pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result Date: Tue, 16 May 2023 13:04:02 -0700 Subject: [PATCH 3/4] fix: fetch the remainder of metadata if it's larger than prefetch buf --- rust/src/format/fragment.rs | 2 +- rust/src/format/manifest.rs | 2 +- rust/src/io/reader.rs | 128 +++++++++++++++++++----------------- 3 files changed, 68 insertions(+), 64 deletions(-) diff --git a/rust/src/format/fragment.rs b/rust/src/format/fragment.rs index 8894b3bc80..2080bcdf22 100644 --- a/rust/src/format/fragment.rs +++ b/rust/src/format/fragment.rs @@ -66,7 +66,7 @@ impl From<&pb::DataFile> for DataFile { /// /// A fragment is a set of files which represent the different columns of the same rows. /// If column exists in the schema, but the related file does not exist, treat this column as `nulls`. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Fragment { /// Fragment ID pub id: u64, diff --git a/rust/src/format/manifest.rs b/rust/src/format/manifest.rs index 47f256540f..b2b66034ce 100644 --- a/rust/src/format/manifest.rs +++ b/rust/src/format/manifest.rs @@ -32,7 +32,7 @@ use crate::format::{pb, ProtoStruct}; /// * Version /// * Fragments. /// * Indices. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Manifest { /// Dataset schema. pub schema: Schema, diff --git a/rust/src/io/reader.rs b/rust/src/io/reader.rs index ea294e74dd..017844737f 100644 --- a/rust/src/io/reader.rs +++ b/rust/src/io/reader.rs @@ -53,6 +53,8 @@ use crate::{ use super::object_store::ObjectStore; /// Read Manifest on URI. +/// +/// This only reads manifest files. It does not read data files. pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result { let file_size = object_store.inner.head(path).await?.size; const PREFETCH_SIZE: usize = 64 * 1024; @@ -74,50 +76,34 @@ pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result = vec![Arc::new(Int64Array::from_iter_values(0..100))]; - let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap(); - + async fn test_roundtrip_manifest(prefix_size: usize, manifest_min_size: usize) { let store = ObjectStore::memory(); let path = Path::from("/read_large_manifest"); - let mut file_writer = FileWriter::try_new(&store, &path, schema.clone()) + + let mut writer = store.create(&path).await.unwrap(); + + // Write prefix we should ignore + let prefix: Vec = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(prefix_size) + .collect(); + writer.write_all(&prefix).await.unwrap(); + + let long_name: String = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(manifest_min_size) + .map(char::from) + .collect(); + + let arrow_schema = + ArrowSchema::new(vec![ArrowField::new(long_name, DataType::Int64, false)]); + let schema = Schema::try_from(&arrow_schema).unwrap(); + let mut manifest = Manifest::new(&schema, Arc::new(vec![])); + let pos = write_manifest(&mut writer, &mut manifest, None) .await .unwrap(); - file_writer.write(&[batch]).await.unwrap(); - file_writer.finish().await.unwrap(); + writer.write_magics(pos).await.unwrap(); + writer.shutdown().await.unwrap(); + + let roundtripped_manifest = read_manifest(&store, &path).await.unwrap(); - let manifest = read_manifest(&store, &path).await.unwrap(); - assert_eq!(schema, manifest.schema); + assert_eq!(manifest, roundtripped_manifest); + + store.inner.delete(&path).await.unwrap(); + } + + #[tokio::test] + async fn test_read_large_manifest() { + test_roundtrip_manifest(0, 100_000).await; + test_roundtrip_manifest(1000, 100_000).await; + test_roundtrip_manifest(1000, 1000).await; } } From e55eab93a54cac6c7f41eeae5e2f1a3f4833c7ea Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 16 May 2023 16:01:53 -0700 Subject: [PATCH 4/4] feat: validate length of the manifest messages --- rust/src/io/reader.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/rust/src/io/reader.rs b/rust/src/io/reader.rs index 017844737f..ea34c35c43 100644 --- a/rust/src/io/reader.rs +++ b/rust/src/io/reader.rs @@ -99,9 +99,18 @@ pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result