Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21649][SQL] Support writing data into hive bucket table. #18866

Closed
wants to merge 2 commits into from

Conversation

jinxing64
Copy link

What changes were proposed in this pull request?

Support writing hive bucket table. Spark internally uses Murmur3Hash for partitioning. We can use hive hash for compatibility when write to bucket table.

How was this patch tested?

Unit test.

@jinxing64
Copy link
Author

I added the unit test referring (https://github.com/apache/hive/blob/branch-1/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java#L393).
Hive will sort bucket files by file names when do SMB join.

def partitionIdExpression: Expression = Pmod(new Murmur3Hash(expressions), Literal(numPartitions))
def partitionIdExpression(useHiveHash: Boolean = false): Expression =
if (useHiveHash) {
Pmod(new HiveHash(expressions), Literal(numPartitions))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that HiveHash simulates Hive's hashing function from Hive v1.2.1.... Is there any compatibility issue for Hive before 1.2.1?

Copy link
Author

@jinxing64 jinxing64 Aug 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Thanks a lot for comment !
I compared code between v0.13(https://github.com/apache/hive/blob/branch-0.13/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java#L496) and v1.2.1(https://github.com/apache/hive/blob/branch-1/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java#L557). In my understanding, there's no compatibility issue. v1.2.1 add hashcode support for more types(INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME), for the existing types, they are compatible with each other.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Let's add a comment for the parameter useHiveHash.

@SparkQA
Copy link

SparkQA commented Aug 7, 2017

Test build #80322 has finished for PR 18866 at commit 51d2c11.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

Jenkins, retest this please.

@@ -103,7 +103,7 @@ object FileFormatWriter extends Logging {
outputSpec: OutputSpec,
hadoopConf: Configuration,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
bucketSpecAndHash: Option[(BucketSpec, Boolean)],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hard to know the meaning of this Option parameter. Let's add a @param for it?

.load(bucketFiles(bucket).getAbsolutePath)
.collect().map(_.getString(0).split("\t")(0).toInt)
.zip((bucket to (100, 4))).foreach { case (a, b) =>
assert(a === b)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks obscure. I need to verify it by calculating HiveHash values for 0 until 100. Maybe we should compute the actual hive hash value here, instead of bucket to (100, 4).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would be better. I've updated, please take another look : )

@SparkQA
Copy link

SparkQA commented Aug 7, 2017

Test build #80341 has finished for PR 18866 at commit 51d2c11.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

@viirya
Please take another look when you have time. I've already updated :)

@SparkQA
Copy link

SparkQA commented Aug 7, 2017

Test build #80348 has finished for PR 18866 at commit 8e4a9ea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 7, 2017

Test build #80349 has finished for PR 18866 at commit 9765a48.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

cc @cloud-fan
Would you mind give some comments? I can keep working on this :)

@cloud-fan
Copy link
Contributor

Hash function is not the only issue, one important difference is: hive will shuffle before write, and make sure one bucket has only one file. Spark doesn't shuffle and each write task may write a file for a bucket. More details please refer to https://docs.google.com/document/d/1a8IDh23RAkrkg9YYAeO51F4aGO8-xAlupKwdshve2fc/edit#heading=h.ualze2k709kj

also cc @tejasapatil

@SparkQA
Copy link

SparkQA commented Aug 13, 2017

Test build #80597 has finished for PR 18866 at commit 19f880b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ClusteredDistribution(clustering: Seq[Expression], clustersOpt: Option[Int] = None,
  • case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int,

@jinxing64 jinxing64 changed the title [SPARK-21649][SQL] Support writing data into hive bucket table. [WIP][SPARK-21649][SQL] Support writing data into hive bucket table. Aug 14, 2017
@SparkQA
Copy link

SparkQA commented Aug 14, 2017

Test build #80637 has finished for PR 18866 at commit 6df2e78.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class HadoopMRTaskCommitStatus(absPathFiles: Map[String, String], commitPaths: Seq[String])
  • case class ClusteredDistribution(clustering: Seq[Expression], clustersOpt: Option[Int] = None,
  • case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int,

@SparkQA
Copy link

SparkQA commented Aug 17, 2017

Test build #80782 has finished for PR 18866 at commit bc2adbd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class HadoopMRTaskCommitStatus(absPathFiles: Map[String, String], commitPaths: Seq[String])
  • case class ClusteredDistribution(clustering: Seq[Expression], clustersOpt: Option[Int] = None,
  • case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int,

@SparkQA
Copy link

SparkQA commented Aug 18, 2017

Test build #80837 has finished for PR 18866 at commit 8c4f794.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class HadoopMRTaskCommitStatus(absPathFiles: Map[String, String], commitPaths: Seq[String])
  • case class ClusteredDistribution(clustering: Seq[Expression], clustersOpt: Option[Int] = None,
  • case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int,

@SparkQA
Copy link

SparkQA commented Aug 19, 2017

Test build #80873 has finished for PR 18866 at commit cd2be58.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jinxing64
Copy link
Author

jinxing64 commented Aug 20, 2017

In current change:

  1. ClusteredDistribution becomes ClusteredDistribution(clustering: Seq[Expression], clustersOpt: Option[Int] = None, useHiveHash: Boolean = false) -- a) number of clusters can be specified; b) useHiveHash indicates whether Hive hash should be used for partitioning.
  2. InsertIntoHiveTable requires distribution ClusteredDistribution(partitionAttributes ++ bucketColumns, Option(bucketSpec.numBuckets), useHiveHash = true). Thus to make sure one bucket has only one file.
  3. The name of bucket file is like part-$jobId$ext, no partition id is shown and bucket id is included in $ext, thus only bucket id decides the order of bucket file names.
  4. Write an empty file for empty bucket.

@jinxing64 jinxing64 changed the title [WIP][SPARK-21649][SQL] Support writing data into hive bucket table. [SPARK-21649][SQL] Support writing data into hive bucket table. Aug 20, 2017
@jinxing64
Copy link
Author

jinxing64 commented Aug 21, 2017

@cloud-fan @gatorsmile @jiangxb1987
Would you give some advice on this ? Thus I can know if I'm on the right direction. I can keep working on it :)

@cloud-fan
Copy link
Contributor

cloud-fan commented Aug 22, 2017

does this work with append? Even you shuffle the data before writing, we still may have multiple files for one bucket.

Is it possible to generalize this patch to data source level? The current approach looks very hacky and is way away from our expection that hive is also a data source.

@jinxing64
Copy link
Author

@cloud-fan
Thanks for reply. Looks like #19001 continues working on this and it's more comprehensive. I will close this pr for now.

@jinxing64 jinxing64 closed this Aug 23, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants