From 4d50cac50b8e1dadd4d066cf1facff0e5f18f781 Mon Sep 17 00:00:00 2001 From: rmeng Date: Wed, 22 Jan 2025 14:15:35 -0800 Subject: [PATCH] feat: allow replacement of entire datafile when the schema lines up correctly --- protos/transaction.proto | 9 +- rust/lance/src/dataset.rs | 268 +++++++++++++++++++++++++- rust/lance/src/dataset/transaction.rs | 195 +++++++++++++++++-- 3 files changed, 457 insertions(+), 15 deletions(-) diff --git a/protos/transaction.proto b/protos/transaction.proto index 9959c5e75a..a052e037b5 100644 --- a/protos/transaction.proto +++ b/protos/transaction.proto @@ -173,6 +173,12 @@ message Transaction { } } + // An operation that replaces the data in a region of the table with new data. + message DataReplacement { + repeated DataFragment old_fragments = 1; + repeated DataFile new_datafiles = 2; + } + // The operation of this transaction. oneof operation { Append append = 100; @@ -186,6 +192,7 @@ message Transaction { Update update = 108; Project project = 109; UpdateConfig update_config = 110; + DataReplacement data_replacement = 111; } // An operation to apply to the blob dataset @@ -193,4 +200,4 @@ message Transaction { Append blob_append = 200; Overwrite blob_overwrite = 202; } -} \ No newline at end of file +} diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 5d8d0ddf3b..3647d7deca 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1744,12 +1744,13 @@ mod tests { use lance_arrow::bfloat16::{self, ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME}; use lance_core::datatypes::LANCE_STORAGE_CLASS_SCHEMA_META_KEY; use lance_datagen::{array, gen, BatchCount, Dimension, RowCount}; + use lance_file::v2::writer::FileWriter; use lance_file::version::LanceFileVersion; use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams}; use lance_index::{scalar::ScalarIndexParams, vector::DIST_COL, DatasetIndexExt, IndexType}; use lance_linalg::distance::MetricType; use lance_table::feature_flags; - use lance_table::format::WriterVersion; + use lance_table::format::{DataFile, WriterVersion}; use lance_table::io::commit::RenameCommitHandler; use lance_table::io::deletion::read_deletion_file; use lance_testing::datagen::generate_random_array; @@ -5148,4 +5149,269 @@ mod tests { assert!(result.is_err()); assert!(matches!(result, Err(Error::SchemaMismatch { .. }))); } + + #[tokio::test] + async fn test_datafile_replacement() { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "a", + DataType::Int32, + true, + )])); + let empty_reader = RecordBatchIterator::new(vec![], schema.clone()); + let dataset = Arc::new( + Dataset::write(empty_reader, "memory://", None) + .await + .unwrap(), + ); + dataset.validate().await.unwrap(); + + // Test empty replacement should commit a new manifest and do nothing + let mut dataset = Dataset::commit( + WriteDestination::Dataset(dataset.clone()), + Operation::DataReplacement { + old_fragments: vec![], + new_datafiles: vec![], + }, + Some(1), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap(); + dataset.validate().await.unwrap(); + + assert_eq!(dataset.version().version, 2); + assert_eq!(dataset.get_fragments().len(), 0); + + // try the same thing on a non-empty dataset + let vals: Int32Array = vec![1, 2, 3].into(); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vals)]).unwrap(); + dataset + .append( + RecordBatchIterator::new(vec![Ok(batch)], schema.clone()), + None, + ) + .await + .unwrap(); + + let dataset = Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset)), + Operation::DataReplacement { + old_fragments: vec![], + new_datafiles: vec![], + }, + Some(3), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap(); + dataset.validate().await.unwrap(); + + assert_eq!(dataset.version().version, 4); + assert_eq!(dataset.get_fragments().len(), 1); + + let batch = dataset.scan().try_into_batch().await.unwrap(); + assert_eq!(batch.num_rows(), 3); + assert_eq!( + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values(), + &[1, 2, 3] + ); + + // write a new datafile + let object_writer = dataset + .object_store + .create(&Path::try_from("data/test.lance").unwrap()) + .await + .unwrap(); + let mut writer = FileWriter::try_new( + object_writer, + schema.as_ref().try_into().unwrap(), + Default::default(), + ) + .unwrap(); + + let vals: Int32Array = vec![4, 5, 6].into(); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vals)]).unwrap(); + writer.write_batch(&batch).await.unwrap(); + writer.finish().await.unwrap(); + + // find the datafile we want to replace + let frag = dataset.get_fragment(0).unwrap(); + let data_file = frag.data_file_for_field(0).unwrap(); + let mut new_data_file = data_file.clone(); + new_data_file.path = "test.lance".to_string(); + let fragments = dataset + .get_fragments() + .into_iter() + .map(|f| f.metadata) + .collect(); + + let dataset = Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset)), + Operation::DataReplacement { + old_fragments: fragments, + new_datafiles: vec![new_data_file], + }, + Some(5), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap(); + + assert_eq!(dataset.version().version, 5); + assert_eq!(dataset.get_fragments().len(), 1); + + let batch = dataset.scan().try_into_batch().await.unwrap(); + assert_eq!(batch.num_rows(), 3); + assert_eq!( + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values(), + &[4, 5, 6] + ); + } + + #[tokio::test] + async fn test_datafile_partial_replacement() { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "a", + DataType::Int32, + true, + )])); + let empty_reader = RecordBatchIterator::new(vec![], schema.clone()); + let mut dataset = Dataset::write(empty_reader, "memory://", None) + .await + .unwrap(); + dataset.validate().await.unwrap(); + + let vals: Int32Array = vec![1, 2, 3].into(); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vals)]).unwrap(); + dataset + .append( + RecordBatchIterator::new(vec![Ok(batch)], schema.clone()), + None, + ) + .await + .unwrap(); + + let fragment = dataset.get_fragments().pop().unwrap().metadata; + + let extened_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, true), + ArrowField::new("b", DataType::Int32, true), + ])); + + // add all null column + let dataset = Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset)), + Operation::Merge { + fragments: vec![fragment], + schema: extened_schema.as_ref().try_into().unwrap(), + }, + Some(2), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap(); + + let partial_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "b", + DataType::Int32, + true, + )])); + + // write a new datafile + let object_writer = dataset + .object_store + .create(&Path::try_from("data/test.lance").unwrap()) + .await + .unwrap(); + let mut writer = FileWriter::try_new( + object_writer, + partial_schema.as_ref().try_into().unwrap(), + Default::default(), + ) + .unwrap(); + + let vals: Int32Array = vec![4, 5, 6].into(); + let batch = RecordBatch::try_new(partial_schema.clone(), vec![Arc::new(vals)]).unwrap(); + writer.write_batch(&batch).await.unwrap(); + writer.finish().await.unwrap(); + + // find the datafile we want to replace + let new_data_file = DataFile { + path: "test.lance".to_string(), + // the second column in the dataset + fields: vec![1], + // is located in the first column of this datafile + column_indices: vec![0], + file_major_version: 2, + file_minor_version: 0, + }; + + let fragments = dataset + .get_fragments() + .into_iter() + .map(|f| f.metadata) + .collect(); + + let dataset = Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset)), + Operation::DataReplacement { + old_fragments: fragments, + new_datafiles: vec![new_data_file], + }, + Some(3), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap(); + + assert_eq!(dataset.version().version, 4); + assert_eq!(dataset.get_fragments().len(), 1); + assert_eq!(dataset.get_fragments()[0].metadata.files.len(), 2); + + let batch = dataset.scan().try_into_batch().await.unwrap(); + assert_eq!(batch.num_rows(), 3); + assert_eq!( + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values(), + &[1, 2, 3] + ); + assert_eq!( + batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .values(), + &[4, 5, 6] + ); + } } diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index e390ecddcc..d1820424fe 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -22,22 +22,27 @@ //! a conflict. Some operations have additional conditions that must be met for //! them to be compatible. //! -//! | | Append | Delete / Update | Overwrite/Create | Create Index | Rewrite | Merge | Project | UpdateConfig | -//! |------------------|--------|-----------------|------------------|--------------|---------|-------|---------|-------------| -//! | Append | ✅ | ✅ | ❌ | ✅ | ✅ | ❌ | ❌ | ✅ | -//! | Delete / Update | ✅ | (1) | ❌ | ✅ | (1) | ❌ | ❌ | ✅ | -//! | Overwrite/Create | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | (2) | -//! | Create index | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ | -//! | Rewrite | ✅ | (1) | ❌ | ❌ | (1) | ❌ | ❌ | ✅ | -//! | Merge | ❌ | ❌ | ❌ | ❌ | ✅ | ❌ | ❌ | ✅ | -//! | Project | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ✅ | ✅ | -//! | UpdateConfig | ✅ | ✅ | (2) | ✅ | ✅ | ✅ | ✅ | (2) | +//! | | Append | Delete / Update | Overwrite/Create | Create Index | Rewrite | Merge | Project | UpdateConfig | DataReplacement | +//! |------------------|--------|-----------------|------------------|--------------|---------|-------|---------|--------------|-----------------| +//! | Append | ✅ | ✅ | ❌ | ✅ | ✅ | ❌ | ❌ | ✅ | ✅ +//! | Delete / Update | ✅ | 1️⃣ | ❌ | ✅ | 1️⃣ | ❌ | ❌ | ✅ | ✅ +//! | Overwrite/Create | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 2️⃣ | ✅ +//! | Create index | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ | 3️⃣ +//! | Rewrite | ✅ | 1️⃣ | ❌ | ❌ | 1️⃣ | ❌ | ❌ | ✅ | 3️⃣ +//! | Merge | ❌ | ❌ | ❌ | ❌ | ✅ | ❌ | ❌ | ✅ | ✅ +//! | Project | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ +//! | UpdateConfig | ✅ | ✅ | 2️⃣ | ✅ | ✅ | ✅ | ✅ | 2️⃣ | ✅ +//! | DataReplacement | ✅ | ✅ | ❌ | 3️⃣ | 3️⃣ | ✅ | ❌* | ✅ | 3️⃣ //! -//! (1) Delete, update, and rewrite are compatible with each other and themselves only if +//! 1️⃣ Delete, update, and rewrite are compatible with each other and themselves only if //! they affect distinct fragments. Otherwise, they conflict. -//! (2) Operations that mutate the config conflict if one of the operations upserts a key +//! 2️⃣ Operations that mutate the config conflict if one of the operations upserts a key //! that if referenced by another concurrent operation or if both operations modify the schema //! metadata or the same field metadata. +//! 3️⃣ DataReplacement on a column without index is compatible with any operation AS LONG AS +//! the operation does not modify the region of the column being replaced. +//! * This could become allowed in the future +//! use std::{ collections::{HashMap, HashSet}, @@ -51,7 +56,7 @@ use lance_io::object_store::ObjectStore; use lance_table::{ format::{ pb::{self, IndexMetadata}, - DataStorageFormat, Fragment, Index, Manifest, RowIdMeta, + DataFile, DataStorageFormat, Fragment, Index, Manifest, RowIdMeta, }, io::{ commit::CommitHandler, @@ -136,6 +141,25 @@ pub enum Operation { /// Indices that have been updated with the new row addresses rewritten_indices: Vec, }, + /// Replace data in a column in the dataset with a new data. This is used for + /// null column population where we replace an entirely null column with a + /// new column that has data. + /// + /// This operation will only allow replacing files that contains the same schema + /// e.g. if the original files contains column A, B, C and the new files contains + /// only column A, B then the operation is not allowed. As we would need to split + /// the original files into two files, one with column A, B and the other with column C. + /// + /// Corollary to the above: the operation will also not allow replacing files layouts + /// that are not uniform across all fragments. + /// e.g. if fragments being replaced contains files with different schema layouts on + /// the column being replaced, the operation is not allowed. + /// say frag_1: [A] [B, C] and frag_2: [A, B] [C] and we are trying to replace column A + /// with a new column A the operation is not allowed. + DataReplacement { + old_fragments: Vec, + new_datafiles: Vec, + }, /// Merge a new column in Merge { fragments: Vec, @@ -229,6 +253,9 @@ impl Operation { .map(|f| f.id) .chain(removed_fragment_ids.iter().copied()), ), + Self::DataReplacement { old_fragments, .. } => { + Box::new(old_fragments.iter().map(|f| f.id)) + } } } @@ -332,6 +359,7 @@ impl Operation { Self::Update { .. } => "Update", Self::Project { .. } => "Project", Self::UpdateConfig { .. } => "UpdateConfig", + Self::DataReplacement { .. } => "DataReplacement", } } } @@ -370,6 +398,7 @@ impl Transaction { Operation::ReserveFragments { .. } => false, Operation::Project { .. } => false, Operation::UpdateConfig { .. } => false, + Operation::DataReplacement { .. } => false, _ => true, }, Operation::Rewrite { .. } => match &other.operation { @@ -385,6 +414,13 @@ impl Transaction { } Operation::Project { .. } => false, Operation::UpdateConfig { .. } => false, + Operation::DataReplacement { + old_fragments, + new_datafiles, + } => { + // TODO: check that the fragments being replaced are not part of the groups + true + } _ => true, }, // Restore always succeeds @@ -411,6 +447,10 @@ impl Transaction { // if the rewrite changed more than X% of row ids. Operation::Rewrite { .. } => true, Operation::UpdateConfig { .. } => false, + Operation::DataReplacement { .. } => { + // TODO: check that the new indices isn't on the column being replaced + true + } _ => true, }, Operation::Delete { .. } | Operation::Update { .. } => match &other.operation { @@ -467,6 +507,29 @@ impl Transaction { Operation::UpdateConfig { .. } => false, _ => true, }, + Operation::DataReplacement { new_datafiles, .. } => match &other.operation { + Operation::Append { .. } + | Operation::Delete { .. } + | Operation::Update { .. } + | Operation::Merge { .. } + | Operation::UpdateConfig { .. } => false, + Operation::CreateIndex { new_indices, .. } => { + // TODO: check that the new indices isn't on the column being replaced + true + } + Operation::Rewrite { groups, .. } => { + // TODO: check that the fragments being replaced are not part of the groups + true + } + Operation::DataReplacement { + old_fragments, + new_datafiles, + } => { + // TODO: check cell conflicts + true + } + _ => true, + }, } } @@ -744,6 +807,90 @@ impl Transaction { Operation::Restore { .. } => { unreachable!() } + Operation::DataReplacement { + old_fragments, + new_datafiles, + } => { + // 0. check we have the same number of old fragments as new data files + if old_fragments.len() != new_datafiles.len() { + return Err(Error::invalid_input( + "Number of old fragments must match number of new data files", + location!(), + )); + } + + // 1. make sure the new files all have the same fields / or empty + if new_datafiles + .iter() + .map(|f| f.fields.clone()) + .collect::>() + .len() + > 1 + { + // TODO: better error message to explain what's wrong + return Err(Error::invalid_input( + "All new data files must have the same fields", + location!(), + )); + } + + // 2. check that the fragments being modified have isomorphic layouts along the columns being replaced + // 3. add modified fragments to final_fragments + for (frag, new_file) in old_fragments.into_iter().zip(new_datafiles) { + let mut new_frag = frag.clone(); + + // TODO: check new file and fragment are the same length + + let mut columns_covered = HashSet::new(); + for file in &mut new_frag.files { + if file.fields == new_file.fields + && file.file_major_version == new_file.file_major_version + && file.file_minor_version == new_file.file_minor_version + { + // assign the new file path to the fragment + file.path = new_file.path.clone(); + } + columns_covered.extend(file.fields.iter()); + } + // SPECIAL CASE: if the column(s) being replaced are not covered by the fragment + // Then it means it's a all-NULL column that is being replaced with real data + // just add it to the final fragments + if columns_covered.is_disjoint(&new_file.fields.iter().collect()) { + new_frag.add_file( + new_file.path.clone(), + new_file.fields.clone(), + new_file.column_indices.clone(), + &LanceFileVersion::try_from_major_minor( + new_file.file_major_version, + new_file.file_minor_version, + ) + .expect("Expected valid file version"), + ); + } + + // Nothing changed in the current fragment, which is not expected -- error out + if &new_frag == frag { + // TODO: better error message to explain what's wrong + return Err(Error::invalid_input( + "Expected to modify the fragment but no changes were made", + location!(), + )); + } + final_fragments.push(new_frag); + } + + let fragments_changed = + final_fragments.iter().map(|f| f.id).collect::>(); + + // 4. push fragments that didn't change back to final_fragments + let unmodified_fragments = maybe_existing_fragments? + .iter() + .filter(|f| !fragments_changed.contains(&f.id)) + .cloned() + .collect::>(); + + final_fragments.extend(unmodified_fragments); + } }; // If a fragment was reserved then it may not belong at the end of the fragments list. @@ -1164,6 +1311,21 @@ impl TryFrom for Transaction { field_metadata, } } + Some(pb::transaction::Operation::DataReplacement( + pb::transaction::DataReplacement { + old_fragments, + new_datafiles, + }, + )) => Operation::DataReplacement { + old_fragments: old_fragments + .into_iter() + .map(Fragment::try_from) + .collect::>>()?, + new_datafiles: new_datafiles + .into_iter() + .map(DataFile::try_from) + .collect::>>()?, + }, None => { return Err(Error::Internal { message: "Transaction message did not contain an operation".to_string(), @@ -1380,6 +1542,13 @@ impl From<&Transaction> for pb::Transaction { }) .unwrap_or(Default::default()), }), + Operation::DataReplacement { + old_fragments, + new_datafiles, + } => pb::transaction::Operation::DataReplacement(pb::transaction::DataReplacement { + old_fragments: old_fragments.iter().map(pb::DataFragment::from).collect(), + new_datafiles: new_datafiles.iter().map(pb::DataFile::from).collect(), + }), }; let blob_operation = value.blobs_op.as_ref().map(|op| match op {