Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: allow scanning data in out of order #874

Merged
merged 9 commits into from
May 18, 2023

Conversation

wjones127
Copy link
Contributor

@wjones127 wjones127 commented May 16, 2023

Closes #861.

TODO:

  • Fix various lifetime issues
  • Make sure ordered case still has buffering
  • Expose in pylance
  • Profile performance for both ordered and unordered cases (deferred for a follow up)

@wjones127
Copy link
Contributor Author

Seeing 2x performance in the existing benchmark:

$ git checkout main
$ cargo bench --bench scan -- --save-baseline main
Scan full dataset       time:   [4.2708 ms 4.2930 ms 4.3176 ms]
                        change: [-3.3599% -0.7548% +1.9459%] (p = 0.62 > 0.10)
                        No change in performance detected.
$ git checkout wjones127/861-buffer-unordered
$ cargo bench --bench scan -- --baseline main
Scan full dataset       time:   [2.1718 ms 2.1794 ms 2.1882 ms]
                        change: [-49.475% -48.707% -47.613%] (p = 0.00 < 0.10)
                        Performance has improved.

In a follow up, I'd like to add a benchmark for scanning multiple files, so we can compare the performance of ordered and unordered scans.

@@ -235,6 +235,7 @@ def to_batches(
offset: Optional[int] = None,
nearest: Optional[dict] = None,
batch_readahead: Optional[int] = None,
ordered_scan: bool = True,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we think of this name? Is there another we'd prefer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable. 👍

Or should we name this as batch_ordered? Will people think of this flag as ORDER BY kind of sorting semantic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, batch_ordered to parallel batch_readahead. The only downside of that is I'm thinking that ordered_scan will mean that both fragments and batches will be scanned in order. For example, if the user passes fragments in a specific order, that order is respected as part of the scan.

Actually, I might add fragment_readahead, which will only apply to out-of-order scans. Arrow C++ has a similar option

I'm thinking scan_in_order might be a less ambiguous name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm.

@@ -235,6 +235,7 @@ def to_batches(
offset: Optional[int] = None,
nearest: Optional[dict] = None,
batch_readahead: Optional[int] = None,
ordered_scan: bool = True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable. 👍

Or should we name this as batch_ordered? Will people think of this flag as ORDER BY kind of sorting semantic

def ordered_scan(self, ordered_scan: bool = True) -> ScannerBuilder:
"""
Whether to scan the dataset in order. If set to False, the scanner may
read fragments concurrently and yield batches out of order.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also mention that "out of order" scan might yield to better performance , so that users know what the benefit of this flag is?

@@ -84,6 +84,9 @@ pub struct Scanner {
/// Scan the dataset with a meta column: "_rowid"
with_row_id: bool,

/// Whether to scan in deterministic order (default: true)
ordered: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use batch_ordered or ordered_scan here, in case people confuses it with ORDER BY SQL semantic.

.step_by(read_size)
.map(move |start| (batch_id, start..min(start + read_size, rows_in_batch)))
});
let batch_stream = stream::iter(read_params_iter).map(move |(batch_id, range)| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is smart!

@eddyxu
Copy link
Contributor

eddyxu commented May 17, 2023

Seeing 2x performance in the existing benchmark:

$ git checkout main
$ cargo bench --bench scan -- --save-baseline main
Scan full dataset       time:   [4.2708 ms 4.2930 ms 4.3176 ms]
                        change: [-3.3599% -0.7548% +1.9459%] (p = 0.62 > 0.10)
                        No change in performance detected.
$ git checkout wjones127/861-buffer-unordered
$ cargo bench --bench scan -- --baseline main
Scan full dataset       time:   [2.1718 ms 2.1794 ms 2.1882 ms]
                        change: [-49.475% -48.707% -47.613%] (p = 0.00 < 0.10)
                        Performance has improved.

In a follow up, I'd like to add a benchmark for scanning multiple files, so we can compare the performance of ordered and unordered scans.

Would be curious to see the performance difference between scanning local file from laptop and scan from S3.

@wjones127 wjones127 marked this pull request as ready for review May 18, 2023 00:28
Copy link
Contributor

@eddyxu eddyxu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some questions and needs some clarification of documents. not blocker.
The rest LGTM.

.await
.map_err(|e| DataFusionError::from(e))
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need a buffered/unbuffered here to control the number of parallelism?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's configured in the body of try_new(), at where your other comment is. This method just defines the stream of futures.

open_file(file_fragment, project_schema.clone(), with_row_id)
})
.map_ok(move |reader| {
scan_batches(reader, read_size).buffer_unordered(batch_readahead)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here it means that it reads batch_readhead * fragment_readhead in memory?

How many actual threads are created in this case? It is not clear to me.

Might want to add some document to describe the expectation of # of threads and # of batches in flight

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here it means that it reads batch_readhead * fragment_readhead in memory?

I think you are right, but that's also not what we want. Took a while to figure out how to rearrange it, but I think my latest commit should make it so batch_readahead is enforced across fragments, so no matter how many fragments are read concurrently only batch_readahead batches will be buffered. (I put the buffer_unordered after the flatten.)

How many actual threads are created in this case? It is not clear to me.

As I understand it, tokio will dynamically schedule them across it's thread pool, which defaults to 1 thread per core. From the tokio docs:

The multi-thread scheduler executes futures on a thread pool, using a work-stealing strategy. By default, it will start a worker thread for each CPU core available on the system.

Might want to add some document to describe the expectation of # of threads and # of batches in flight

Agreed. This code can be a little confusing. I've added some comments.

Copy link
Contributor

@eddyxu eddyxu May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I think my latest commit should make it so batch_readahead is enforced across fragments,

I feel it is ok to make batch_readahead just limited to one open fragment, which could be the least surprise with the official pyarrow API ?

https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html

As I understand it, tokio will dynamically schedule them across it's thread pool, which defaults to 1 thread per core.

Oh yes, my previous comment was not clear. I was wondering whether buffer_unordered + try_flatten_unordered will lead to the desired batch_readahead * fragment_readahead. Especially try_flatten_unordered will control the number of fragments opened.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems as desired after reading the doc. lets merge them :)

@wjones127 wjones127 merged commit 8902a74 into main May 18, 2023
@wjones127 wjones127 deleted the wjones127/861-buffer-unordered branch May 18, 2023 15:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[I/O] Allow a flag for Scanner::to_batches() to return batches in non-deterministic order.
2 participants