-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19256][SQL] Hive bucketing support #19001
Conversation
|
||
package org.apache.hadoop.hive.ql.io; | ||
|
||
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @tejasapatil
Is this the only actual Hive dependency? Without this, it seems that BucketizedSparkInputFormat
and BucketizedSparkRecordReader
can be promoted to sql/core
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do we gain out of moving it to sql/core
given that they are quite specific for Hive ? I don't see any other use cases besides hive benefiting from it so decided to keep it in sql/hive
and have sql/core
cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Thanks.
Test build #80879 has finished for PR 19001 at commit
|
Jenkins retest this please |
Test build #80885 has finished for PR 19001 at commit
|
Test build #80900 has finished for PR 19001 at commit
|
The R failure looks irrelevant.
|
Retest this please. |
Test build #80908 has finished for PR 19001 at commit
|
02d8711
to
a30b6ce
Compare
Test build #81005 has finished for PR 19001 at commit
|
ping @cloud-fan @gatorsmile |
#19080 is improving the distribution semantic in planner. Will wait for that to get in. |
With the simplified distribution semantic, I think it's much easier to support the hive bucketing. We only need to create a For non-broadcast join, we have the potential to support it, after we make the hash function configurable for |
a30b6ce
to
7b8a072
Compare
Jenkins retest this please |
Test build #86013 has finished for PR 19001 at commit
|
7b8a072
to
3c367a0
Compare
Test build #86074 has finished for PR 19001 at commit
|
Jenkins retest this please |
Test build #86085 has finished for PR 19001 at commit
|
Jenkins retest this please |
Test build #86097 has finished for PR 19001 at commit
|
cc @cloud-fan @gatorsmile @sameeragarwal for review |
val orderingExpr = requiredOrdering | ||
.map(SortOrder(_, Ascending)) | ||
.map(BindReferences.bindReference(_, outputSpec.outputColumns)) | ||
SortExec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing SortExec here and adding it in EnsureRequirements Strategy will have impact on many other DataWritingCommands which depends on FileFormatWriter, like CreateDataSourceTableAsSelectCommand. To fix it code changes are needed onto such DataWritingCommand implementations to export requiredDistribution and requiredOrdering.
} | ||
|
||
/** | ||
* How is `requiredOrdering` determined ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the definition of requiredOrdering here differs from that in InsertIntoHiveTable?
newJob.setInputFormat(inputFormat.getClass()); | ||
|
||
for (int i = 0; i < numBuckets; i++) { | ||
final FileStatus fileStatus = listStatus[i]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic depends on the files are listed in a right order, otherwise the RDD partitions to be joined cannot be zipped correctly. Logic should be fixed here to reorder the files listed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks good, but we should separate this PR into smaller ones.
right: SparkPlan) extends BinaryExecNode with CodegenSupport { | ||
right: SparkPlan, | ||
requiredNumPartitions: Option[Int] = None, | ||
hashingFunctionClass: Class[_ <: HashExpression[Int]] = classOf[Murmur3Hash]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be done in a followup. For the first version we can just add a HiveHashPartitioning
, which can satisfy ClusteredDistribution
(save shuffle for aggregate) but not HashClusteredDistribution
(can't save shuffle for join).
@@ -43,7 +44,13 @@ trait RunnableCommand extends Command { | |||
// `ExecutedCommand` during query planning. | |||
lazy val metrics: Map[String, SQLMetric] = Map.empty | |||
|
|||
def run(sparkSession: SparkSession): Seq[Row] | |||
def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExecutedCommandExec
doesn't call it.
@@ -156,40 +144,14 @@ object FileFormatWriter extends Logging { | |||
statsTrackers = statsTrackers | |||
) | |||
|
|||
// We should first sort by partition columns, then bucket id, and finally sorting columns. | |||
val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we send an individual PR to do this? i.e. do the sorting via requiredOrdering
instead of doing it manually.
Hi all, any updates on this PR? |
I will close this for now |
Will work on this continue in the future? |
What changes were proposed in this pull request?
This PR implements both read and write side changes for supporting hive bucketing in Spark. I had initially created a PR for just the write side changes (#18954) for simplicity. If reviewers want to review reader and writer side changes separately, I am happy to wait for the writer side PR to get merged and then send a new PR for reader side changes.
Semantics for read:
outputPartitioning
while scanning hive table would be the set of bucketing columns (whether its partitioned or not, whether you are reading single partition or multiple partitions)outputOrdering
would be the sort columns (actually prefix subset ofsort columns
being read from the table).HiveTableScanExec
where theoutputPartitioning
andoutputOrdering
is populated for more nitty gritty details.Semantics for write:
Just to compare how sort ordering is expressed for Spark native bucketing:
Why is there a difference ? With hive, since there bucketed insertions would need a shuffle, sort ordering can be relaxed for both non-partitioned and static partition cases. Every RDD partition would get rows corresponding to a single bucket so those can be written to corresponding output file after sort. In case of dynamic partitions, the rows need to be routed to appropriate partition which makes it similar to Spark's constraints.
Overwrite
mode is allowed for hive bucketed tables as any other mode will break the bucketing guarantees of the table. This is a difference wrt how Spark bucketing works.Summary of changes done:
ClusteredDistribution
andHashPartitioning
are modified to store the hashing function used.RunnableCommand
's' can now express the required distribution and ordering. This is used byExecutedCommandExec
which run these commandsFileFormatWriter
which felt out of place. Ideally, this kinda adding of physical nodes should be done within the planner which is what happens with this PR.InsertIntoHiveTable
enforces both distribution and sort orderingInsertIntoHadoopFsRelationCommand
enforces sort ordering ONLY (and not the distribution)HiveTableScanExec
populatesoutputPartitioning
andoutputOrdering
based on table's metadata, configs and the queryHadoopTableReader
to useBucketizedSparkInputFormat
for bucketed readsHow was this patch tested?