diff --git a/rust/src/format/fragment.rs b/rust/src/format/fragment.rs index 8894b3bc80..2080bcdf22 100644 --- a/rust/src/format/fragment.rs +++ b/rust/src/format/fragment.rs @@ -66,7 +66,7 @@ impl From<&pb::DataFile> for DataFile { /// /// A fragment is a set of files which represent the different columns of the same rows. /// If column exists in the schema, but the related file does not exist, treat this column as `nulls`. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Fragment { /// Fragment ID pub id: u64, diff --git a/rust/src/format/manifest.rs b/rust/src/format/manifest.rs index 47f256540f..b2b66034ce 100644 --- a/rust/src/format/manifest.rs +++ b/rust/src/format/manifest.rs @@ -32,7 +32,7 @@ use crate::format::{pb, ProtoStruct}; /// * Version /// * Fragments. /// * Indices. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Manifest { /// Dataset schema. pub schema: Schema, diff --git a/rust/src/io/reader.rs b/rust/src/io/reader.rs index 4b73832f3c..ea34c35c43 100644 --- a/rust/src/io/reader.rs +++ b/rust/src/io/reader.rs @@ -31,6 +31,7 @@ use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; use arrow_select::concat::{concat, concat_batches}; use async_recursion::async_recursion; use byteorder::{ByteOrder, LittleEndian}; +use bytes::{Bytes, BytesMut}; use futures::stream::{self, TryStreamExt}; use futures::StreamExt; use object_store::path::Path; @@ -52,11 +53,14 @@ use crate::{ use super::object_store::ObjectStore; /// Read Manifest on URI. +/// +/// This only reads manifest files. It does not read data files. pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result { let file_size = object_store.inner.head(path).await?.size; const PREFETCH_SIZE: usize = 64 * 1024; + let initial_start = std::cmp::max(file_size as i64 - PREFETCH_SIZE as i64, 0) as usize; let range = Range { - start: std::cmp::max(file_size as i64 - PREFETCH_SIZE as i64, 0) as usize, + start: initial_start, end: file_size, }; let buf = object_store.inner.get_range(path, range).await?; @@ -71,15 +75,43 @@ pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(prefix_size) + .collect(); + writer.write_all(&prefix).await.unwrap(); + + let long_name: String = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(manifest_min_size) + .map(char::from) + .collect(); + + let arrow_schema = + ArrowSchema::new(vec![ArrowField::new(long_name, DataType::Int64, false)]); + let schema = Schema::try_from(&arrow_schema).unwrap(); + let mut manifest = Manifest::new(&schema, Arc::new(vec![])); + let pos = write_manifest(&mut writer, &mut manifest, None) + .await + .unwrap(); + writer.write_magics(pos).await.unwrap(); + writer.shutdown().await.unwrap(); + + let roundtripped_manifest = read_manifest(&store, &path).await.unwrap(); + + assert_eq!(manifest, roundtripped_manifest); + + store.inner.delete(&path).await.unwrap(); + } + + #[tokio::test] + async fn test_read_large_manifest() { + test_roundtrip_manifest(0, 100_000).await; + test_roundtrip_manifest(1000, 100_000).await; + test_roundtrip_manifest(1000, 1000).await; + } }