Skip to content

Commit

Permalink
Update spark-avro dependency
Browse files Browse the repository at this point in the history
Author: Josh Rosen <[email protected]>
Author: Michael Armbrust <[email protected]>

Closes #75 from marmbrus/newAvro.
  • Loading branch information
JoshRosen committed Sep 8, 2015
1 parent dad9a44 commit a9198c3
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 8 deletions.
22 changes: 19 additions & 3 deletions project/SparkRedshiftBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,12 @@ object SparkRedshiftBuild extends Build {
"com.amazonaws" % "aws-java-sdk-sts" % "1.9.40" % "test",
// We require spark-avro, but avro-mapred must be provided to match Hadoop version.
// In most cases, avro-mapred will be provided as part of the Spark assembly JAR.
"com.databricks" %% "spark-avro" % "1.0.0",
"org.apache.avro" % "avro-mapred" % "1.7.6" % "provided" exclude("org.mortbay.jetty", "servlet-api"),
"com.databricks" %% "spark-avro" % "2.0.0",
if (testHadoopVersion.value.startsWith("1")) {
"org.apache.avro" % "avro-mapred" % "1.7.7" % "provided" classifier "hadoop1" exclude("org.mortbay.jetty", "servlet-api")
} else {
"org.apache.avro" % "avro-mapred" % "1.7.7" % "provided" classifier "hadoop2" exclude("org.mortbay.jetty", "servlet-api")
},
// A Redshift-compatible JDBC driver must be present on the classpath for spark-redshift to work.
// For testing, we use an Amazon driver, which is available from
// http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html
Expand All @@ -98,9 +102,21 @@ object SparkRedshiftBuild extends Build {
"org.apache.spark" %% "spark-sql" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") force(),
"org.apache.spark" %% "spark-hive" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") force()
),
// Although spark-avro declares its avro-mapred dependency as `provided`, its version of the
// dependency can still end up on the classpath during tests, which breaks the tests for
// Hadoop 1.x. To work around this, we filter out the incompatible JARs here:
(fullClasspath in Test) := (if (testHadoopVersion.value.startsWith("1")) {
(fullClasspath in Test).value.filterNot {
x => x.data.getName.contains("hadoop2") && x.data.getName.contains("avro")
}
} else {
(fullClasspath in Test).value.filterNot {
x => x.data.getName.contains("hadoop1") && x.data.getName.contains("avro")
}
}),
ScoverageSbtPlugin.ScoverageKeys.coverageHighlighting := {
if (scalaBinaryVersion.value == "2.10") false
else false
else true
},
logBuffered := false,
// Display full-length stacktraces from ScalaTest:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package com.databricks.spark.redshift

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred._
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat}

class DirectOutputCommitter extends OutputCommitter {
override def setupJob(jobContext: JobContext): Unit = { }
Expand All @@ -40,9 +42,9 @@ class DirectOutputCommitter extends OutputCommitter {
* This mimics the behavior of FileOutputCommitter, reusing the same file name and conf option.
*/
override def commitJob(context: JobContext): Unit = {
val conf = context.getJobConf
val conf = context.getConfiguration
if (shouldCreateSuccessFile(conf)) {
val outputPath = FileOutputFormat.getOutputPath(conf)
val outputPath = FileOutputFormat.getOutputPath(context)
if (outputPath != null) {
val fileSys = outputPath.getFileSystem(conf)
val filePath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME)
Expand All @@ -52,7 +54,7 @@ class DirectOutputCommitter extends OutputCommitter {
}

/** By default, we do create the _SUCCESS file, but we allow it to be turned off. */
private def shouldCreateSuccessFile(conf: JobConf): Boolean = {
private def shouldCreateSuccessFile(conf: Configuration): Boolean = {
conf.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class RedshiftSourceSuite
sc.hadoopConfiguration.set("fs.s3n.impl", classOf[S3NInMemoryFileSystem].getName)
// We need to use a DirectOutputCommitter to work around an issue which occurs with renames
// while using the mocked S3 filesystem.
sc.hadoopConfiguration.set("mapred.output.committer.class",
sc.hadoopConfiguration.set("spark.sql.sources.outputCommitterClass",
classOf[DirectOutputCommitter].getName)
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "test1")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "test2")
Expand Down

0 comments on commit a9198c3

Please sign in to comment.