Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add file compaction #1095

Merged
merged 4 commits into from
Sep 11, 2023
Merged

feat: add file compaction #1095

merged 4 commits into from
Sep 11, 2023

Conversation

wjones127
Copy link
Contributor

@wjones127 wjones127 commented Jul 26, 2023

Closes #934

rust/src/dataset/write.rs Outdated Show resolved Hide resolved
@@ -667,6 +667,12 @@ impl Stream for DatasetRecordBatchStream {
}
}

impl From<DatasetRecordBatchStream> for SendableRecordBatchStream {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, I'm not entirely clear why we have our own wrapper for SendableRecordBatchStream. Seems like it would be much less of a headache to just use the DataFusion type and make sure we provide a conversion trait for the errors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets just use Datafusion if feasible.

@wjones127 wjones127 force-pushed the wjones127/table-maintenance branch 2 times, most recently from a215a34 to e86d2d7 Compare July 27, 2023 20:42
@wjones127 wjones127 marked this pull request as ready for review July 27, 2023 21:10
@wjones127 wjones127 requested review from eddyxu and gsilvestrin July 27, 2023 21:10
/// This method tries to preserve the insertion order of rows in the dataset.
///
/// If no compaction is needed, this method will not make a new version of the table.
pub async fn compact_files(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concerning that this is global level planning + action (compaction). It might be not feasible for a large dataset. Should we split the plan and action into two phases?

.buffer_unordered(options.num_concurrent_jobs);

// Prepare this so we can assign ids to the new fragments.
let mut current_fragment_id = dataset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to lock the dataset to prevent other writer obtain these fragment ids.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a concurrent writer, one of them will fail. Eventually, we'll implement retries, at which point the losing writer will need to recompute the fragment ids starting at the new max.

// TODO: replace this with from_previous
let mut manifest = Manifest::new(dataset.schema(), Arc::new(final_fragments));

manifest.version = dataset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we abstract this away into a new commit() operation.

}

impl CompactionPlan {
fn with_capacity(n: usize) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does capacity mean here (as the user of CompactPlan)?

Do we want to make progressive plan , as only compact up to certain number of fragment each time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is private right now. with_capacity is there mostly for performance reasons, since we know we'll have the full fragment list stored there.

@@ -667,6 +667,12 @@ impl Stream for DatasetRecordBatchStream {
}
}

impl From<DatasetRecordBatchStream> for SendableRecordBatchStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets just use Datafusion if feasible.

materialize_deletions_threshold=materialize_deletions_threshold,
num_concurrent_jobs=num_concurrent_jobs,
)
return _compact_files(self._dataset._ds, opts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make these two phases (plan + (potentially distributed) execution). I would see many cases we need to be able to run this distributely.

@@ -562,3 +562,83 @@ pub(crate) fn get_write_params(options: &PyDict) -> PyResult<Option<WriteParams>
};
Ok(params)
}

pub mod optimize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just separate this to another file under /dataset/?

/// Options to be passed to [compact_files].
#[derive(Debug, Clone)]
pub struct CompactionOptions {
/// Target number of rows per file. Defaults to 1 million.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one thing that i'd like to do for long time, is that using stats of column to find a better encoding when doing compaction. For example, in Procella paper, it actually do two phase write to determine the optimal perf/cost of encodings, and rewrite the data in background.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree we should use stats to determine encoding, but IMO we should do that at the page level. So as we write pages, we first collect stats, then pass that to the encoder. It chooses the encoding based on the stats (all values the same => Constant encoding, distinct count small => dictionary encoding, etc.). That's part of the motivation for variable encodings.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. With that in mind, it means that we need to put page encoding metadata with each page?
Also are there chances that we want to merge/split pages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also are there chances that we want to merge/split pages?

This should do that automatically, since it reads the data in as Arrow, and then streams that into the writer.

With that in mind, it means that we need to put page encoding metadata with each page?

Yup. That's part of the variable encodings design.

rust/src/dataset/optimize.rs Outdated Show resolved Hide resolved
/// to compact is represented as a range of indices in the `fragments` list. We
/// also track the ranges of fragments that should be kept as-is, to make it
/// easier to build the final list of fragments.
struct CompactionPlan {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we consider "rewrite for better encoding", this is not just a compact plan, right? It is like a Optimize(Storage)Plan.

@wjones127 wjones127 force-pushed the wjones127/table-maintenance branch from e86d2d7 to a443248 Compare August 3, 2023 04:09
@wjones127 wjones127 marked this pull request as draft August 4, 2023 23:49
@wjones127 wjones127 force-pushed the wjones127/table-maintenance branch 2 times, most recently from aaef7bb to a93a5eb Compare August 15, 2023 00:01
@eddyxu eddyxu requested review from westonpace and chebbyChefNEQ and removed request for gsilvestrin August 15, 2023 00:56
@wjones127
Copy link
Contributor Author

Note: need to refactor for this: #1127 (comment)

@wjones127 wjones127 force-pushed the wjones127/table-maintenance branch from a93a5eb to 789a31d Compare August 28, 2023 19:30
@wjones127 wjones127 marked this pull request as ready for review August 28, 2023 20:08
@wjones127 wjones127 force-pushed the wjones127/table-maintenance branch from 789a31d to 50e8068 Compare September 5, 2023 20:13
Copy link
Contributor

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing but minor thoughts. This looks great. I wonder if we might have other "plan then execute" distributed tasks in the future (e.g. building an index) and this serves as a good template.

docs/read_and_write.rst Show resolved Hide resolved

When files are rewritten, the original row ids are invalidated. This means the
affected files are no longer part of any ANN index if they were before. Because
of this, it's recommended to rewrite files before re-building indices.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
of this, it's recommended to rewrite files before re-building indices.
of this, it's recommended to rebuild indices after rewriting files.

I'm not sure if you are trying to say "after you rewrite you need to make sure and rebuild indices" or "if you're going to build indices, you should probably rewrite first so you don't lose the index data later" but I think the former is more essential to communicate (though both could also be said if you want)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I say it this way because I don't think you have to rebuild the index after compaction. If the part that was compacted was data that wasn't indexed in the first place (because it was recently appended), you don't necessarily have to rebuild the index. But it is always a waste to build indices and then do compaction, if you are planning on doing both.

python/python/lance/dataset.py Outdated Show resolved Hide resolved
Comment on lines +1003 to +1143
materialize_deletions_threshold: float, default 0.1
The fraction of original rows that are soft deleted in a fragment
before the fragment is a candidate for compaction.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I would think a simple limit (e.g. 100 soft deleted rows) would be fine here. Why go with proportion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't say I have great justification, without having any benchmarks. I've heard engineers on similar projects say 10% was the threshold they saw deteriorations in scan performance, so I guess here's a little cargo-culting :)

I guess I could setup a benchmark parametrized by scale and proportion deleted to see how scan performance is affected. And we can see if a fixed value or a proportional value allows for the most broadly applicable default value.

python/python/lance/dataset.py Show resolved Hide resolved
rust/src/dataset/optimize.rs Outdated Show resolved Hide resolved
rust/src/dataset/optimize.rs Outdated Show resolved Hide resolved
rust/src/dataset/optimize.rs Outdated Show resolved Hide resolved
rust/src/dataset/optimize.rs Outdated Show resolved Hide resolved
rust/src/dataset/optimize.rs Outdated Show resolved Hide resolved
@wjones127 wjones127 force-pushed the wjones127/table-maintenance branch 2 times, most recently from da29586 to 73ed44d Compare September 11, 2023 19:36
wip: start outlining compaction impl

wip: implement compaction

complete impl

docs: add good rust docs

feat: expose compact_files in python

fix: handle deletions better

docs: rewrite for final api

pr feedback

wip: refactor for tasks

feat: add distributed compaction API

test distributed

wip: add python distributed bindings

Python api and docs

migrate to transaction api

format

Apply suggestions from code review

Co-authored-by: Weston Pace <[email protected]>

make more flexible
@wjones127 wjones127 force-pushed the wjones127/table-maintenance branch from 73ed44d to 6b14c40 Compare September 11, 2023 20:28
@wjones127 wjones127 merged commit 816d85c into main Sep 11, 2023
@wjones127 wjones127 deleted the wjones127/table-maintenance branch September 11, 2023 23:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Fragment::optimize() / compact() to regenerate the fragment
3 participants