-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for reading multiple sorted files per bucket #731
Conversation
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSortedMergeScanRDD.scala
Show resolved
Hide resolved
Ok. I suppose this is better. I'll take a closer look at the iterators tomorrow. Think we'll want a second pair of eyes from @lwwmanning or @robert3005 as well. |
The methods in |
// The readFunction may read some bytes before consuming the iterator, e.g., | ||
// vectorized Parquet reader. Here we use lazy val to delay the creation of | ||
// iterator so that we will throw exception in `getNext`. | ||
private lazy val internalIter = readFile(file) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can just pass readFile(currentFile)
. Why do you need private val file = currentFile
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not modified any code copied from FileScanRDD
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh nvm this is new, I'll change - this was how it was in the upstream PR
// Set InputFileBlockHolder for the file block's information | ||
currentFile = currentIteratorWithRow.getFile | ||
InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that's to not break input_file_name()
(which holds a thread-local of the current file). But I'm not sure that this will work if you read a batch of rows 1..N from different files and only evaluate input_file_name()
afterwards. So for row 1 you might get the file name for N.
Not sure. If the tests don't cover it I guess we'll just have to trust.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I wonder why setting currentFile
(which could have side-effects in theory but not in this PR) instead of assigning to a local variable. I guess in case it's read, but I only see the value being read when reading a file. (Which doesn't happen after the first hasNext
.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's also used in nextIterator in the BaseScanIterators? Wondering if I should just collapse both of them into a single iterator - the superclass was there from the upstream PR since it had 3 subclasses, but we only have one here. Maybe it might be easier to read.
// but those files combined together are not globally sorted. With configuration | ||
// "spark.sql.sources.bucketing.sortedScan.enabled" being enabled, sort ordering | ||
// is preserved by reading those sorted files in sort-merge way. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain how we assert that?
Maybe some fundamental misunderstanding, but I'd expect the query plans change if you enable bucketing.sortedScan.enabled
and the inputs are sorted. (Compared to the inputs are stored but the flag is disabled.) Yet you didn't need to update the testBucketing
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BucketedTableTestSpec
has a field (expectedSort) to indicate whether there should be a sort in the query plan - we pass that as the negative of if the flag is enabled. So the testBucketing method already has this code to check the query plan:
joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft,
s"expected sort in the left child to be $sortLeft but found\n${joinOperator.left}")
assert(
joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight,
s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, cool
Read through the iterator logic and it looks fine to me. The mechanism seems simple enough. (Note to self: In the first |
lgtm 👍🏼 |
PR optimized for a smaller diff from master, as opposed to smaller diff from upstream PR. Single file partitions are now an actual no-op as well.
More details in #730