From 4f331d4cc5869fe44407bf14b9b335230acf33e4 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 24 Oct 2024 11:34:16 -0700 Subject: [PATCH 01/19] start to accept subschemas --- rust/lance-core/src/datatypes/field.rs | 91 +++------ rust/lance-core/src/datatypes/schema.rs | 257 ++++++++++++++++++------ rust/lance/src/dataset.rs | 90 +++++++++ 3 files changed, 313 insertions(+), 125 deletions(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index a4a0b58d4f..3fa53283bf 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -5,7 +5,7 @@ use std::{ cmp::max, - collections::{HashMap, HashSet}, + collections::HashMap, fmt::{self, Display}, str::FromStr, sync::Arc, @@ -23,7 +23,7 @@ use deepsize::DeepSizeOf; use lance_arrow::{bfloat16::ARROW_EXT_NAME_KEY, *}; use snafu::{location, Location}; -use super::{Dictionary, LogicalType}; +use super::{schema::explain_fields_difference, Dictionary, LogicalType}; use crate::{Error, Result}; pub const LANCE_STORAGE_CLASS_SCHEMA_META_KEY: &str = "lance-schema:storage-class"; @@ -49,6 +49,10 @@ pub struct SchemaCompareOptions { pub compare_field_ids: bool, /// Should nullability be compared (default Strict) pub compare_nullability: NullabilityComparison, + /// Allow fields to be missing if they are nullable (default false) + pub allow_missing_if_nullable: bool, + /// Allow out of order fields (default false) + pub ignore_field_order: bool, } /// Encoding enum. #[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)] @@ -151,7 +155,7 @@ impl Field { self.storage_class } - fn explain_differences( + pub(crate) fn explain_differences( &self, expected: &Self, options: &SchemaCompareOptions, @@ -210,61 +214,19 @@ impl Field { self_name )); } - if self.children.len() != expected.children.len() - || !self - .children - .iter() - .zip(expected.children.iter()) - .all(|(child, expected)| child.name == expected.name) - { - let self_children = self - .children - .iter() - .map(|child| child.name.clone()) - .collect::>(); - let expected_children = expected - .children - .iter() - .map(|child| child.name.clone()) - .collect::>(); - let missing = expected_children - .difference(&self_children) - .cloned() - .collect::>(); - let unexpected = self_children - .difference(&expected_children) - .cloned() - .collect::>(); - if missing.is_empty() && unexpected.is_empty() { - differences.push(format!( - "`{}` field order mismatch, expected [{}] but was [{}]", - self_name, - expected - .children - .iter() - .map(|child| child.name.clone()) - .collect::>() - .join(", "), - self.children - .iter() - .map(|child| child.name.clone()) - .collect::>() - .join(", "), - )); - } else { - differences.push(format!( - "`{}` had mismatched children, missing=[{}] unexpected=[{}]", - self_name, - missing.join(", "), - unexpected.join(", ") - )); - } - } else { - differences.extend(self.children.iter().zip(expected.children.iter()).flat_map( - |(child, expected_child)| { - child.explain_differences(expected_child, options, Some(&self_name)) - }, - )); + let children_differences = explain_fields_difference( + &self.children, + &expected.children, + options, + Some(&self_name), + ); + if !children_differences.is_empty() { + let children_differences = format!( + "`{}` had mismatched children: {}", + self_name, + children_differences.join(", ") + ); + differences.push(children_differences); } differences } @@ -1177,7 +1139,10 @@ mod tests { .unwrap(); assert_eq!( wrong_child.explain_difference(&expected, &opts), - Some("`a.b` should have nullable=true but nullable=false".to_string()) + Some( + "`a` had mismatched children: `a.b` should have nullable=true but nullable=false" + .to_string() + ) ); let mismatched_children: Field = ArrowField::new( @@ -1192,13 +1157,13 @@ mod tests { .unwrap(); assert_eq!( mismatched_children.explain_difference(&expected, &opts), - Some("`a` had mismatched children, missing=[c] unexpected=[d]".to_string()) + Some("`a` had mismatched children: fields did not match, missing=[a.c], unexpected=[a.d]".to_string()) ); let reordered_children: Field = ArrowField::new( "a", DataType::Struct(Fields::from(vec![ - ArrowField::new("c", DataType::Int32, false), + ArrowField::new("c", DataType::Int32, true), ArrowField::new("b", DataType::Int32, true), ])), true, @@ -1207,7 +1172,7 @@ mod tests { .unwrap(); assert_eq!( reordered_children.explain_difference(&expected, &opts), - Some("`a` field order mismatch, expected [b, c] but was [c, b]".to_string()) + Some("`a` had mismatched children: fields in different order, expected: [b, c], actual: [c, b]".to_string()) ); let multiple_wrongs: Field = ArrowField::new( @@ -1223,7 +1188,7 @@ mod tests { assert_eq!( multiple_wrongs.explain_difference(&expected, &opts), Some( - "expected name 'a' but name was 'c', `c.c` should have type int32 but type was float" + "expected name 'a' but name was 'c', `c` had mismatched children: `c.c` should have type int32 but type was float" .to_string() ) ); diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index 8c3077bb7d..4103f11bde 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -76,69 +76,21 @@ impl Schema { expected: &Self, options: &SchemaCompareOptions, ) -> Option { - if self.fields.len() != expected.fields.len() - || !self - .fields - .iter() - .zip(expected.fields.iter()) - .all(|(field, expected)| field.name == expected.name) - { - let self_fields = self - .fields - .iter() - .map(|f| f.name.clone()) - .collect::>(); - let expected_fields = expected - .fields - .iter() - .map(|f| f.name.clone()) - .collect::>(); - let missing = expected_fields - .difference(&self_fields) - .cloned() - .collect::>(); - let unexpected = self_fields - .difference(&expected_fields) - .cloned() - .collect::>(); - if missing.is_empty() && unexpected.is_empty() { - Some(format!( - "fields in different order, expected: [{}], actual: [{}]", - expected - .fields - .iter() - .map(|f| f.name.clone()) - .collect::>() - .join(", "), - self.fields - .iter() - .map(|f| f.name.clone()) - .collect::>() - .join(", "), - )) - } else { - Some(format!( - "fields did not match, missing=[{}], unexpected=[{}]", - missing.join(", "), - unexpected.join(", ") - )) + let mut differences = + explain_fields_difference(&self.fields, &expected.fields, options, None); + + if options.compare_metadata { + if let Some(difference) = + explain_metadata_difference(&self.metadata, &expected.metadata) + { + differences.push(difference); } + } + + if differences.is_empty() { + None } else { - let differences = self - .fields - .iter() - .zip(expected.fields.iter()) - .flat_map(|(field, expected)| field.explain_difference(expected, options)) - .collect::>(); - if differences.is_empty() { - if options.compare_metadata && self.metadata != expected.metadata { - Some("schema metadata did not match expected schema metadata".to_string()) - } else { - None - } - } else { - Some(differences.join(", ")) - } + Some(differences.join(", ")) } } @@ -632,6 +584,113 @@ impl TryFrom<&Self> for Schema { } } +pub fn explain_fields_difference( + fields: &[Field], + expected: &[Field], + options: &SchemaCompareOptions, + path: Option<&str>, +) -> Vec { + let field_names = fields + .iter() + .map(|f| f.name.as_str()) + .collect::>(); + let expected_names = expected + .iter() + .map(|f| f.name.as_str()) + .collect::>(); + + let prepend_path = |f: &str| { + if let Some(path) = path { + format!("{}.{}", path, f) + } else { + f.to_string() + } + }; + + // Check there are no extra fields or missing fields + let unexpected_fields = field_names + .difference(&expected_names) + .cloned() + .map(prepend_path) + .collect::>(); + let missing_fields = expected_names.difference(&field_names); + let missing_fields = if options.allow_missing_if_nullable { + missing_fields + .filter(|f| { + let expected_field = expected.iter().find(|ef| ef.name == **f).unwrap(); + !expected_field.nullable + }) + .cloned() + .map(prepend_path) + .collect::>() + } else { + missing_fields + .cloned() + .map(prepend_path) + .collect::>() + }; + + let mut differences = vec![]; + if !missing_fields.is_empty() || !unexpected_fields.is_empty() { + differences.push(format!( + "fields did not match, missing=[{}], unexpected=[{}]", + missing_fields.join(", "), + unexpected_fields.join(", ") + )); + } + + // Map the expected fields to position of field + let field_mapping = expected + .iter() + .filter_map(|ef| { + fields + .iter() + .position(|f| ef.name == f.name) + .map(|pos| (ef, pos)) + }) + .collect::>(); + + // Check the fields are in the same order + if !options.ignore_field_order { + let fields_out_of_order = field_mapping.windows(2).any(|w| w[0].1 > w[1].1); + if fields_out_of_order { + let expected_order = expected.iter().map(|f| f.name.as_str()).collect::>(); + let actual_order = fields.iter().map(|f| f.name.as_str()).collect::>(); + differences.push(format!( + "fields in different order, expected: [{}], actual: [{}]", + expected_order.join(", "), + actual_order.join(", ") + )); + } + } + + // Check for individual differences in the fields + for (expected_field, field_pos) in field_mapping.iter() { + let field = &fields[*field_pos]; + debug_assert_eq!(field.name, expected_field.name); + let field_diffs = field.explain_differences(expected_field, options, path); + if !field_diffs.is_empty() { + differences.push(field_diffs.join(", ")) + } + } + + differences +} + +fn explain_metadata_difference( + metadata: &HashMap, + expected: &HashMap, +) -> Option { + if metadata != expected { + Some(format!( + "metadata did not match, expected: {:?}, actual: {:?}", + expected, metadata + )) + } else { + None + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -1096,6 +1155,80 @@ mod tests { ]); let mismatched = Schema::try_from(&mismatched).unwrap(); - assert_eq!(mismatched.explain_difference(&expected, &SchemaCompareOptions::default()), Some("`b` had mismatched children, missing=[f2] unexpected=[], `c` should have nullable=false but nullable=true".to_string())); + assert_eq!( + mismatched.explain_difference(&expected, &SchemaCompareOptions::default()), + Some( + "`b` had mismatched children: fields did not match, missing=[b.f2], \ + unexpected=[], `c` should have nullable=false but nullable=true" + .to_string() + ) + ); + } + + #[test] + fn test_schema_difference_subschema() { + let expected = ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, false), + ArrowField::new( + "b", + DataType::Struct(ArrowFields::from(vec![ + ArrowField::new("f1", DataType::Utf8, true), + ArrowField::new("f2", DataType::Boolean, false), + ArrowField::new("f3", DataType::Float32, false), + ])), + true, + ), + ArrowField::new("c", DataType::Float64, true), + ]); + let expected = Schema::try_from(&expected).unwrap(); + + // Can omit nullable fields and subfields + let subschema = ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, false), + ArrowField::new( + "b", + DataType::Struct(ArrowFields::from(vec![ + ArrowField::new("f2", DataType::Boolean, false), + ArrowField::new("f3", DataType::Float32, false), + ])), + true, + ), + ]); + let subschema = Schema::try_from(&subschema).unwrap(); + + assert_eq!( + subschema.explain_difference(&expected, &SchemaCompareOptions::default()), + Some( + "fields did not match, missing=[c], unexpected=[], `b` had mismatched \ + children: fields did not match, missing=[b.f1], unexpected=[]" + .to_string() + ) + ); + let options = SchemaCompareOptions { + allow_missing_if_nullable: true, + ..Default::default() + }; + let res = subschema.explain_difference(&expected, &options); + assert!(res.is_none(), "Expected None, got {:?}", res); + + // Omitting non-nullable fields should fail + let subschema = ArrowSchema::new(vec![ArrowField::new( + "b", + DataType::Struct(ArrowFields::from(vec![ArrowField::new( + "f2", + DataType::Boolean, + false, + )])), + true, + )]); + let subschema = Schema::try_from(&subschema).unwrap(); + assert_eq!( + subschema.explain_difference(&expected, &options), + Some( + "fields did not match, missing=[a], unexpected=[], `b` had mismatched \ + children: fields did not match, missing=[b.f3], unexpected=[]" + .to_string() + ) + ); } } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index d470f2f7ee..a2dae817dc 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -4876,4 +4876,94 @@ mod tests { } } } + + #[tokio::test] + async fn test_insert_subschema() { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, false), + ArrowField::new("b", DataType::Int32, true), + ])); + let empty_reader = RecordBatchIterator::new(vec![], schema.clone()); + let mut dataset = Dataset::write(empty_reader, "memory://", None) + .await + .unwrap(); + + // If missing columns that aren't nullable, will return an error + // TODO: provide alternative default than null. + let just_b = Arc::new(schema.project(&[1]).unwrap()); + let batch = RecordBatch::try_new(just_b.clone(), vec![Arc::new(Int32Array::from(vec![1]))]) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], just_b.clone()); + let res = dataset.append(reader, None).await; + assert!( + matches!(res, Err(Error::SchemaMismatch { .. })), + "Expected Error::SchemaMismatch, got {:?}", + res + ); + + // If missing columns that are nullable, the write succeeds. + let just_a = Arc::new(schema.project(&[0]).unwrap()); + let batch = RecordBatch::try_new(just_a.clone(), vec![Arc::new(Int32Array::from(vec![1]))]) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], just_a.clone()); + dataset.append(reader, None).await.unwrap(); + // TODO: do we also need to check the write() path, since it seems separate? + + // Looking at the fragments, there is no data file with the missing field + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 1); + assert_eq!(fragments[0].metadata.files.len(), 1); + assert_eq!(&fragments[0].metadata.files[0].fields, &[0]); + + // When reading back, columns that are missing are null + let data = dataset.scan().try_into_batch().await.unwrap(); + let expected = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1])), + Arc::new(Int32Array::from(vec![None])), + ], + ) + .unwrap(); + assert_eq!(data, expected); + + // Can still insert all columns + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![2])), + Arc::new(Int32Array::from(vec![3])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + dataset.append(reader, None).await.unwrap(); + + // When reading back, only missing data is null, otherwise is filled in + let data = dataset.scan().try_into_batch().await.unwrap(); + let expected = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(Int32Array::from(vec![None, Some(3)])), + ], + ) + .unwrap(); + assert_eq!(data, expected); + + // Can run compaction. All files should now have all fields. + compact_files(&mut dataset, CompactionOptions::default(), None) + .await + .unwrap(); + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 1); + assert_eq!(fragments[0].metadata.files.len(), 1); + assert_eq!(&fragments[0].metadata.files[0].fields, &[0, 1]); + + // Can scan and get expected data. + let data = dataset.scan().try_into_batch().await.unwrap(); + assert_eq!(data, expected); + } + + // TODO: test nested fields and subschemas } From ea20a60ab8695f1f2eed7065cfe46afa32dee78e Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 24 Oct 2024 12:16:42 -0700 Subject: [PATCH 02/19] finish subschema checks --- rust/lance-core/src/datatypes/field.rs | 28 +++----- rust/lance-core/src/datatypes/schema.rs | 95 ++++++++++++++++++++++--- 2 files changed, 97 insertions(+), 26 deletions(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 3fa53283bf..8a0289318b 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -23,7 +23,10 @@ use deepsize::DeepSizeOf; use lance_arrow::{bfloat16::ARROW_EXT_NAME_KEY, *}; use snafu::{location, Location}; -use super::{schema::explain_fields_difference, Dictionary, LogicalType}; +use super::{ + schema::{compare_fields, explain_fields_difference}, + Dictionary, LogicalType, +}; use crate::{Error, Result}; pub const LANCE_STORAGE_CLASS_SCHEMA_META_KEY: &str = "lance-schema:storage-class"; @@ -257,22 +260,13 @@ impl Field { } pub fn compare_with_options(&self, expected: &Self, options: &SchemaCompareOptions) -> bool { - if self.children.len() != expected.children.len() { - false - } else { - self.name == expected.name - && self.logical_type == expected.logical_type - && Self::compare_nullability(expected.nullable, self.nullable, options) - && self.children.len() == expected.children.len() - && self - .children - .iter() - .zip(&expected.children) - .all(|(left, right)| left.compare_with_options(right, options)) - && (!options.compare_field_ids || self.id == expected.id) - && (!options.compare_dictionary || self.dictionary == expected.dictionary) - && (!options.compare_metadata || self.metadata == expected.metadata) - } + self.name == expected.name + && self.logical_type == expected.logical_type + && Self::compare_nullability(expected.nullable, self.nullable, options) + && compare_fields(&self.children, &expected.children, options) + && (!options.compare_field_ids || self.id == expected.id) + && (!options.compare_dictionary || self.dictionary == expected.dictionary) + && (!options.compare_metadata || self.metadata == expected.metadata) } pub fn extension_name(&self) -> Option<&str> { diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index 4103f11bde..c76198a2fd 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -60,15 +60,8 @@ impl<'a> Iterator for SchemaFieldIterPreOrder<'a> { impl Schema { pub fn compare_with_options(&self, expected: &Self, options: &SchemaCompareOptions) -> bool { - if self.fields.len() != expected.fields.len() { - false - } else { - self.fields - .iter() - .zip(&expected.fields) - .all(|(lhs, rhs)| lhs.compare_with_options(rhs, options)) - && (!options.compare_metadata || self.metadata == expected.metadata) - } + compare_fields(&self.fields, &expected.fields, options) + && (!options.compare_metadata || self.metadata == expected.metadata) } pub fn explain_difference( @@ -584,6 +577,55 @@ impl TryFrom<&Self> for Schema { } } +pub fn compare_fields( + fields: &[Field], + expected: &[Field], + options: &SchemaCompareOptions, +) -> bool { + if options.allow_missing_if_nullable || options.ignore_field_order { + let expected_names = expected + .iter() + .map(|f| f.name.as_str()) + .collect::>(); + for field in fields { + if !expected_names.contains(field.name.as_str()) { + // Extra field + return false; + } + } + + let field_mapping = fields + .iter() + .enumerate() + .map(|(pos, f)| (f.name.as_str(), (f, pos))) + .collect::>(); + let mut cumulative_position = 0; + for expected_field in expected { + if let Some((field, pos)) = field_mapping.get(expected_field.name.as_str()) { + if !field.compare_with_options(expected_field, options) { + return false; + } + if !options.ignore_field_order && *pos < cumulative_position { + return false; + } + cumulative_position = *pos; + } else if options.allow_missing_if_nullable && expected_field.nullable { + continue; + } else { + return false; + } + } + true + } else { + // Fast path: we can just zip + fields.len() == expected.len() + && fields + .iter() + .zip(expected.iter()) + .all(|(lhs, rhs)| lhs.compare_with_options(rhs, options)) + } +} + pub fn explain_fields_difference( fields: &[Field], expected: &[Field], @@ -1196,6 +1238,7 @@ mod tests { ]); let subschema = Schema::try_from(&subschema).unwrap(); + assert!(!subschema.compare_with_options(&expected, &SchemaCompareOptions::default())); assert_eq!( subschema.explain_difference(&expected, &SchemaCompareOptions::default()), Some( @@ -1208,6 +1251,7 @@ mod tests { allow_missing_if_nullable: true, ..Default::default() }; + assert!(subschema.compare_with_options(&expected, &options)); let res = subschema.explain_difference(&expected, &options); assert!(res.is_none(), "Expected None, got {:?}", res); @@ -1222,6 +1266,7 @@ mod tests { true, )]); let subschema = Schema::try_from(&subschema).unwrap(); + assert!(!subschema.compare_with_options(&expected, &options)); assert_eq!( subschema.explain_difference(&expected, &options), Some( @@ -1230,5 +1275,37 @@ mod tests { .to_string() ) ); + + let out_of_order = ArrowSchema::new(vec![ + ArrowField::new("c", DataType::Float64, true), + ArrowField::new( + "b", + DataType::Struct(ArrowFields::from(vec![ + ArrowField::new("f3", DataType::Float32, false), + ArrowField::new("f2", DataType::Boolean, false), + ArrowField::new("f1", DataType::Utf8, true), + ])), + true, + ), + ArrowField::new("a", DataType::Int32, false), + ]); + let out_of_order = Schema::try_from(&out_of_order).unwrap(); + assert!(!out_of_order.compare_with_options(&expected, &options)); + assert_eq!( + subschema.explain_difference(&expected, &options), + Some( + "fields did not match, missing=[a], unexpected=[], `b` had mismatched \ + children: fields did not match, missing=[b.f3], unexpected=[]" + .to_string() + ) + ); + + let options = SchemaCompareOptions { + ignore_field_order: true, + ..Default::default() + }; + assert!(out_of_order.compare_with_options(&expected, &options)); + let res = out_of_order.explain_difference(&expected, &options); + assert!(res.is_none(), "Expected None, got {:?}", res); } } From ece60a2ba8981eaa817d8a7d5c22cb152a4bdca9 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 24 Oct 2024 14:46:46 -0700 Subject: [PATCH 03/19] get top-level subschema inserts working --- rust/lance/src/dataset.rs | 6 +- rust/lance/src/dataset/fragment.rs | 113 ++++++++++++++++++++++++++++- rust/lance/src/dataset/write.rs | 28 +++---- 3 files changed, 131 insertions(+), 16 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index a2dae817dc..46265c251a 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -734,10 +734,12 @@ impl Dataset { let stream = reader_to_stream(batches); // Return Error if append and input schema differ - self.manifest.schema.check_compatible( - &schema, + schema.check_compatible( + &self.manifest.schema, &SchemaCompareOptions { compare_dictionary: true, + ignore_field_order: true, + allow_missing_if_nullable: true, ..Default::default() }, )?; diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index d2d6d9e604..9eb9358b99 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -12,7 +12,9 @@ use std::sync::Arc; use arrow::compute::concat_batches; use arrow_array::cast::as_primitive_array; -use arrow_array::{RecordBatch, RecordBatchReader, StructArray, UInt32Array, UInt64Array}; +use arrow_array::{ + new_null_array, RecordBatch, RecordBatchReader, StructArray, UInt32Array, UInt64Array, +}; use arrow_schema::Schema as ArrowSchema; use datafusion::logical_expr::Expr; use datafusion::scalar::ScalarValue; @@ -388,6 +390,101 @@ mod v2_adapter { } } +/// A reader where all rows are null. Used when there are fields that have no +/// data files in a fragment. +#[derive(Debug, Clone)] +struct NullReader { + schema: Arc, + num_rows: u32, +} + +impl NullReader { + fn new(schema: Arc, num_rows: u32) -> Self { + Self { schema, num_rows } + } + + fn batch(projection: Arc, num_rows: usize) -> RecordBatch { + let columns = projection + .fields() + .iter() + .map(|f| new_null_array(f.data_type(), num_rows)) + .collect::>(); + RecordBatch::try_new(projection, columns).unwrap() + } +} + +#[async_trait::async_trait] +impl GenericFileReader for NullReader { + fn read_range_tasks( + &self, + range: Range, + batch_size: u32, + projection: Arc, + ) -> Result { + let mut remaining_rows = range.end - range.start; + let projection: Arc = Arc::new(projection.as_ref().into()); + + let task_iter = std::iter::from_fn(move || { + if remaining_rows == 0 { + return None; + } + + let num_rows = remaining_rows.min(batch_size as u64) as usize; + remaining_rows -= num_rows as u64; + let batch = Self::batch(projection.clone(), num_rows); + let task = ReadBatchTask { + task: futures::future::ready(Ok(batch)).boxed(), + num_rows: num_rows as u32, + }; + Some(task) + }); + + Ok(futures::stream::iter(task_iter).boxed()) + } + + fn read_all_tasks( + &self, + batch_size: u32, + projection: Arc, + ) -> Result { + self.read_range_tasks(0..self.num_rows as u64, batch_size, projection) + } + + fn take_all_tasks( + &self, + indices: &[u32], + batch_size: u32, + projection: Arc, + ) -> Result { + let num_rows = indices.len() as u64; + self.read_range_tasks(0..num_rows, batch_size, projection) + } + + fn projection(&self) -> &Arc { + &self.schema + } + + fn len(&self) -> u32 { + self.num_rows + } + + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } + + fn is_legacy(&self) -> bool { + false + } + + fn as_legacy_opt(&self) -> Option<&FileReader> { + None + } + + fn as_legacy_opt_mut(&mut self) -> Option<&mut FileReader> { + None + } +} + #[derive(Debug, Default)] pub struct FragReadConfig { // Add the row id column @@ -713,6 +810,20 @@ impl FileFragment { } } + // Check if there are any fields that are not in any data files + let field_ids_in_files = opened_files + .iter() + .flat_map(|r| r.projection().fields_pre_order().map(|f| f.id)) + .filter(|id| *id >= 0) + .collect::>(); + let mut missing_fields = projection.field_ids(); + missing_fields.retain(|f| !field_ids_in_files.contains(f) && *f >= 0); + if !missing_fields.is_empty() { + let missing_projection = projection.project_by_ids(&missing_fields); + let null_reader = NullReader::new(Arc::new(missing_projection), opened_files[0].len()); + opened_files.push(Box::new(null_reader)); + } + Ok(opened_files) } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index f28edee8a3..d2e338aa39 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::borrow::Cow; use std::sync::Arc; use arrow_array::{RecordBatch, RecordBatchReader}; @@ -231,7 +232,7 @@ pub async fn do_write_fragments( .boxed() }; - let writer_generator = WriterGenerator::new(object_store, base_dir, schema, storage_version); + let writer_generator = WriterGenerator::new(object_store, base_dir, &schema, storage_version); let mut writer: Option> = None; let mut num_rows_in_current_file = 0; let mut fragments = Vec::new(); @@ -309,22 +310,23 @@ pub async fn write_fragments_internal( schema.check_compatible( dataset.schema(), &SchemaCompareOptions { - // We don't care if the user claims their data is nullable / non-nullable. - // We will verify against the actual data in the writer. + // We don't if the user claims their data is nullable / non-nullable. We will + // verify against the actual data. compare_nullability: NullabilityComparison::Ignore, + allow_missing_if_nullable: true, + ignore_field_order: true, + compare_dictionary: true, ..Default::default() }, )?; - // Use the schema from the dataset, because it has the correct - // field ids. Use the storage version from the dataset, ignoring - // any version from the user. - ( - dataset.schema().clone(), - dataset - .manifest() - .data_storage_format - .lance_file_version()?, - ) + // Project from the dataset schema, because it has the correct field ids. + let write_schema = dataset.schema().project_by_schema(&schema)?; + // Use the storage version from the dataset, ignoring any version from the user. + let data_storage_version = dataset + .manifest() + .data_storage_format + .lance_file_version()?; + (write_schema, data_storage_version) } WriteMode::Overwrite => { // Overwrite, use the schema from the data. If the user specified From bda2649cee161a74cf9683c99f618c5e622fe10c Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 24 Oct 2024 15:40:40 -0700 Subject: [PATCH 04/19] add test for column order and nested subschema --- rust/lance/src/dataset.rs | 128 +++++++++++++++++++++++++++++++++++++- 1 file changed, 126 insertions(+), 2 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 46265c251a..c54b94f377 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -4909,7 +4909,6 @@ mod tests { .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], just_a.clone()); dataset.append(reader, None).await.unwrap(); - // TODO: do we also need to check the write() path, since it seems separate? // Looking at the fragments, there is no data file with the missing field let fragments = dataset.get_fragments(); @@ -4967,5 +4966,130 @@ mod tests { assert_eq!(data, expected); } - // TODO: test nested fields and subschemas + #[tokio::test] + async fn test_insert_nested_subschemas() { + // Test subschemas at struct level + // Test different orders + // Test the Dataset::write() path + // Test Take across fragments with different field id sets + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let field_a = Arc::new(ArrowField::new("a", DataType::Int32, false)); + let field_b = Arc::new(ArrowField::new("b", DataType::Int32, true)); + let field_c = Arc::new(ArrowField::new("c", DataType::Int32, false)); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "s", + DataType::Struct(vec![field_a.clone(), field_b.clone(), field_c.clone()].into()), + true, + )])); + let empty_reader = RecordBatchIterator::new(vec![], schema.clone()); + Dataset::write(empty_reader, test_uri, None).await.unwrap(); + + let append_options = WriteParams { + mode: WriteMode::Append, + ..Default::default() + }; + // Can insert b, a + let just_ba = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "s", + DataType::Struct(vec![field_a.clone(), field_b.clone()].into()), + true, + )])); + let batch = RecordBatch::try_new( + just_ba.clone(), + vec![Arc::new(StructArray::from(vec![ + ( + field_b.clone(), + Arc::new(Int32Array::from(vec![1])) as ArrayRef, + ), + (field_a.clone(), Arc::new(Int32Array::from(vec![2]))), + ]))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], just_ba.clone()); + let dataset = Dataset::write(reader, test_uri, Some(append_options.clone())) + .await + .unwrap(); + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 1); + assert_eq!(fragments[0].metadata.files.len(), 1); + assert_eq!(&fragments[0].metadata.files[0].fields, &[2, 1]); + + // Can insert c, b + let just_cb = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "s", + DataType::Struct(vec![field_b.clone(), field_c.clone()].into()), + true, + )])); + let batch = RecordBatch::try_new( + just_cb.clone(), + vec![Arc::new(StructArray::from(vec![ + ( + field_b.clone(), + Arc::new(Int32Array::from(vec![3])) as ArrayRef, + ), + (field_c.clone(), Arc::new(Int32Array::from(vec![4]))), + ]))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], just_cb.clone()); + let dataset = Dataset::write(reader, test_uri, Some(append_options.clone())) + .await + .unwrap(); + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 2); + assert_eq!(fragments[1].metadata.files.len(), 1); + assert_eq!(&fragments[1].metadata.files[0].fields, &[3, 2]); + + // Can't insert a, c (b is non-nullable) + let just_ac = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "s", + DataType::Struct(vec![field_a.clone(), field_c.clone()].into()), + true, + )])); + let batch = RecordBatch::try_new( + just_ac.clone(), + vec![Arc::new(StructArray::from(vec![ + ( + field_a.clone(), + Arc::new(Int32Array::from(vec![5])) as ArrayRef, + ), + (field_c.clone(), Arc::new(Int32Array::from(vec![6]))), + ]))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], just_ac.clone()); + let res = Dataset::write(reader, test_uri, Some(append_options)).await; + assert!( + matches!(res, Err(Error::SchemaMismatch { .. })), + "Expected Error::SchemaMismatch, got {:?}", + res + ); + + // Can call take and get rows from all three back in one batch + let result = dataset + .take(&[1, 2, 0], Arc::new(dataset.schema().clone())) + .await + .unwrap(); + let expected = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StructArray::from(vec![ + ( + field_a.clone(), + Arc::new(Int32Array::from(vec![2, 1, 5])) as ArrayRef, + ), + ( + field_b.clone(), + Arc::new(Int32Array::from(vec![Some(1), Some(3), None])), + ), + ( + field_c.clone(), + Arc::new(Int32Array::from(vec![None, Some(4), Some(6)])), + ), + ]))], + ) + .unwrap(); + assert_eq!(result, expected); + } } From c955f828b6a8a6c7eb3cdca436bd48baa604ad61 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 1 Nov 2024 11:19:12 -0700 Subject: [PATCH 05/19] get python test passing --- python/python/tests/test_dataset.py | 7 +++++++ rust/lance/src/dataset.rs | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index c46f5b53ed..6c62b50d59 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -119,6 +119,13 @@ def test_dataset_append(tmp_path: Path): with pytest.raises(OSError): lance.write_dataset(table2, base_dir, mode="append") + # But we can append subschemas + table3 = pa.Table.from_pydict({"colA": [4, 5, 6]}) + dataset = lance.write_dataset(table3, base_dir, mode="append") + assert dataset.to_table() == pa.table( + {"colA": [1, 2, 3, 4, 5, 6], "colB": [4, 5, 6, None, None, None]} + ) + def test_dataset_from_record_batch_iterable(tmp_path: Path): base_dir = tmp_path / "test" diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index c54b94f377..5038ded578 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -573,6 +573,8 @@ impl Dataset { // array nullability is checked later, using actual data instead // of the schema compare_nullability: NullabilityComparison::Ignore, + ignore_field_order: true, + allow_missing_if_nullable: true, ..Default::default() }, )?; @@ -738,6 +740,9 @@ impl Dataset { &self.manifest.schema, &SchemaCompareOptions { compare_dictionary: true, + // array nullability is checked later, using actual data instead + // of the schema + compare_nullability: NullabilityComparison::Ignore, ignore_field_order: true, allow_missing_if_nullable: true, ..Default::default() From 272df86b99e169a327e4fbcb4bdd5e44e01cf05a Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 1 Nov 2024 15:40:21 -0700 Subject: [PATCH 06/19] fix issues in test --- rust/lance/src/dataset.rs | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 5038ded578..0ee604811a 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -4980,9 +4980,9 @@ mod tests { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); - let field_a = Arc::new(ArrowField::new("a", DataType::Int32, false)); - let field_b = Arc::new(ArrowField::new("b", DataType::Int32, true)); - let field_c = Arc::new(ArrowField::new("c", DataType::Int32, false)); + let field_a = Arc::new(ArrowField::new("a", DataType::Int32, true)); + let field_b = Arc::new(ArrowField::new("b", DataType::Int32, false)); + let field_c = Arc::new(ArrowField::new("c", DataType::Int32, true)); let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( "s", DataType::Struct(vec![field_a.clone(), field_b.clone(), field_c.clone()].into()), @@ -4998,7 +4998,7 @@ mod tests { // Can insert b, a let just_ba = Arc::new(ArrowSchema::new(vec![ArrowField::new( "s", - DataType::Struct(vec![field_a.clone(), field_b.clone()].into()), + DataType::Struct(vec![field_b.clone(), field_a.clone()].into()), true, )])); let batch = RecordBatch::try_new( @@ -5019,22 +5019,22 @@ mod tests { let fragments = dataset.get_fragments(); assert_eq!(fragments.len(), 1); assert_eq!(fragments[0].metadata.files.len(), 1); - assert_eq!(&fragments[0].metadata.files[0].fields, &[2, 1]); + assert_eq!(&fragments[0].metadata.files[0].fields, &[0, 2, 1]); // Can insert c, b let just_cb = Arc::new(ArrowSchema::new(vec![ArrowField::new( "s", - DataType::Struct(vec![field_b.clone(), field_c.clone()].into()), + DataType::Struct(vec![field_c.clone(), field_b.clone()].into()), true, )])); let batch = RecordBatch::try_new( just_cb.clone(), vec![Arc::new(StructArray::from(vec![ ( - field_b.clone(), - Arc::new(Int32Array::from(vec![3])) as ArrayRef, + field_c.clone(), + Arc::new(Int32Array::from(vec![4])) as ArrayRef, ), - (field_c.clone(), Arc::new(Int32Array::from(vec![4]))), + (field_b.clone(), Arc::new(Int32Array::from(vec![3]))), ]))], ) .unwrap(); @@ -5045,7 +5045,7 @@ mod tests { let fragments = dataset.get_fragments(); assert_eq!(fragments.len(), 2); assert_eq!(fragments[1].metadata.files.len(), 1); - assert_eq!(&fragments[1].metadata.files[0].fields, &[3, 2]); + assert_eq!(&fragments[1].metadata.files[0].fields, &[0, 3, 2]); // Can't insert a, c (b is non-nullable) let just_ac = Arc::new(ArrowSchema::new(vec![ArrowField::new( @@ -5074,7 +5074,7 @@ mod tests { // Can call take and get rows from all three back in one batch let result = dataset - .take(&[1, 2, 0], Arc::new(dataset.schema().clone())) + .take(&[1, 0], Arc::new(dataset.schema().clone())) .await .unwrap(); let expected = RecordBatch::try_new( @@ -5082,15 +5082,15 @@ mod tests { vec![Arc::new(StructArray::from(vec![ ( field_a.clone(), - Arc::new(Int32Array::from(vec![2, 1, 5])) as ArrayRef, + Arc::new(Int32Array::from(vec![None, Some(2)])) as ArrayRef, ), ( field_b.clone(), - Arc::new(Int32Array::from(vec![Some(1), Some(3), None])), + Arc::new(Int32Array::from(vec![Some(3), Some(1)])), ), ( field_c.clone(), - Arc::new(Int32Array::from(vec![None, Some(4), Some(6)])), + Arc::new(Int32Array::from(vec![Some(4), None])), ), ]))], ) From d3c235ba848cef02a232f4312011075dc283fc51 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 4 Nov 2024 15:57:43 -0800 Subject: [PATCH 07/19] update test --- rust/lance/src/dataset.rs | 42 ++++++++++++++++++++++++++------- rust/lance/src/dataset/write.rs | 3 +-- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 0ee604811a..3f74c6be2b 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -4996,13 +4996,13 @@ mod tests { ..Default::default() }; // Can insert b, a - let just_ba = Arc::new(ArrowSchema::new(vec![ArrowField::new( + let just_b_a = Arc::new(ArrowSchema::new(vec![ArrowField::new( "s", DataType::Struct(vec![field_b.clone(), field_a.clone()].into()), true, )])); let batch = RecordBatch::try_new( - just_ba.clone(), + just_b_a.clone(), vec![Arc::new(StructArray::from(vec![ ( field_b.clone(), @@ -5012,7 +5012,7 @@ mod tests { ]))], ) .unwrap(); - let reader = RecordBatchIterator::new(vec![Ok(batch)], just_ba.clone()); + let reader = RecordBatchIterator::new(vec![Ok(batch)], just_b_a.clone()); let dataset = Dataset::write(reader, test_uri, Some(append_options.clone())) .await .unwrap(); @@ -5020,15 +5020,16 @@ mod tests { assert_eq!(fragments.len(), 1); assert_eq!(fragments[0].metadata.files.len(), 1); assert_eq!(&fragments[0].metadata.files[0].fields, &[0, 2, 1]); + assert_eq!(&fragments[0].metadata.files[0].column_indices, &[0, 1, 2]); // Can insert c, b - let just_cb = Arc::new(ArrowSchema::new(vec![ArrowField::new( + let just_c_b = Arc::new(ArrowSchema::new(vec![ArrowField::new( "s", DataType::Struct(vec![field_c.clone(), field_b.clone()].into()), true, )])); let batch = RecordBatch::try_new( - just_cb.clone(), + just_c_b.clone(), vec![Arc::new(StructArray::from(vec![ ( field_c.clone(), @@ -5038,7 +5039,7 @@ mod tests { ]))], ) .unwrap(); - let reader = RecordBatchIterator::new(vec![Ok(batch)], just_cb.clone()); + let reader = RecordBatchIterator::new(vec![Ok(batch)], just_c_b.clone()); let dataset = Dataset::write(reader, test_uri, Some(append_options.clone())) .await .unwrap(); @@ -5046,15 +5047,16 @@ mod tests { assert_eq!(fragments.len(), 2); assert_eq!(fragments[1].metadata.files.len(), 1); assert_eq!(&fragments[1].metadata.files[0].fields, &[0, 3, 2]); + assert_eq!(&fragments[1].metadata.files[0].column_indices, &[0, 1, 2]); // Can't insert a, c (b is non-nullable) - let just_ac = Arc::new(ArrowSchema::new(vec![ArrowField::new( + let just_a_c = Arc::new(ArrowSchema::new(vec![ArrowField::new( "s", DataType::Struct(vec![field_a.clone(), field_c.clone()].into()), true, )])); let batch = RecordBatch::try_new( - just_ac.clone(), + just_a_c.clone(), vec![Arc::new(StructArray::from(vec![ ( field_a.clone(), @@ -5064,7 +5066,7 @@ mod tests { ]))], ) .unwrap(); - let reader = RecordBatchIterator::new(vec![Ok(batch)], just_ac.clone()); + let reader = RecordBatchIterator::new(vec![Ok(batch)], just_a_c.clone()); let res = Dataset::write(reader, test_uri, Some(append_options)).await; assert!( matches!(res, Err(Error::SchemaMismatch { .. })), @@ -5072,6 +5074,28 @@ mod tests { res ); + // Can scan and get all data + let data = dataset.scan().try_into_batch().await.unwrap(); + let expected = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StructArray::from(vec![ + ( + field_a.clone(), + Arc::new(Int32Array::from(vec![2, 5])) as ArrayRef, + ), + ( + field_b.clone(), + Arc::new(Int32Array::from(vec![Some(1), None])), + ), + ( + field_c.clone(), + Arc::new(Int32Array::from(vec![None, Some(6)])), + ), + ]))], + ) + .unwrap(); + assert_eq!(data, expected); + // Can call take and get rows from all three back in one batch let result = dataset .take(&[1, 0], Arc::new(dataset.schema().clone())) diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index d2e338aa39..64e5d82b44 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -1,7 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::borrow::Cow; use std::sync::Arc; use arrow_array::{RecordBatch, RecordBatchReader}; @@ -232,7 +231,7 @@ pub async fn do_write_fragments( .boxed() }; - let writer_generator = WriterGenerator::new(object_store, base_dir, &schema, storage_version); + let writer_generator = WriterGenerator::new(object_store, base_dir, schema, storage_version); let mut writer: Option> = None; let mut num_rows_in_current_file = 0; let mut fragments = Vec::new(); From 492d3fa4de7d80d752514036c2b91c77e564b950 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 4 Nov 2024 17:12:34 -0800 Subject: [PATCH 08/19] fix nested fields --- rust/lance-core/src/datatypes/field.rs | 6 +++--- rust/lance-core/src/datatypes/schema.rs | 22 +++++++++++++++++----- rust/lance-file/src/reader.rs | 2 +- rust/lance-file/src/writer.rs | 2 +- rust/lance-table/src/format/fragment.rs | 2 +- rust/lance/src/dataset.rs | 14 ++++---------- rust/lance/src/dataset/fragment.rs | 2 +- rust/lance/src/dataset/schema_evolution.rs | 2 +- rust/lance/src/index.rs | 2 +- rust/lance/src/io/exec/projection.rs | 2 +- rust/lance/src/io/exec/pushdown_scan.rs | 14 ++++++++++---- 11 files changed, 41 insertions(+), 29 deletions(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 8a0289318b..7cff24932b 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -432,13 +432,13 @@ impl Field { /// /// If the ids are `[2]`, then this will include the parent `0` and the /// child `3`. - pub(crate) fn project_by_ids(&self, ids: &[i32]) -> Option { + pub(crate) fn project_by_ids(&self, ids: &[i32], include_all_children: bool) -> Option { let children = self .children .iter() - .filter_map(|c| c.project_by_ids(ids)) + .filter_map(|c| c.project_by_ids(ids, include_all_children)) .collect::>(); - if ids.contains(&self.id) { + if ids.contains(&self.id) && (children.is_empty() || include_all_children) { Some(self.clone()) } else if !children.is_empty() { Some(Self { diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index c76198a2fd..8f5074d67f 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -280,11 +280,11 @@ impl Schema { /// Returns a new schema that only contains the fields in `column_ids`. /// /// This projection can filter out both top-level and nested fields - pub fn project_by_ids(&self, column_ids: &[i32]) -> Self { + pub fn project_by_ids(&self, column_ids: &[i32], include_all_children: bool) -> Self { let filtered_fields = self .fields .iter() - .filter_map(|f| f.project_by_ids(column_ids)) + .filter_map(|f| f.project_by_ids(column_ids, include_all_children)) .collect(); Self { fields: filtered_fields, @@ -792,7 +792,7 @@ mod tests { ]); let mut schema = Schema::try_from(&arrow_schema).unwrap(); schema.set_field_id(None); - let projected = schema.project_by_ids(&[2, 4, 5]); + let projected = schema.project_by_ids(&[2, 4, 5], true); let expected_arrow_schema = ArrowSchema::new(vec![ ArrowField::new( @@ -807,7 +807,7 @@ mod tests { ]); assert_eq!(ArrowSchema::from(&projected), expected_arrow_schema); - let projected = schema.project_by_ids(&[2]); + let projected = schema.project_by_ids(&[2], true); let expected_arrow_schema = ArrowSchema::new(vec![ArrowField::new( "b", DataType::Struct(ArrowFields::from(vec![ArrowField::new( @@ -819,7 +819,7 @@ mod tests { )]); assert_eq!(ArrowSchema::from(&projected), expected_arrow_schema); - let projected = schema.project_by_ids(&[1]); + let projected = schema.project_by_ids(&[1], true); let expected_arrow_schema = ArrowSchema::new(vec![ArrowField::new( "b", DataType::Struct(ArrowFields::from(vec![ @@ -830,6 +830,18 @@ mod tests { true, )]); assert_eq!(ArrowSchema::from(&projected), expected_arrow_schema); + + let projected = schema.project_by_ids(&[1, 2], false); + let expected_arrow_schema = ArrowSchema::new(vec![ArrowField::new( + "b", + DataType::Struct(ArrowFields::from(vec![ArrowField::new( + "f1", + DataType::Utf8, + true, + )])), + true, + )]); + assert_eq!(ArrowSchema::from(&projected), expected_arrow_schema); } #[test] diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index f9496a6498..1b2e7b1a25 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -349,7 +349,7 @@ impl FileReader { } } } - meta.schema.project_by_ids(&stats_field_ids) + meta.schema.project_by_ids(&stats_field_ids, true) }) } diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index a8d56eba57..bb51ec9380 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -121,7 +121,7 @@ impl FileWriter { }; let stats_collector = if !collect_stats_for_fields.is_empty() { - let stats_schema = schema.project_by_ids(&collect_stats_for_fields); + let stats_schema = schema.project_by_ids(&collect_stats_for_fields, true); statistics::StatisticsCollector::try_new(&stats_schema) } else { None diff --git a/rust/lance-table/src/format/fragment.rs b/rust/lance-table/src/format/fragment.rs index 7e370bba9d..76125c3c94 100644 --- a/rust/lance-table/src/format/fragment.rs +++ b/rust/lance-table/src/format/fragment.rs @@ -91,7 +91,7 @@ impl DataFile { } pub fn schema(&self, full_schema: &Schema) -> Schema { - full_schema.project_by_ids(&self.fields) + full_schema.project_by_ids(&self.fields, false) } pub fn is_legacy_file(&self) -> bool { diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 3f74c6be2b..9b3a906c1f 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -5081,15 +5081,12 @@ mod tests { vec![Arc::new(StructArray::from(vec![ ( field_a.clone(), - Arc::new(Int32Array::from(vec![2, 5])) as ArrayRef, - ), - ( - field_b.clone(), - Arc::new(Int32Array::from(vec![Some(1), None])), + Arc::new(Int32Array::from(vec![Some(2), None])) as ArrayRef, ), + (field_b.clone(), Arc::new(Int32Array::from(vec![1, 3]))), ( field_c.clone(), - Arc::new(Int32Array::from(vec![None, Some(6)])), + Arc::new(Int32Array::from(vec![None, Some(4)])), ), ]))], ) @@ -5108,10 +5105,7 @@ mod tests { field_a.clone(), Arc::new(Int32Array::from(vec![None, Some(2)])) as ArrayRef, ), - ( - field_b.clone(), - Arc::new(Int32Array::from(vec![Some(3), Some(1)])), - ), + (field_b.clone(), Arc::new(Int32Array::from(vec![3, 1]))), ( field_c.clone(), Arc::new(Int32Array::from(vec![Some(4), None])), diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 9eb9358b99..cf8e1621ab 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -819,7 +819,7 @@ impl FileFragment { let mut missing_fields = projection.field_ids(); missing_fields.retain(|f| !field_ids_in_files.contains(f) && *f >= 0); if !missing_fields.is_empty() { - let missing_projection = projection.project_by_ids(&missing_fields); + let missing_projection = projection.project_by_ids(&missing_fields, true); let null_reader = NullReader::new(Arc::new(missing_projection), opened_files[0].len()); opened_files.push(Box::new(null_reader)); } diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index fb3aa5a037..b42aaaaa32 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -528,7 +528,7 @@ pub(super) async fn alter_columns( .map(|(_old, new)| new.id) .collect::>(); // This schema contains the exact field ids we want to write the new fields with. - let new_col_schema = new_schema.project_by_ids(&new_ids); + let new_col_schema = new_schema.project_by_ids(&new_ids, true); let mapper = move |batch: &RecordBatch| { let mut fields = Vec::with_capacity(cast_fields.len()); diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index aa313a1405..9e0c822bbb 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -800,7 +800,7 @@ impl DatasetIndexInternalExt for Dataset { let schema = self.schema(); let mut indexed_fields = Vec::new(); for index in indices.iter().filter(|idx| { - let idx_schema = schema.project_by_ids(idx.fields.as_slice()); + let idx_schema = schema.project_by_ids(idx.fields.as_slice(), true); let is_vector_index = idx_schema .fields .iter() diff --git a/rust/lance/src/io/exec/projection.rs b/rust/lance/src/io/exec/projection.rs index 00cb350725..09b0d3fbcf 100644 --- a/rust/lance/src/io/exec/projection.rs +++ b/rust/lance/src/io/exec/projection.rs @@ -306,7 +306,7 @@ mod tests { ]; for projection in projections { - let projected_schema = lance_schema.project_by_ids(projection); + let projected_schema = lance_schema.project_by_ids(projection, true); let projected_arrow_schema = (&projected_schema).into(); let result = apply_to_batch(sample_data.clone(), &projected_arrow_schema) diff --git a/rust/lance/src/io/exec/pushdown_scan.rs b/rust/lance/src/io/exec/pushdown_scan.rs index f4881714c0..92bdf48132 100644 --- a/rust/lance/src/io/exec/pushdown_scan.rs +++ b/rust/lance/src/io/exec/pushdown_scan.rs @@ -456,7 +456,8 @@ impl FragmentScanner { .collect(); let remainder_batch = if !remaining_fields.is_empty() { - let remaining_projection = self.projection.project_by_ids(&remaining_fields); + let remaining_projection = + self.projection.project_by_ids(&remaining_fields, true); Some( self.reader .legacy_read_batch_projected( @@ -859,7 +860,7 @@ mod test { let fragments = dataset.fragments().clone(); // [x.b, y.a] - let projection = Arc::new(dataset.schema().clone().project_by_ids(&[2, 4])); + let projection = Arc::new(dataset.schema().clone().project_by_ids(&[2, 4], true)); let predicate = col("x") .field_newstyle("a") @@ -889,7 +890,7 @@ mod test { assert_eq!(results[0].schema().as_ref(), expected_schema.as_ref()); // Also try where projection is same as filter columns - let projection = Arc::new(dataset.schema().clone().project_by_ids(&[1, 5])); + let projection = Arc::new(dataset.schema().clone().project_by_ids(&[1, 5], true)); let exec = LancePushdownScanExec::try_new( dataset.clone(), fragments, @@ -1182,7 +1183,12 @@ mod test { let scan = LancePushdownScanExec::try_new( dataset.clone(), dataset.fragments().clone(), - Arc::new(dataset.schema().clone().project_by_ids(&projection_indices)), + Arc::new( + dataset + .schema() + .clone() + .project_by_ids(&projection_indices, true), + ), predicate, scan_config, ) From f6c27d1f2bf135c1cd30c903c5600b7111297ef4 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 4 Nov 2024 17:23:58 -0800 Subject: [PATCH 09/19] fix python --- python/src/dataset.rs | 2 +- python/src/debug.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 38bd82c495..38324b0a97 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -532,7 +532,7 @@ impl Dataset { let dict = PyDict::new(py); let schema = self_.ds.schema(); - let idx_schema = schema.project_by_ids(idx.fields.as_slice()); + let idx_schema = schema.project_by_ids(idx.fields.as_slice(), true); let is_vector = idx_schema .fields diff --git a/python/src/debug.rs b/python/src/debug.rs index 9cc575150b..8856c1fb28 100644 --- a/python/src/debug.rs +++ b/python/src/debug.rs @@ -58,7 +58,7 @@ impl PrettyPrintableFragment { .files .iter() .map(|file| { - let schema = schema.project_by_ids(&file.fields); + let schema = schema.project_by_ids(&file.fields, false); PrettyPrintableDataFile { path: file.path.clone(), fields: file.fields.clone(), From f916af75b20cdf6657d9b9fe8b4470e7a5c12172 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 4 Nov 2024 18:25:10 -0800 Subject: [PATCH 10/19] fix for migration test --- rust/lance/src/io/commit.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 3b462c75ce..36553a6e85 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -350,6 +350,10 @@ fn fix_schema(manifest: &mut Manifest) -> Result<()> { for (old_field_id, new_field_id) in &old_field_id_mapping { let field = manifest.schema.mut_field_by_id(*old_field_id).unwrap(); field.id = *new_field_id; + + if let Some(local_field) = manifest.local_schema.mut_field_by_id(*old_field_id) { + local_field.id = *new_field_id; + } } // Drop data files that are no longer in use. From 478a523f21eaf2f8f10d8389a85176f3a689be15 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 6 Nov 2024 15:35:58 -0800 Subject: [PATCH 11/19] add initial test --- python/python/tests/test_balanced.py | 18 ---- rust/lance-core/src/datatypes.rs | 5 +- rust/lance/src/dataset.rs | 133 ++++++++++++++++++++++++++- rust/lance/src/dataset/fragment.rs | 5 +- 4 files changed, 140 insertions(+), 21 deletions(-) diff --git a/python/python/tests/test_balanced.py b/python/python/tests/test_balanced.py index e2713c38c9..a7d33bd3d0 100644 --- a/python/python/tests/test_balanced.py +++ b/python/python/tests/test_balanced.py @@ -271,21 +271,3 @@ def test_unsupported(balanced_dataset, big_val): balanced_dataset.merge_insert("idx").when_not_matched_insert_all().execute( make_table(0, 1, big_val) ) - - -# TODO: Once https://github.com/lancedb/lance/pull/3041 merges we will -# want to test partial appends. We need to make sure an append of -# non-blob data is supported. In order to do this we need to make -# sure a blob tx is created that marks the row ids as used so that -# the two row id sequences stay in sync. -# -# def test_one_sided_append(balanced_dataset, tmp_path): -# # Write new data, but only to the idx column -# ds = lance.write_dataset( -# pa.table({"idx": pa.array(range(128, 256), pa.uint64())}), -# tmp_path / "test_ds", -# max_bytes_per_file=32 * 1024 * 1024, -# mode="append", -# ) - -# print(ds.to_table()) diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index 54f4971324..e7d3f28a97 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -18,7 +18,10 @@ mod field; mod schema; use crate::{Error, Result}; -pub use field::{Encoding, Field, NullabilityComparison, SchemaCompareOptions, StorageClass}; +pub use field::{ + Encoding, Field, NullabilityComparison, SchemaCompareOptions, StorageClass, + LANCE_STORAGE_CLASS_SCHEMA_META_KEY, +}; pub use schema::Schema; pub const COMPRESSION_META_KEY: &str = "lance-encoding:compression"; diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 9b3a906c1f..124d423a40 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -719,7 +719,7 @@ impl Dataset { params: Option, ) -> Result<()> { // Force append mode - let params = WriteParams { + let mut params = WriteParams { mode: WriteMode::Append, ..params.unwrap_or_default() }; @@ -749,6 +749,17 @@ impl Dataset { }, )?; + // If the dataset is already using (or not using) move stable row ids, we need to match + // and ignore whatever the user provided as input + if params.enable_move_stable_row_ids != self.manifest.uses_move_stable_row_ids() { + info!( + "Ignoring user provided move stable row ids setting of {}, dataset already has it set to {}", + params.enable_move_stable_row_ids, + self.manifest.uses_move_stable_row_ids() + ); + params.enable_move_stable_row_ids = self.manifest.uses_move_stable_row_ids(); + } + let written_frags = write_fragments_internal( Some(self), self.object_store.clone(), @@ -2065,6 +2076,7 @@ mod tests { DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema, }; use lance_arrow::bfloat16::{self, ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME}; + use lance_core::datatypes::LANCE_STORAGE_CLASS_SCHEMA_META_KEY; use lance_datagen::{array, gen, BatchCount, Dimension, RowCount}; use lance_file::version::LanceFileVersion; use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams}; @@ -5115,4 +5127,123 @@ mod tests { .unwrap(); assert_eq!(result, expected); } + + #[tokio::test] + async fn test_insert_balanced_subschemas() { + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let field_a = ArrowField::new("a", DataType::Int32, true); + let field_b = ArrowField::new("b", DataType::Int64, true); + let schema = Arc::new(ArrowSchema::new(vec![ + field_a.clone(), + field_b.clone().with_metadata( + [( + LANCE_STORAGE_CLASS_SCHEMA_META_KEY.to_string(), + "blob".to_string(), + )] + .into(), + ), + ])); + let empty_reader = RecordBatchIterator::new(vec![], schema.clone()); + let options = WriteParams { + enable_move_stable_row_ids: true, + ..Default::default() + }; + let mut dataset = Dataset::write(empty_reader, test_uri, Some(options)) + .await + .unwrap(); + + // Insert left side + let just_a = Arc::new(ArrowSchema::new(vec![field_a.clone()])); + let batch = RecordBatch::try_new(just_a.clone(), vec![Arc::new(Int32Array::from(vec![1]))]) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], just_a.clone()); + dataset.append(reader, None).await.unwrap(); + + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 1); + let blob_dataset = Dataset::open(&format!("{}/_blobs", test_uri)) + .await + .unwrap(); + let blob_dataset = blob_dataset + .checkout_version(dataset.manifest.blob_dataset_version.unwrap()) + .await + .unwrap(); + let blob_fragments = blob_dataset.get_fragments(); + assert_eq!(blob_fragments.len(), 0); + + // Insert right side + let just_b = Arc::new(ArrowSchema::new(vec![field_b.clone()])); + let batch = RecordBatch::try_new(just_b.clone(), vec![Arc::new(Int64Array::from(vec![2]))]) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], just_b.clone()); + dataset.append(reader, None).await.unwrap(); + + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 2); + let blob_dataset = blob_dataset + .checkout_version(dataset.manifest.blob_dataset_version.unwrap()) + .await + .unwrap(); + let blob_fragments = blob_dataset.get_fragments(); + assert_eq!(blob_fragments.len(), 1); + + // insert both + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![3])), + Arc::new(Int64Array::from(vec![4])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + dataset.append(reader, None).await.unwrap(); + + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 3); + let blob_dataset = blob_dataset + .checkout_version(dataset.manifest.blob_dataset_version.unwrap()) + .await + .unwrap(); + let blob_fragments = blob_dataset.get_fragments(); + assert_eq!(blob_fragments.len(), 2); + + // Assert scan results is correct + let data = dataset.scan().try_into_batch().await.unwrap(); + let expected = RecordBatch::try_new( + just_a.clone(), + vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]))], + ) + .unwrap(); + assert_eq!(data, expected); + + // TODO: Scanning columns with non-default storage class is not yet supported + // let data = dataset.scan().project(&["a", "b"]).unwrap().try_into_batch().await.unwrap(); + // let expected = RecordBatch::try_new( + // schema.clone(), + // vec![ + // Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])), + // Arc::new(Int64Array::from(vec![None, Some(2), Some(4)])), + // ], + // ) + // .unwrap(); + // assert_eq!(data, expected); + + // TODO: mapping from row addresses to row ids + // let result = dataset + // .take(&[1, 2, 0], dataset.schema().clone()) + // .await + // .unwrap(); + // let expected = RecordBatch::try_new( + // schema.clone(), + // vec![ + // Arc::new(Int32Array::from(vec![None, Some(3), Some(1)])), + // Arc::new(Int64Array::from(vec![Some(2), Some(4), None])), + // ], + // ) + // .unwrap(); + // assert_eq!(result, expected); + } } diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index cf8e1621ab..c94901448a 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -810,6 +810,9 @@ impl FileFragment { } } + // This should return immediately on modern datasets. + let num_rows = self.count_rows().await?; + // Check if there are any fields that are not in any data files let field_ids_in_files = opened_files .iter() @@ -820,7 +823,7 @@ impl FileFragment { missing_fields.retain(|f| !field_ids_in_files.contains(f) && *f >= 0); if !missing_fields.is_empty() { let missing_projection = projection.project_by_ids(&missing_fields, true); - let null_reader = NullReader::new(Arc::new(missing_projection), opened_files[0].len()); + let null_reader = NullReader::new(Arc::new(missing_projection), num_rows as u32); opened_files.push(Box::new(null_reader)); } From c09e67b275c48e1631ab4e883a259b8f6369acee Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 6 Nov 2024 15:54:03 -0800 Subject: [PATCH 12/19] add failing test for take --- rust/lance/src/dataset.rs | 14 ++++++++++++++ rust/lance/src/dataset/fragment.rs | 1 - rust/lance/src/dataset/take.rs | 1 + 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 124d423a40..9f552c95ca 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -5245,5 +5245,19 @@ mod tests { // ) // .unwrap(); // assert_eq!(result, expected); + + let result = dataset + .take_rows(&[1, 2, 0], dataset.schema().clone()) + .await + .unwrap(); + let expected = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![None, Some(3), Some(1)])), + Arc::new(Int64Array::from(vec![Some(2), Some(4), None])), + ], + ) + .unwrap(); + assert_eq!(result, expected); } } diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index c94901448a..c55d5bdb34 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -1223,7 +1223,6 @@ impl FileFragment { projection: &Schema, with_row_address: bool, ) -> Result { - println!("Fragment take (offsets={:?}", row_offsets); let reader = self .open( projection, diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index c390bbd45c..ab58f4fcac 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -302,6 +302,7 @@ async fn zip_takes( let mut all_cols = Vec::with_capacity(local.num_columns() + sibling.num_columns()); all_cols.extend(local.columns().iter().cloned()); all_cols.extend(sibling.columns().iter().cloned()); + dbg!(&all_cols); let mut all_fields = Vec::with_capacity(local.num_columns() + sibling.num_columns()); all_fields.extend(local.schema().fields().iter().cloned()); From 711f0a0a18fe395ad1bfca8144973cb4fc79f360 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 7 Nov 2024 09:56:26 -0800 Subject: [PATCH 13/19] fix handling blobs --- rust/lance/src/dataset.rs | 7 ++++--- rust/lance/src/dataset/blob.rs | 6 +++--- rust/lance/src/dataset/take.rs | 1 - rust/lance/src/dataset/write.rs | 16 ++++++++++++++-- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 9f552c95ca..c2e05664b9 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -5148,6 +5148,7 @@ mod tests { let empty_reader = RecordBatchIterator::new(vec![], schema.clone()); let options = WriteParams { enable_move_stable_row_ids: true, + enable_v2_manifest_paths: true, ..Default::default() }; let mut dataset = Dataset::write(empty_reader, test_uri, Some(options)) @@ -5171,7 +5172,7 @@ mod tests { .await .unwrap(); let blob_fragments = blob_dataset.get_fragments(); - assert_eq!(blob_fragments.len(), 0); + assert_eq!(blob_fragments.len(), 1); // Insert right side let just_b = Arc::new(ArrowSchema::new(vec![field_b.clone()])); @@ -5187,7 +5188,7 @@ mod tests { .await .unwrap(); let blob_fragments = blob_dataset.get_fragments(); - assert_eq!(blob_fragments.len(), 1); + assert_eq!(blob_fragments.len(), 2); // insert both let batch = RecordBatch::try_new( @@ -5208,7 +5209,7 @@ mod tests { .await .unwrap(); let blob_fragments = blob_dataset.get_fragments(); - assert_eq!(blob_fragments.len(), 2); + assert_eq!(blob_fragments.len(), 3); // Assert scan results is correct let data = dataset.scan().try_into_batch().await.unwrap(); diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 67f8c7081b..db4d8fa30c 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -239,11 +239,11 @@ pub trait BlobStreamExt: Sized { /// /// The second stream may be None (if there are no fields with the blob storage class) /// or it contains all fields with the blob storage class. - fn extract_blob_stream(self, schema: &Schema) -> (Self, Option); + fn extract_blob_stream(self, schema: &Schema, has_blob: bool) -> (Self, Option); } impl BlobStreamExt for SendableRecordBatchStream { - fn extract_blob_stream(self, schema: &Schema) -> (Self, Option) { + fn extract_blob_stream(self, schema: &Schema, has_blob: bool) -> (Self, Option) { let mut indices_with_blob = Vec::with_capacity(schema.fields.len()); let mut indices_without_blob = Vec::with_capacity(schema.fields.len()); for (idx, field) in schema.fields.iter().enumerate() { @@ -253,7 +253,7 @@ impl BlobStreamExt for SendableRecordBatchStream { indices_without_blob.push(idx); } } - if indices_with_blob.is_empty() { + if !has_blob { (self, None) } else { let left_schema = Arc::new(self.schema().project(&indices_without_blob).unwrap()); diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index ab58f4fcac..c390bbd45c 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -302,7 +302,6 @@ async fn zip_takes( let mut all_cols = Vec::with_capacity(local.num_columns() + sibling.num_columns()); all_cols.extend(local.columns().iter().cloned()); all_cols.extend(sibling.columns().iter().cloned()); - dbg!(&all_cols); let mut all_fields = Vec::with_capacity(local.num_columns() + sibling.num_columns()); all_fields.extend(local.schema().fields().iter().cloned()); diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 64e5d82b44..b73a370600 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -309,7 +309,7 @@ pub async fn write_fragments_internal( schema.check_compatible( dataset.schema(), &SchemaCompareOptions { - // We don't if the user claims their data is nullable / non-nullable. We will + // We don't care if the user claims their data is nullable / non-nullable. We will // verify against the actual data. compare_nullability: NullabilityComparison::Ignore, allow_missing_if_nullable: true, @@ -348,7 +348,19 @@ pub async fn write_fragments_internal( let data_schema = schema.project_by_schema(data.schema().as_ref())?; - let (data, blob_data) = data.extract_blob_stream(&data_schema); + let has_blob = { + let schema_to_check = if let Some(dataset) = dataset { + dataset.schema() + } else { + &data_schema + }; + schema_to_check + .fields + .iter() + .any(|f| f.storage_class() == StorageClass::Blob) + }; + + let (data, blob_data) = data.extract_blob_stream(&data_schema, has_blob); // Some params we borrow from the normal write, some we override let blob_write_params = WriteParams { From 48b6aa3e4e0d2abbed1084b55402125c8fbf3058 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 7 Nov 2024 10:03:52 -0800 Subject: [PATCH 14/19] cleanup --- rust/lance/src/dataset/blob.rs | 61 ++++++++++++++++----------------- rust/lance/src/dataset/write.rs | 7 +++- 2 files changed, 35 insertions(+), 33 deletions(-) diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index db4d8fa30c..ddba3b66a0 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -239,11 +239,11 @@ pub trait BlobStreamExt: Sized { /// /// The second stream may be None (if there are no fields with the blob storage class) /// or it contains all fields with the blob storage class. - fn extract_blob_stream(self, schema: &Schema, has_blob: bool) -> (Self, Option); + fn extract_blob_stream(self, schema: &Schema) -> (Self, Self); } impl BlobStreamExt for SendableRecordBatchStream { - fn extract_blob_stream(self, schema: &Schema, has_blob: bool) -> (Self, Option) { + fn extract_blob_stream(self, schema: &Schema) -> (Self, Self) { let mut indices_with_blob = Vec::with_capacity(schema.fields.len()); let mut indices_without_blob = Vec::with_capacity(schema.fields.len()); for (idx, field) in schema.fields.iter().enumerate() { @@ -253,36 +253,33 @@ impl BlobStreamExt for SendableRecordBatchStream { indices_without_blob.push(idx); } } - if !has_blob { - (self, None) - } else { - let left_schema = Arc::new(self.schema().project(&indices_without_blob).unwrap()); - let right_schema = Arc::new(self.schema().project(&indices_with_blob).unwrap()); - - let (left, right) = ShareableRecordBatchStream(self) - .boxed() - // If we are working with blobs then we are probably working with rather large batches - // We don't want to read too far ahead. - .share(Capacity::Bounded(1)); - - let left = left.map(move |batch| match batch { - CloneableResult(Ok(batch)) => { - CloneableResult(Ok(batch.project(&indices_without_blob).unwrap())) - } - CloneableResult(Err(err)) => CloneableResult(Err(err)), - }); - - let right = right.map(move |batch| match batch { - CloneableResult(Ok(batch)) => { - CloneableResult(Ok(batch.project(&indices_with_blob).unwrap())) - } - CloneableResult(Err(err)) => CloneableResult(Err(err)), - }); - - let left = ShareableRecordBatchStreamAdapter::new(left_schema, left); - let right = ShareableRecordBatchStreamAdapter::new(right_schema, right); - (Box::pin(left), Some(Box::pin(right))) - } + + let left_schema = Arc::new(self.schema().project(&indices_without_blob).unwrap()); + let right_schema = Arc::new(self.schema().project(&indices_with_blob).unwrap()); + + let (left, right) = ShareableRecordBatchStream(self) + .boxed() + // If we are working with blobs then we are probably working with rather large batches + // We don't want to read too far ahead. + .share(Capacity::Bounded(1)); + + let left = left.map(move |batch| match batch { + CloneableResult(Ok(batch)) => { + CloneableResult(Ok(batch.project(&indices_without_blob).unwrap())) + } + CloneableResult(Err(err)) => CloneableResult(Err(err)), + }); + + let right = right.map(move |batch| match batch { + CloneableResult(Ok(batch)) => { + CloneableResult(Ok(batch.project(&indices_with_blob).unwrap())) + } + CloneableResult(Err(err)) => CloneableResult(Err(err)), + }); + + let left = ShareableRecordBatchStreamAdapter::new(left_schema, left); + let right = ShareableRecordBatchStreamAdapter::new(right_schema, right); + (Box::pin(left), Box::pin(right)) } } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index b73a370600..7e1c943f93 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -360,7 +360,12 @@ pub async fn write_fragments_internal( .any(|f| f.storage_class() == StorageClass::Blob) }; - let (data, blob_data) = data.extract_blob_stream(&data_schema, has_blob); + let (data, blob_data) = if has_blob { + let (data, blob_data) = data.extract_blob_stream(&data_schema); + (data, Some(blob_data)) + } else { + (data, None) + }; // Some params we borrow from the normal write, some we override let blob_write_params = WriteParams { From ce74c63cb99312c104b2176af4e105dbca06eaad Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 7 Nov 2024 10:13:00 -0800 Subject: [PATCH 15/19] pr feedback --- rust/lance-core/src/datatypes/field.rs | 9 ++++++++- rust/lance-core/src/datatypes/schema.rs | 5 +++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 7cff24932b..82ac2f8049 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -52,7 +52,11 @@ pub struct SchemaCompareOptions { pub compare_field_ids: bool, /// Should nullability be compared (default Strict) pub compare_nullability: NullabilityComparison, - /// Allow fields to be missing if they are nullable (default false) + /// Allow fields in the expected schema to be missing from the schema being tested if + /// they are nullable (default false) + /// + /// Fields in the schema being tested must always be present in the expected schema + /// regardless of this flag. pub allow_missing_if_nullable: bool, /// Allow out of order fields (default false) pub ignore_field_order: bool, @@ -433,6 +437,9 @@ impl Field { /// If the ids are `[2]`, then this will include the parent `0` and the /// child `3`. pub(crate) fn project_by_ids(&self, ids: &[i32], include_all_children: bool) -> Option { + if !ids.contains(&self.id) { + return None; + } let children = self .children .iter() diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index 8f5074d67f..4d4589ee13 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -280,6 +280,11 @@ impl Schema { /// Returns a new schema that only contains the fields in `column_ids`. /// /// This projection can filter out both top-level and nested fields + /// + /// If `include_all_children` is true, then if a parent field id is passed, + /// then all children of that field will be included in the projection + /// regardless of whether their ids were passed. If this is false, then + /// only the child fields with the passed ids will be included. pub fn project_by_ids(&self, column_ids: &[i32], include_all_children: bool) -> Self { let filtered_fields = self .fields From 22e8e8d0e176822011da46ce9cf7383d7f109592 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 7 Nov 2024 11:49:35 -0800 Subject: [PATCH 16/19] revert early return --- rust/lance-core/src/datatypes/field.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 82ac2f8049..91eade2fa7 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -437,9 +437,6 @@ impl Field { /// If the ids are `[2]`, then this will include the parent `0` and the /// child `3`. pub(crate) fn project_by_ids(&self, ids: &[i32], include_all_children: bool) -> Option { - if !ids.contains(&self.id) { - return None; - } let children = self .children .iter() From e1ec1889ec87ef4bcfe623efd6e77123fefc84fe Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 7 Nov 2024 13:17:05 -0800 Subject: [PATCH 17/19] try to fix test --- rust/lance/src/dataset/optimize.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 0f4037ed2b..6e37dd5704 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -704,6 +704,11 @@ async fn rewrite_files( if let Some(max_bytes_per_file) = options.max_bytes_per_file { params.max_bytes_per_file = max_bytes_per_file; } + + if dataset.manifest.uses_move_stable_row_ids() { + params.enable_move_stable_row_ids = true; + } + let new_fragments = write_fragments_internal( Some(dataset.as_ref()), dataset.object_store.clone(), From 59a70d731586f74243016557e13e07bf02872990 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 7 Nov 2024 13:46:40 -0800 Subject: [PATCH 18/19] validate the dataset --- rust/lance/src/dataset.rs | 21 ++++++++++++++++++++- rust/lance/src/dataset/fragment.rs | 18 ------------------ 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index c2e05664b9..eed495de0e 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -4906,6 +4906,7 @@ mod tests { let mut dataset = Dataset::write(empty_reader, "memory://", None) .await .unwrap(); + dataset.validate().await.unwrap(); // If missing columns that aren't nullable, will return an error // TODO: provide alternative default than null. @@ -4926,6 +4927,7 @@ mod tests { .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], just_a.clone()); dataset.append(reader, None).await.unwrap(); + dataset.validate().await.unwrap(); // Looking at the fragments, there is no data file with the missing field let fragments = dataset.get_fragments(); @@ -4956,6 +4958,7 @@ mod tests { .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); dataset.append(reader, None).await.unwrap(); + dataset.validate().await.unwrap(); // When reading back, only missing data is null, otherwise is filled in let data = dataset.scan().try_into_batch().await.unwrap(); @@ -4973,6 +4976,7 @@ mod tests { compact_files(&mut dataset, CompactionOptions::default(), None) .await .unwrap(); + dataset.validate().await.unwrap(); let fragments = dataset.get_fragments(); assert_eq!(fragments.len(), 1); assert_eq!(fragments[0].metadata.files.len(), 1); @@ -5001,7 +5005,8 @@ mod tests { true, )])); let empty_reader = RecordBatchIterator::new(vec![], schema.clone()); - Dataset::write(empty_reader, test_uri, None).await.unwrap(); + let dataset = Dataset::write(empty_reader, test_uri, None).await.unwrap(); + dataset.validate().await.unwrap(); let append_options = WriteParams { mode: WriteMode::Append, @@ -5028,6 +5033,7 @@ mod tests { let dataset = Dataset::write(reader, test_uri, Some(append_options.clone())) .await .unwrap(); + dataset.validate().await.unwrap(); let fragments = dataset.get_fragments(); assert_eq!(fragments.len(), 1); assert_eq!(fragments[0].metadata.files.len(), 1); @@ -5055,6 +5061,7 @@ mod tests { let dataset = Dataset::write(reader, test_uri, Some(append_options.clone())) .await .unwrap(); + dataset.validate().await.unwrap(); let fragments = dataset.get_fragments(); assert_eq!(fragments.len(), 2); assert_eq!(fragments[1].metadata.files.len(), 1); @@ -5154,6 +5161,7 @@ mod tests { let mut dataset = Dataset::write(empty_reader, test_uri, Some(options)) .await .unwrap(); + dataset.validate().await.unwrap(); // Insert left side let just_a = Arc::new(ArrowSchema::new(vec![field_a.clone()])); @@ -5161,6 +5169,7 @@ mod tests { .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], just_a.clone()); dataset.append(reader, None).await.unwrap(); + dataset.validate().await.unwrap(); let fragments = dataset.get_fragments(); assert_eq!(fragments.len(), 1); @@ -5180,6 +5189,7 @@ mod tests { .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], just_b.clone()); dataset.append(reader, None).await.unwrap(); + dataset.validate().await.unwrap(); let fragments = dataset.get_fragments(); assert_eq!(fragments.len(), 2); @@ -5201,6 +5211,7 @@ mod tests { .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); dataset.append(reader, None).await.unwrap(); + dataset.validate().await.unwrap(); let fragments = dataset.get_fragments(); assert_eq!(fragments.len(), 3); @@ -5260,5 +5271,13 @@ mod tests { ) .unwrap(); assert_eq!(result, expected); + + // Make sure we can compact and still do those things + let metrics = compact_files(&mut dataset, CompactionOptions::default(), None) + .await + .unwrap(); + assert_eq!(metrics.fragments_removed, 3); + assert_eq!(metrics.fragments_added, 1); + dataset.validate().await.unwrap(); } } diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index c55d5bdb34..def1b1ae2b 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -985,24 +985,6 @@ impl FileFragment { )); } - for field in self.schema().fields_pre_order() { - if !seen_fields.contains(&field.id) { - return Err(Error::corrupt_file( - self.dataset - .data_dir() - .child(self.metadata.files[0].path.as_str()), - format!( - "Field {} is missing in fragment {}\nField: {:#?}\nFragment: {:#?}", - field.id, - self.id(), - field, - self.metadata() - ), - location!(), - )); - } - } - for data_file in &self.metadata.files { data_file.validate(&self.dataset.data_dir())?; } From c897c09fa668edccc6862ec412704cf1cb22ab54 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 7 Nov 2024 14:18:20 -0800 Subject: [PATCH 19/19] cleanup --- rust/lance/src/dataset.rs | 164 +++++++------------------------- rust/lance/src/dataset/blob.rs | 61 ++++++------ rust/lance/src/dataset/write.rs | 19 +--- 3 files changed, 68 insertions(+), 176 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index eed495de0e..dde4788115 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -566,18 +566,20 @@ impl Dataset { params.enable_move_stable_row_ids = d.manifest.uses_move_stable_row_ids(); } let m = d.manifest.as_ref(); - schema.check_compatible( - &m.schema, - &SchemaCompareOptions { - compare_dictionary: true, - // array nullability is checked later, using actual data instead - // of the schema - compare_nullability: NullabilityComparison::Ignore, - ignore_field_order: true, - allow_missing_if_nullable: true, - ..Default::default() - }, - )?; + let mut schema_cmp_opts = SchemaCompareOptions { + compare_dictionary: true, + // array nullability is checked later, using actual data instead + // of the schema + compare_nullability: NullabilityComparison::Ignore, + ..Default::default() + }; + if m.blob_dataset_version.is_none() { + // Balanced datasets don't yet support schema evolution + schema_cmp_opts.ignore_field_order = true; + schema_cmp_opts.allow_missing_if_nullable = true; + } + + schema.check_compatible(&m.schema, &schema_cmp_opts)?; // If appending, always use existing storage version storage_version = m.data_storage_format.lance_file_version()?; } @@ -736,18 +738,20 @@ impl Dataset { let stream = reader_to_stream(batches); // Return Error if append and input schema differ - schema.check_compatible( - &self.manifest.schema, - &SchemaCompareOptions { - compare_dictionary: true, - // array nullability is checked later, using actual data instead - // of the schema - compare_nullability: NullabilityComparison::Ignore, - ignore_field_order: true, - allow_missing_if_nullable: true, - ..Default::default() - }, - )?; + let mut schema_cmp_opts = SchemaCompareOptions { + compare_dictionary: true, + // array nullability is checked later, using actual data instead + // of the schema + compare_nullability: NullabilityComparison::Ignore, + ..Default::default() + }; + if self.manifest.blob_dataset_version.is_none() { + // Balanced datasets don't yet support schema evolution + schema_cmp_opts.ignore_field_order = true; + schema_cmp_opts.allow_missing_if_nullable = true; + } + + schema.check_compatible(&self.manifest.schema, &schema_cmp_opts)?; // If the dataset is already using (or not using) move stable row ids, we need to match // and ignore whatever the user provided as input @@ -5137,6 +5141,7 @@ mod tests { #[tokio::test] async fn test_insert_balanced_subschemas() { + // TODO: support this. let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -5168,116 +5173,17 @@ mod tests { let batch = RecordBatch::try_new(just_a.clone(), vec![Arc::new(Int32Array::from(vec![1]))]) .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], just_a.clone()); - dataset.append(reader, None).await.unwrap(); - dataset.validate().await.unwrap(); - - let fragments = dataset.get_fragments(); - assert_eq!(fragments.len(), 1); - let blob_dataset = Dataset::open(&format!("{}/_blobs", test_uri)) - .await - .unwrap(); - let blob_dataset = blob_dataset - .checkout_version(dataset.manifest.blob_dataset_version.unwrap()) - .await - .unwrap(); - let blob_fragments = blob_dataset.get_fragments(); - assert_eq!(blob_fragments.len(), 1); + let result = dataset.append(reader, None).await; + assert!(result.is_err()); + assert!(matches!(result, Err(Error::SchemaMismatch { .. }))); // Insert right side let just_b = Arc::new(ArrowSchema::new(vec![field_b.clone()])); let batch = RecordBatch::try_new(just_b.clone(), vec![Arc::new(Int64Array::from(vec![2]))]) .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], just_b.clone()); - dataset.append(reader, None).await.unwrap(); - dataset.validate().await.unwrap(); - - let fragments = dataset.get_fragments(); - assert_eq!(fragments.len(), 2); - let blob_dataset = blob_dataset - .checkout_version(dataset.manifest.blob_dataset_version.unwrap()) - .await - .unwrap(); - let blob_fragments = blob_dataset.get_fragments(); - assert_eq!(blob_fragments.len(), 2); - - // insert both - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Int32Array::from(vec![3])), - Arc::new(Int64Array::from(vec![4])), - ], - ) - .unwrap(); - let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); - dataset.append(reader, None).await.unwrap(); - dataset.validate().await.unwrap(); - - let fragments = dataset.get_fragments(); - assert_eq!(fragments.len(), 3); - let blob_dataset = blob_dataset - .checkout_version(dataset.manifest.blob_dataset_version.unwrap()) - .await - .unwrap(); - let blob_fragments = blob_dataset.get_fragments(); - assert_eq!(blob_fragments.len(), 3); - - // Assert scan results is correct - let data = dataset.scan().try_into_batch().await.unwrap(); - let expected = RecordBatch::try_new( - just_a.clone(), - vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]))], - ) - .unwrap(); - assert_eq!(data, expected); - - // TODO: Scanning columns with non-default storage class is not yet supported - // let data = dataset.scan().project(&["a", "b"]).unwrap().try_into_batch().await.unwrap(); - // let expected = RecordBatch::try_new( - // schema.clone(), - // vec![ - // Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])), - // Arc::new(Int64Array::from(vec![None, Some(2), Some(4)])), - // ], - // ) - // .unwrap(); - // assert_eq!(data, expected); - - // TODO: mapping from row addresses to row ids - // let result = dataset - // .take(&[1, 2, 0], dataset.schema().clone()) - // .await - // .unwrap(); - // let expected = RecordBatch::try_new( - // schema.clone(), - // vec![ - // Arc::new(Int32Array::from(vec![None, Some(3), Some(1)])), - // Arc::new(Int64Array::from(vec![Some(2), Some(4), None])), - // ], - // ) - // .unwrap(); - // assert_eq!(result, expected); - - let result = dataset - .take_rows(&[1, 2, 0], dataset.schema().clone()) - .await - .unwrap(); - let expected = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Int32Array::from(vec![None, Some(3), Some(1)])), - Arc::new(Int64Array::from(vec![Some(2), Some(4), None])), - ], - ) - .unwrap(); - assert_eq!(result, expected); - - // Make sure we can compact and still do those things - let metrics = compact_files(&mut dataset, CompactionOptions::default(), None) - .await - .unwrap(); - assert_eq!(metrics.fragments_removed, 3); - assert_eq!(metrics.fragments_added, 1); - dataset.validate().await.unwrap(); + let result = dataset.append(reader, None).await; + assert!(result.is_err()); + assert!(matches!(result, Err(Error::SchemaMismatch { .. }))); } } diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index ddba3b66a0..67f8c7081b 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -239,11 +239,11 @@ pub trait BlobStreamExt: Sized { /// /// The second stream may be None (if there are no fields with the blob storage class) /// or it contains all fields with the blob storage class. - fn extract_blob_stream(self, schema: &Schema) -> (Self, Self); + fn extract_blob_stream(self, schema: &Schema) -> (Self, Option); } impl BlobStreamExt for SendableRecordBatchStream { - fn extract_blob_stream(self, schema: &Schema) -> (Self, Self) { + fn extract_blob_stream(self, schema: &Schema) -> (Self, Option) { let mut indices_with_blob = Vec::with_capacity(schema.fields.len()); let mut indices_without_blob = Vec::with_capacity(schema.fields.len()); for (idx, field) in schema.fields.iter().enumerate() { @@ -253,33 +253,36 @@ impl BlobStreamExt for SendableRecordBatchStream { indices_without_blob.push(idx); } } - - let left_schema = Arc::new(self.schema().project(&indices_without_blob).unwrap()); - let right_schema = Arc::new(self.schema().project(&indices_with_blob).unwrap()); - - let (left, right) = ShareableRecordBatchStream(self) - .boxed() - // If we are working with blobs then we are probably working with rather large batches - // We don't want to read too far ahead. - .share(Capacity::Bounded(1)); - - let left = left.map(move |batch| match batch { - CloneableResult(Ok(batch)) => { - CloneableResult(Ok(batch.project(&indices_without_blob).unwrap())) - } - CloneableResult(Err(err)) => CloneableResult(Err(err)), - }); - - let right = right.map(move |batch| match batch { - CloneableResult(Ok(batch)) => { - CloneableResult(Ok(batch.project(&indices_with_blob).unwrap())) - } - CloneableResult(Err(err)) => CloneableResult(Err(err)), - }); - - let left = ShareableRecordBatchStreamAdapter::new(left_schema, left); - let right = ShareableRecordBatchStreamAdapter::new(right_schema, right); - (Box::pin(left), Box::pin(right)) + if indices_with_blob.is_empty() { + (self, None) + } else { + let left_schema = Arc::new(self.schema().project(&indices_without_blob).unwrap()); + let right_schema = Arc::new(self.schema().project(&indices_with_blob).unwrap()); + + let (left, right) = ShareableRecordBatchStream(self) + .boxed() + // If we are working with blobs then we are probably working with rather large batches + // We don't want to read too far ahead. + .share(Capacity::Bounded(1)); + + let left = left.map(move |batch| match batch { + CloneableResult(Ok(batch)) => { + CloneableResult(Ok(batch.project(&indices_without_blob).unwrap())) + } + CloneableResult(Err(err)) => CloneableResult(Err(err)), + }); + + let right = right.map(move |batch| match batch { + CloneableResult(Ok(batch)) => { + CloneableResult(Ok(batch.project(&indices_with_blob).unwrap())) + } + CloneableResult(Err(err)) => CloneableResult(Err(err)), + }); + + let left = ShareableRecordBatchStreamAdapter::new(left_schema, left); + let right = ShareableRecordBatchStreamAdapter::new(right_schema, right); + (Box::pin(left), Some(Box::pin(right))) + } } } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 7e1c943f93..bcbf685284 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -348,24 +348,7 @@ pub async fn write_fragments_internal( let data_schema = schema.project_by_schema(data.schema().as_ref())?; - let has_blob = { - let schema_to_check = if let Some(dataset) = dataset { - dataset.schema() - } else { - &data_schema - }; - schema_to_check - .fields - .iter() - .any(|f| f.storage_class() == StorageClass::Blob) - }; - - let (data, blob_data) = if has_blob { - let (data, blob_data) = data.extract_blob_stream(&data_schema); - (data, Some(blob_data)) - } else { - (data, None) - }; + let (data, blob_data) = data.extract_blob_stream(&data_schema); // Some params we borrow from the normal write, some we override let blob_write_params = WriteParams {