diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 7a16ee8742dc0..5e0d5c15d7069 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -934,6 +934,12 @@ for details. Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network. + + repartitionAndSortWithinPartitions(partitioner) + Repartition the RDD according to the given partitioner and, within each resulting partition, + sort records by their keys. This is more efficient than calling repartition and then sorting within + each partition because it can push the sorting down into the shuffle machinery. + ### Actions diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 24a68bb083334..85d446b9da0e7 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -146,7 +146,7 @@ describes the various methods for loading data into a SchemaRDD. Spark SQL supports two different methods for converting existing RDDs into SchemaRDDs. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This -reflection based approach leads to more concise code and works well when you already know the schema +reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application. The second method for creating SchemaRDDs is through a programmatic interface that allows you to @@ -566,7 +566,7 @@ for teenName in teenNames.collect(): ### Configuration -Configuration of Parquet can be done using the `setConf` method on SQLContext or by running +Configuration of Parquet can be done using the `setConf` method on SQLContext or by running `SET key=value` commands using SQL. @@ -575,8 +575,8 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or @@ -591,10 +591,20 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or + + + + + @@ -945,7 +955,7 @@ options. ## Migration Guide for Shark User -### Scheduling +### Scheduling To set a [Fair Scheduler](job-scheduling.html#fair-scheduler-pools) pool for a JDBC client session, users can set the `spark.sql.thriftserver.scheduler.pool` variable: @@ -992,7 +1002,7 @@ Several caching related features are not supported yet: ## Compatibility with Apache Hive Spark SQL is designed to be compatible with the Hive Metastore, SerDes and UDFs. Currently Spark -SQL is based on Hive 0.12.0. +SQL is based on Hive 0.12.0 and 0.13.1. #### Deploying in Existing Hive Warehouses @@ -1031,6 +1041,7 @@ Spark SQL supports the vast majority of Hive features, such as: * Sampling * Explain * Partitioned tables +* View * All Hive DDL Functions, including: * `CREATE TABLE` * `CREATE TABLE AS SELECT` @@ -1046,6 +1057,7 @@ Spark SQL supports the vast majority of Hive features, such as: * `STRING` * `BINARY` * `TIMESTAMP` + * `DATE` * `ARRAY<>` * `MAP<>` * `STRUCT<>` @@ -1146,6 +1158,7 @@ evaluated by the SQL execution engine. A full list of the functions supported c * Datetime type - `TimestampType`: Represents values comprising values of fields year, month, day, hour, minute, and second. + - `DateType`: Represents values comprising values of fields year, month, day. * Complex types - `ArrayType(elementType, containsNull)`: Represents values comprising a sequence of elements with the type of `elementType`. `containsNull` is used to indicate if @@ -1253,6 +1266,13 @@ import org.apache.spark.sql._ TimestampType + + + + + @@ -1379,6 +1399,13 @@ please use factory methods provided in DataType.TimestampType + + + + + @@ -1526,6 +1553,13 @@ from pyspark.sql import * TimestampType() + + + + + diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala index 227acc117502d..138923c4d7f2f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -29,9 +29,10 @@ object HiveFromSpark { val sc = new SparkContext(sparkConf) val path = s"${System.getenv("SPARK_HOME")}/examples/src/main/resources/kv1.txt" - // A local hive context creates an instance of the Hive Metastore in process, storing - // the warehouse data in the current directory. This location can be overridden by - // specifying a second parameter to the constructor. + // A hive context adds support for finding tables in the MetaStore and writing queries + // using HiveQL. Users who do not have an existing Hive deployment can still create a + // HiveContext. When not configured by the hive-site.xml, the context automatically + // creates metastore_db and warehouse in the current directory. val hiveContext = new HiveContext(sc) import hiveContext._ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala index b198ed9936d95..f1a1ca6616a21 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala @@ -97,10 +97,10 @@ class SqlLexical(val keywords: Seq[String]) extends StdLexical { /** Generate all variations of upper and lower case of a given string */ def allCaseVersions(s: String, prefix: String = ""): Stream[String] = { - if (s == "") { + if (s.isEmpty) { Stream(prefix) } else { - allCaseVersions(s.tail, prefix + s.head.toLower) ++ + allCaseVersions(s.tail, prefix + s.head.toLower) #::: allCaseVersions(s.tail, prefix + s.head.toUpper) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index dc1d349f10f1b..a2bcd73b6074f 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -277,7 +277,8 @@ class SqlParser extends AbstractSparkSQLParser { | SUM ~> "(" ~> DISTINCT ~> expression <~ ")" ^^ { case exp => SumDistinct(exp) } | COUNT ~ "(" ~> "*" <~ ")" ^^ { case _ => Count(Literal(1)) } | COUNT ~ "(" ~> expression <~ ")" ^^ { case exp => Count(exp) } - | COUNT ~> "(" ~> DISTINCT ~> expression <~ ")" ^^ { case exp => CountDistinct(exp :: Nil) } + | COUNT ~> "(" ~> DISTINCT ~> repsep(expression, ",") <~ ")" ^^ + { case exps => CountDistinct(exps) } | APPROXIMATE ~ COUNT ~ "(" ~ DISTINCT ~> expression <~ ")" ^^ { case exp => ApproxCountDistinct(exp) } | APPROXIMATE ~> "(" ~> floatLit ~ ")" ~ COUNT ~ "(" ~ DISTINCT ~ expression <~ ")" ^^ @@ -340,18 +341,13 @@ class SqlParser extends AbstractSparkSQLParser { | floatLit ^^ { f => Literal(f.toDouble) } ) - private val longMax = BigDecimal(s"${Long.MaxValue}") - private val longMin = BigDecimal(s"${Long.MinValue}") - private val intMax = BigDecimal(s"${Int.MaxValue}") - private val intMin = BigDecimal(s"${Int.MinValue}") - private def toNarrowestIntegerType(value: String) = { val bigIntValue = BigDecimal(value) bigIntValue match { - case v if v < longMin || v > longMax => v - case v if v < intMin || v > intMax => v.toLong - case v => v.toInt + case v if bigIntValue.isValidInt => v.toIntExact + case v if bigIntValue.isValidLong => v.toLongExact + case v => v } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 452baab8eb889..c6d4dabf83bc4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -225,6 +225,8 @@ class SchemaRDD( * {{{ * schemaRDD.limit(10) * }}} + * + * @group Query */ def limit(limitNum: Int): SchemaRDD = new SchemaRDD(sqlContext, Limit(Literal(limitNum), logicalPlan)) @@ -355,6 +357,8 @@ class SchemaRDD( * Return the number of elements in the RDD. Unlike the base RDD implementation of count, this * implementation leverages the query optimizer to compute the count on the SchemaRDD, which * supports features such as filter pushdown. + * + * @group Query */ @Experimental override def count(): Long = aggregate(Count(Literal(1))).collect().head.getLong(0) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index bea12e6dd674e..9b89c3bfb3307 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -39,8 +39,8 @@ import scala.collection.JavaConversions._ /** * Allows creation of parquet based tables using the syntax - * `CREATE TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option required - * is `path`, which should be the location of a collection of, optionally partitioned, + * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option + * required is `path`, which should be the location of a collection of, optionally partitioned, * parquet files. */ class DefaultSource extends RelationProvider { @@ -49,7 +49,7 @@ class DefaultSource extends RelationProvider { sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val path = - parameters.getOrElse("path", sys.error("'path' must be specifed for parquet tables.")) + parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables.")) ParquetRelation2(path)(sqlContext) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 9168ca2fc6fec..ca510cb0b07e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -67,7 +67,7 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected lazy val ddl: Parser[LogicalPlan] = createTable /** - * CREATE FOREIGN TEMPORARY TABLE avroTable + * CREATE TEMPORARY TABLE avroTable * USING org.apache.spark.sql.avro * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro") */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 84ee3051eb682..f83e647014193 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -992,4 +992,11 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { "nulldata2 on nulldata1.value <=> nulldata2.value"), (1 to 2).map(i => Seq(i))) } + + test("Multi-column COUNT(DISTINCT ...)") { + val data = TestData(1,"val_1") :: TestData(2,"val_2") :: Nil + val rdd = sparkContext.parallelize((0 to 1).map(i => data(i))) + rdd.registerTempTable("distinctData") + checkAnswer(sql("SELECT COUNT(DISTINCT key,value) FROM distinctData"), 2) + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index b9283f668a9b5..f4c42bbc5b03d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -379,7 +379,7 @@ private[hive] object HiveQl { protected def nameExpressions(exprs: Seq[Expression]): Seq[NamedExpression] = { exprs.zipWithIndex.map { case (ne: NamedExpression, _) => ne - case (e, i) => Alias(e, s"c_$i")() + case (e, i) => Alias(e, s"_c$i")() } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index b255a2ebb9778..fecf8faaf4cda 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -279,7 +279,7 @@ private[hive] case class HiveGenericUdtf( } override protected def makeOutput() = { - // Use column names when given, otherwise c_1, c_2, ... c_n. + // Use column names when given, otherwise _c1, _c2, ... _cn. if (aliasNames.size == outputDataTypes.size) { aliasNames.zip(outputDataTypes).map { case (attrName, attrDataType) => @@ -288,7 +288,7 @@ private[hive] case class HiveGenericUdtf( } else { outputDataTypes.zipWithIndex.map { case (attrDataType, i) => - AttributeReference(s"c_$i", attrDataType, nullable = true)() + AttributeReference(s"_c$i", attrDataType, nullable = true)() } } }
spark.sql.parquet.binaryAsString false - Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do - not differentiate between binary data and strings when writing out the Parquet schema. This + Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do + not differentiate between binary data and strings when writing out the Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
spark.sql.parquet.compression.codec gzip - Sets the compression codec use when writing Parquet files. Acceptable values include: + Sets the compression codec use when writing Parquet files. Acceptable values include: uncompressed, snappy, gzip, lzo.
spark.sql.parquet.filterPushdownfalse + Turn on Parquet filter pushdown optimization. This feature is turned off by default because of a known + bug in Paruet 1.6.0rc3 (PARQUET-136). + However, if your table doesn't contain any nullable string or binary columns, it's still safe to turn + this feature on. +
spark.sql.hive.convertMetastoreParquet true
DateType java.sql.Date + DateType +
ArrayType scala.collection.Seq
DateType java.sql.Date + DataType.DateType +
ArrayType java.util.List
DateType datetime.date + DateType() +
ArrayType list, tuple, or array