Skip to content

Commit

Permalink
feat: enforce max rows per group and file (#1093)
Browse files Browse the repository at this point in the history
* wip: start docs

* wip: start outlining compaction impl

* fix the chunker

* fix various tests

* adjust tests

* pr feedback
  • Loading branch information
wjones127 authored Jul 26, 2023
1 parent cf7ee43 commit d728044
Show file tree
Hide file tree
Showing 8 changed files with 421 additions and 89 deletions.
130 changes: 61 additions & 69 deletions rust/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use chrono::prelude::*;
use futures::stream::{self, StreamExt, TryStreamExt};
use log::warn;
use object_store::path::Path;
use uuid::Uuid;

mod feature_flags;
pub mod fragment;
Expand All @@ -41,19 +40,20 @@ mod write;
use self::feature_flags::{apply_feature_flags, can_read_dataset, can_write_dataset};
use self::fragment::FileFragment;
use self::scanner::Scanner;
use self::write::{reader_to_stream, write_fragments};
use crate::datatypes::Schema;
use crate::error::box_error;
use crate::format::{pb, Fragment, Index, Manifest};
use crate::io::object_store::ObjectStoreParams;
use crate::io::{
object_reader::{read_message, read_struct},
read_manifest, read_metadata_offset, write_manifest, FileWriter, ObjectStore,
read_manifest, read_metadata_offset, write_manifest, ObjectStore,
};
use crate::session::Session;
use crate::{Error, Result};
use hash_joiner::HashJoiner;
pub use scanner::ROW_ID;
pub use write::*;
pub use write::{WriteMode, WriteParams};

const LATEST_MANIFEST_NAME: &str = "_latest.manifest";
const VERSIONS_DIR: &str = "_versions";
Expand Down Expand Up @@ -96,17 +96,6 @@ impl From<&Manifest> for Version {
}
}

/// Create a new [FileWriter] with the related `data_file_path` under `<DATA_DIR>`.
async fn new_file_writer(
object_store: &ObjectStore,
base_dir: &Path,
data_file_path: &str,
schema: &Schema,
) -> Result<FileWriter> {
let full_path = base_dir.child(DATA_DIR).child(data_file_path);
FileWriter::try_new(object_store, &full_path, schema.clone()).await
}

/// Get the manifest file path for a version.
fn manifest_path(base: &Path, version: u64) -> Path {
base.child(VERSIONS_DIR)
Expand Down Expand Up @@ -322,16 +311,7 @@ impl Dataset {
let latest_manifest_path = latest_manifest_path(&base);
let flag_dataset_exists = object_store.exists(&latest_manifest_path).await?;

let mut schema: Schema = Schema::try_from(batches.schema().as_ref())?;
let mut peekable = batches.peekable();
if let Some(batch) = peekable.peek() {
if let Ok(b) = batch {
schema.set_dictionary(b)?;
} else {
return Err(Error::from(batch.as_ref().unwrap_err()));
}
}
schema.validate()?;
let (stream, schema) = reader_to_stream(batches)?;

// Running checks for the different write modes
// create + dataset already exists = error
Expand Down Expand Up @@ -411,52 +391,16 @@ impl Dataset {
vec![]
};

let mut writer = None;
let mut batches: Vec<RecordBatch> = Vec::new();
let mut num_rows: usize = 0;
for batch_result in peekable {
let batch: RecordBatch = batch_result?;
batches.push(batch.clone());
num_rows += batch.num_rows();
if num_rows >= params.max_rows_per_group {
// TODO: the max rows per group boundary is not accurately calculated yet.
if writer.is_none() {
writer = {
let file_path = format!("{}.lance", Uuid::new_v4());
let fragment = Fragment::with_file(fragment_id, &file_path, &schema);
fragments.push(fragment);
fragment_id += 1;
Some(new_file_writer(&object_store, &base, &file_path, &schema).await?)
}
};
let object_store = Arc::new(object_store);
let mut new_fragments =
write_fragments(object_store.clone(), &base, &schema, stream, params.clone()).await?;

writer.as_mut().unwrap().write(&batches).await?;
batches = Vec::new();
num_rows = 0;
}
if let Some(w) = writer.as_mut() {
if w.len() >= params.max_rows_per_file {
w.finish().await?;
writer = None;
}
}
}
if num_rows > 0 {
if writer.is_none() {
writer = {
let file_path = format!("{}.lance", Uuid::new_v4());
let fragment = Fragment::with_file(fragment_id, &file_path, &schema);
fragments.push(fragment);
Some(new_file_writer(&object_store, &base, &file_path, &schema).await?)
}
};
writer.as_mut().unwrap().write(&batches).await?;
// Assign IDs
for fragment in &mut new_fragments {
fragment.id = fragment_id;
fragment_id += 1;
}
if let Some(w) = writer.as_mut() {
// Drop the last writer.
w.finish().await?;
drop(writer);
};
fragments.extend(new_fragments);

let mut manifest = Manifest::new(&schema, Arc::new(fragments));
manifest.version = match dataset.as_ref() {
Expand All @@ -482,7 +426,7 @@ impl Dataset {
.await?;

Ok(Self {
object_store: Arc::new(object_store),
object_store,
base,
manifest: Arc::new(manifest.clone()),
session: Arc::new(Session::default()),
Expand Down Expand Up @@ -1089,6 +1033,12 @@ mod tests {
.try_collect::<Vec<_>>()
.await
.unwrap();

// The batch size batches the group size.
for batch in &actual_batches {
assert_eq!(batch.num_rows(), 10);
}

// sort
let actual_batch = concat_batches(&schema, &actual_batches).unwrap();
let idx_arr = actual_batch.column_by_name("i").unwrap();
Expand Down Expand Up @@ -1192,6 +1142,48 @@ mod tests {
assert_eq!(&expected_struct_arr, as_struct_array(sorted_arr.as_ref()));
}

#[tokio::test]
async fn test_write_params() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();

let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"i",
DataType::Int32,
false,
)]));
let num_rows: usize = 1_000;
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..num_rows as i32))],
)
.unwrap()];

let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());

let write_params = WriteParams {
max_rows_per_file: 100,
max_rows_per_group: 10,
..Default::default()
};
let dataset = Dataset::write(batches, test_uri, Some(write_params))
.await
.unwrap();

assert_eq!(dataset.count_rows().await.unwrap(), num_rows);

let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 10);
for fragment in &fragments {
assert_eq!(fragment.count_rows().await.unwrap(), 100);
let reader = fragment.open(dataset.schema()).await.unwrap();
assert_eq!(reader.num_batches(), 10);
for i in 0..reader.num_batches() {
assert_eq!(reader.num_rows_in_batch(i), 10);
}
}
}

#[tokio::test]
async fn test_write_manifest() {
let test_dir = tempdir().unwrap();
Expand Down
39 changes: 24 additions & 15 deletions rust/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,19 +168,24 @@ impl FileFragment {
pub async fn count_rows(&self) -> Result<usize> {
let total_rows = self.fragment_length();

let deletion_count = read_deletion_file(
&self.dataset.base,
&self.metadata,
self.dataset.object_store(),
)
.map_ok(|v| v.map(|v| v.len()).unwrap_or_default());
let deletion_count = self.count_deletions();

let (total_rows, deletion_count) =
futures::future::try_join(total_rows, deletion_count).await?;

Ok(total_rows - deletion_count)
}

pub(crate) async fn count_deletions(&self) -> Result<usize> {
read_deletion_file(
&self.dataset.base,
&self.metadata,
self.dataset.object_store(),
)
.map_ok(|v| v.map(|v| v.len()).unwrap_or_default())
.await
}

/// Get the number of physical rows in the fragment. This includes deleted rows.
///
/// If there are no deleted rows, this is equal to the number of rows in the
Expand Down Expand Up @@ -555,7 +560,7 @@ mod tests {

let write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 2,
max_rows_per_group: 10,
..Default::default()
};
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Expand All @@ -575,23 +580,27 @@ mod tests {
let mut scanner = fragment.scan();
let batches = scanner
.with_row_id()
.filter(" i < 110")
.filter(" i < 105")
.unwrap()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(batches.len(), 2);
assert_eq!(batches.len(), 3);

assert_eq!(
batches[0].column_by_name("i").unwrap().as_ref(),
&Int32Array::from_iter_values(80..100)
&Int32Array::from_iter_values(80..90)
);
assert_eq!(
batches[1].column_by_name("i").unwrap().as_ref(),
&Int32Array::from_iter_values(100..110)
&Int32Array::from_iter_values(90..100)
);
assert_eq!(
batches[2].column_by_name("i").unwrap().as_ref(),
&Int32Array::from_iter_values(100..105)
);
}

Expand Down Expand Up @@ -759,22 +768,22 @@ mod tests {
.unwrap();
let batches = stream.try_collect::<Vec<_>>().await.unwrap();

assert_eq!(batches[0].schema().as_ref(), &(&new_projection).into());
assert_eq!(batches[1].schema().as_ref(), &(&new_projection).into());
let max_value_in_batch = if with_delete { 15 } else { 20 };
let expected_batch = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", DataType::Int32, true),
ArrowField::new("double_i", DataType::Int32, true),
])),
vec![
Arc::new(Int32Array::from_iter_values(0..max_value_in_batch)),
Arc::new(Int32Array::from_iter_values(10..max_value_in_batch)),
Arc::new(Int32Array::from_iter_values(
(0..(2 * max_value_in_batch)).step_by(2),
(20..(2 * max_value_in_batch)).step_by(2),
)),
],
)
.unwrap();
assert_eq!(batches[0], expected_batch);
assert_eq!(batches[1], expected_batch);
}
}

Expand Down
2 changes: 1 addition & 1 deletion rust/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ mod test {
.try_into_stream()
.await
.unwrap();
for expected_len in [8, 8, 4, 8, 8, 4] {
for expected_len in [8, 2, 8, 2, 8, 2, 8, 2, 8, 2] {
assert_eq!(
stream.next().await.unwrap().unwrap().num_rows(),
expected_len as usize
Expand Down
8 changes: 8 additions & 0 deletions rust/src/dataset/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,14 @@ pub(crate) fn add_blanks(
return Ok(batch);
}

if batch.num_rows() == 0 {
// TODO: implement adding blanks for an empty batch.
// This is difficult because we need to create a batch for arbitrary schemas.
return Err(Error::NotSupported {
source: "Missing many rows in merge".into(),
});
}

let mut array_i = 0;
let selection_vector: Vec<u32> = row_id_range
.map(move |row_id| {
Expand Down
Loading

0 comments on commit d728044

Please sign in to comment.