diff --git a/python/python/tests/test_balanced.py b/python/python/tests/test_balanced.py index e2713c38c9..a7d33bd3d0 100644 --- a/python/python/tests/test_balanced.py +++ b/python/python/tests/test_balanced.py @@ -271,21 +271,3 @@ def test_unsupported(balanced_dataset, big_val): balanced_dataset.merge_insert("idx").when_not_matched_insert_all().execute( make_table(0, 1, big_val) ) - - -# TODO: Once https://github.com/lancedb/lance/pull/3041 merges we will -# want to test partial appends. We need to make sure an append of -# non-blob data is supported. In order to do this we need to make -# sure a blob tx is created that marks the row ids as used so that -# the two row id sequences stay in sync. -# -# def test_one_sided_append(balanced_dataset, tmp_path): -# # Write new data, but only to the idx column -# ds = lance.write_dataset( -# pa.table({"idx": pa.array(range(128, 256), pa.uint64())}), -# tmp_path / "test_ds", -# max_bytes_per_file=32 * 1024 * 1024, -# mode="append", -# ) - -# print(ds.to_table()) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index c46f5b53ed..6c62b50d59 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -119,6 +119,13 @@ def test_dataset_append(tmp_path: Path): with pytest.raises(OSError): lance.write_dataset(table2, base_dir, mode="append") + # But we can append subschemas + table3 = pa.Table.from_pydict({"colA": [4, 5, 6]}) + dataset = lance.write_dataset(table3, base_dir, mode="append") + assert dataset.to_table() == pa.table( + {"colA": [1, 2, 3, 4, 5, 6], "colB": [4, 5, 6, None, None, None]} + ) + def test_dataset_from_record_batch_iterable(tmp_path: Path): base_dir = tmp_path / "test" diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 38bd82c495..38324b0a97 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -532,7 +532,7 @@ impl Dataset { let dict = PyDict::new(py); let schema = self_.ds.schema(); - let idx_schema = schema.project_by_ids(idx.fields.as_slice()); + let idx_schema = schema.project_by_ids(idx.fields.as_slice(), true); let is_vector = idx_schema .fields diff --git a/python/src/debug.rs b/python/src/debug.rs index 9cc575150b..8856c1fb28 100644 --- a/python/src/debug.rs +++ b/python/src/debug.rs @@ -58,7 +58,7 @@ impl PrettyPrintableFragment { .files .iter() .map(|file| { - let schema = schema.project_by_ids(&file.fields); + let schema = schema.project_by_ids(&file.fields, false); PrettyPrintableDataFile { path: file.path.clone(), fields: file.fields.clone(), diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index 54f4971324..e7d3f28a97 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -18,7 +18,10 @@ mod field; mod schema; use crate::{Error, Result}; -pub use field::{Encoding, Field, NullabilityComparison, SchemaCompareOptions, StorageClass}; +pub use field::{ + Encoding, Field, NullabilityComparison, SchemaCompareOptions, StorageClass, + LANCE_STORAGE_CLASS_SCHEMA_META_KEY, +}; pub use schema::Schema; pub const COMPRESSION_META_KEY: &str = "lance-encoding:compression"; diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index a4a0b58d4f..91eade2fa7 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -5,7 +5,7 @@ use std::{ cmp::max, - collections::{HashMap, HashSet}, + collections::HashMap, fmt::{self, Display}, str::FromStr, sync::Arc, @@ -23,7 +23,10 @@ use deepsize::DeepSizeOf; use lance_arrow::{bfloat16::ARROW_EXT_NAME_KEY, *}; use snafu::{location, Location}; -use super::{Dictionary, LogicalType}; +use super::{ + schema::{compare_fields, explain_fields_difference}, + Dictionary, LogicalType, +}; use crate::{Error, Result}; pub const LANCE_STORAGE_CLASS_SCHEMA_META_KEY: &str = "lance-schema:storage-class"; @@ -49,6 +52,14 @@ pub struct SchemaCompareOptions { pub compare_field_ids: bool, /// Should nullability be compared (default Strict) pub compare_nullability: NullabilityComparison, + /// Allow fields in the expected schema to be missing from the schema being tested if + /// they are nullable (default false) + /// + /// Fields in the schema being tested must always be present in the expected schema + /// regardless of this flag. + pub allow_missing_if_nullable: bool, + /// Allow out of order fields (default false) + pub ignore_field_order: bool, } /// Encoding enum. #[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)] @@ -151,7 +162,7 @@ impl Field { self.storage_class } - fn explain_differences( + pub(crate) fn explain_differences( &self, expected: &Self, options: &SchemaCompareOptions, @@ -210,61 +221,19 @@ impl Field { self_name )); } - if self.children.len() != expected.children.len() - || !self - .children - .iter() - .zip(expected.children.iter()) - .all(|(child, expected)| child.name == expected.name) - { - let self_children = self - .children - .iter() - .map(|child| child.name.clone()) - .collect::>(); - let expected_children = expected - .children - .iter() - .map(|child| child.name.clone()) - .collect::>(); - let missing = expected_children - .difference(&self_children) - .cloned() - .collect::>(); - let unexpected = self_children - .difference(&expected_children) - .cloned() - .collect::>(); - if missing.is_empty() && unexpected.is_empty() { - differences.push(format!( - "`{}` field order mismatch, expected [{}] but was [{}]", - self_name, - expected - .children - .iter() - .map(|child| child.name.clone()) - .collect::>() - .join(", "), - self.children - .iter() - .map(|child| child.name.clone()) - .collect::>() - .join(", "), - )); - } else { - differences.push(format!( - "`{}` had mismatched children, missing=[{}] unexpected=[{}]", - self_name, - missing.join(", "), - unexpected.join(", ") - )); - } - } else { - differences.extend(self.children.iter().zip(expected.children.iter()).flat_map( - |(child, expected_child)| { - child.explain_differences(expected_child, options, Some(&self_name)) - }, - )); + let children_differences = explain_fields_difference( + &self.children, + &expected.children, + options, + Some(&self_name), + ); + if !children_differences.is_empty() { + let children_differences = format!( + "`{}` had mismatched children: {}", + self_name, + children_differences.join(", ") + ); + differences.push(children_differences); } differences } @@ -295,22 +264,13 @@ impl Field { } pub fn compare_with_options(&self, expected: &Self, options: &SchemaCompareOptions) -> bool { - if self.children.len() != expected.children.len() { - false - } else { - self.name == expected.name - && self.logical_type == expected.logical_type - && Self::compare_nullability(expected.nullable, self.nullable, options) - && self.children.len() == expected.children.len() - && self - .children - .iter() - .zip(&expected.children) - .all(|(left, right)| left.compare_with_options(right, options)) - && (!options.compare_field_ids || self.id == expected.id) - && (!options.compare_dictionary || self.dictionary == expected.dictionary) - && (!options.compare_metadata || self.metadata == expected.metadata) - } + self.name == expected.name + && self.logical_type == expected.logical_type + && Self::compare_nullability(expected.nullable, self.nullable, options) + && compare_fields(&self.children, &expected.children, options) + && (!options.compare_field_ids || self.id == expected.id) + && (!options.compare_dictionary || self.dictionary == expected.dictionary) + && (!options.compare_metadata || self.metadata == expected.metadata) } pub fn extension_name(&self) -> Option<&str> { @@ -476,13 +436,13 @@ impl Field { /// /// If the ids are `[2]`, then this will include the parent `0` and the /// child `3`. - pub(crate) fn project_by_ids(&self, ids: &[i32]) -> Option { + pub(crate) fn project_by_ids(&self, ids: &[i32], include_all_children: bool) -> Option { let children = self .children .iter() - .filter_map(|c| c.project_by_ids(ids)) + .filter_map(|c| c.project_by_ids(ids, include_all_children)) .collect::>(); - if ids.contains(&self.id) { + if ids.contains(&self.id) && (children.is_empty() || include_all_children) { Some(self.clone()) } else if !children.is_empty() { Some(Self { @@ -1177,7 +1137,10 @@ mod tests { .unwrap(); assert_eq!( wrong_child.explain_difference(&expected, &opts), - Some("`a.b` should have nullable=true but nullable=false".to_string()) + Some( + "`a` had mismatched children: `a.b` should have nullable=true but nullable=false" + .to_string() + ) ); let mismatched_children: Field = ArrowField::new( @@ -1192,13 +1155,13 @@ mod tests { .unwrap(); assert_eq!( mismatched_children.explain_difference(&expected, &opts), - Some("`a` had mismatched children, missing=[c] unexpected=[d]".to_string()) + Some("`a` had mismatched children: fields did not match, missing=[a.c], unexpected=[a.d]".to_string()) ); let reordered_children: Field = ArrowField::new( "a", DataType::Struct(Fields::from(vec![ - ArrowField::new("c", DataType::Int32, false), + ArrowField::new("c", DataType::Int32, true), ArrowField::new("b", DataType::Int32, true), ])), true, @@ -1207,7 +1170,7 @@ mod tests { .unwrap(); assert_eq!( reordered_children.explain_difference(&expected, &opts), - Some("`a` field order mismatch, expected [b, c] but was [c, b]".to_string()) + Some("`a` had mismatched children: fields in different order, expected: [b, c], actual: [c, b]".to_string()) ); let multiple_wrongs: Field = ArrowField::new( @@ -1223,7 +1186,7 @@ mod tests { assert_eq!( multiple_wrongs.explain_difference(&expected, &opts), Some( - "expected name 'a' but name was 'c', `c.c` should have type int32 but type was float" + "expected name 'a' but name was 'c', `c` had mismatched children: `c.c` should have type int32 but type was float" .to_string() ) ); diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index 8c3077bb7d..4d4589ee13 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -60,15 +60,8 @@ impl<'a> Iterator for SchemaFieldIterPreOrder<'a> { impl Schema { pub fn compare_with_options(&self, expected: &Self, options: &SchemaCompareOptions) -> bool { - if self.fields.len() != expected.fields.len() { - false - } else { - self.fields - .iter() - .zip(&expected.fields) - .all(|(lhs, rhs)| lhs.compare_with_options(rhs, options)) - && (!options.compare_metadata || self.metadata == expected.metadata) - } + compare_fields(&self.fields, &expected.fields, options) + && (!options.compare_metadata || self.metadata == expected.metadata) } pub fn explain_difference( @@ -76,69 +69,21 @@ impl Schema { expected: &Self, options: &SchemaCompareOptions, ) -> Option { - if self.fields.len() != expected.fields.len() - || !self - .fields - .iter() - .zip(expected.fields.iter()) - .all(|(field, expected)| field.name == expected.name) - { - let self_fields = self - .fields - .iter() - .map(|f| f.name.clone()) - .collect::>(); - let expected_fields = expected - .fields - .iter() - .map(|f| f.name.clone()) - .collect::>(); - let missing = expected_fields - .difference(&self_fields) - .cloned() - .collect::>(); - let unexpected = self_fields - .difference(&expected_fields) - .cloned() - .collect::>(); - if missing.is_empty() && unexpected.is_empty() { - Some(format!( - "fields in different order, expected: [{}], actual: [{}]", - expected - .fields - .iter() - .map(|f| f.name.clone()) - .collect::>() - .join(", "), - self.fields - .iter() - .map(|f| f.name.clone()) - .collect::>() - .join(", "), - )) - } else { - Some(format!( - "fields did not match, missing=[{}], unexpected=[{}]", - missing.join(", "), - unexpected.join(", ") - )) + let mut differences = + explain_fields_difference(&self.fields, &expected.fields, options, None); + + if options.compare_metadata { + if let Some(difference) = + explain_metadata_difference(&self.metadata, &expected.metadata) + { + differences.push(difference); } + } + + if differences.is_empty() { + None } else { - let differences = self - .fields - .iter() - .zip(expected.fields.iter()) - .flat_map(|(field, expected)| field.explain_difference(expected, options)) - .collect::>(); - if differences.is_empty() { - if options.compare_metadata && self.metadata != expected.metadata { - Some("schema metadata did not match expected schema metadata".to_string()) - } else { - None - } - } else { - Some(differences.join(", ")) - } + Some(differences.join(", ")) } } @@ -335,11 +280,16 @@ impl Schema { /// Returns a new schema that only contains the fields in `column_ids`. /// /// This projection can filter out both top-level and nested fields - pub fn project_by_ids(&self, column_ids: &[i32]) -> Self { + /// + /// If `include_all_children` is true, then if a parent field id is passed, + /// then all children of that field will be included in the projection + /// regardless of whether their ids were passed. If this is false, then + /// only the child fields with the passed ids will be included. + pub fn project_by_ids(&self, column_ids: &[i32], include_all_children: bool) -> Self { let filtered_fields = self .fields .iter() - .filter_map(|f| f.project_by_ids(column_ids)) + .filter_map(|f| f.project_by_ids(column_ids, include_all_children)) .collect(); Self { fields: filtered_fields, @@ -632,6 +582,162 @@ impl TryFrom<&Self> for Schema { } } +pub fn compare_fields( + fields: &[Field], + expected: &[Field], + options: &SchemaCompareOptions, +) -> bool { + if options.allow_missing_if_nullable || options.ignore_field_order { + let expected_names = expected + .iter() + .map(|f| f.name.as_str()) + .collect::>(); + for field in fields { + if !expected_names.contains(field.name.as_str()) { + // Extra field + return false; + } + } + + let field_mapping = fields + .iter() + .enumerate() + .map(|(pos, f)| (f.name.as_str(), (f, pos))) + .collect::>(); + let mut cumulative_position = 0; + for expected_field in expected { + if let Some((field, pos)) = field_mapping.get(expected_field.name.as_str()) { + if !field.compare_with_options(expected_field, options) { + return false; + } + if !options.ignore_field_order && *pos < cumulative_position { + return false; + } + cumulative_position = *pos; + } else if options.allow_missing_if_nullable && expected_field.nullable { + continue; + } else { + return false; + } + } + true + } else { + // Fast path: we can just zip + fields.len() == expected.len() + && fields + .iter() + .zip(expected.iter()) + .all(|(lhs, rhs)| lhs.compare_with_options(rhs, options)) + } +} + +pub fn explain_fields_difference( + fields: &[Field], + expected: &[Field], + options: &SchemaCompareOptions, + path: Option<&str>, +) -> Vec { + let field_names = fields + .iter() + .map(|f| f.name.as_str()) + .collect::>(); + let expected_names = expected + .iter() + .map(|f| f.name.as_str()) + .collect::>(); + + let prepend_path = |f: &str| { + if let Some(path) = path { + format!("{}.{}", path, f) + } else { + f.to_string() + } + }; + + // Check there are no extra fields or missing fields + let unexpected_fields = field_names + .difference(&expected_names) + .cloned() + .map(prepend_path) + .collect::>(); + let missing_fields = expected_names.difference(&field_names); + let missing_fields = if options.allow_missing_if_nullable { + missing_fields + .filter(|f| { + let expected_field = expected.iter().find(|ef| ef.name == **f).unwrap(); + !expected_field.nullable + }) + .cloned() + .map(prepend_path) + .collect::>() + } else { + missing_fields + .cloned() + .map(prepend_path) + .collect::>() + }; + + let mut differences = vec![]; + if !missing_fields.is_empty() || !unexpected_fields.is_empty() { + differences.push(format!( + "fields did not match, missing=[{}], unexpected=[{}]", + missing_fields.join(", "), + unexpected_fields.join(", ") + )); + } + + // Map the expected fields to position of field + let field_mapping = expected + .iter() + .filter_map(|ef| { + fields + .iter() + .position(|f| ef.name == f.name) + .map(|pos| (ef, pos)) + }) + .collect::>(); + + // Check the fields are in the same order + if !options.ignore_field_order { + let fields_out_of_order = field_mapping.windows(2).any(|w| w[0].1 > w[1].1); + if fields_out_of_order { + let expected_order = expected.iter().map(|f| f.name.as_str()).collect::>(); + let actual_order = fields.iter().map(|f| f.name.as_str()).collect::>(); + differences.push(format!( + "fields in different order, expected: [{}], actual: [{}]", + expected_order.join(", "), + actual_order.join(", ") + )); + } + } + + // Check for individual differences in the fields + for (expected_field, field_pos) in field_mapping.iter() { + let field = &fields[*field_pos]; + debug_assert_eq!(field.name, expected_field.name); + let field_diffs = field.explain_differences(expected_field, options, path); + if !field_diffs.is_empty() { + differences.push(field_diffs.join(", ")) + } + } + + differences +} + +fn explain_metadata_difference( + metadata: &HashMap, + expected: &HashMap, +) -> Option { + if metadata != expected { + Some(format!( + "metadata did not match, expected: {:?}, actual: {:?}", + expected, metadata + )) + } else { + None + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -691,7 +797,7 @@ mod tests { ]); let mut schema = Schema::try_from(&arrow_schema).unwrap(); schema.set_field_id(None); - let projected = schema.project_by_ids(&[2, 4, 5]); + let projected = schema.project_by_ids(&[2, 4, 5], true); let expected_arrow_schema = ArrowSchema::new(vec![ ArrowField::new( @@ -706,7 +812,7 @@ mod tests { ]); assert_eq!(ArrowSchema::from(&projected), expected_arrow_schema); - let projected = schema.project_by_ids(&[2]); + let projected = schema.project_by_ids(&[2], true); let expected_arrow_schema = ArrowSchema::new(vec![ArrowField::new( "b", DataType::Struct(ArrowFields::from(vec![ArrowField::new( @@ -718,7 +824,7 @@ mod tests { )]); assert_eq!(ArrowSchema::from(&projected), expected_arrow_schema); - let projected = schema.project_by_ids(&[1]); + let projected = schema.project_by_ids(&[1], true); let expected_arrow_schema = ArrowSchema::new(vec![ArrowField::new( "b", DataType::Struct(ArrowFields::from(vec![ @@ -729,6 +835,18 @@ mod tests { true, )]); assert_eq!(ArrowSchema::from(&projected), expected_arrow_schema); + + let projected = schema.project_by_ids(&[1, 2], false); + let expected_arrow_schema = ArrowSchema::new(vec![ArrowField::new( + "b", + DataType::Struct(ArrowFields::from(vec![ArrowField::new( + "f1", + DataType::Utf8, + true, + )])), + true, + )]); + assert_eq!(ArrowSchema::from(&projected), expected_arrow_schema); } #[test] @@ -1096,6 +1214,115 @@ mod tests { ]); let mismatched = Schema::try_from(&mismatched).unwrap(); - assert_eq!(mismatched.explain_difference(&expected, &SchemaCompareOptions::default()), Some("`b` had mismatched children, missing=[f2] unexpected=[], `c` should have nullable=false but nullable=true".to_string())); + assert_eq!( + mismatched.explain_difference(&expected, &SchemaCompareOptions::default()), + Some( + "`b` had mismatched children: fields did not match, missing=[b.f2], \ + unexpected=[], `c` should have nullable=false but nullable=true" + .to_string() + ) + ); + } + + #[test] + fn test_schema_difference_subschema() { + let expected = ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, false), + ArrowField::new( + "b", + DataType::Struct(ArrowFields::from(vec![ + ArrowField::new("f1", DataType::Utf8, true), + ArrowField::new("f2", DataType::Boolean, false), + ArrowField::new("f3", DataType::Float32, false), + ])), + true, + ), + ArrowField::new("c", DataType::Float64, true), + ]); + let expected = Schema::try_from(&expected).unwrap(); + + // Can omit nullable fields and subfields + let subschema = ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, false), + ArrowField::new( + "b", + DataType::Struct(ArrowFields::from(vec![ + ArrowField::new("f2", DataType::Boolean, false), + ArrowField::new("f3", DataType::Float32, false), + ])), + true, + ), + ]); + let subschema = Schema::try_from(&subschema).unwrap(); + + assert!(!subschema.compare_with_options(&expected, &SchemaCompareOptions::default())); + assert_eq!( + subschema.explain_difference(&expected, &SchemaCompareOptions::default()), + Some( + "fields did not match, missing=[c], unexpected=[], `b` had mismatched \ + children: fields did not match, missing=[b.f1], unexpected=[]" + .to_string() + ) + ); + let options = SchemaCompareOptions { + allow_missing_if_nullable: true, + ..Default::default() + }; + assert!(subschema.compare_with_options(&expected, &options)); + let res = subschema.explain_difference(&expected, &options); + assert!(res.is_none(), "Expected None, got {:?}", res); + + // Omitting non-nullable fields should fail + let subschema = ArrowSchema::new(vec![ArrowField::new( + "b", + DataType::Struct(ArrowFields::from(vec![ArrowField::new( + "f2", + DataType::Boolean, + false, + )])), + true, + )]); + let subschema = Schema::try_from(&subschema).unwrap(); + assert!(!subschema.compare_with_options(&expected, &options)); + assert_eq!( + subschema.explain_difference(&expected, &options), + Some( + "fields did not match, missing=[a], unexpected=[], `b` had mismatched \ + children: fields did not match, missing=[b.f3], unexpected=[]" + .to_string() + ) + ); + + let out_of_order = ArrowSchema::new(vec![ + ArrowField::new("c", DataType::Float64, true), + ArrowField::new( + "b", + DataType::Struct(ArrowFields::from(vec![ + ArrowField::new("f3", DataType::Float32, false), + ArrowField::new("f2", DataType::Boolean, false), + ArrowField::new("f1", DataType::Utf8, true), + ])), + true, + ), + ArrowField::new("a", DataType::Int32, false), + ]); + let out_of_order = Schema::try_from(&out_of_order).unwrap(); + assert!(!out_of_order.compare_with_options(&expected, &options)); + assert_eq!( + subschema.explain_difference(&expected, &options), + Some( + "fields did not match, missing=[a], unexpected=[], `b` had mismatched \ + children: fields did not match, missing=[b.f3], unexpected=[]" + .to_string() + ) + ); + + let options = SchemaCompareOptions { + ignore_field_order: true, + ..Default::default() + }; + assert!(out_of_order.compare_with_options(&expected, &options)); + let res = out_of_order.explain_difference(&expected, &options); + assert!(res.is_none(), "Expected None, got {:?}", res); } } diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index f9496a6498..1b2e7b1a25 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -349,7 +349,7 @@ impl FileReader { } } } - meta.schema.project_by_ids(&stats_field_ids) + meta.schema.project_by_ids(&stats_field_ids, true) }) } diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index a8d56eba57..bb51ec9380 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -121,7 +121,7 @@ impl FileWriter { }; let stats_collector = if !collect_stats_for_fields.is_empty() { - let stats_schema = schema.project_by_ids(&collect_stats_for_fields); + let stats_schema = schema.project_by_ids(&collect_stats_for_fields, true); statistics::StatisticsCollector::try_new(&stats_schema) } else { None diff --git a/rust/lance-table/src/format/fragment.rs b/rust/lance-table/src/format/fragment.rs index 7e370bba9d..76125c3c94 100644 --- a/rust/lance-table/src/format/fragment.rs +++ b/rust/lance-table/src/format/fragment.rs @@ -91,7 +91,7 @@ impl DataFile { } pub fn schema(&self, full_schema: &Schema) -> Schema { - full_schema.project_by_ids(&self.fields) + full_schema.project_by_ids(&self.fields, false) } pub fn is_legacy_file(&self) -> bool { diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index d470f2f7ee..dde4788115 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -566,16 +566,20 @@ impl Dataset { params.enable_move_stable_row_ids = d.manifest.uses_move_stable_row_ids(); } let m = d.manifest.as_ref(); - schema.check_compatible( - &m.schema, - &SchemaCompareOptions { - compare_dictionary: true, - // array nullability is checked later, using actual data instead - // of the schema - compare_nullability: NullabilityComparison::Ignore, - ..Default::default() - }, - )?; + let mut schema_cmp_opts = SchemaCompareOptions { + compare_dictionary: true, + // array nullability is checked later, using actual data instead + // of the schema + compare_nullability: NullabilityComparison::Ignore, + ..Default::default() + }; + if m.blob_dataset_version.is_none() { + // Balanced datasets don't yet support schema evolution + schema_cmp_opts.ignore_field_order = true; + schema_cmp_opts.allow_missing_if_nullable = true; + } + + schema.check_compatible(&m.schema, &schema_cmp_opts)?; // If appending, always use existing storage version storage_version = m.data_storage_format.lance_file_version()?; } @@ -717,7 +721,7 @@ impl Dataset { params: Option, ) -> Result<()> { // Force append mode - let params = WriteParams { + let mut params = WriteParams { mode: WriteMode::Append, ..params.unwrap_or_default() }; @@ -734,13 +738,31 @@ impl Dataset { let stream = reader_to_stream(batches); // Return Error if append and input schema differ - self.manifest.schema.check_compatible( - &schema, - &SchemaCompareOptions { - compare_dictionary: true, - ..Default::default() - }, - )?; + let mut schema_cmp_opts = SchemaCompareOptions { + compare_dictionary: true, + // array nullability is checked later, using actual data instead + // of the schema + compare_nullability: NullabilityComparison::Ignore, + ..Default::default() + }; + if self.manifest.blob_dataset_version.is_none() { + // Balanced datasets don't yet support schema evolution + schema_cmp_opts.ignore_field_order = true; + schema_cmp_opts.allow_missing_if_nullable = true; + } + + schema.check_compatible(&self.manifest.schema, &schema_cmp_opts)?; + + // If the dataset is already using (or not using) move stable row ids, we need to match + // and ignore whatever the user provided as input + if params.enable_move_stable_row_ids != self.manifest.uses_move_stable_row_ids() { + info!( + "Ignoring user provided move stable row ids setting of {}, dataset already has it set to {}", + params.enable_move_stable_row_ids, + self.manifest.uses_move_stable_row_ids() + ); + params.enable_move_stable_row_ids = self.manifest.uses_move_stable_row_ids(); + } let written_frags = write_fragments_internal( Some(self), @@ -2058,6 +2080,7 @@ mod tests { DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema, }; use lance_arrow::bfloat16::{self, ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME}; + use lance_core::datatypes::LANCE_STORAGE_CLASS_SCHEMA_META_KEY; use lance_datagen::{array, gen, BatchCount, Dimension, RowCount}; use lance_file::version::LanceFileVersion; use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams}; @@ -4876,4 +4899,291 @@ mod tests { } } } + + #[tokio::test] + async fn test_insert_subschema() { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, false), + ArrowField::new("b", DataType::Int32, true), + ])); + let empty_reader = RecordBatchIterator::new(vec![], schema.clone()); + let mut dataset = Dataset::write(empty_reader, "memory://", None) + .await + .unwrap(); + dataset.validate().await.unwrap(); + + // If missing columns that aren't nullable, will return an error + // TODO: provide alternative default than null. + let just_b = Arc::new(schema.project(&[1]).unwrap()); + let batch = RecordBatch::try_new(just_b.clone(), vec![Arc::new(Int32Array::from(vec![1]))]) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], just_b.clone()); + let res = dataset.append(reader, None).await; + assert!( + matches!(res, Err(Error::SchemaMismatch { .. })), + "Expected Error::SchemaMismatch, got {:?}", + res + ); + + // If missing columns that are nullable, the write succeeds. + let just_a = Arc::new(schema.project(&[0]).unwrap()); + let batch = RecordBatch::try_new(just_a.clone(), vec![Arc::new(Int32Array::from(vec![1]))]) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], just_a.clone()); + dataset.append(reader, None).await.unwrap(); + dataset.validate().await.unwrap(); + + // Looking at the fragments, there is no data file with the missing field + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 1); + assert_eq!(fragments[0].metadata.files.len(), 1); + assert_eq!(&fragments[0].metadata.files[0].fields, &[0]); + + // When reading back, columns that are missing are null + let data = dataset.scan().try_into_batch().await.unwrap(); + let expected = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1])), + Arc::new(Int32Array::from(vec![None])), + ], + ) + .unwrap(); + assert_eq!(data, expected); + + // Can still insert all columns + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![2])), + Arc::new(Int32Array::from(vec![3])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + dataset.append(reader, None).await.unwrap(); + dataset.validate().await.unwrap(); + + // When reading back, only missing data is null, otherwise is filled in + let data = dataset.scan().try_into_batch().await.unwrap(); + let expected = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(Int32Array::from(vec![None, Some(3)])), + ], + ) + .unwrap(); + assert_eq!(data, expected); + + // Can run compaction. All files should now have all fields. + compact_files(&mut dataset, CompactionOptions::default(), None) + .await + .unwrap(); + dataset.validate().await.unwrap(); + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 1); + assert_eq!(fragments[0].metadata.files.len(), 1); + assert_eq!(&fragments[0].metadata.files[0].fields, &[0, 1]); + + // Can scan and get expected data. + let data = dataset.scan().try_into_batch().await.unwrap(); + assert_eq!(data, expected); + } + + #[tokio::test] + async fn test_insert_nested_subschemas() { + // Test subschemas at struct level + // Test different orders + // Test the Dataset::write() path + // Test Take across fragments with different field id sets + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let field_a = Arc::new(ArrowField::new("a", DataType::Int32, true)); + let field_b = Arc::new(ArrowField::new("b", DataType::Int32, false)); + let field_c = Arc::new(ArrowField::new("c", DataType::Int32, true)); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "s", + DataType::Struct(vec![field_a.clone(), field_b.clone(), field_c.clone()].into()), + true, + )])); + let empty_reader = RecordBatchIterator::new(vec![], schema.clone()); + let dataset = Dataset::write(empty_reader, test_uri, None).await.unwrap(); + dataset.validate().await.unwrap(); + + let append_options = WriteParams { + mode: WriteMode::Append, + ..Default::default() + }; + // Can insert b, a + let just_b_a = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "s", + DataType::Struct(vec![field_b.clone(), field_a.clone()].into()), + true, + )])); + let batch = RecordBatch::try_new( + just_b_a.clone(), + vec![Arc::new(StructArray::from(vec![ + ( + field_b.clone(), + Arc::new(Int32Array::from(vec![1])) as ArrayRef, + ), + (field_a.clone(), Arc::new(Int32Array::from(vec![2]))), + ]))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], just_b_a.clone()); + let dataset = Dataset::write(reader, test_uri, Some(append_options.clone())) + .await + .unwrap(); + dataset.validate().await.unwrap(); + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 1); + assert_eq!(fragments[0].metadata.files.len(), 1); + assert_eq!(&fragments[0].metadata.files[0].fields, &[0, 2, 1]); + assert_eq!(&fragments[0].metadata.files[0].column_indices, &[0, 1, 2]); + + // Can insert c, b + let just_c_b = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "s", + DataType::Struct(vec![field_c.clone(), field_b.clone()].into()), + true, + )])); + let batch = RecordBatch::try_new( + just_c_b.clone(), + vec![Arc::new(StructArray::from(vec![ + ( + field_c.clone(), + Arc::new(Int32Array::from(vec![4])) as ArrayRef, + ), + (field_b.clone(), Arc::new(Int32Array::from(vec![3]))), + ]))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], just_c_b.clone()); + let dataset = Dataset::write(reader, test_uri, Some(append_options.clone())) + .await + .unwrap(); + dataset.validate().await.unwrap(); + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 2); + assert_eq!(fragments[1].metadata.files.len(), 1); + assert_eq!(&fragments[1].metadata.files[0].fields, &[0, 3, 2]); + assert_eq!(&fragments[1].metadata.files[0].column_indices, &[0, 1, 2]); + + // Can't insert a, c (b is non-nullable) + let just_a_c = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "s", + DataType::Struct(vec![field_a.clone(), field_c.clone()].into()), + true, + )])); + let batch = RecordBatch::try_new( + just_a_c.clone(), + vec![Arc::new(StructArray::from(vec![ + ( + field_a.clone(), + Arc::new(Int32Array::from(vec![5])) as ArrayRef, + ), + (field_c.clone(), Arc::new(Int32Array::from(vec![6]))), + ]))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], just_a_c.clone()); + let res = Dataset::write(reader, test_uri, Some(append_options)).await; + assert!( + matches!(res, Err(Error::SchemaMismatch { .. })), + "Expected Error::SchemaMismatch, got {:?}", + res + ); + + // Can scan and get all data + let data = dataset.scan().try_into_batch().await.unwrap(); + let expected = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StructArray::from(vec![ + ( + field_a.clone(), + Arc::new(Int32Array::from(vec![Some(2), None])) as ArrayRef, + ), + (field_b.clone(), Arc::new(Int32Array::from(vec![1, 3]))), + ( + field_c.clone(), + Arc::new(Int32Array::from(vec![None, Some(4)])), + ), + ]))], + ) + .unwrap(); + assert_eq!(data, expected); + + // Can call take and get rows from all three back in one batch + let result = dataset + .take(&[1, 0], Arc::new(dataset.schema().clone())) + .await + .unwrap(); + let expected = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StructArray::from(vec![ + ( + field_a.clone(), + Arc::new(Int32Array::from(vec![None, Some(2)])) as ArrayRef, + ), + (field_b.clone(), Arc::new(Int32Array::from(vec![3, 1]))), + ( + field_c.clone(), + Arc::new(Int32Array::from(vec![Some(4), None])), + ), + ]))], + ) + .unwrap(); + assert_eq!(result, expected); + } + + #[tokio::test] + async fn test_insert_balanced_subschemas() { + // TODO: support this. + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let field_a = ArrowField::new("a", DataType::Int32, true); + let field_b = ArrowField::new("b", DataType::Int64, true); + let schema = Arc::new(ArrowSchema::new(vec![ + field_a.clone(), + field_b.clone().with_metadata( + [( + LANCE_STORAGE_CLASS_SCHEMA_META_KEY.to_string(), + "blob".to_string(), + )] + .into(), + ), + ])); + let empty_reader = RecordBatchIterator::new(vec![], schema.clone()); + let options = WriteParams { + enable_move_stable_row_ids: true, + enable_v2_manifest_paths: true, + ..Default::default() + }; + let mut dataset = Dataset::write(empty_reader, test_uri, Some(options)) + .await + .unwrap(); + dataset.validate().await.unwrap(); + + // Insert left side + let just_a = Arc::new(ArrowSchema::new(vec![field_a.clone()])); + let batch = RecordBatch::try_new(just_a.clone(), vec![Arc::new(Int32Array::from(vec![1]))]) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], just_a.clone()); + let result = dataset.append(reader, None).await; + assert!(result.is_err()); + assert!(matches!(result, Err(Error::SchemaMismatch { .. }))); + + // Insert right side + let just_b = Arc::new(ArrowSchema::new(vec![field_b.clone()])); + let batch = RecordBatch::try_new(just_b.clone(), vec![Arc::new(Int64Array::from(vec![2]))]) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], just_b.clone()); + let result = dataset.append(reader, None).await; + assert!(result.is_err()); + assert!(matches!(result, Err(Error::SchemaMismatch { .. }))); + } } diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index d2d6d9e604..def1b1ae2b 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -12,7 +12,9 @@ use std::sync::Arc; use arrow::compute::concat_batches; use arrow_array::cast::as_primitive_array; -use arrow_array::{RecordBatch, RecordBatchReader, StructArray, UInt32Array, UInt64Array}; +use arrow_array::{ + new_null_array, RecordBatch, RecordBatchReader, StructArray, UInt32Array, UInt64Array, +}; use arrow_schema::Schema as ArrowSchema; use datafusion::logical_expr::Expr; use datafusion::scalar::ScalarValue; @@ -388,6 +390,101 @@ mod v2_adapter { } } +/// A reader where all rows are null. Used when there are fields that have no +/// data files in a fragment. +#[derive(Debug, Clone)] +struct NullReader { + schema: Arc, + num_rows: u32, +} + +impl NullReader { + fn new(schema: Arc, num_rows: u32) -> Self { + Self { schema, num_rows } + } + + fn batch(projection: Arc, num_rows: usize) -> RecordBatch { + let columns = projection + .fields() + .iter() + .map(|f| new_null_array(f.data_type(), num_rows)) + .collect::>(); + RecordBatch::try_new(projection, columns).unwrap() + } +} + +#[async_trait::async_trait] +impl GenericFileReader for NullReader { + fn read_range_tasks( + &self, + range: Range, + batch_size: u32, + projection: Arc, + ) -> Result { + let mut remaining_rows = range.end - range.start; + let projection: Arc = Arc::new(projection.as_ref().into()); + + let task_iter = std::iter::from_fn(move || { + if remaining_rows == 0 { + return None; + } + + let num_rows = remaining_rows.min(batch_size as u64) as usize; + remaining_rows -= num_rows as u64; + let batch = Self::batch(projection.clone(), num_rows); + let task = ReadBatchTask { + task: futures::future::ready(Ok(batch)).boxed(), + num_rows: num_rows as u32, + }; + Some(task) + }); + + Ok(futures::stream::iter(task_iter).boxed()) + } + + fn read_all_tasks( + &self, + batch_size: u32, + projection: Arc, + ) -> Result { + self.read_range_tasks(0..self.num_rows as u64, batch_size, projection) + } + + fn take_all_tasks( + &self, + indices: &[u32], + batch_size: u32, + projection: Arc, + ) -> Result { + let num_rows = indices.len() as u64; + self.read_range_tasks(0..num_rows, batch_size, projection) + } + + fn projection(&self) -> &Arc { + &self.schema + } + + fn len(&self) -> u32 { + self.num_rows + } + + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } + + fn is_legacy(&self) -> bool { + false + } + + fn as_legacy_opt(&self) -> Option<&FileReader> { + None + } + + fn as_legacy_opt_mut(&mut self) -> Option<&mut FileReader> { + None + } +} + #[derive(Debug, Default)] pub struct FragReadConfig { // Add the row id column @@ -713,6 +810,23 @@ impl FileFragment { } } + // This should return immediately on modern datasets. + let num_rows = self.count_rows().await?; + + // Check if there are any fields that are not in any data files + let field_ids_in_files = opened_files + .iter() + .flat_map(|r| r.projection().fields_pre_order().map(|f| f.id)) + .filter(|id| *id >= 0) + .collect::>(); + let mut missing_fields = projection.field_ids(); + missing_fields.retain(|f| !field_ids_in_files.contains(f) && *f >= 0); + if !missing_fields.is_empty() { + let missing_projection = projection.project_by_ids(&missing_fields, true); + let null_reader = NullReader::new(Arc::new(missing_projection), num_rows as u32); + opened_files.push(Box::new(null_reader)); + } + Ok(opened_files) } @@ -871,24 +985,6 @@ impl FileFragment { )); } - for field in self.schema().fields_pre_order() { - if !seen_fields.contains(&field.id) { - return Err(Error::corrupt_file( - self.dataset - .data_dir() - .child(self.metadata.files[0].path.as_str()), - format!( - "Field {} is missing in fragment {}\nField: {:#?}\nFragment: {:#?}", - field.id, - self.id(), - field, - self.metadata() - ), - location!(), - )); - } - } - for data_file in &self.metadata.files { data_file.validate(&self.dataset.data_dir())?; } @@ -1109,7 +1205,6 @@ impl FileFragment { projection: &Schema, with_row_address: bool, ) -> Result { - println!("Fragment take (offsets={:?}", row_offsets); let reader = self .open( projection, diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 0f4037ed2b..6e37dd5704 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -704,6 +704,11 @@ async fn rewrite_files( if let Some(max_bytes_per_file) = options.max_bytes_per_file { params.max_bytes_per_file = max_bytes_per_file; } + + if dataset.manifest.uses_move_stable_row_ids() { + params.enable_move_stable_row_ids = true; + } + let new_fragments = write_fragments_internal( Some(dataset.as_ref()), dataset.object_store.clone(), diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index fb3aa5a037..b42aaaaa32 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -528,7 +528,7 @@ pub(super) async fn alter_columns( .map(|(_old, new)| new.id) .collect::>(); // This schema contains the exact field ids we want to write the new fields with. - let new_col_schema = new_schema.project_by_ids(&new_ids); + let new_col_schema = new_schema.project_by_ids(&new_ids, true); let mapper = move |batch: &RecordBatch| { let mut fields = Vec::with_capacity(cast_fields.len()); diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index f28edee8a3..bcbf685284 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -309,22 +309,23 @@ pub async fn write_fragments_internal( schema.check_compatible( dataset.schema(), &SchemaCompareOptions { - // We don't care if the user claims their data is nullable / non-nullable. - // We will verify against the actual data in the writer. + // We don't care if the user claims their data is nullable / non-nullable. We will + // verify against the actual data. compare_nullability: NullabilityComparison::Ignore, + allow_missing_if_nullable: true, + ignore_field_order: true, + compare_dictionary: true, ..Default::default() }, )?; - // Use the schema from the dataset, because it has the correct - // field ids. Use the storage version from the dataset, ignoring - // any version from the user. - ( - dataset.schema().clone(), - dataset - .manifest() - .data_storage_format - .lance_file_version()?, - ) + // Project from the dataset schema, because it has the correct field ids. + let write_schema = dataset.schema().project_by_schema(&schema)?; + // Use the storage version from the dataset, ignoring any version from the user. + let data_storage_version = dataset + .manifest() + .data_storage_format + .lance_file_version()?; + (write_schema, data_storage_version) } WriteMode::Overwrite => { // Overwrite, use the schema from the data. If the user specified diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index aa313a1405..9e0c822bbb 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -800,7 +800,7 @@ impl DatasetIndexInternalExt for Dataset { let schema = self.schema(); let mut indexed_fields = Vec::new(); for index in indices.iter().filter(|idx| { - let idx_schema = schema.project_by_ids(idx.fields.as_slice()); + let idx_schema = schema.project_by_ids(idx.fields.as_slice(), true); let is_vector_index = idx_schema .fields .iter() diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 3b462c75ce..36553a6e85 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -350,6 +350,10 @@ fn fix_schema(manifest: &mut Manifest) -> Result<()> { for (old_field_id, new_field_id) in &old_field_id_mapping { let field = manifest.schema.mut_field_by_id(*old_field_id).unwrap(); field.id = *new_field_id; + + if let Some(local_field) = manifest.local_schema.mut_field_by_id(*old_field_id) { + local_field.id = *new_field_id; + } } // Drop data files that are no longer in use. diff --git a/rust/lance/src/io/exec/projection.rs b/rust/lance/src/io/exec/projection.rs index 00cb350725..09b0d3fbcf 100644 --- a/rust/lance/src/io/exec/projection.rs +++ b/rust/lance/src/io/exec/projection.rs @@ -306,7 +306,7 @@ mod tests { ]; for projection in projections { - let projected_schema = lance_schema.project_by_ids(projection); + let projected_schema = lance_schema.project_by_ids(projection, true); let projected_arrow_schema = (&projected_schema).into(); let result = apply_to_batch(sample_data.clone(), &projected_arrow_schema) diff --git a/rust/lance/src/io/exec/pushdown_scan.rs b/rust/lance/src/io/exec/pushdown_scan.rs index f4881714c0..92bdf48132 100644 --- a/rust/lance/src/io/exec/pushdown_scan.rs +++ b/rust/lance/src/io/exec/pushdown_scan.rs @@ -456,7 +456,8 @@ impl FragmentScanner { .collect(); let remainder_batch = if !remaining_fields.is_empty() { - let remaining_projection = self.projection.project_by_ids(&remaining_fields); + let remaining_projection = + self.projection.project_by_ids(&remaining_fields, true); Some( self.reader .legacy_read_batch_projected( @@ -859,7 +860,7 @@ mod test { let fragments = dataset.fragments().clone(); // [x.b, y.a] - let projection = Arc::new(dataset.schema().clone().project_by_ids(&[2, 4])); + let projection = Arc::new(dataset.schema().clone().project_by_ids(&[2, 4], true)); let predicate = col("x") .field_newstyle("a") @@ -889,7 +890,7 @@ mod test { assert_eq!(results[0].schema().as_ref(), expected_schema.as_ref()); // Also try where projection is same as filter columns - let projection = Arc::new(dataset.schema().clone().project_by_ids(&[1, 5])); + let projection = Arc::new(dataset.schema().clone().project_by_ids(&[1, 5], true)); let exec = LancePushdownScanExec::try_new( dataset.clone(), fragments, @@ -1182,7 +1183,12 @@ mod test { let scan = LancePushdownScanExec::try_new( dataset.clone(), dataset.fragments().clone(), - Arc::new(dataset.schema().clone().project_by_ids(&projection_indices)), + Arc::new( + dataset + .schema() + .clone() + .project_by_ids(&projection_indices, true), + ), predicate, scan_config, )