-
Notifications
You must be signed in to change notification settings - Fork 245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add file compaction #1095
Conversation
rust/src/dataset/scanner.rs
Outdated
@@ -667,6 +667,12 @@ impl Stream for DatasetRecordBatchStream { | |||
} | |||
} | |||
|
|||
impl From<DatasetRecordBatchStream> for SendableRecordBatchStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH, I'm not entirely clear why we have our own wrapper for SendableRecordBatchStream
. Seems like it would be much less of a headache to just use the DataFusion type and make sure we provide a conversion trait for the errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets just use Datafusion if feasible.
a215a34
to
e86d2d7
Compare
rust/src/dataset/optimize.rs
Outdated
/// This method tries to preserve the insertion order of rows in the dataset. | ||
/// | ||
/// If no compaction is needed, this method will not make a new version of the table. | ||
pub async fn compact_files( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concerning that this is global level planning + action (compaction). It might be not feasible for a large dataset. Should we split the plan and action into two phases?
rust/src/dataset/optimize.rs
Outdated
.buffer_unordered(options.num_concurrent_jobs); | ||
|
||
// Prepare this so we can assign ids to the new fragments. | ||
let mut current_fragment_id = dataset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to lock the dataset to prevent other writer obtain these fragment ids.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a concurrent writer, one of them will fail. Eventually, we'll implement retries, at which point the losing writer will need to recompute the fragment ids starting at the new max.
rust/src/dataset/optimize.rs
Outdated
// TODO: replace this with from_previous | ||
let mut manifest = Manifest::new(dataset.schema(), Arc::new(final_fragments)); | ||
|
||
manifest.version = dataset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we abstract this away into a new commit() operation.
rust/src/dataset/optimize.rs
Outdated
} | ||
|
||
impl CompactionPlan { | ||
fn with_capacity(n: usize) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does capacity mean here (as the user of CompactPlan)?
Do we want to make progressive plan , as only compact up to certain number of fragment each time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is private right now. with_capacity
is there mostly for performance reasons, since we know we'll have the full fragment list stored there.
rust/src/dataset/scanner.rs
Outdated
@@ -667,6 +667,12 @@ impl Stream for DatasetRecordBatchStream { | |||
} | |||
} | |||
|
|||
impl From<DatasetRecordBatchStream> for SendableRecordBatchStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets just use Datafusion if feasible.
python/python/lance/dataset.py
Outdated
materialize_deletions_threshold=materialize_deletions_threshold, | ||
num_concurrent_jobs=num_concurrent_jobs, | ||
) | ||
return _compact_files(self._dataset._ds, opts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make these two phases (plan + (potentially distributed
) execution). I would see many cases we need to be able to run this distributely.
python/src/dataset.rs
Outdated
@@ -562,3 +562,83 @@ pub(crate) fn get_write_params(options: &PyDict) -> PyResult<Option<WriteParams> | |||
}; | |||
Ok(params) | |||
} | |||
|
|||
pub mod optimize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just separate this to another file under /dataset/
?
rust/src/dataset/optimize.rs
Outdated
/// Options to be passed to [compact_files]. | ||
#[derive(Debug, Clone)] | ||
pub struct CompactionOptions { | ||
/// Target number of rows per file. Defaults to 1 million. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is one thing that i'd like to do for long time, is that using stats of column to find a better encoding when doing compaction. For example, in Procella paper, it actually do two phase write to determine the optimal perf/cost of encodings, and rewrite the data in background.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree we should use stats to determine encoding, but IMO we should do that at the page level. So as we write pages, we first collect stats, then pass that to the encoder. It chooses the encoding based on the stats (all values the same => Constant encoding, distinct count small => dictionary encoding, etc.). That's part of the motivation for variable encodings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. With that in mind, it means that we need to put page encoding metadata with each page?
Also are there chances that we want to merge/split pages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also are there chances that we want to merge/split pages?
This should do that automatically, since it reads the data in as Arrow, and then streams that into the writer.
With that in mind, it means that we need to put page encoding metadata with each page?
Yup. That's part of the variable encodings design.
rust/src/dataset/optimize.rs
Outdated
/// to compact is represented as a range of indices in the `fragments` list. We | ||
/// also track the ranges of fragments that should be kept as-is, to make it | ||
/// easier to build the final list of fragments. | ||
struct CompactionPlan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we consider "rewrite for better encoding", this is not just a compact plan, right? It is like a Optimize(Storage)Plan
.
e86d2d7
to
a443248
Compare
aaef7bb
to
a93a5eb
Compare
Note: need to refactor for this: #1127 (comment) |
a93a5eb
to
789a31d
Compare
789a31d
to
50e8068
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing but minor thoughts. This looks great. I wonder if we might have other "plan then execute" distributed tasks in the future (e.g. building an index) and this serves as a good template.
|
||
When files are rewritten, the original row ids are invalidated. This means the | ||
affected files are no longer part of any ANN index if they were before. Because | ||
of this, it's recommended to rewrite files before re-building indices. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
of this, it's recommended to rewrite files before re-building indices. | |
of this, it's recommended to rebuild indices after rewriting files. |
I'm not sure if you are trying to say "after you rewrite you need to make sure and rebuild indices" or "if you're going to build indices, you should probably rewrite first so you don't lose the index data later" but I think the former is more essential to communicate (though both could also be said if you want)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I say it this way because I don't think you have to rebuild the index after compaction. If the part that was compacted was data that wasn't indexed in the first place (because it was recently appended), you don't necessarily have to rebuild the index. But it is always a waste to build indices and then do compaction, if you are planning on doing both.
materialize_deletions_threshold: float, default 0.1 | ||
The fraction of original rows that are soft deleted in a fragment | ||
before the fragment is a candidate for compaction. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I would think a simple limit (e.g. 100 soft deleted rows) would be fine here. Why go with proportion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't say I have great justification, without having any benchmarks. I've heard engineers on similar projects say 10% was the threshold they saw deteriorations in scan performance, so I guess here's a little cargo-culting :)
I guess I could setup a benchmark parametrized by scale and proportion deleted to see how scan performance is affected. And we can see if a fixed value or a proportional value allows for the most broadly applicable default value.
da29586
to
73ed44d
Compare
wip: start outlining compaction impl wip: implement compaction complete impl docs: add good rust docs feat: expose compact_files in python fix: handle deletions better docs: rewrite for final api pr feedback wip: refactor for tasks feat: add distributed compaction API test distributed wip: add python distributed bindings Python api and docs migrate to transaction api format Apply suggestions from code review Co-authored-by: Weston Pace <[email protected]> make more flexible
73ed44d
to
6b14c40
Compare
Closes #934