Skip to content

Commit

Permalink
fix: fetch remainder of metadata if it is large (#873)
Browse files Browse the repository at this point in the history
* test: add failing test for reading manifest

* add some changes

* fix: fetch the remainder of metadata if it's larger than prefetch buf

* feat: validate length of the manifest messages
  • Loading branch information
wjones127 authored May 17, 2023
1 parent 841ee20 commit 6af670a
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 13 deletions.
2 changes: 1 addition & 1 deletion rust/src/format/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl From<&pb::DataFile> for DataFile {
///
/// A fragment is a set of files which represent the different columns of the same rows.
/// If column exists in the schema, but the related file does not exist, treat this column as `nulls`.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct Fragment {
/// Fragment ID
pub id: u64,
Expand Down
2 changes: 1 addition & 1 deletion rust/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::format::{pb, ProtoStruct};
/// * Version
/// * Fragments.
/// * Indices.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct Manifest {
/// Dataset schema.
pub schema: Schema,
Expand Down
99 changes: 88 additions & 11 deletions rust/src/io/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
use arrow_select::concat::{concat, concat_batches};
use async_recursion::async_recursion;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Bytes, BytesMut};
use futures::stream::{self, TryStreamExt};
use futures::StreamExt;
use object_store::path::Path;
Expand All @@ -52,11 +53,14 @@ use crate::{
use super::object_store::ObjectStore;

/// Read Manifest on URI.
///
/// This only reads manifest files. It does not read data files.
pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result<Manifest> {
let file_size = object_store.inner.head(path).await?.size;
const PREFETCH_SIZE: usize = 64 * 1024;
let initial_start = std::cmp::max(file_size as i64 - PREFETCH_SIZE as i64, 0) as usize;
let range = Range {
start: std::cmp::max(file_size as i64 - PREFETCH_SIZE as i64, 0) as usize,
start: initial_start,
end: file_size,
};
let buf = object_store.inner.get_range(path, range).await?;
Expand All @@ -71,15 +75,43 @@ pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result<Ma
));
}
let manifest_pos = LittleEndian::read_i64(&buf[buf.len() - 16..buf.len() - 8]) as usize;
assert!(
file_size - manifest_pos <= buf.len(),
"Assert file_size - manifest_pos <= buf.len(), but got file_size = {}, manifest_pos = {}, buf.len() = {}",
file_size,
manifest_pos,
buf.len()
);
let proto =
pb::Manifest::decode(&buf[buf.len() - (file_size - manifest_pos) + 4..buf.len() - 16])?;
let manifest_len = file_size - manifest_pos;

let buf: Bytes = if manifest_len <= buf.len() {
// The prefetch catpured the entire manifest. We just need to trim the buffer.
buf.slice(buf.len() - manifest_len..buf.len())
} else {
// The prefetch only captured part of the manifest. We need to make an
// additional range request to read the remainder.
let mut buf2: BytesMut = object_store
.inner
.get_range(
path,
Range {
start: manifest_pos,
end: file_size - PREFETCH_SIZE,
},
)
.await?
.into_iter()
.collect();
buf2.extend_from_slice(&buf);
buf2.freeze()
};

let recorded_length = LittleEndian::read_u32(&buf[0..4]) as usize;
// Need to trim the magic number at end and message length at beginning
let buf = buf.slice(4..buf.len() - 16);

if buf.len() != recorded_length {
return Err(Error::IO(format!(
"Invalid format: manifest length does not match. Expected {}, got {}",
recorded_length,
buf.len()
)));
}

let proto = pb::Manifest::decode(buf)?;
Ok(Manifest::from(proto))
}

Expand Down Expand Up @@ -588,9 +620,11 @@ mod tests {
RecordBatchReader, StringArray, StructArray, UInt32Array, UInt8Array,
};
use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
use rand::{distributions::Alphanumeric, Rng};
use tempfile::tempdir;
use tokio::io::AsyncWriteExt;

use crate::io::FileWriter;
use crate::io::{write_manifest, FileWriter};

#[tokio::test]
async fn read_with_row_id() {
Expand Down Expand Up @@ -1182,4 +1216,47 @@ mod tests {
&Int64Array::from_iter_values(7..25)
);
}

async fn test_roundtrip_manifest(prefix_size: usize, manifest_min_size: usize) {
let store = ObjectStore::memory();
let path = Path::from("/read_large_manifest");

let mut writer = store.create(&path).await.unwrap();

// Write prefix we should ignore
let prefix: Vec<u8> = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(prefix_size)
.collect();
writer.write_all(&prefix).await.unwrap();

let long_name: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(manifest_min_size)
.map(char::from)
.collect();

let arrow_schema =
ArrowSchema::new(vec![ArrowField::new(long_name, DataType::Int64, false)]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let mut manifest = Manifest::new(&schema, Arc::new(vec![]));
let pos = write_manifest(&mut writer, &mut manifest, None)
.await
.unwrap();
writer.write_magics(pos).await.unwrap();
writer.shutdown().await.unwrap();

let roundtripped_manifest = read_manifest(&store, &path).await.unwrap();

assert_eq!(manifest, roundtripped_manifest);

store.inner.delete(&path).await.unwrap();
}

#[tokio::test]
async fn test_read_large_manifest() {
test_roundtrip_manifest(0, 100_000).await;
test_roundtrip_manifest(1000, 100_000).await;
test_roundtrip_manifest(1000, 1000).await;
}
}

0 comments on commit 6af670a

Please sign in to comment.