Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for "take" operation to balanced storage #3079

Merged
merged 6 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions python/python/tests/test_balanced.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

import lance
import pyarrow as pa
import pytest


@pytest.fixture(scope="module")
def big_val():
# 1 MiB per value
return b"0" * 1024 * 1024


# 16 batches of 8 rows = 128 rows
def balanced_datagen(big_val):
for batch_idx in range(16):
start = batch_idx * 8
end = start + 8
values = pa.array([big_val for _ in range(start, end)], pa.large_binary())
idx = pa.array(range(start, end), pa.uint64())
table = pa.record_batch(
[values, idx],
schema=pa.schema(
[
pa.field(
"blobs",
pa.large_binary(),
metadata={
"lance-schema:storage-class": "blob",
},
),
pa.field("idx", pa.uint64()),
]
),
)
yield table


@pytest.fixture
def balanced_dataset(tmp_path, big_val):
# 16 MiB per file, 128 total MiB, so we should have 8 blob files
#
# In addition, max_rows_per_file=64 means we should get 2 regular files
schema = next(iter(balanced_datagen(big_val))).schema
return lance.write_dataset(
balanced_datagen(big_val),
tmp_path / "test_ds",
max_bytes_per_file=16 * 1024 * 1024,
max_rows_per_file=64,
schema=schema,
)


def test_append_then_take(balanced_dataset, tmp_path, big_val):
blob_dir = tmp_path / "test_ds" / "_blobs" / "data"
assert len(list(blob_dir.iterdir())) == 8

# A read will only return non-blob columns
assert balanced_dataset.to_table() == pa.table(
{
"idx": pa.array(range(128), pa.uint64()),
}
)

# Now verify we can append some data
ds = lance.write_dataset(
balanced_datagen(big_val),
tmp_path / "test_ds",
max_bytes_per_file=32 * 1024 * 1024,
schema=balanced_dataset.schema,
mode="append",
)

assert len(list(blob_dir.iterdir())) == 12

assert ds.to_table() == pa.table(
{
"idx": pa.array(list(range(128)) + list(range(128)), pa.uint64()),
}
)

# Verify we can take blob values
row_ids = ds.to_table(columns=[], with_row_id=True).column("_rowid")

take_tbl = ds._take_rows(row_ids.to_pylist(), columns=["idx", "blobs"])

blobs = take_tbl.column("blobs")
for val in blobs:
assert val.as_py() == big_val


def test_delete(balanced_dataset):
# This will delete some of the first fragment (deletion vector) and
# the entire second fragment
balanced_dataset.delete("idx >= 40")

row_ids = balanced_dataset.to_table(columns=[], with_row_id=True).column("_rowid")

assert len(row_ids) == 40

assert balanced_dataset._take_rows(
row_ids.to_pylist(), columns=["idx"]
) == pa.table(
{
"idx": pa.array(list(range(40)), pa.uint64()),
}
)

assert (
len(balanced_dataset._take_rows(row_ids.to_pylist(), columns=["blobs"])) == 40
)

assert len(balanced_dataset._take_rows([100], columns=["idx"])) == 0
assert len(balanced_dataset._take_rows([100], columns=["blobs"])) == 0

assert len(balanced_dataset._take_rows(range(20, 80), columns=["idx"])) == 20
assert len(balanced_dataset._take_rows(range(20, 80), columns=["blobs"])) == 20


# TODO: Once https://github.com/lancedb/lance/pull/3041 merges we will
# want to test partial appends. We need to make sure an append of
# non-blob data is supported. In order to do this we need to make
# sure a blob tx is created that marks the row ids as used so that
# the two row id sequences stay in sync.
Comment on lines +121 to +125
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Thanks for testing the interaction

#
# def test_one_sided_append(balanced_dataset, tmp_path):
# # Write new data, but only to the idx column
# ds = lance.write_dataset(
# pa.table({"idx": pa.array(range(128, 256), pa.uint64())}),
# tmp_path / "test_ds",
# max_bytes_per_file=32 * 1024 * 1024,
# mode="append",
# )

# print(ds.to_table())
65 changes: 0 additions & 65 deletions python/python/tests/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,68 +122,3 @@ def test_take_deleted_blob(tmp_path, dataset_with_blobs):
match="A take operation that includes row addresses must not target deleted",
):
dataset_with_blobs.take_blobs(row_ids, "blobs")


def test_blob_storage_class(tmp_path):
# 1 MiB per value
big_val = "0" * 1024 * 1024

# 16 batches of 8 rows = 128 rows
def datagen():
for batch_idx in range(16):
start = batch_idx * 8
end = start + 8
values = pa.array([big_val for _ in range(start, end)], pa.large_binary())
idx = pa.array(range(start, end), pa.uint64())
table = pa.record_batch(
[values, idx],
schema=pa.schema(
[
pa.field(
"blobs",
pa.large_binary(),
metadata={
"lance-schema:storage-class": "blob",
},
),
pa.field("idx", pa.uint64()),
]
),
)
yield table

schema = next(iter(datagen())).schema
ds = lance.write_dataset(
datagen(),
tmp_path / "test_ds",
max_bytes_per_file=16 * 1024 * 1024,
schema=schema,
)

# 16 MiB per file, 128 total MiB, so we should have 8 files
blob_dir = tmp_path / "test_ds" / "_blobs" / "data"
assert len(list(blob_dir.iterdir())) == 8

# A read will only return non-blob columns
assert ds.to_table() == pa.table(
{
"idx": pa.array(range(128), pa.uint64()),
}
)

# Now verify we can append some data
ds = lance.write_dataset(
datagen(),
tmp_path / "test_ds",
max_bytes_per_file=32 * 1024 * 1024,
schema=schema,
mode="append",
)

assert len(list(blob_dir.iterdir())) == 12

assert ds.to_table() == pa.table(
{
"idx": pa.array(list(range(128)) + list(range(128)), pa.uint64()),
}
)
28 changes: 28 additions & 0 deletions rust/lance-core/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,34 @@ impl Schema {
}
}

/// Splits the schema into two schemas, one with default storage class fields and the other with blob storage class fields.
/// If there are no blob storage class fields, the second schema will be `None`.
/// The order of fields is preserved.
pub fn partition_by_storage_class(&self) -> (Self, Option<Self>) {
Comment on lines +158 to +161
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can only top-level fields be StorageClass::Blob?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is simplest. I should probably add that restriction. We can always add support for sub-fields in the future.

let mut local_fields = Vec::with_capacity(self.fields.len());
let mut sibling_fields = Vec::with_capacity(self.fields.len());
for field in self.fields.iter() {
match field.storage_class() {
StorageClass::Default => local_fields.push(field.clone()),
StorageClass::Blob => sibling_fields.push(field.clone()),
}
}
(
Self {
fields: local_fields,
metadata: self.metadata.clone(),
},
if sibling_fields.is_empty() {
None
} else {
Some(Self {
fields: sibling_fields,
metadata: self.metadata.clone(),
})
},
)
}

pub fn has_dictionary_types(&self) -> bool {
self.fields.iter().any(|f| f.has_dictionary_types())
}
Expand Down
20 changes: 13 additions & 7 deletions rust/lance-core/src/utils/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,27 @@ impl RowAddress {
// A row id that will never be used
pub const TOMBSTONE_ROW: u64 = 0xffffffffffffffff;

pub fn new_from_id(row_id: u64) -> Self {
Self(row_id)
pub fn new_from_u64(row_addr: u64) -> Self {
Self(row_addr)
}

pub fn new_from_parts(fragment_id: u32, row_id: u32) -> Self {
Self(((fragment_id as u64) << 32) | row_id as u64)
pub fn new_from_parts(fragment_id: u32, row_offset: u32) -> Self {
Self(((fragment_id as u64) << 32) | row_offset as u64)
}

pub fn first_row(fragment_id: u32) -> Self {
Self::new_from_parts(fragment_id, 0)
}

pub fn fragment_range(fragment_id: u32) -> Range<u64> {
pub fn address_range(fragment_id: u32) -> Range<u64> {
u64::from(Self::first_row(fragment_id))..u64::from(Self::first_row(fragment_id + 1))
}

pub fn fragment_id(&self) -> u32 {
(self.0 >> 32) as u32
}

pub fn row_id(&self) -> u32 {
pub fn row_offset(&self) -> u32 {
self.0 as u32
}
}
Expand All @@ -44,6 +44,12 @@ impl From<RowAddress> for u64 {
}
}

impl From<u64> for RowAddress {
fn from(row_id: u64) -> Self {
Self(row_id)
}
}

impl std::fmt::Debug for RowAddress {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self) // use Display
Expand All @@ -52,6 +58,6 @@ impl std::fmt::Debug for RowAddress {

impl std::fmt::Display for RowAddress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "({}, {})", self.fragment_id(), self.row_id())
write!(f, "({}, {})", self.fragment_id(), self.row_offset())
}
}
7 changes: 6 additions & 1 deletion rust/lance-core/src/utils/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,12 @@ impl<'a, T: Clone> Stream for SharedStream<'a, T> {
if let Some(polling_side) = inner_state.polling.as_ref() {
if *polling_side != self.side {
// Another task is already polling the inner stream, so we don't need to do anything
debug_assert!(inner_state.waker.is_none());

// Per rust docs:
// Note that on multiple calls to poll, only the Waker from the Context
// passed to the most recent call should be scheduled to receive a wakeup.
//
// So it is safe to replace a potentially stale waker here.
inner_state.waker = Some(cx.waker().clone());
return std::task::Poll::Pending;
}
Expand Down
29 changes: 25 additions & 4 deletions rust/lance-datafusion/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@ use crate::{
planner::Planner,
};

#[derive(Debug)]
pub struct ProjectionPlan {
/// The physical schema (before dynamic projection) that must be loaded from the dataset
pub physical_schema: Arc<Schema>,
pub physical_df_schema: Arc<DFSchema>,

/// The schema of the sibling fields that must be loaded
pub sibling_schema: Option<Arc<Schema>>,

/// The expressions for all the columns to be in the output
/// Note: this doesn't include _distance, and _rowid
pub requested_output_expr: Option<Vec<(Expr, String)>>,
Expand Down Expand Up @@ -98,7 +102,9 @@ impl ProjectionPlan {
output.insert(output_name.as_ref().to_string(), expr);
}

let mut physical_schema = Arc::new(base_schema.project(&physical_cols)?);
let physical_schema = Arc::new(base_schema.project(&physical_cols)?);
let (physical_schema, sibling_schema) = physical_schema.partition_by_storage_class();
let mut physical_schema = Arc::new(physical_schema);
if !load_blobs {
physical_schema = Self::unload_blobs(&physical_schema);
}
Expand All @@ -112,22 +118,37 @@ impl ProjectionPlan {
let physical_df_schema = Arc::new(DFSchema::try_from(physical_arrow_schema).unwrap());
Ok(Self {
physical_schema,
sibling_schema: sibling_schema.map(Arc::new),
physical_df_schema,
requested_output_expr,
})
}

pub fn new_empty(base_schema: Arc<Schema>, load_blobs: bool) -> Self {
let base_schema = if !load_blobs {
let (physical_schema, sibling_schema) = base_schema.partition_by_storage_class();
Self::inner_new(
Arc::new(physical_schema),
load_blobs,
sibling_schema.map(Arc::new),
)
}

pub fn inner_new(
base_schema: Arc<Schema>,
load_blobs: bool,
sibling_schema: Option<Arc<Schema>>,
) -> Self {
let physical_schema = if !load_blobs {
Self::unload_blobs(&base_schema)
} else {
base_schema
};

let physical_arrow_schema = ArrowSchema::from(base_schema.as_ref());
let physical_arrow_schema = ArrowSchema::from(physical_schema.as_ref());
let physical_df_schema = Arc::new(DFSchema::try_from(physical_arrow_schema).unwrap());
Self {
physical_schema: base_schema,
physical_schema,
sibling_schema,
physical_df_schema,
requested_output_expr: None,
}
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/scalar/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl Index for FlatIndex {
.ids()
.as_primitive::<UInt64Type>()
.iter()
.map(|row_id| RowAddress::new_from_id(row_id.unwrap()).fragment_id())
.map(|row_id| RowAddress::from(row_id.unwrap()).fragment_id())
.collect::<Vec<_>>();
frag_ids.sort();
frag_ids.dedup();
Expand Down
3 changes: 3 additions & 0 deletions rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ pub struct Manifest {
pub writer_version: Option<WriterVersion>,

/// Fragments, the pieces to build the dataset.
///
/// This list is stored in order, sorted by fragment id. However, the fragment id
/// sequence may have gaps.
pub fragments: Arc<Vec<Fragment>>,

/// The file position of the version aux data.
Expand Down
Loading
Loading