Skip to content

Commit

Permalink
feat: allow replacement of entire datafile when the schema lines up c…
Browse files Browse the repository at this point in the history
…orrectly
  • Loading branch information
chebbyChefNEQ committed Feb 3, 2025
1 parent c73d717 commit bea9bd0
Show file tree
Hide file tree
Showing 5 changed files with 688 additions and 16 deletions.
9 changes: 8 additions & 1 deletion protos/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ message Transaction {
}
}

// An operation that replaces the data in a region of the table with new data.
message DataReplacement {
repeated uint64 old_fragment_ids = 1;
repeated DataFile new_datafiles = 2;
}

// The operation of this transaction.
oneof operation {
Append append = 100;
Expand All @@ -186,11 +192,12 @@ message Transaction {
Update update = 108;
Project project = 109;
UpdateConfig update_config = 110;
DataReplacement data_replacement = 111;
}

// An operation to apply to the blob dataset
oneof blob_operation {
Append blob_append = 200;
Overwrite blob_overwrite = 202;
}
}
}
11 changes: 10 additions & 1 deletion python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
)
from .dependencies import numpy as np
from .dependencies import pandas as pd
from .fragment import FragmentMetadata, LanceFragment
from .fragment import DataFile, FragmentMetadata, LanceFragment
from .lance import (
CleanupStats,
Compaction,
Expand Down Expand Up @@ -2935,6 +2935,15 @@ class CreateIndex(BaseOperation):
dataset_version: int
fragment_ids: Set[int]

@dataclass
class DataReplacement(BaseOperation):
"""
Operation that replaces existing datafiles in the dataset.
"""

old_fragment_ids: List[int]
new_datafiles: List[DataFile]


class ScannerBuilder:
def __init__(self, ds: LanceDataset):
Expand Down
24 changes: 24 additions & 0 deletions python/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ impl FromPyObject<'_> for PyLance<Operation> {
};
Ok(Self(op))
}
"DataReplacement" => {
let old_fragment_ids = ob.getattr("old_fragment_ids")?.extract::<Vec<u64>>()?;
let new_datafiles = extract_vec(&ob.getattr("new_datafiles")?)?;

let op = Operation::DataReplacement {
old_fragment_ids,
new_datafiles,
};

Ok(Self(op))
}
unsupported => Err(PyValueError::new_err(format!(
"Unsupported operation: {unsupported}",
))),
Expand Down Expand Up @@ -172,6 +183,19 @@ impl ToPyObject for PyLance<&Operation> {
.unwrap()
.to_object(py)
}
Operation::DataReplacement {
old_fragment_ids,
new_datafiles,
} => {
let old_fragment_ids = old_fragment_ids.to_object(py);
let new_datafiles = export_vec(py, new_datafiles.as_slice());
let cls = namespace
.getattr("DataReplacement")
.expect("Failed to get DataReplacement class");
cls.call1((old_fragment_ids, new_datafiles))
.unwrap()
.to_object(py)
}
_ => todo!(),
}
}
Expand Down
Loading

0 comments on commit bea9bd0

Please sign in to comment.