-
Notifications
You must be signed in to change notification settings - Fork 245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
wip: allow scanning data in out of order #874
Conversation
Seeing 2x performance in the existing benchmark: $ git checkout main
$ cargo bench --bench scan -- --save-baseline main
Scan full dataset time: [4.2708 ms 4.2930 ms 4.3176 ms]
change: [-3.3599% -0.7548% +1.9459%] (p = 0.62 > 0.10)
No change in performance detected. $ git checkout wjones127/861-buffer-unordered
$ cargo bench --bench scan -- --baseline main
Scan full dataset time: [2.1718 ms 2.1794 ms 2.1882 ms]
change: [-49.475% -48.707% -47.613%] (p = 0.00 < 0.10)
Performance has improved. In a follow up, I'd like to add a benchmark for scanning multiple files, so we can compare the performance of ordered and unordered scans. |
python/python/lance/dataset.py
Outdated
@@ -235,6 +235,7 @@ def to_batches( | |||
offset: Optional[int] = None, | |||
nearest: Optional[dict] = None, | |||
batch_readahead: Optional[int] = None, | |||
ordered_scan: bool = True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do we think of this name? Is there another we'd prefer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable. 👍
Or should we name this as batch_ordered
? Will people think of this flag as ORDER BY
kind of sorting semantic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, batch_ordered
to parallel batch_readahead
. The only downside of that is I'm thinking that ordered_scan
will mean that both fragments and batches will be scanned in order. For example, if the user passes fragments in a specific order, that order is respected as part of the scan.
Actually, I might add fragment_readahead
, which will only apply to out-of-order scans. Arrow C++ has a similar option
I'm thinking scan_in_order
might be a less ambiguous name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sgtm.
python/python/lance/dataset.py
Outdated
@@ -235,6 +235,7 @@ def to_batches( | |||
offset: Optional[int] = None, | |||
nearest: Optional[dict] = None, | |||
batch_readahead: Optional[int] = None, | |||
ordered_scan: bool = True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable. 👍
Or should we name this as batch_ordered
? Will people think of this flag as ORDER BY
kind of sorting semantic
python/python/lance/dataset.py
Outdated
def ordered_scan(self, ordered_scan: bool = True) -> ScannerBuilder: | ||
""" | ||
Whether to scan the dataset in order. If set to False, the scanner may | ||
read fragments concurrently and yield batches out of order. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you also mention that "out of order" scan might yield to better performance , so that users know what the benefit of this flag is?
@@ -84,6 +84,9 @@ pub struct Scanner { | |||
/// Scan the dataset with a meta column: "_rowid" | |||
with_row_id: bool, | |||
|
|||
/// Whether to scan in deterministic order (default: true) | |||
ordered: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use batch_ordered
or ordered_scan
here, in case people confuses it with ORDER BY
SQL semantic.
.step_by(read_size) | ||
.map(move |start| (batch_id, start..min(start + read_size, rows_in_batch))) | ||
}); | ||
let batch_stream = stream::iter(read_params_iter).map(move |(batch_id, range)| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is smart!
Would be curious to see the performance difference between scanning local file from laptop and scan from S3. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions and needs some clarification of documents. not blocker.
The rest LGTM.
.await | ||
.map_err(|e| DataFusionError::from(e)) | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need a buffered/unbuffered
here to control the number of parallelism?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's configured in the body of try_new()
, at where your other comment is. This method just defines the stream of futures.
rust/src/io/exec/scan.rs
Outdated
open_file(file_fragment, project_schema.clone(), with_row_id) | ||
}) | ||
.map_ok(move |reader| { | ||
scan_batches(reader, read_size).buffer_unordered(batch_readahead) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So here it means that it reads batch_readhead * fragment_readhead
in memory?
How many actual threads are created in this case? It is not clear to me.
Might want to add some document to describe the expectation of # of threads and # of batches in flight
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So here it means that it reads batch_readhead * fragment_readhead in memory?
I think you are right, but that's also not what we want. Took a while to figure out how to rearrange it, but I think my latest commit should make it so batch_readahead
is enforced across fragments, so no matter how many fragments are read concurrently only batch_readahead
batches will be buffered. (I put the buffer_unordered after the flatten.)
How many actual threads are created in this case? It is not clear to me.
As I understand it, tokio will dynamically schedule them across it's thread pool, which defaults to 1 thread per core. From the tokio docs:
The multi-thread scheduler executes futures on a thread pool, using a work-stealing strategy. By default, it will start a worker thread for each CPU core available on the system.
Might want to add some document to describe the expectation of # of threads and # of batches in flight
Agreed. This code can be a little confusing. I've added some comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but I think my latest commit should make it so batch_readahead is enforced across fragments,
I feel it is ok to make batch_readahead
just limited to one open fragment, which could be the least surprise with the official pyarrow API ?
https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html
As I understand it, tokio will dynamically schedule them across it's thread pool, which defaults to 1 thread per core.
Oh yes, my previous comment was not clear. I was wondering whether buffer_unordered
+ try_flatten_unordered
will lead to the desired batch_readahead * fragment_readahead
. Especially try_flatten_unordered
will control the number of fragments opened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems as desired after reading the doc. lets merge them :)
Closes #861.
TODO: