diff --git a/python/python/tests/test_balanced.py b/python/python/tests/test_balanced.py index 4e9ecd338d..e2713c38c9 100644 --- a/python/python/tests/test_balanced.py +++ b/python/python/tests/test_balanced.py @@ -199,6 +199,80 @@ def test_compaction(tmp_path, big_val): ) +def test_schema(balanced_dataset): + # Schema should contain blob columns + assert balanced_dataset.schema == pa.schema( + [ + pa.field( + "blobs", + pa.large_binary(), + metadata={ + "lance-schema:storage-class": "blob", + }, + ), + pa.field("idx", pa.uint64()), + ] + ) + + +def test_sample(balanced_dataset): + assert balanced_dataset.sample(10, columns=["idx"]).num_rows == 10 + # Not the most obvious error but hopefully not long lived + with pytest.raises( + OSError, match="Not supported.*mapping from row addresses to row ids" + ): + assert balanced_dataset.sample(10).num_rows == 10 + with pytest.raises( + OSError, match="Not supported.*mapping from row addresses to row ids" + ): + assert balanced_dataset.sample(10, columns=["blobs"]).num_rows == 10 + + +def test_add_columns(tmp_path, balanced_dataset): + # Adding columns should be fine as long as we don't try to use the blob + # column in any way + + balanced_dataset.add_columns( + { + "idx2": "idx * 2", + } + ) + + assert balanced_dataset.to_table() == pa.table( + { + "idx": pa.array(range(128), pa.uint64()), + "idx2": pa.array(range(0, 256, 2), pa.uint64()), + } + ) + + with pytest.raises( + OSError, match="Not supported.*adding columns.*scanning non-default storage" + ): + balanced_dataset.add_columns({"blobs2": "blobs"}) + + +def test_unsupported(balanced_dataset, big_val): + # The following operations are not yet supported and we need to make + # sure they fail with a useful error message + + # Updates & merge-insert are not supported. They add new rows and we + # will need to make sure the sibling datasets are kept in sync. + + with pytest.raises( + ValueError, match="Not supported.*Updating.*non-default storage" + ): + balanced_dataset.update({"idx": "0"}) + + with pytest.raises( + # This error could be nicer but it's fine for now + OSError, + match="Not supported.*Scanning.*non-default storage", + ): + balanced_dataset.merge_insert("idx").when_not_matched_insert_all().execute( + make_table(0, 1, big_val) + ) + + # TODO: Once https://github.com/lancedb/lance/pull/3041 merges we will # want to test partial appends. We need to make sure an append of # non-blob data is supported. In order to do this we need to make diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 7668156888..d470f2f7ee 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1199,12 +1199,7 @@ impl Dataset { row_indices: &[u64], projection: impl Into, ) -> Result { - take::take( - self, - row_indices, - &projection.into().into_projection_plan(self.schema())?, - ) - .await + take::take(self, row_indices, projection.into()).await } /// Take Rows by the internal ROW ids. @@ -1605,10 +1600,6 @@ impl Dataset { .collect()) } - // Leaving this here so it is more obvious to future readers that we can do this and - // someone doesn't go off and create a new function to do this. Delete this comment - // if you use this method. - #[allow(unused)] pub(crate) async fn filter_deleted_addresses(&self, addrs: &[u64]) -> Result> { self.filter_addr_or_ids(addrs, addrs).await } diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index d80967a4ce..9ecabca749 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -1186,6 +1186,14 @@ impl FileFragment { } schema = schema.project(&projection)?; } + + if schema.fields.iter().any(|f| !f.is_default_storage()) { + return Err(Error::NotSupported { + source: "adding columns whose value depends on scanning non-default storage".into(), + location: location!(), + }); + } + // If there is no projection, we at least need to read the row addresses with_row_addr |= schema.fields.is_empty(); diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index f3ff60dd3f..56fb12792e 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -8,9 +8,8 @@ use crate::dataset::rowids::get_row_id_index; use crate::{Error, Result}; use arrow::{array::as_struct_array, compute::concat_batches, datatypes::UInt64Type}; use arrow_array::cast::AsArray; -use arrow_array::{Array, RecordBatch, StructArray, UInt64Array}; +use arrow_array::{RecordBatch, StructArray, UInt64Array}; use arrow_schema::{Field as ArrowField, Schema as ArrowSchema}; -use arrow_select::interleave::interleave; use datafusion::error::DataFusionError; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use futures::{Future, Stream, StreamExt, TryStreamExt}; @@ -26,142 +25,70 @@ use super::{fragment::FileFragment, scanner::DatasetRecordBatchStream, Dataset}; pub async fn take( dataset: &Dataset, - row_indices: &[u64], - projection: &ProjectionPlan, + offsets: &[u64], + projection: ProjectionRequest, ) -> Result { - if row_indices.is_empty() { + let projection = projection.into_projection_plan(dataset.schema())?; + + if offsets.is_empty() { return Ok(RecordBatch::new_empty(Arc::new( projection.output_schema()?, ))); } - let mut sorted_indices: Vec = (0..row_indices.len()).collect(); - sorted_indices.sort_by_key(|&i| row_indices[i]); - - let fragments = dataset.get_fragments().into_iter().map(Arc::new); - - // We will split into sub-requests for each fragment. - let mut sub_requests: Vec<(Arc, Range)> = Vec::new(); - // We will remap the row indices to the original row indices, using a pair - // of (request position, position in request) - let mut remap_index: Vec<(usize, usize)> = vec![(0, 0); row_indices.len()]; - let mut local_ids_buffer: Vec = Vec::with_capacity(row_indices.len()); - - let mut fragments_iter = fragments.into_iter(); - let mut current_fragment = fragments_iter.next().ok_or_else(|| Error::InvalidInput { - source: "Called take on an empty dataset.".to_string().into(), - location: location!(), - })?; - let mut current_fragment_len = current_fragment.count_rows().await?; - let mut curr_fragment_offset: u64 = 0; - let mut current_fragment_end = current_fragment_len as u64; - let mut start = 0; - let mut end = 0; - // We want to keep track of the previous row_index to detect duplicates - // index takes. To start, we pick a value that is guaranteed to be different - // from the first row_index. - let mut previous_row_index: u64 = row_indices[sorted_indices[0]] + 1; - let mut previous_sorted_index: usize = 0; - - for index in sorted_indices { - // Get the index - let row_index = row_indices[index]; - - if previous_row_index == row_index { - // If we have a duplicate index request we add a remap_index - // entry that points to the original index request. - remap_index[index] = remap_index[previous_sorted_index]; - continue; - } else { - previous_sorted_index = index; - previous_row_index = row_index; - } - - // If the row index is beyond the current fragment, iterate - // until we find the fragment that contains it. - while row_index >= current_fragment_end { - // If we have a non-empty sub-request, add it to the list - if end - start > 0 { - // If we have a non-empty sub-request, add it to the list - sub_requests.push((current_fragment, start..end)); - } + // First, convert the dataset offsets into row addresses + let fragments = dataset.get_fragments(); - start = end; + let mut perm = permutation::sort(offsets); + let sorted_offsets = perm.apply_slice(offsets); - current_fragment = fragments_iter.next().ok_or_else(|| Error::InvalidInput { - source: format!( - "Row index {} is beyond the range of the dataset.", - row_index - ) - .into(), - location: location!(), - })?; - curr_fragment_offset += current_fragment_len as u64; - current_fragment_len = current_fragment.count_rows().await?; - current_fragment_end = curr_fragment_offset + current_fragment_len as u64; + let mut frag_iter = fragments.iter(); + let mut cur_frag = frag_iter.next(); + let mut cur_frag_rows = if let Some(cur_frag) = cur_frag { + cur_frag.count_rows().await? as u64 + } else { + 0 + }; + let mut frag_offset = 0; + + let mut addrs = Vec::with_capacity(sorted_offsets.len()); + for sorted_offset in sorted_offsets.into_iter() { + while cur_frag.is_some() && sorted_offset >= frag_offset + cur_frag_rows { + frag_offset += cur_frag_rows; + cur_frag = frag_iter.next(); + cur_frag_rows = if let Some(cur_frag) = cur_frag { + cur_frag.count_rows().await? as u64 + } else { + 0 + }; } - - // Note that we cast to u32 *after* subtracting the offset, - // since it is possible for the global index to be larger than - // u32::MAX. - let local_index = (row_index - curr_fragment_offset) as u32; - local_ids_buffer.push(local_index); - - remap_index[index] = (sub_requests.len(), end - start); - - end += 1; + let Some(cur_frag) = cur_frag else { + addrs.push(RowAddress::TOMBSTONE_ROW); + continue; + }; + let row_addr = + RowAddress::new_from_parts(cur_frag.id() as u32, (sorted_offset - frag_offset) as u32); + addrs.push(u64::from(row_addr)); } - // flush last batch - if end - start > 0 { - sub_requests.push((current_fragment, start..end)); - } + // Restore the original order + perm.apply_inv_slice_in_place(&mut addrs); - let local_ids_buffer = Arc::new(local_ids_buffer); + let builder = TakeBuilder::try_new_from_addresses( + Arc::new(dataset.clone()), + addrs, + Arc::new(projection), + )?; - let take_tasks = sub_requests - .into_iter() - .map(|(fragment, indices_range)| { - let local_ids_buffer = local_ids_buffer.clone(); - let physical_schema = projection.physical_schema.clone(); - async move { - let local_ids = &local_ids_buffer[indices_range]; - fragment.take(local_ids, &physical_schema).await - } - }) - .collect::>(); - let take_stream = futures::stream::iter(take_tasks) - .buffered(dataset.object_store.io_parallelism()) - .map_err(|err| DataFusionError::External(err.into())) - .boxed(); - let take_stream = Box::pin(RecordBatchStreamAdapter::new( - projection.arrow_schema_ref(), - take_stream, - )); - let take_stream = projection.project_stream(take_stream)?; - let batches = take_stream.try_collect::>().await?; - - let struct_arrs: Vec = batches.into_iter().map(StructArray::from).collect(); - let refs: Vec<_> = struct_arrs.iter().map(|x| x as &dyn Array).collect(); - let reordered = interleave(&refs, &remap_index)?; - Ok(as_struct_array(&reordered).into()) + take_rows(builder).await } /// Take rows by the internal ROW ids. async fn do_take_rows( - builder: TakeBuilder, + mut builder: TakeBuilder, projection: Arc, ) -> Result { - let row_addrs = if let Some(row_id_index) = get_row_id_index(&builder.dataset).await? { - let addresses = builder - .row_ids - .iter() - .filter_map(|id| row_id_index.get(*id).map(|address| address.into())) - .collect::>(); - addresses - } else { - builder.row_ids - }; + let row_addrs = builder.get_row_addrs().await?.clone(); if row_addrs.is_empty() { // It is possible that `row_id_index` returns None when a fragment has been wholly deleted @@ -382,7 +309,7 @@ async fn zip_takes( } async fn take_rows(builder: TakeBuilder) -> Result { - if builder.row_ids.is_empty() { + if builder.is_empty() { return Ok(RecordBatch::new_empty(Arc::new( builder.projection.output_schema()?, ))); @@ -394,7 +321,7 @@ async fn take_rows(builder: TakeBuilder) -> Result { // If we have sibling columns then we load those in parallel to the local // columns and zip the results together. let sibling_take = if let Some(sibling_schema) = projection.sibling_schema.as_ref() { - let filtered_row_ids = builder.dataset.filter_deleted_ids(&builder.row_ids).await?; + let filtered_row_ids = builder.get_filtered_ids().await?; if filtered_row_ids.is_empty() { return Ok(RecordBatch::new_empty(Arc::new( builder.projection.output_schema()?, @@ -411,7 +338,8 @@ async fn take_rows(builder: TakeBuilder) -> Result { // The sibling take only takes valid row ids and sibling columns let mut builder = builder.clone(); builder.dataset = sibling_ds; - builder.row_ids = filtered_row_ids; + builder.row_ids = Some(filtered_row_ids); + builder.row_addrs = None; let blobs_projection = Arc::new(ProjectionPlan::inner_new( sibling_schema.clone(), false, @@ -509,7 +437,8 @@ fn check_row_addrs(row_ids: &[u64]) -> RowAddressStats { #[derive(Clone, Debug)] pub struct TakeBuilder { dataset: Arc, - row_ids: Vec, + row_ids: Option>, + row_addrs: Option>, projection: Arc, with_row_address: bool, } @@ -522,13 +451,29 @@ impl TakeBuilder { projection: ProjectionRequest, ) -> Result { Ok(Self { - row_ids, + row_ids: Some(row_ids), + row_addrs: None, projection: Arc::new(projection.into_projection_plan(dataset.schema())?), dataset, with_row_address: false, }) } + /// Create a new `TakeBuilder` for taking by address + pub fn try_new_from_addresses( + dataset: Arc, + addresses: Vec, + projection: Arc, + ) -> Result { + Ok(Self { + row_ids: None, + row_addrs: Some(addresses), + projection, + dataset, + with_row_address: false, + }) + } + /// Adds row addresses to the output pub fn with_row_address(mut self, with_row_address: bool) -> Self { self.with_row_address = with_row_address; @@ -539,6 +484,52 @@ impl TakeBuilder { pub async fn execute(self) -> Result { take_rows(self).await } + + pub fn is_empty(&self) -> bool { + match (self.row_ids.as_ref(), self.row_addrs.as_ref()) { + (Some(ids), _) => ids.is_empty(), + (_, Some(addrs)) => addrs.is_empty(), + _ => unreachable!(), + } + } + + async fn get_filtered_ids(&self) -> Result> { + match (self.row_ids.as_ref(), self.row_addrs.as_ref()) { + (Some(ids), _) => self.dataset.filter_deleted_ids(ids).await, + (_, Some(addrs)) => { + let _filtered_addresses = self.dataset.filter_deleted_addresses(addrs).await?; + // TODO: Create an inverse mapping from addresses to ids + // This path is currently encountered in the "take by dataset offsets" case. + // Another solution could be to translate dataset offsets into row ids instead + // of translating them into row addresses. + Err(Error::NotSupported { + source: "mapping from row addresses to row ids".into(), + location: location!(), + }) + } + _ => unreachable!(), + } + } + + async fn get_row_addrs(&mut self) -> Result<&Vec> { + if self.row_addrs.is_none() { + let row_ids = self + .row_ids + .as_ref() + .expect("row_ids must be set if row_addrs is not"); + let addrs = if let Some(row_id_index) = get_row_id_index(&self.dataset).await? { + let addresses = row_ids + .iter() + .filter_map(|id| row_id_index.get(*id).map(|address| address.into())) + .collect::>(); + addresses + } else { + row_ids.clone() + }; + self.row_addrs = Some(addrs); + } + Ok(self.row_addrs.as_ref().unwrap()) + } } #[cfg(test)] diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index 43ae216792..a2a37cdfcc 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -160,6 +160,19 @@ impl UpdateBuilder { // pub fn with_write_params(mut self, params: WriteParams) -> Self { ... } pub fn build(self) -> Result { + if self + .dataset + .schema() + .fields + .iter() + .any(|f| !f.is_default_storage()) + { + return Err(Error::NotSupported { + source: "Updating datasets containing non-default storage columns".into(), + location: location!(), + }); + } + let mut updates = HashMap::new(); let planner = Planner::new(Arc::new(self.dataset.schema().into()));