Skip to content

Commit

Permalink
Adds support for compacting the local files. Support for compacting s…
Browse files Browse the repository at this point in the history
…ibling datasets can be added in future PR
  • Loading branch information
westonpace committed Nov 5, 2024
1 parent 83439ef commit 5fe765f
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 35 deletions.
3 changes: 3 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3040,6 +3040,9 @@ def compact_files(
max_rows_per_group: int, default 1024
Max number of rows per group. This does not affect which fragments
need compaction, but does affect how they are re-written if selected.
This setting only affects datasets using the legacy storage format.
The newer format does not require row groups.
max_bytes_per_file: Optional[int], default None
Max number of bytes in a single file. This does not affect which
fragments need compaction, but does affect how they are re-written if
Expand Down
131 changes: 106 additions & 25 deletions python/python/tests/test_balanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,45 @@ def big_val():
return b"0" * 1024 * 1024


def make_table(offset, num_rows, big_val):
end = offset + num_rows
values = pa.array([big_val for _ in range(num_rows)], pa.large_binary())
idx = pa.array(range(offset, end), pa.uint64())
table = pa.record_batch(
[values, idx],
schema=pa.schema(
[
pa.field(
"blobs",
pa.large_binary(),
metadata={
"lance-schema:storage-class": "blob",
},
),
pa.field("idx", pa.uint64()),
]
),
)
return table


# 16 batches of 8 rows = 128 rows
def balanced_datagen(big_val):
for batch_idx in range(16):
start = batch_idx * 8
end = start + 8
values = pa.array([big_val for _ in range(start, end)], pa.large_binary())
idx = pa.array(range(start, end), pa.uint64())
table = pa.record_batch(
[values, idx],
schema=pa.schema(
[
pa.field(
"blobs",
pa.large_binary(),
metadata={
"lance-schema:storage-class": "blob",
},
),
pa.field("idx", pa.uint64()),
]
),
)
yield table
def balanced_datagen(big_val, rows_per_batch, num_batches, offset=0):
for batch_idx in range(num_batches):
start = offset + (batch_idx * rows_per_batch)
yield make_table(start, rows_per_batch, big_val)


@pytest.fixture
def balanced_dataset(tmp_path, big_val):
# 16 MiB per file, 128 total MiB, so we should have 8 blob files
#
# In addition, max_rows_per_file=64 means we should get 2 regular files
schema = next(iter(balanced_datagen(big_val))).schema
rows_per_batch = 8
num_batches = 16
schema = next(iter(balanced_datagen(big_val, 1, 1))).schema
return lance.write_dataset(
balanced_datagen(big_val),
balanced_datagen(big_val, rows_per_batch, num_batches),
tmp_path / "test_ds",
max_bytes_per_file=16 * 1024 * 1024,
max_rows_per_file=64,
Expand All @@ -64,8 +70,10 @@ def test_append_then_take(balanced_dataset, tmp_path, big_val):
)

# Now verify we can append some data
rows_per_batch = 8
num_batches = 16
ds = lance.write_dataset(
balanced_datagen(big_val),
balanced_datagen(big_val, rows_per_batch, num_batches),
tmp_path / "test_ds",
max_bytes_per_file=32 * 1024 * 1024,
schema=balanced_dataset.schema,
Expand Down Expand Up @@ -118,6 +126,79 @@ def test_delete(balanced_dataset):
assert len(balanced_dataset._take_rows(range(20, 80), columns=["blobs"])) == 20


def test_scan(balanced_dataset):
# Scan without any special arguments should only return non-blob columns
expected = pa.table({"idx": pa.array(range(128), pa.uint64())})
assert balanced_dataset.to_table() == expected
assert balanced_dataset.to_table(columns=["idx"]) == expected
# Can filter on regular columns
assert balanced_dataset.to_table(columns=["idx"], filter="idx < 1000") == expected

# Scan with blob column specified should fail (currently, will support in future
# but need to make sure it fails currently so users don't shoot themselves in the
# foot)
with pytest.raises(
ValueError, match="Not supported.*Scanning.*non-default storage"
):
balanced_dataset.to_table(columns=["idx", "blobs"])
with pytest.raises(
ValueError, match="Not supported.*Scanning.*non-default storage"
):
balanced_dataset.to_table(columns=["blobs"])

# Can't filter on blob columns either
with pytest.raises(
ValueError,
match="Not supported.*non-default storage columns cannot be used as filters",
):
balanced_dataset.to_table(columns=["idx"], filter="blobs IS NOT NULL")


def test_compaction(tmp_path, big_val):
# Make a bunch of small 1-row writes
schema = next(iter(balanced_datagen(big_val, 1, 1))).schema
for write_idx in range(40):
lance.write_dataset(
balanced_datagen(big_val, 1, 1, offset=write_idx),
tmp_path / "test_ds",
max_bytes_per_file=16 * 1024 * 1024,
max_rows_per_file=64,
schema=schema,
mode="append",
)
# Run compaction. Normal storage should compact to 1 file. Blob storage
# should compact to 3 files (40MB over 16MB per file)
ds = lance.dataset(tmp_path / "test_ds")
ds.optimize.compact_files(max_bytes_per_file=16 * 1024 * 1024)

assert len(ds.get_fragments()) == 1

# TODO: Add support for compacting the blob files. For now, we just leave them
# uncompacted
assert len(list((tmp_path / "test_ds" / "_blobs" / "data").iterdir())) == 40

# Make sure we can still scan / take

assert ds.to_table() == pa.table(
{
"idx": pa.array(range(40), pa.uint64()),
}
)
row_ids = ds.to_table(columns=[], with_row_id=True).column("_rowid")
assert row_ids.to_pylist() == list(range(40))

assert ds._take_rows(row_ids.to_pylist(), columns=["idx"]) == pa.table(
{
"idx": pa.array(range(40), pa.uint64()),
}
)
assert ds._take_rows(row_ids.to_pylist(), columns=["blobs"]) == pa.table(
{
"blobs": pa.array([big_val for _ in range(40)], pa.large_binary()),
}
)


# TODO: Once https://github.com/lancedb/lance/pull/3041 merges we will
# want to test partial appends. We need to make sure an append of
# non-blob data is supported. In order to do this we need to make
Expand Down
37 changes: 28 additions & 9 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,16 @@ impl Scanner {
&mut self,
columns: &[(impl AsRef<str>, impl AsRef<str>)],
) -> Result<&mut Self> {
let physical_schema = self.scan_output_schema(true)?;
let base_schema = self.scan_output_schema(self.dataset.schema(), true)?;
self.projection_plan =
ProjectionPlan::try_new(&physical_schema, columns, /*load_blobs=*/ false)?;
ProjectionPlan::try_new(&base_schema, columns, /*load_blobs=*/ false)?;
if self.projection_plan.sibling_schema.is_some() {
return Err(Error::NotSupported {
source: "Scanning columns with non-default storage class is not yet supported"
.into(),
location: location!(),
});
}
Ok(self)
}

Expand Down Expand Up @@ -859,15 +866,17 @@ impl Scanner {
///
/// This includes columns that are added by the scan but don't exist in the dataset
/// schema (e.g. _distance, _rowid, _rowaddr)
pub(crate) fn scan_output_schema(&self, force_row_id: bool) -> Result<Arc<Schema>> {
pub(crate) fn scan_output_schema(
&self,
base_schema: &Schema,
force_row_id: bool,
) -> Result<Arc<Schema>> {
let extra_columns = self.get_extra_columns(force_row_id);

let schema = if !extra_columns.is_empty() {
self.projection_plan
.physical_schema
.merge(&ArrowSchema::new(extra_columns))?
base_schema.merge(&ArrowSchema::new(extra_columns))?
} else {
self.projection_plan.physical_schema.as_ref().clone()
base_schema.clone()
};

// drop metadata
Expand All @@ -888,7 +897,10 @@ impl Scanner {
// Append the extra columns
let mut output_expr = self.projection_plan.to_physical_exprs()?;

let physical_schema = ArrowSchema::from(self.scan_output_schema(false)?.as_ref());
let physical_schema = ArrowSchema::from(
self.scan_output_schema(&self.projection_plan.physical_schema, false)?
.as_ref(),
);

// distance goes before the row_id column
if self.nearest.is_some() && output_expr.iter().all(|(_, name)| name != DIST_COL) {
Expand Down Expand Up @@ -1043,6 +1055,12 @@ impl Scanner {
// which do not exist in the dataset schema but are added by the scan. We can ignore
// those as eager columns.
let filter_schema = self.dataset.schema().project_or_drop(&columns)?;
if filter_schema.fields.iter().any(|f| !f.is_default_storage()) {
return Err(Error::NotSupported {
source: "non-default storage columns cannot be used as filters".into(),
location: location!(),
});
}
let physical_schema = self.projection_plan.physical_schema.clone();
let remaining_schema = physical_schema.exclude(&filter_schema)?;

Expand Down Expand Up @@ -1367,7 +1385,8 @@ impl Scanner {
}

// Stage 5: take remaining columns required for projection
let physical_schema = self.scan_output_schema(false)?;
let physical_schema =
self.scan_output_schema(&self.projection_plan.physical_schema, false)?;
let remaining_schema = physical_schema.exclude(plan.schema().as_ref())?;
if !remaining_schema.fields.is_empty() {
plan = self.take(plan, &remaining_schema, self.batch_readahead)?;
Expand Down
4 changes: 3 additions & 1 deletion rust/lance/src/dataset/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,9 @@ pub async fn write_fragments_internal(
(schema, params.storage_version_or_default())
};

let (data, blob_data) = data.extract_blob_stream(&schema);
let data_schema = schema.project_by_schema(data.schema().as_ref())?;

let (data, blob_data) = data.extract_blob_stream(&data_schema);

// Some params we borrow from the normal write, some we override
let blob_write_params = WriteParams {
Expand Down

0 comments on commit 5fe765f

Please sign in to comment.