From fdf5bba35d201fe0de3901b4d47262c485c76569 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 15 May 2015 16:20:49 +0800 Subject: [PATCH 1/9] [SPARK-7591] [SQL] Partitioning support API tweaks Please see [SPARK-7591] [1] for the details. /cc rxin marmbrus yhuai [1]: https://issues.apache.org/jira/browse/SPARK-7591 Author: Cheng Lian Closes #6150 from liancheng/spark-7591 and squashes the following commits: af422e7 [Cheng Lian] Addresses @rxin's comments 37d1738 [Cheng Lian] Fixes HadoopFsRelation partition columns initialization 2fc680a [Cheng Lian] Fixes Scala style issue 189ad23 [Cheng Lian] Removes HadoopFsRelation constructor arguments 522c24e [Cheng Lian] Adds OutputWriterFactory 047d40d [Cheng Lian] Renames FSBased* to HadoopFs*, also renamed FSBasedParquetRelation back to ParquetRelation2 --- .../org/apache/spark/sql/SQLContext.scala | 14 +- ...{fsBasedParquet.scala => newParquet.scala} | 71 ++++----- .../sql/sources/DataSourceStrategy.scala | 10 +- .../spark/sql/sources/PartitioningUtils.scala | 4 + .../apache/spark/sql/sources/commands.scala | 23 ++- .../org/apache/spark/sql/sources/ddl.scala | 8 +- .../apache/spark/sql/sources/interfaces.scala | 140 +++++++++--------- .../org/apache/spark/sql/sources/rules.scala | 2 +- .../sql/parquet/ParquetFilterSuite.scala | 2 +- .../sql/parquet/ParquetSchemaSuite.scala | 12 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 12 +- .../spark/sql/hive/execution/commands.scala | 2 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 6 +- .../sql/hive/execution/SQLQuerySuite.scala | 8 +- .../apache/spark/sql/hive/parquetSuites.scala | 20 +-- .../sql/sources/SimpleTextRelation.scala | 47 +++--- ...tes.scala => hadoopFsRelationSuites.scala} | 8 +- 17 files changed, 195 insertions(+), 194 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/parquet/{fsBasedParquet.scala => newParquet.scala} (92%) rename sql/hive/src/test/scala/org/apache/spark/sql/sources/{fsBasedRelationSuites.scala => hadoopFsRelationSuites.scala} (98%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index b33a700208014..9fb355eb81939 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.ParserDialect import org.apache.spark.sql.execution.{Filter, _} import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} import org.apache.spark.sql.json._ -import org.apache.spark.sql.parquet.FSBasedParquetRelation +import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -610,7 +610,7 @@ class SQLContext(@transient val sparkContext: SparkContext) } else if (conf.parquetUseDataSourceApi) { val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray baseRelationToDataFrame( - new FSBasedParquetRelation( + new ParquetRelation2( globbedPaths.map(_.toString), None, None, Map.empty[String, String])(this)) } else { DataFrame(this, parquet.ParquetRelation( @@ -989,7 +989,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def jdbc(url: String, table: String): DataFrame = { jdbc(url, table, JDBCRelation.columnPartition(null), new Properties()) } - + /** * :: Experimental :: * Construct a [[DataFrame]] representing the database table accessible via JDBC URL @@ -1002,7 +1002,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def jdbc(url: String, table: String, properties: Properties): DataFrame = { jdbc(url, table, JDBCRelation.columnPartition(null), properties) } - + /** * :: Experimental :: * Construct a [[DataFrame]] representing the database table accessible via JDBC URL @@ -1020,7 +1020,7 @@ class SQLContext(@transient val sparkContext: SparkContext) @Experimental def jdbc( url: String, - table: String, + table: String, columnName: String, lowerBound: Long, upperBound: Long, @@ -1056,7 +1056,7 @@ class SQLContext(@transient val sparkContext: SparkContext) val parts = JDBCRelation.columnPartition(partitioning) jdbc(url, table, parts, properties) } - + /** * :: Experimental :: * Construct a [[DataFrame]] representing the database table accessible via JDBC URL @@ -1093,7 +1093,7 @@ class SQLContext(@transient val sparkContext: SparkContext) } jdbc(url, table, parts, properties) } - + private def jdbc( url: String, table: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala similarity index 92% rename from sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala rename to sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index c83a9c35dbddf..946062f6ea64e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -41,27 +41,23 @@ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.{Row, SQLConf, SQLContext} import org.apache.spark.{Logging, Partition => SparkPartition, SparkException} -private[sql] class DefaultSource extends FSBasedRelationProvider { +private[sql] class DefaultSource extends HadoopFsRelationProvider { override def createRelation( sqlContext: SQLContext, paths: Array[String], schema: Option[StructType], partitionColumns: Option[StructType], - parameters: Map[String, String]): FSBasedRelation = { + parameters: Map[String, String]): HadoopFsRelation = { val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty)) - new FSBasedParquetRelation(paths, schema, partitionSpec, parameters)(sqlContext) + new ParquetRelation2(paths, schema, partitionSpec, parameters)(sqlContext) } } // NOTE: This class is instantiated and used on executor side only, no need to be serializable. -private[sql] class ParquetOutputWriter extends OutputWriter { - private var recordWriter: RecordWriter[Void, Row] = _ - private var taskAttemptContext: TaskAttemptContext = _ - - override def init( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): Unit = { +private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext) + extends OutputWriter { + + private val recordWriter: RecordWriter[Void, Row] = { val conf = context.getConfiguration val outputFormat = { // When appending new Parquet files to an existing Parquet file directory, to avoid @@ -77,7 +73,7 @@ private[sql] class ParquetOutputWriter extends OutputWriter { if (fs.exists(outputPath)) { // Pattern used to match task ID in part file names, e.g.: // - // part-r-00001.gz.part + // part-r-00001.gz.parquet // ^~~~~ val partFilePattern = """part-.-(\d{1,}).*""".r @@ -86,9 +82,8 @@ private[sql] class ParquetOutputWriter extends OutputWriter { case name if name.startsWith("_") => 0 case name if name.startsWith(".") => 0 case name => sys.error( - s"""Trying to write Parquet files to directory $outputPath, - |but found items with illegal name "$name" - """.stripMargin.replace('\n', ' ').trim) + s"Trying to write Parquet files to directory $outputPath, " + + s"but found items with illegal name '$name'.") }.reduceOption(_ max _).getOrElse(0) } else { 0 @@ -111,37 +106,39 @@ private[sql] class ParquetOutputWriter extends OutputWriter { } } - recordWriter = outputFormat.getRecordWriter(context) - taskAttemptContext = context + outputFormat.getRecordWriter(context) } override def write(row: Row): Unit = recordWriter.write(null, row) - override def close(): Unit = recordWriter.close(taskAttemptContext) + override def close(): Unit = recordWriter.close(context) } -private[sql] class FSBasedParquetRelation( - paths: Array[String], +private[sql] class ParquetRelation2( + override val paths: Array[String], private val maybeDataSchema: Option[StructType], private val maybePartitionSpec: Option[PartitionSpec], parameters: Map[String, String])( val sqlContext: SQLContext) - extends FSBasedRelation(paths, maybePartitionSpec) + extends HadoopFsRelation(maybePartitionSpec) with Logging { // Should we merge schemas from all Parquet part-files? private val shouldMergeSchemas = - parameters.getOrElse(FSBasedParquetRelation.MERGE_SCHEMA, "true").toBoolean + parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean private val maybeMetastoreSchema = parameters - .get(FSBasedParquetRelation.METASTORE_SCHEMA) + .get(ParquetRelation2.METASTORE_SCHEMA) .map(DataType.fromJson(_).asInstanceOf[StructType]) - private val metadataCache = new MetadataCache - metadataCache.refresh() + private lazy val metadataCache: MetadataCache = { + val meta = new MetadataCache + meta.refresh() + meta + } override def equals(other: scala.Any): Boolean = other match { - case that: FSBasedParquetRelation => + case that: ParquetRelation2 => val schemaEquality = if (shouldMergeSchemas) { this.shouldMergeSchemas == that.shouldMergeSchemas } else { @@ -175,8 +172,6 @@ private[sql] class FSBasedParquetRelation( } } - override def outputWriterClass: Class[_ <: OutputWriter] = classOf[ParquetOutputWriter] - override def dataSchema: StructType = metadataCache.dataSchema override private[sql] def refresh(): Unit = { @@ -187,9 +182,12 @@ private[sql] class FSBasedParquetRelation( // Parquet data source always uses Catalyst internal representations. override val needConversion: Boolean = false - override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum + override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum + + override def userDefinedPartitionColumns: Option[StructType] = + maybePartitionSpec.map(_.partitionColumns) - override def prepareForWrite(job: Job): Unit = { + override def prepareJobForWrite(job: Job): OutputWriterFactory = { val conf = ContextUtil.getConfiguration(job) val committerClass = @@ -224,6 +222,13 @@ private[sql] class FSBasedParquetRelation( .getOrElse( sqlContext.conf.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED).name()) + + new OutputWriterFactory { + override def newInstance( + path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { + new ParquetOutputWriter(path, context) + } + } } override def buildScan( @@ -385,7 +390,7 @@ private[sql] class FSBasedParquetRelation( // case insensitivity issue and possible schema mismatch (probably caused by schema // evolution). maybeMetastoreSchema - .map(FSBasedParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0)) + .map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema0)) .getOrElse(dataSchema0) } } @@ -439,12 +444,12 @@ private[sql] class FSBasedParquetRelation( "No schema defined, " + s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.") - FSBasedParquetRelation.readSchema(filesToTouch.map(footers.apply), sqlContext) + ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext) } } } -private[sql] object FSBasedParquetRelation extends Logging { +private[sql] object ParquetRelation2 extends Logging { // Whether we should merge schemas collected from all Parquet part-files. private[sql] val MERGE_SCHEMA = "mergeSchema" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index ee099ab9593c7..e6324b20b3065 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -59,7 +59,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { (a, _) => t.buildScan(a)) :: Nil // Scanning partitioned FSBasedRelation - case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation)) + case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) if t.partitionSpec.partitionColumns.nonEmpty => val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray @@ -87,7 +87,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { selectedPartitions) :: Nil // Scanning non-partitioned FSBasedRelation - case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation)) => + case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) => val inputPaths = t.paths.map(new Path(_)).flatMap { path => val fs = path.getFileSystem(t.sqlContext.sparkContext.hadoopConfiguration) val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory) @@ -111,10 +111,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: FSBasedRelation), part, query, overwrite, false) if part.isEmpty => + l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) if part.isEmpty => val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append execution.ExecutedCommand( - InsertIntoFSBasedRelation(t, query, Array.empty[String], mode)) :: Nil + InsertIntoHadoopFsRelation(t, query, Array.empty[String], mode)) :: Nil case _ => Nil } @@ -126,7 +126,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { partitionColumns: StructType, partitions: Array[Partition]) = { val output = projections.map(_.toAttribute) - val relation = logicalRelation.relation.asInstanceOf[FSBasedRelation] + val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation] // Builds RDD[Row]s for each selected partition. val perPartitionRows = partitions.map { case Partition(partitionValues, dir) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala index d30f7f65e21c0..d1f0cdab55f66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala @@ -35,6 +35,10 @@ private[sql] case class Partition(values: Row, path: String) private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition]) private[sql] object PartitioningUtils { + // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't + // depend on Hive. + private[sql] val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__" + private[sql] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) { require(columnNames.size == literals.size) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 7879328bbaaab..a09bb08de736a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -58,8 +58,8 @@ private[sql] case class InsertIntoDataSource( } } -private[sql] case class InsertIntoFSBasedRelation( - @transient relation: FSBasedRelation, +private[sql] case class InsertIntoHadoopFsRelation( + @transient relation: HadoopFsRelation, @transient query: LogicalPlan, partitionColumns: Array[String], mode: SaveMode) @@ -102,7 +102,7 @@ private[sql] case class InsertIntoFSBasedRelation( insert(new DefaultWriterContainer(relation, job), df) } else { val writerContainer = new DynamicPartitionWriterContainer( - relation, job, partitionColumns, "__HIVE_DEFAULT_PARTITION__") + relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME) insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns) } } @@ -234,7 +234,7 @@ private[sql] case class InsertIntoFSBasedRelation( } private[sql] abstract class BaseWriterContainer( - @transient val relation: FSBasedRelation, + @transient val relation: HadoopFsRelation, @transient job: Job) extends SparkHadoopMapReduceUtil with Logging @@ -261,7 +261,7 @@ private[sql] abstract class BaseWriterContainer( protected val dataSchema = relation.dataSchema - protected val outputWriterClass: Class[_ <: OutputWriter] = relation.outputWriterClass + protected var outputWriterFactory: OutputWriterFactory = _ private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _ @@ -269,7 +269,7 @@ private[sql] abstract class BaseWriterContainer( setupIDs(0, 0, 0) setupConf() taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId) - relation.prepareForWrite(job) + outputWriterFactory = relation.prepareJobForWrite(job) outputFormatClass = job.getOutputFormatClass outputCommitter = newOutputCommitter(taskAttemptContext) outputCommitter.setupJob(jobContext) @@ -346,16 +346,15 @@ private[sql] abstract class BaseWriterContainer( } private[sql] class DefaultWriterContainer( - @transient relation: FSBasedRelation, + @transient relation: HadoopFsRelation, @transient job: Job) extends BaseWriterContainer(relation, job) { @transient private var writer: OutputWriter = _ override protected def initWriters(): Unit = { - writer = outputWriterClass.newInstance() taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath) - writer.init(getWorkPath, dataSchema, taskAttemptContext) + writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext) } override def outputWriterForRow(row: Row): OutputWriter = writer @@ -372,7 +371,7 @@ private[sql] class DefaultWriterContainer( } private[sql] class DynamicPartitionWriterContainer( - @transient relation: FSBasedRelation, + @transient relation: HadoopFsRelation, @transient job: Job, partitionColumns: Array[String], defaultPartitionName: String) @@ -398,12 +397,10 @@ private[sql] class DynamicPartitionWriterContainer( outputWriters.getOrElseUpdate(partitionPath, { val path = new Path(getWorkPath, partitionPath) - val writer = outputWriterClass.newInstance() taskAttemptContext.getConfiguration.set( "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString) - writer.init(path.toString, dataSchema, taskAttemptContext) - writer + outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext) }) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 595c5eb40e295..37a569db311ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -226,7 +226,7 @@ private[sql] object ResolvedDataSource { case Some(schema: StructType) => clazz.newInstance() match { case dataSource: SchemaRelationProvider => dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema) - case dataSource: FSBasedRelationProvider => + case dataSource: HadoopFsRelationProvider => val maybePartitionsSchema = if (partitionColumns.isEmpty) { None } else { @@ -256,7 +256,7 @@ private[sql] object ResolvedDataSource { case None => clazz.newInstance() match { case dataSource: RelationProvider => dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options)) - case dataSource: FSBasedRelationProvider => + case dataSource: HadoopFsRelationProvider => val caseInsensitiveOptions = new CaseInsensitiveMap(options) val paths = { val patternPath = new Path(caseInsensitiveOptions("path")) @@ -296,7 +296,7 @@ private[sql] object ResolvedDataSource { val relation = clazz.newInstance() match { case dataSource: CreatableRelationProvider => dataSource.createRelation(sqlContext, mode, options, data) - case dataSource: FSBasedRelationProvider => + case dataSource: HadoopFsRelationProvider => // Don't glob path for the write path. The contracts here are: // 1. Only one output path can be specified on the write path; // 2. Output path must be a legal HDFS style file system path; @@ -315,7 +315,7 @@ private[sql] object ResolvedDataSource { Some(partitionColumnsSchema(data.schema, partitionColumns)), caseInsensitiveOptions) sqlContext.executePlan( - InsertIntoFSBasedRelation( + InsertIntoHadoopFsRelation( r, data.logicalPlan, partitionColumns.toArray, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 6f315305c11d6..274ab4485217a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, _} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection import org.apache.spark.sql.types.{StructField, StructType} @@ -94,7 +94,7 @@ trait SchemaRelationProvider { * ::DeveloperApi:: * Implemented by objects that produce relations for a specific kind of data source * with a given schema and partitioned columns. When Spark SQL is given a DDL operation with a - * USING clause specified (to specify the implemented [[FSBasedRelationProvider]]), a user defined + * USING clause specified (to specify the implemented [[HadoopFsRelationProvider]]), a user defined * schema, and an optional list of partition columns, this interface is used to pass in the * parameters specified by a user. * @@ -105,15 +105,15 @@ trait SchemaRelationProvider { * * A new instance of this class with be instantiated each time a DDL call is made. * - * The difference between a [[RelationProvider]] and a [[FSBasedRelationProvider]] is + * The difference between a [[RelationProvider]] and a [[HadoopFsRelationProvider]] is * that users need to provide a schema and a (possibly empty) list of partition columns when * using a SchemaRelationProvider. A relation provider can inherits both [[RelationProvider]], - * and [[FSBasedRelationProvider]] if it can support schema inference, user-specified + * and [[HadoopFsRelationProvider]] if it can support schema inference, user-specified * schemas, and accessing partitioned relations. * * @since 1.4.0 */ -trait FSBasedRelationProvider { +trait HadoopFsRelationProvider { /** * Returns a new base relation with the given parameters, a user defined schema, and a list of * partition columns. Note: the parameters' keywords are case insensitive and this insensitivity @@ -124,7 +124,7 @@ trait FSBasedRelationProvider { paths: Array[String], schema: Option[StructType], partitionColumns: Option[StructType], - parameters: Map[String, String]): FSBasedRelation + parameters: Map[String, String]): HadoopFsRelation } /** @@ -280,33 +280,42 @@ trait CatalystScan { /** * ::Experimental:: - * [[OutputWriter]] is used together with [[FSBasedRelation]] for persisting rows to the - * underlying file system. Subclasses of [[OutputWriter]] must provide a zero-argument constructor. - * An [[OutputWriter]] instance is created and initialized when a new output file is opened on - * executor side. This instance is used to persist rows to this single output file. + * A factory that produces [[OutputWriter]]s. A new [[OutputWriterFactory]] is created on driver + * side for each write job issued when writing to a [[HadoopFsRelation]], and then gets serialized + * to executor side to create actual [[OutputWriter]]s on the fly. * * @since 1.4.0 */ @Experimental -abstract class OutputWriter { +abstract class OutputWriterFactory extends Serializable { /** - * Initializes this [[OutputWriter]] before any rows are persisted. + * When writing to a [[HadoopFsRelation]], this method gets called by each task on executor side + * to instantiate new [[OutputWriter]]s. * * @param path Path of the file to which this [[OutputWriter]] is supposed to write. Note that * this may not point to the final output file. For example, `FileOutputFormat` writes to * temporary directories and then merge written files back to the final destination. In * this case, `path` points to a temporary output file under the temporary directory. * @param dataSchema Schema of the rows to be written. Partition columns are not included in the - * schema if the corresponding relation is partitioned. + * schema if the relation being written is partitioned. * @param context The Hadoop MapReduce task context. * * @since 1.4.0 */ - def init( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): Unit = () + def newInstance(path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter +} +/** + * ::Experimental:: + * [[OutputWriter]] is used together with [[HadoopFsRelation]] for persisting rows to the + * underlying file system. Subclasses of [[OutputWriter]] must provide a zero-argument constructor. + * An [[OutputWriter]] instance is created and initialized when a new output file is opened on + * executor side. This instance is used to persist rows to this single output file. + * + * @since 1.4.0 + */ +@Experimental +abstract class OutputWriter { /** * Persists a single row. Invoked on the executor side. When writing to dynamically partitioned * tables, dynamic partition columns are not included in rows to be written. @@ -333,74 +342,71 @@ abstract class OutputWriter { * filter using selected predicates before producing an RDD containing all matching tuples as * [[Row]] objects. In addition, when reading from Hive style partitioned tables stored in file * systems, it's able to discover partitioning information from the paths of input directories, and - * perform partition pruning before start reading the data. Subclasses of [[FSBasedRelation()]] must - * override one of the three `buildScan` methods to implement the read path. + * perform partition pruning before start reading the data. Subclasses of [[HadoopFsRelation()]] + * must override one of the three `buildScan` methods to implement the read path. * * For the write path, it provides the ability to write to both non-partitioned and partitioned * tables. Directory layout of the partitioned tables is compatible with Hive. * * @constructor This constructor is for internal uses only. The [[PartitionSpec]] argument is for * implementing metastore table conversion. - * @param paths Base paths of this relation. For partitioned relations, it should be the root - * directories of all partition directories. - * @param maybePartitionSpec An [[FSBasedRelation]] can be created with an optional + * + * @param maybePartitionSpec An [[HadoopFsRelation]] can be created with an optional * [[PartitionSpec]], so that partition discovery can be skipped. * * @since 1.4.0 */ @Experimental -abstract class FSBasedRelation private[sql]( - val paths: Array[String], - maybePartitionSpec: Option[PartitionSpec]) +abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec]) extends BaseRelation { + def this() = this(None) + + private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + + private val codegenEnabled = sqlContext.conf.codegenEnabled + + private var _partitionSpec: PartitionSpec = _ + + final private[sql] def partitionSpec: PartitionSpec = { + if (_partitionSpec == null) { + _partitionSpec = maybePartitionSpec + .map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable)) + .orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition]))) + .getOrElse { + if (sqlContext.conf.partitionDiscoveryEnabled()) { + discoverPartitions() + } else { + PartitionSpec(StructType(Nil), Array.empty[Partition]) + } + } + } + _partitionSpec + } + /** - * Constructs an [[FSBasedRelation]]. - * - * @param paths Base paths of this relation. For partitioned relations, it should be either root - * directories of all partition directories. - * @param partitionColumns Partition columns of this relation. + * Base paths of this relation. For partitioned relations, it should be either root directories + * of all partition directories. * * @since 1.4.0 */ - def this(paths: Array[String], partitionColumns: StructType) = - this(paths, { - if (partitionColumns.isEmpty) None - else Some(PartitionSpec(partitionColumns, Array.empty[Partition])) - }) + def paths: Array[String] /** - * Constructs an [[FSBasedRelation]]. - * - * @param paths Base paths of this relation. For partitioned relations, it should be root - * directories of all partition directories. + * Partition columns. Can be either defined by [[userDefinedPartitionColumns]] or automatically + * discovered. Note that they should always be nullable. * * @since 1.4.0 */ - def this(paths: Array[String]) = this(paths, None) - - private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) - - private val codegenEnabled = sqlContext.conf.codegenEnabled - - private var _partitionSpec: PartitionSpec = maybePartitionSpec.map { spec => - spec.copy(partitionColumns = spec.partitionColumns.asNullable) - }.getOrElse { - if (sqlContext.conf.partitionDiscoveryEnabled()) { - discoverPartitions() - } else { - PartitionSpec(StructType(Nil), Array.empty[Partition]) - } - } - - private[sql] def partitionSpec: PartitionSpec = _partitionSpec + final def partitionColumns: StructType = + userDefinedPartitionColumns.getOrElse(partitionSpec.partitionColumns) /** - * Partition columns. Note that they are always nullable. + * Optional user defined partition columns. * * @since 1.4.0 */ - def partitionColumns: StructType = partitionSpec.partitionColumns + def userDefinedPartitionColumns: Option[StructType] = None private[sql] def refresh(): Unit = { if (sqlContext.conf.partitionDiscoveryEnabled()) { @@ -419,7 +425,7 @@ abstract class FSBasedRelation private[sql]( }.map(_.getPath) if (leafDirs.nonEmpty) { - PartitioningUtils.parsePartitions(leafDirs, "__HIVE_DEFAULT_PARTITION__") + PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME) } else { PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition]) } @@ -458,7 +464,7 @@ abstract class FSBasedRelation private[sql]( * @since 1.4.0 */ def buildScan(inputPaths: Array[String]): RDD[Row] = { - throw new RuntimeException( + throw new UnsupportedOperationException( "At least one buildScan() method should be overridden to read the relation.") } @@ -520,8 +526,8 @@ abstract class FSBasedRelation private[sql]( } /** - * Client side preparation for data writing can be put here. For example, user defined output - * committer can be configured here. + * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can + * be put here. For example, user defined output committer can be configured here. * * Note that the only side effect expected here is mutating `job` via its setters. Especially, * Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states @@ -529,13 +535,5 @@ abstract class FSBasedRelation private[sql]( * * @since 1.4.0 */ - def prepareForWrite(job: Job): Unit = () - - /** - * This method is responsible for producing a new [[OutputWriter]] for each newly opened output - * file on the executor side. - * - * @since 1.4.0 - */ - def outputWriterClass: Class[_ <: OutputWriter] + def prepareJobForWrite(job: Job): OutputWriterFactory } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala index aad1d248d0a28..1eacdde7413f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala @@ -102,7 +102,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => } case logical.InsertIntoTable(LogicalRelation(_: InsertableRelation), _, _, _, _) => // OK - case logical.InsertIntoTable(LogicalRelation(_: FSBasedRelation), _, _, _, _) => // OK + case logical.InsertIntoTable(LogicalRelation(_: HadoopFsRelation), _, _, _, _) => // OK case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) => // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala index 3bbc5b05868af..5ad439584716f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala @@ -63,7 +63,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest { }.flatten.reduceOption(_ && _) val forParquetDataSource = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, LogicalRelation(_: FSBasedParquetRelation)) => filters + case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation2)) => filters }.flatten.reduceOption(_ && _) forParquetTableScan.orElse(forParquetDataSource) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala index fc90e3edce7fe..c964b6d984557 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala @@ -204,7 +204,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { StructField("lowerCase", StringType), StructField("UPPERCase", DoubleType, nullable = false)))) { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq( StructField("lowercase", StringType), StructField("uppercase", DoubleType, nullable = false))), @@ -219,7 +219,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { StructType(Seq( StructField("UPPERCase", DoubleType, nullable = false)))) { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq( StructField("uppercase", DoubleType, nullable = false))), @@ -230,7 +230,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { // Metastore schema contains additional non-nullable fields. assert(intercept[Throwable] { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq( StructField("uppercase", DoubleType, nullable = false), StructField("lowerCase", BinaryType, nullable = false))), @@ -241,7 +241,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { // Conflicting non-nullable field names intercept[Throwable] { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq(StructField("lower", StringType, nullable = false))), StructType(Seq(StructField("lowerCase", BinaryType)))) } @@ -255,7 +255,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { StructField("firstField", StringType, nullable = true), StructField("secondField", StringType, nullable = true), StructField("thirdfield", StringType, nullable = true)))) { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq( StructField("firstfield", StringType, nullable = true), StructField("secondfield", StringType, nullable = true), @@ -268,7 +268,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { // Merge should fail if the Metastore contains any additional fields that are not // nullable. assert(intercept[Throwable] { - FSBasedParquetRelation.mergeMetastoreParquetSchema( + ParquetRelation2.mergeMetastoreParquetSchema( StructType(Seq( StructField("firstfield", StringType, nullable = true), StructField("secondfield", StringType, nullable = true), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index b0e82c8d033b2..2aa80b47a97e2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.hive.client._ -import org.apache.spark.sql.parquet.FSBasedParquetRelation +import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource} import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources} @@ -226,8 +226,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive // serialize the Metastore schema to JSON and pass it as a data source option because of the // evil case insensitivity issue, which is reconciled within `ParquetRelation2`. val parquetOptions = Map( - FSBasedParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json, - FSBasedParquetRelation.MERGE_SCHEMA -> mergeSchema.toString) + ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json, + ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString) val tableIdentifier = QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) @@ -238,7 +238,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => None // Cache miss - case logical@LogicalRelation(parquetRelation: FSBasedParquetRelation) => + case logical@LogicalRelation(parquetRelation: ParquetRelation2) => // If we have the same paths, same schema, and same partition spec, // we will use the cached Parquet Relation. val useCached = @@ -281,7 +281,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec)) val parquetRelation = cached.getOrElse { val created = LogicalRelation( - new FSBasedParquetRelation( + new ParquetRelation2( paths.toArray, None, Some(partitionSpec), parquetOptions)(hive)) cachedDataSourceTables.put(tableIdentifier, created) created @@ -294,7 +294,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val cached = getCached(tableIdentifier, paths, metastoreSchema, None) val parquetRelation = cached.getOrElse { val created = LogicalRelation( - new FSBasedParquetRelation(paths.toArray, None, None, parquetOptions)(hive)) + new ParquetRelation2(paths.toArray, None, None, parquetOptions)(hive)) cachedDataSourceTables.put(tableIdentifier, created) created } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 8e405e080489f..6609763343752 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -194,7 +194,7 @@ case class CreateMetastoreDataSourceAsSelect( sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath) val createdRelation = LogicalRelation(resolved.relation) EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match { - case l @ LogicalRelation(_: InsertableRelation | _: FSBasedRelation) => + case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation) => if (l.relation != createdRelation.relation) { val errorDescription = s"Cannot append to table $tableName because the resolved relation does not " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index da5d203d9d343..1bf1c1be3e3d3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ -import org.apache.spark.sql.parquet.FSBasedParquetRelation +import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -579,11 +579,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { ) table("test_parquet_ctas").queryExecution.optimizedPlan match { - case LogicalRelation(p: FSBasedParquetRelation) => // OK + case LogicalRelation(p: ParquetRelation2) => // OK case _ => fail( "test_parquet_ctas should be converted to " + - s"${classOf[FSBasedParquetRelation].getCanonicalName}") + s"${classOf[ParquetRelation2].getCanonicalName}") } // Clenup and reset confs. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 5c7152e2140db..dfe73c62c42b9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim, MetastoreRelation} -import org.apache.spark.sql.parquet.FSBasedParquetRelation +import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ @@ -175,17 +175,17 @@ class SQLQuerySuite extends QueryTest { def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = { val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) relation match { - case LogicalRelation(r: FSBasedParquetRelation) => + case LogicalRelation(r: ParquetRelation2) => if (!isDataSourceParquet) { fail( s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " + - s"${FSBasedParquetRelation.getClass.getCanonicalName}.") + s"${ParquetRelation2.getClass.getCanonicalName}.") } case r: MetastoreRelation => if (isDataSourceParquet) { fail( - s"${FSBasedParquetRelation.getClass.getCanonicalName} is expected, but found " + + s"${ParquetRelation2.getClass.getCanonicalName} is expected, but found " + s"${classOf[MetastoreRelation].getCanonicalName}.") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 41bcbe84b0ef2..b6be09e2f8837 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -26,8 +26,8 @@ import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD} import org.apache.spark.sql.hive.execution.HiveTableScan import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ -import org.apache.spark.sql.parquet.{FSBasedParquetRelation, ParquetTableScan} -import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoFSBasedRelation, LogicalRelation} +import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan} +import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation} import org.apache.spark.sql.types._ import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode} import org.apache.spark.util.Utils @@ -291,10 +291,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { ) table("test_parquet_ctas").queryExecution.optimizedPlan match { - case LogicalRelation(_: FSBasedParquetRelation) => // OK + case LogicalRelation(_: ParquetRelation2) => // OK case _ => fail( "test_parquet_ctas should be converted to " + - s"${classOf[FSBasedParquetRelation].getCanonicalName}") + s"${classOf[ParquetRelation2].getCanonicalName}") } sql("DROP TABLE IF EXISTS test_parquet_ctas") @@ -315,9 +315,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") df.queryExecution.executedPlan match { - case ExecutedCommand(InsertIntoFSBasedRelation(_: FSBasedParquetRelation, _, _, _)) => // OK + case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + - s"${classOf[FSBasedParquetRelation].getCanonicalName} and " + + s"${classOf[ParquetRelation2].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " + s"However, found a ${o.toString} ") } @@ -345,9 +345,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") df.queryExecution.executedPlan match { - case ExecutedCommand(InsertIntoFSBasedRelation(r: FSBasedParquetRelation, _, _, _)) => // OK + case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + - s"${classOf[FSBasedParquetRelation].getCanonicalName} and " + + s"${classOf[ParquetRelation2].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." + s"However, found a ${o.toString} ") } @@ -378,7 +378,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { assertResult(2) { analyzed.collect { - case r @ LogicalRelation(_: FSBasedParquetRelation) => r + case r @ LogicalRelation(_: ParquetRelation2) => r }.size } @@ -390,7 +390,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { // Converted test_parquet should be cached. catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match { case null => fail("Converted test_parquet should be cached in the cache.") - case logical @ LogicalRelation(parquetRelation: FSBasedParquetRelation) => // OK + case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK case other => fail( "The cached test_parquet should be a Parquet Relation. " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 8801aba2f64c3..29b21586f9c2a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -24,7 +24,7 @@ import com.google.common.base.Objects import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{NullWritable, Text} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat} -import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext} +import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} @@ -32,17 +32,16 @@ import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} /** - * A simple example [[FSBasedRelationProvider]]. + * A simple example [[HadoopFsRelationProvider]]. */ -class SimpleTextSource extends FSBasedRelationProvider { +class SimpleTextSource extends HadoopFsRelationProvider { override def createRelation( sqlContext: SQLContext, paths: Array[String], schema: Option[StructType], partitionColumns: Option[StructType], - parameters: Map[String, String]): FSBasedRelation = { - val partitionsSchema = partitionColumns.getOrElse(StructType(Array.empty[StructField])) - new SimpleTextRelation(paths, schema, partitionsSchema, parameters)(sqlContext) + parameters: Map[String, String]): HadoopFsRelation = { + new SimpleTextRelation(paths, schema, partitionColumns, parameters)(sqlContext) } } @@ -59,38 +58,30 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW } } -class SimpleTextOutputWriter extends OutputWriter { - private var recordWriter: RecordWriter[NullWritable, Text] = _ - private var taskAttemptContext: TaskAttemptContext = _ - - override def init( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): Unit = { - recordWriter = new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context) - taskAttemptContext = context - } +class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter { + private val recordWriter: RecordWriter[NullWritable, Text] = + new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context) override def write(row: Row): Unit = { val serialized = row.toSeq.map(_.toString).mkString(",") recordWriter.write(null, new Text(serialized)) } - override def close(): Unit = recordWriter.close(taskAttemptContext) + override def close(): Unit = recordWriter.close(context) } /** - * A simple example [[FSBasedRelation]], used for testing purposes. Data are stored as comma + * A simple example [[HadoopFsRelation]], used for testing purposes. Data are stored as comma * separated string lines. When scanning data, schema must be explicitly provided via data source * option `"dataSchema"`. */ class SimpleTextRelation( - paths: Array[String], + override val paths: Array[String], val maybeDataSchema: Option[StructType], - partitionsSchema: StructType, + override val userDefinedPartitionColumns: Option[StructType], parameters: Map[String, String])( @transient val sqlContext: SQLContext) - extends FSBasedRelation(paths, partitionsSchema) { + extends HadoopFsRelation { import sqlContext.sparkContext @@ -110,9 +101,6 @@ class SimpleTextRelation( override def hashCode(): Int = Objects.hashCode(paths, maybeDataSchema, dataSchema) - override def outputWriterClass: Class[_ <: OutputWriter] = - classOf[SimpleTextOutputWriter] - override def buildScan(inputPaths: Array[String]): RDD[Row] = { val fields = dataSchema.map(_.dataType) @@ -122,4 +110,13 @@ class SimpleTextRelation( }: _*) } } + + override def prepareJobForWrite(job: Job): OutputWriterFactory = new OutputWriterFactory { + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new SimpleTextOutputWriter(path, context) + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala similarity index 98% rename from sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala rename to sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 394833f22907d..cf6afd25ae5a0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types._ // TODO Don't extend ParquetTest // This test suite extends ParquetTest for some convenient utility methods. These methods should be // moved to some more general places, maybe QueryTest. -class FSBasedRelationTest extends QueryTest with ParquetTest { +class HadoopFsRelationTest extends QueryTest with ParquetTest { override val sqlContext: SQLContext = TestHive import sqlContext._ @@ -487,7 +487,7 @@ class FSBasedRelationTest extends QueryTest with ParquetTest { } val actualPaths = df.queryExecution.analyzed.collectFirst { - case LogicalRelation(relation: FSBasedRelation) => + case LogicalRelation(relation: HadoopFsRelation) => relation.paths.toSet }.getOrElse { fail("Expect an FSBasedRelation, but none could be found") @@ -499,7 +499,7 @@ class FSBasedRelationTest extends QueryTest with ParquetTest { } } -class SimpleTextRelationSuite extends FSBasedRelationTest { +class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName import sqlContext._ @@ -530,7 +530,7 @@ class SimpleTextRelationSuite extends FSBasedRelationTest { } } -class FSBasedParquetRelationSuite extends FSBasedRelationTest { +class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { override val dataSourceName: String = classOf[parquet.DefaultSource].getCanonicalName import sqlContext._ From c64ff8036cc6bc7c87743f4c751d7fe91c2e366a Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 15 May 2015 11:37:34 +0100 Subject: [PATCH 2/9] [SPARK-7503] [YARN] Resources in .sparkStaging directory can't be cleaned up on error When we run applications on YARN with cluster mode, uploaded resources on .sparkStaging directory can't be cleaned up in case of failure of uploading local resources. You can see this issue by running following command. ``` bin/spark-submit --master yarn --deploy-mode cluster --class ``` Author: Kousuke Saruta Closes #6026 from sarutak/delete-uploaded-resources-on-error and squashes the following commits: caef9f4 [Kousuke Saruta] Fixed style 882f921 [Kousuke Saruta] Wrapped Client#submitApplication with try/catch blocks in order to delete resources on error 1786ca4 [Kousuke Saruta] Merge branch 'master' of https://github.com/apache/spark into delete-uploaded-resources-on-error f61071b [Kousuke Saruta] Fixed cleanup problem --- .../org/apache/spark/deploy/yarn/Client.scala | 72 ++++++++++++------- 1 file changed, 47 insertions(+), 25 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index d21a7393478ce..7e023f2d92578 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.yarn -import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream} +import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, IOException} import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer import java.security.PrivilegedExceptionAction @@ -91,30 +91,52 @@ private[spark] class Client( * available in the alpha API. */ def submitApplication(): ApplicationId = { - // Setup the credentials before doing anything else, so we have don't have issues at any point. - setupCredentials() - yarnClient.init(yarnConf) - yarnClient.start() - - logInfo("Requesting a new application from cluster with %d NodeManagers" - .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) - - // Get a new application from our RM - val newApp = yarnClient.createApplication() - val newAppResponse = newApp.getNewApplicationResponse() - val appId = newAppResponse.getApplicationId() - - // Verify whether the cluster has enough resources for our AM - verifyClusterResources(newAppResponse) - - // Set up the appropriate contexts to launch our AM - val containerContext = createContainerLaunchContext(newAppResponse) - val appContext = createApplicationSubmissionContext(newApp, containerContext) - - // Finally, submit and monitor the application - logInfo(s"Submitting application ${appId.getId} to ResourceManager") - yarnClient.submitApplication(appContext) - appId + var appId: ApplicationId = null + try { + // Setup the credentials before doing anything else, + // so we have don't have issues at any point. + setupCredentials() + yarnClient.init(yarnConf) + yarnClient.start() + + logInfo("Requesting a new application from cluster with %d NodeManagers" + .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) + + // Get a new application from our RM + val newApp = yarnClient.createApplication() + val newAppResponse = newApp.getNewApplicationResponse() + appId = newAppResponse.getApplicationId() + + // Verify whether the cluster has enough resources for our AM + verifyClusterResources(newAppResponse) + + // Set up the appropriate contexts to launch our AM + val containerContext = createContainerLaunchContext(newAppResponse) + val appContext = createApplicationSubmissionContext(newApp, containerContext) + + // Finally, submit and monitor the application + logInfo(s"Submitting application ${appId.getId} to ResourceManager") + yarnClient.submitApplication(appContext) + appId + } catch { + case e: Throwable => + if (appId != null) { + val appStagingDir = getAppStagingDir(appId) + try { + val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false) + val stagingDirPath = new Path(appStagingDir) + val fs = FileSystem.get(hadoopConf) + if (!preserveFiles && fs.exists(stagingDirPath)) { + logInfo("Deleting staging directory " + stagingDirPath) + fs.delete(stagingDirPath, true) + } + } catch { + case ioe: IOException => + logWarning("Failed to cleanup staging dir " + appStagingDir, ioe) + } + } + throw e + } } /** From f96b85ab44b82736363764ea39ee62884007f4a3 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 15 May 2015 10:03:29 -0700 Subject: [PATCH 3/9] [SPARK-7668] [MLLIB] Preserve isTransposed property for Matrix after calling map function JIRA: https://issues.apache.org/jira/browse/SPARK-7668 Author: Liang-Chi Hsieh Closes #6188 from viirya/fix_matrix_map and squashes the following commits: 2a7cc97 [Liang-Chi Hsieh] Preserve isTransposed property for Matrix after calling map function. --- .../main/scala/org/apache/spark/mllib/linalg/Matrices.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index 3fa5e068d16d4..a609674df6b8b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -273,7 +273,8 @@ class DenseMatrix( override def copy: DenseMatrix = new DenseMatrix(numRows, numCols, values.clone()) - private[mllib] def map(f: Double => Double) = new DenseMatrix(numRows, numCols, values.map(f)) + private[mllib] def map(f: Double => Double) = new DenseMatrix(numRows, numCols, values.map(f), + isTransposed) private[mllib] def update(f: Double => Double): DenseMatrix = { val len = values.length @@ -535,7 +536,7 @@ class SparseMatrix( } private[mllib] def map(f: Double => Double) = - new SparseMatrix(numRows, numCols, colPtrs, rowIndices, values.map(f)) + new SparseMatrix(numRows, numCols, colPtrs, rowIndices, values.map(f), isTransposed) private[mllib] def update(f: Double => Double): SparseMatrix = { val len = values.length From 8f4aaba0e4e3350ab152a476d08ff60e9495c6d2 Mon Sep 17 00:00:00 2001 From: FlytxtRnD Date: Fri, 15 May 2015 10:43:18 -0700 Subject: [PATCH 4/9] [SPARK-7651] [MLLIB] [PYSPARK] GMM predict, predictSoft should raise error on bad input In the Python API for Gaussian Mixture Model, predict() and predictSoft() methods should raise an error when the input argument is not an RDD. Author: FlytxtRnD Closes #6180 from FlytxtRnD/GmmPredictException and squashes the following commits: 4b6aa11 [FlytxtRnD] Raise error if the input to predict()/predictSoft() is not an RDD --- python/pyspark/mllib/clustering.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index a53333dae6a82..b55583f82223f 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -212,6 +212,9 @@ def predict(self, x): if isinstance(x, RDD): cluster_labels = self.predictSoft(x).map(lambda z: z.index(max(z))) return cluster_labels + else: + raise TypeError("x should be represented by an RDD, " + "but got %s." % type(x)) def predictSoft(self, x): """ @@ -225,6 +228,9 @@ def predictSoft(self, x): membership_matrix = callMLlibFunc("predictSoftGMM", x.map(_convert_to_vector), _convert_to_vector(self._weights), means, sigmas) return membership_matrix.map(lambda x: pyarray.array('d', x)) + else: + raise TypeError("x should be represented by an RDD, " + "but got %s." % type(x)) class GaussianMixture(object): From b1b9d5802e3d185f42711ab043a21c9d1eb4763f Mon Sep 17 00:00:00 2001 From: Oleksii Kostyliev Date: Fri, 15 May 2015 11:19:56 -0700 Subject: [PATCH 5/9] [SPARK-7233] [CORE] Detect REPL mode once

Description

Detect REPL mode once per JVM lifespan. Previous behavior was to check presence of interpreter mode every time a job was submitted. In the case of execution of multiple short-living jobs this was causing massive mutual blocks between submission threads. For more details please refer to https://issues.apache.org/jira/browse/SPARK-7233.

Notes

* I inverted the return value in case of catching an exception from `true` to `false`. It seems more logical to assume that if the REPL class is not found, we aren't in the interpreter mode. * I'd personally would call `classForName` with just a Spark classloader (`org.apache.spark.util.Utils#getSparkClassLoader`) but `org.apache.spark.util.Utils#getContextOrSparkClassLoader` is said to be preferable. * I struggled to come up with a concise, readable and clear unit test. Suggestions are welcome if you feel necessary. Author: Oleksii Kostyliev Author: Oleksii Kostyliev Closes #5835 from preeze/SPARK-7233 and squashes the following commits: 69bb9e4 [Oleksii Kostyliev] SPARK-7527: fixed explanatory comment to meet style-checker requirements 26dcc24 [Oleksii Kostyliev] SPARK-7527: fixed explanatory comment to meet style-checker requirements c6f9685 [Oleksii Kostyliev] Merge remote-tracking branch 'remotes/upstream/master' into SPARK-7233 b78a983 [Oleksii Kostyliev] SPARK-7527: revert the fix and let it be addressed separately at a later stage b64d441 [Oleksii Kostyliev] SPARK-7233: inline inInterpreter parameter into instantiateClass 86e2606 [Oleksii Kostyliev] SPARK-7233, SPARK-7527: Handle interpreter mode properly. c7ee69c [Oleksii Kostyliev] Merge remote-tracking branch 'upstream/master' into SPARK-7233 d6c07fc [Oleksii Kostyliev] SPARK-7233: properly handle the inverted meaning of isInInterpreter c319039 [Oleksii Kostyliev] SPARK-7233: move inInterpreter to Utils and make it lazy --- .../org/apache/spark/util/ClosureCleaner.scala | 16 +++------------- .../main/scala/org/apache/spark/util/Utils.scala | 14 ++++++++++++++ 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 6fe32e469c732..6f2966bd4fd31 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -239,15 +239,6 @@ private[spark] object ClosureCleaner extends Logging { logDebug(s" + fields accessed by starting closure: " + accessedFields.size) accessedFields.foreach { f => logDebug(" " + f) } - val inInterpreter = { - try { - val interpClass = Class.forName("spark.repl.Main") - interpClass.getMethod("interp").invoke(null) != null - } catch { - case _: ClassNotFoundException => true - } - } - // List of outer (class, object) pairs, ordered from outermost to innermost // Note that all outer objects but the outermost one (first one in this list) must be closures var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse @@ -274,7 +265,7 @@ private[spark] object ClosureCleaner extends Logging { // required fields from the original object. We need the parent here because the Java // language specification requires the first constructor parameter of any closure to be // its enclosing object. - val clone = instantiateClass(cls, parent, inInterpreter) + val clone = instantiateClass(cls, parent) for (fieldName <- accessedFields(cls)) { val field = cls.getDeclaredField(fieldName) field.setAccessible(true) @@ -327,9 +318,8 @@ private[spark] object ClosureCleaner extends Logging { private def instantiateClass( cls: Class[_], - enclosingObject: AnyRef, - inInterpreter: Boolean): AnyRef = { - if (!inInterpreter) { + enclosingObject: AnyRef): AnyRef = { + if (!Utils.isInInterpreter) { // This is a bona fide closure class, whose constructor has no effects // other than to set its fields, so use its constructor val cons = cls.getConstructors()(0) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 48843b4ae57c6..6a7d1fae3320e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1795,6 +1795,20 @@ private[spark] object Utils extends Logging { } } + lazy val isInInterpreter: Boolean = { + try { + val interpClass = classForName("spark.repl.Main") + interpClass.getMethod("interp").invoke(null) != null + } catch { + // Returning true seems to be a mistake. + // Currently changing it to false causes tests failures in Streaming. + // For a more detailed discussion, please, refer to + // https://github.com/apache/spark/pull/5835#issuecomment-101042271 and subsequent comments. + // Addressing this changed is tracked as https://issues.apache.org/jira/browse/SPARK-7527 + case _: ClassNotFoundException => true + } + } + /** * Return a well-formed URI for the file described by a user input string. * From 270d4b5181b95e3f1f131b1d65dde00a7e5b9d6e Mon Sep 17 00:00:00 2001 From: Tim Ellison Date: Fri, 15 May 2015 11:27:24 -0700 Subject: [PATCH 6/9] [CORE] Protect additional test vars from early GC Fix more places in which some test variables could be collected early by aggressive JVM optimization. Added a couple of comments to note where existing references are sufficient in the same test pattern. Author: Tim Ellison Closes #6187 from tellison/DefeatEarlyGC and squashes the following commits: 27329d9 [Tim Ellison] [CORE] Protect additional test vars from early GC --- .../scala/org/apache/spark/ContextCleanerSuite.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index cb30e1f4e63a1..0922a2c3599cc 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -165,6 +165,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { } // Test that GC causes RDD cleanup after dereferencing the RDD + // Note rdd is used after previous GC to avoid early collection by the JVM val postGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id)) rdd = null // Make RDD out of scope runGC() @@ -181,9 +182,9 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { intercept[Exception] { preGCTester.assertCleanup()(timeout(1000 millis)) } + rdd.count() // Defeat early collection by the JVM // Test that GC causes shuffle cleanup after dereferencing the RDD - rdd.count() // Defeat any early collection of rdd variable by the JVM val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0)) rdd = null // Make RDD out of scope, so that corresponding shuffle goes out of scope runGC() @@ -201,6 +202,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { } // Test that GC causes broadcast cleanup after dereferencing the broadcast variable + // Note broadcast is used after previous GC to avoid early collection by the JVM val postGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id)) broadcast = null // Make broadcast variable out of scope runGC() @@ -226,7 +228,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { // the checkpoint is not cleaned by default (without the configuration set) var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Nil) - rdd = null // Make RDD out of scope + rdd = null // Make RDD out of scope, ok if collected earlier runGC() postGCTester.assertCleanup() assert(fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get)) @@ -245,6 +247,9 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { // Confirm the checkpoint directory exists assert(fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get)) + // Reference rdd to defeat any early collection by the JVM + rdd.count() + // Test that GC causes checkpoint data cleanup after dereferencing the RDD postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId)) rdd = null // Make RDD out of scope @@ -352,6 +357,7 @@ class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[Sor intercept[Exception] { preGCTester.assertCleanup()(timeout(1000 millis)) } + rdd.count() // Defeat early collection by the JVM // Test that GC causes shuffle cleanup after dereferencing the RDD val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0)) From 8ab1450d3995b0c3ef64c5991b88c258e17bcb12 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Fri, 15 May 2015 11:30:19 -0700 Subject: [PATCH 7/9] [SPARK-5412] [DEPLOY] Cannot bind Master to a specific hostname as per the documentation Pass args to start-master.sh through to start-daemon.sh, as other scripts do, so that things like --host have effect on start-master.sh as per docs Author: Sean Owen Closes #6185 from srowen/SPARK-5412 and squashes the following commits: b3ce9da [Sean Owen] Pass args to start-master.sh through to start-daemon.sh, as other scripts do, so that things like --host have effect on start-master.sh as per docs --- sbin/start-master.sh | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sbin/start-master.sh b/sbin/start-master.sh index 17fff58f4f768..a7f5d5702fd80 100755 --- a/sbin/start-master.sh +++ b/sbin/start-master.sh @@ -22,6 +22,8 @@ sbin="`dirname "$0"`" sbin="`cd "$sbin"; pwd`" +ORIGINAL_ARGS="$@" + START_TACHYON=false while (( "$#" )); do @@ -53,7 +55,9 @@ if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then SPARK_MASTER_WEBUI_PORT=8080 fi -"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT +"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 \ + --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \ + $ORIGINAL_ARGS if [ "$START_TACHYON" == "true" ]; then "$sbin"/../tachyon/bin/tachyon bootstrap-conf $SPARK_MASTER_IP From ad92af9dbbd0c4e1224cca26da166382ed4f15b9 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 15 May 2015 11:54:13 -0700 Subject: [PATCH 8/9] [SPARK-7664] [WEBUI] DAG visualization: Fix incorrect link paths of DAG. In JobPage, we can jump a StagePage when we click corresponding box of DAG viz but the link path is incorrect. When we click a box like as follows ... ![screenshot_from_2015-05-15 19 24 25](https://cloud.githubusercontent.com/assets/4736016/7651528/5f7ef824-fb3c-11e4-9518-8c9ade2dff7a.png) We jump to index page. ![screenshot_from_2015-05-15 19 24 45](https://cloud.githubusercontent.com/assets/4736016/7651534/6d666274-fb3c-11e4-971c-c3f2dc2b1da2.png) Author: Kousuke Saruta Closes #6184 from sarutak/fix-link-path-of-dag-viz and squashes the following commits: faba3ba [Kousuke Saruta] Fix a incorrect link --- .../resources/org/apache/spark/ui/static/spark-dag-viz.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js index 8138eb0d4f390..ee48fd29a6432 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js @@ -186,8 +186,9 @@ function renderDagVizForJob(svgContainer) { var stageId = metadata.attr("stage-id"); var containerId = VizConstants.graphPrefix + stageId; // Link each graph to the corresponding stage page (TODO: handle stage attempts) - var stageLink = "/stages/stage/?id=" + - stageId.replace(VizConstants.stagePrefix, "") + "&attempt=0&expandDagViz=true"; + var stageLink = $("#stage-" + stageId.replace(VizConstants.stagePrefix, "") + "-0") + .find("a") + .attr("href") + "&expandDagViz=true"; var container = svgContainer .append("a") .attr("xlink:href", stageLink) From 8e3822a0794b8b18436bd63d6859d40139a77090 Mon Sep 17 00:00:00 2001 From: ehnalis Date: Fri, 15 May 2015 12:14:02 -0700 Subject: [PATCH 9/9] [SPARK-7504] [YARN] NullPointerException when initializing SparkContext in YARN-cluster mode Added a simple checking for SparkContext. Also added two rational checking against null at AM object. Author: ehnalis Closes #6083 from ehnalis/cluster and squashes the following commits: 926bd96 [ehnalis] Moved check to SparkContext. 7c89b6e [ehnalis] Remove false line. ea2a5fe [ehnalis] [SPARK-7504] [YARN] NullPointerException when initializing SparkContext in YARN-cluster mode 4924e01 [ehnalis] [SPARK-7504] [YARN] NullPointerException when initializing SparkContext in YARN-cluster mode 39e4fa3 [ehnalis] SPARK-7504 [YARN] NullPointerException when initializing SparkContext in YARN-cluster mode 9f287c5 [ehnalis] [SPARK-7504] [YARN] NullPointerException when initializing SparkContext in YARN-cluster mode --- core/src/main/scala/org/apache/spark/SparkContext.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b59f562d05ead..af276e7b8d40c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -371,6 +371,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli throw new SparkException("An application name must be set in your configuration") } + // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster + // yarn-standalone is deprecated, but still supported + if ((master == "yarn-cluster" || master == "yarn-standalone") && + !_conf.contains("spark.yarn.app.id")) { + throw new SparkException("Detected yarn-cluster mode, but isn't running on a cluster. " + + "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.") + } + if (_conf.getBoolean("spark.logConf", false)) { logInfo("Spark configuration:\n" + _conf.toDebugString) }