Skip to content

Commit

Permalink
feat: allow replacement of entire datafile when the schema lines up c…
Browse files Browse the repository at this point in the history
…orrectly
  • Loading branch information
chebbyChefNEQ committed Feb 1, 2025
1 parent c73d717 commit 4d50cac
Show file tree
Hide file tree
Showing 3 changed files with 457 additions and 15 deletions.
9 changes: 8 additions & 1 deletion protos/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ message Transaction {
}
}

// An operation that replaces the data in a region of the table with new data.
message DataReplacement {
repeated DataFragment old_fragments = 1;
repeated DataFile new_datafiles = 2;
}

// The operation of this transaction.
oneof operation {
Append append = 100;
Expand All @@ -186,11 +192,12 @@ message Transaction {
Update update = 108;
Project project = 109;
UpdateConfig update_config = 110;
DataReplacement data_replacement = 111;
}

// An operation to apply to the blob dataset
oneof blob_operation {
Append blob_append = 200;
Overwrite blob_overwrite = 202;
}
}
}
268 changes: 267 additions & 1 deletion rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1744,12 +1744,13 @@ mod tests {
use lance_arrow::bfloat16::{self, ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME};
use lance_core::datatypes::LANCE_STORAGE_CLASS_SCHEMA_META_KEY;
use lance_datagen::{array, gen, BatchCount, Dimension, RowCount};
use lance_file::v2::writer::FileWriter;
use lance_file::version::LanceFileVersion;
use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams};
use lance_index::{scalar::ScalarIndexParams, vector::DIST_COL, DatasetIndexExt, IndexType};
use lance_linalg::distance::MetricType;
use lance_table::feature_flags;
use lance_table::format::WriterVersion;
use lance_table::format::{DataFile, WriterVersion};
use lance_table::io::commit::RenameCommitHandler;
use lance_table::io::deletion::read_deletion_file;
use lance_testing::datagen::generate_random_array;
Expand Down Expand Up @@ -5148,4 +5149,269 @@ mod tests {
assert!(result.is_err());
assert!(matches!(result, Err(Error::SchemaMismatch { .. })));
}

#[tokio::test]
async fn test_datafile_replacement() {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"a",
DataType::Int32,
true,
)]));
let empty_reader = RecordBatchIterator::new(vec![], schema.clone());
let dataset = Arc::new(
Dataset::write(empty_reader, "memory://", None)
.await
.unwrap(),
);
dataset.validate().await.unwrap();

// Test empty replacement should commit a new manifest and do nothing
let mut dataset = Dataset::commit(
WriteDestination::Dataset(dataset.clone()),
Operation::DataReplacement {
old_fragments: vec![],
new_datafiles: vec![],
},
Some(1),
None,
None,
Arc::new(Default::default()),
false,
)
.await
.unwrap();
dataset.validate().await.unwrap();

assert_eq!(dataset.version().version, 2);
assert_eq!(dataset.get_fragments().len(), 0);

// try the same thing on a non-empty dataset
let vals: Int32Array = vec![1, 2, 3].into();
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vals)]).unwrap();
dataset
.append(
RecordBatchIterator::new(vec![Ok(batch)], schema.clone()),
None,
)
.await
.unwrap();

let dataset = Dataset::commit(
WriteDestination::Dataset(Arc::new(dataset)),
Operation::DataReplacement {
old_fragments: vec![],
new_datafiles: vec![],
},
Some(3),
None,
None,
Arc::new(Default::default()),
false,
)
.await
.unwrap();
dataset.validate().await.unwrap();

assert_eq!(dataset.version().version, 4);
assert_eq!(dataset.get_fragments().len(), 1);

let batch = dataset.scan().try_into_batch().await.unwrap();
assert_eq!(batch.num_rows(), 3);
assert_eq!(
batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.values(),
&[1, 2, 3]
);

// write a new datafile
let object_writer = dataset
.object_store
.create(&Path::try_from("data/test.lance").unwrap())
.await
.unwrap();
let mut writer = FileWriter::try_new(
object_writer,
schema.as_ref().try_into().unwrap(),
Default::default(),
)
.unwrap();

let vals: Int32Array = vec![4, 5, 6].into();
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vals)]).unwrap();
writer.write_batch(&batch).await.unwrap();
writer.finish().await.unwrap();

// find the datafile we want to replace
let frag = dataset.get_fragment(0).unwrap();
let data_file = frag.data_file_for_field(0).unwrap();
let mut new_data_file = data_file.clone();
new_data_file.path = "test.lance".to_string();
let fragments = dataset
.get_fragments()
.into_iter()
.map(|f| f.metadata)
.collect();

let dataset = Dataset::commit(
WriteDestination::Dataset(Arc::new(dataset)),
Operation::DataReplacement {
old_fragments: fragments,
new_datafiles: vec![new_data_file],
},
Some(5),
None,
None,
Arc::new(Default::default()),
false,
)
.await
.unwrap();

assert_eq!(dataset.version().version, 5);
assert_eq!(dataset.get_fragments().len(), 1);

let batch = dataset.scan().try_into_batch().await.unwrap();
assert_eq!(batch.num_rows(), 3);
assert_eq!(
batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.values(),
&[4, 5, 6]
);
}

#[tokio::test]
async fn test_datafile_partial_replacement() {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"a",
DataType::Int32,
true,
)]));
let empty_reader = RecordBatchIterator::new(vec![], schema.clone());
let mut dataset = Dataset::write(empty_reader, "memory://", None)
.await
.unwrap();
dataset.validate().await.unwrap();

let vals: Int32Array = vec![1, 2, 3].into();
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vals)]).unwrap();
dataset
.append(
RecordBatchIterator::new(vec![Ok(batch)], schema.clone()),
None,
)
.await
.unwrap();

let fragment = dataset.get_fragments().pop().unwrap().metadata;

let extened_schema = Arc::new(ArrowSchema::new(vec![

Check warning on line 5315 in rust/lance/src/dataset.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"extened" should be "extended".
ArrowField::new("a", DataType::Int32, true),
ArrowField::new("b", DataType::Int32, true),
]));

// add all null column
let dataset = Dataset::commit(
WriteDestination::Dataset(Arc::new(dataset)),
Operation::Merge {
fragments: vec![fragment],
schema: extened_schema.as_ref().try_into().unwrap(),

Check warning on line 5325 in rust/lance/src/dataset.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"extened" should be "extended".
},
Some(2),
None,
None,
Arc::new(Default::default()),
false,
)
.await
.unwrap();

let partial_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"b",
DataType::Int32,
true,
)]));

// write a new datafile
let object_writer = dataset
.object_store
.create(&Path::try_from("data/test.lance").unwrap())
.await
.unwrap();
let mut writer = FileWriter::try_new(
object_writer,
partial_schema.as_ref().try_into().unwrap(),
Default::default(),
)
.unwrap();

let vals: Int32Array = vec![4, 5, 6].into();
let batch = RecordBatch::try_new(partial_schema.clone(), vec![Arc::new(vals)]).unwrap();
writer.write_batch(&batch).await.unwrap();
writer.finish().await.unwrap();

// find the datafile we want to replace
let new_data_file = DataFile {
path: "test.lance".to_string(),
// the second column in the dataset
fields: vec![1],
// is located in the first column of this datafile
column_indices: vec![0],
file_major_version: 2,
file_minor_version: 0,
};

let fragments = dataset
.get_fragments()
.into_iter()
.map(|f| f.metadata)
.collect();

let dataset = Dataset::commit(
WriteDestination::Dataset(Arc::new(dataset)),
Operation::DataReplacement {
old_fragments: fragments,
new_datafiles: vec![new_data_file],
},
Some(3),
None,
None,
Arc::new(Default::default()),
false,
)
.await
.unwrap();

assert_eq!(dataset.version().version, 4);
assert_eq!(dataset.get_fragments().len(), 1);
assert_eq!(dataset.get_fragments()[0].metadata.files.len(), 2);

let batch = dataset.scan().try_into_batch().await.unwrap();
assert_eq!(batch.num_rows(), 3);
assert_eq!(
batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.values(),
&[1, 2, 3]
);
assert_eq!(
batch
.column(1)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.values(),
&[4, 5, 6]
);
}
}
Loading

0 comments on commit 4d50cac

Please sign in to comment.