Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for reading multiple sorted files per bucket #731

Merged
merged 17 commits into from
Feb 19, 2021
Merged

Conversation

rahij
Copy link

@rahij rahij commented Feb 17, 2021

PR optimized for a smaller diff from master, as opposed to smaller diff from upstream PR. Single file partitions are now an actual no-op as well.

More details in #730

@rahij rahij requested review from rshkv and mattsills February 17, 2021 10:54
@rshkv
Copy link

rshkv commented Feb 18, 2021

Ok. I suppose this is better. BaseFileScanIterator and FileSortedBucketScanIterator is still core code that's unreviewed upstream which I'm really not a huge fan of. But at least FileScanRDD goes through the old code paths. Thanks for that.

I'll take a closer look at the iterators tomorrow. Think we'll want a second pair of eyes from @lwwmanning or @robert3005 as well.

@rahij
Copy link
Author

rahij commented Feb 18, 2021

The methods in BaseFileScanIterator are also copy pasted from FileScanRDD. The main new code that was added are the methods in FileSortedBucketScanIterator

// The readFunction may read some bytes before consuming the iterator, e.g.,
// vectorized Parquet reader. Here we use lazy val to delay the creation of
// iterator so that we will throw exception in `getNext`.
private lazy val internalIter = readFile(file)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can just pass readFile(currentFile). Why do you need private val file = currentFile?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not modified any code copied from FileScanRDD

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh nvm this is new, I'll change - this was how it was in the upstream PR

Comment on lines +226 to +228
// Set InputFileBlockHolder for the file block's information
currentFile = currentIteratorWithRow.getFile
InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that's to not break input_file_name() (which holds a thread-local of the current file). But I'm not sure that this will work if you read a batch of rows 1..N from different files and only evaluate input_file_name() afterwards. So for row 1 you might get the file name for N.

Not sure. If the tests don't cover it I guess we'll just have to trust.

Copy link

@rshkv rshkv Feb 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I wonder why setting currentFile (which could have side-effects in theory but not in this PR) instead of assigning to a local variable. I guess in case it's read, but I only see the value being read when reading a file. (Which doesn't happen after the first hasNext.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's also used in nextIterator in the BaseScanIterators? Wondering if I should just collapse both of them into a single iterator - the superclass was there from the upstream PR since it had 3 subclasses, but we only have one here. Maybe it might be easier to read.

Comment on lines +486 to +488
// but those files combined together are not globally sorted. With configuration
// "spark.sql.sources.bucketing.sortedScan.enabled" being enabled, sort ordering
// is preserved by reading those sorted files in sort-merge way.
Copy link

@rshkv rshkv Feb 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain how we assert that?

Maybe some fundamental misunderstanding, but I'd expect the query plans change if you enable bucketing.sortedScan.enabled and the inputs are sorted. (Compared to the inputs are stored but the flag is disabled.) Yet you didn't need to update the testBucketing method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BucketedTableTestSpec has a field (expectedSort) to indicate whether there should be a sort in the query plan - we pass that as the negative of if the flag is enabled. So the testBucketing method already has this code to check the query plan:

          joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft,
          s"expected sort in the left child to be $sortLeft but found\n${joinOperator.left}")
        assert(
          joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight,
          s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}")

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, cool

@rshkv
Copy link

rshkv commented Feb 18, 2021

Read through the iterator logic and it looks fine to me. The mechanism seems simple enough. (Note to self: In the first hasNext we call initializeHeapWithFirstRows, which creates iterators for all files in the bucket and puts them in a priority queue. Then on every next() we get the next-priority iterator, get its row, then put the iterator back in the queue. Until all iterators are exhausted. Ok.)

@lwwmanning
Copy link

lgtm 👍🏼

@rshkv rshkv changed the title [smaller diff] add support for reading multiple sorted files per bucket Support for reading multiple sorted files per bucket Feb 19, 2021
@rshkv rshkv merged commit 3195ed8 into master Feb 19, 2021
@rshkv rshkv deleted the rr/round2 branch February 19, 2021 10:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants