From 1cf70183423b938ec064925b20fd4a5b9e355991 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Sat, 2 Apr 2016 19:17:25 -0700 Subject: [PATCH] =?UTF-8?q?[SPARK-14056]=20Appends=20s3=20specific=20confi?= =?UTF-8?q?gurations=20and=20spark.hadoop=20con=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changes were proposed in this pull request? Appends s3 specific configurations and spark.hadoop configurations to hive configuration. ## How was this patch tested? Tested by running a job on cluster. …figurations to hive configuration. Author: Sital Kedia Closes #11876 from sitalkedia/hiveConf. --- .../apache/spark/deploy/SparkHadoopUtil.scala | 19 +++++++++++++------ .../apache/spark/sql/hive/TableReader.scala | 4 ++-- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 06b7b388ca543..4e8e36363599c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -74,13 +74,12 @@ class SparkHadoopUtil extends Logging { } } - /** - * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop - * subsystems. - */ - def newConfiguration(conf: SparkConf): Configuration = { - val hadoopConf = new Configuration() + /** + * Appends S3-specific, spark.hadoop.*, and spark.buffer.size configurations to a Hadoop + * configuration. + */ + def appendS3AndSparkHadoopConfigurations(conf: SparkConf, hadoopConf: Configuration): Unit = { // Note: this null check is around more than just access to the "conf" object to maintain // the behavior of the old implementation of this code, for backwards compatibility. if (conf != null) { @@ -106,7 +105,15 @@ class SparkHadoopUtil extends Logging { val bufferSize = conf.get("spark.buffer.size", "65536") hadoopConf.set("io.file.buffer.size", bufferSize) } + } + /** + * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop + * subsystems. + */ + def newConfiguration(conf: SparkConf): Configuration = { + val hadoopConf = new Configuration() + appendS3AndSparkHadoopConfigurations(conf, hadoopConf) hadoopConf } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 80b24dc9899ff..54afe9c2a3550 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -34,6 +34,7 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf} import org.apache.spark.broadcast.Broadcast +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.catalyst.InternalRow @@ -74,8 +75,7 @@ class HadoopTableReader( math.max(sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinPartitions) } - // TODO: set aws s3 credentials. - + SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sc.sparkContext.conf, hiveExtraConf) private val _broadcastedHiveConf = sc.sparkContext.broadcast(new SerializableConfiguration(hiveExtraConf))