From 32fad4233f353814496c84e15ba64326730b7ae7 Mon Sep 17 00:00:00 2001 From: Brenden Matthews Date: Sun, 5 Oct 2014 09:49:24 -0700 Subject: [PATCH 01/15] [SPARK-3597][Mesos] Implement `killTask`. The MesosSchedulerBackend did not previously implement `killTask`, resulting in an exception. Author: Brenden Matthews Closes #2453 from brndnmtthws/implement-killtask and squashes the following commits: 23ddcdc [Brenden Matthews] [SPARK-3597][Mesos] Implement `killTask`. --- .../scheduler/cluster/mesos/MesosSchedulerBackend.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index b11786368e661..e0f2fd622f54c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -372,6 +372,13 @@ private[spark] class MesosSchedulerBackend( recordSlaveLost(d, slaveId, ExecutorExited(status)) } + override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = { + driver.killTask( + TaskID.newBuilder() + .setValue(taskId.toString).build() + ) + } + // TODO: query Mesos for number of cores override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8) From a7c73130f1b6b0b8b19a7b0a0de5c713b673cd7b Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 5 Oct 2014 09:55:17 -0700 Subject: [PATCH 02/15] SPARK-1656: Fix potential resource leaks JIRA: https://issues.apache.org/jira/browse/SPARK-1656 Author: zsxwing Closes #577 from zsxwing/SPARK-1656 and squashes the following commits: c431095 [zsxwing] Add a comment and fix the code style 2de96e5 [zsxwing] Make sure file will be deleted if exception happens 28b90dc [zsxwing] Update to follow the code style 4521d6e [zsxwing] Merge branch 'master' into SPARK-1656 afc3383 [zsxwing] Update to follow the code style 071fdd1 [zsxwing] SPARK-1656: Fix potential resource leaks --- .../spark/broadcast/HttpBroadcast.scala | 25 +++++++++++-------- .../master/FileSystemPersistenceEngine.scala | 14 ++++++++--- .../org/apache/spark/storage/DiskStore.scala | 16 +++++++++++- 3 files changed, 40 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 942dc7d7eac87..4cd4f4f96fd16 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -163,18 +163,23 @@ private[broadcast] object HttpBroadcast extends Logging { private def write(id: Long, value: Any) { val file = getFile(id) - val out: OutputStream = { - if (compress) { - compressionCodec.compressedOutputStream(new FileOutputStream(file)) - } else { - new BufferedOutputStream(new FileOutputStream(file), bufferSize) + val fileOutputStream = new FileOutputStream(file) + try { + val out: OutputStream = { + if (compress) { + compressionCodec.compressedOutputStream(fileOutputStream) + } else { + new BufferedOutputStream(fileOutputStream, bufferSize) + } } + val ser = SparkEnv.get.serializer.newInstance() + val serOut = ser.serializeStream(out) + serOut.writeObject(value) + serOut.close() + files += file + } finally { + fileOutputStream.close() } - val ser = SparkEnv.get.serializer.newInstance() - val serOut = ser.serializeStream(out) - serOut.writeObject(value) - serOut.close() - files += file } private def read[T: ClassTag](id: Long): T = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index aa85aa060d9c1..08a99bbe68578 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -83,15 +83,21 @@ private[spark] class FileSystemPersistenceEngine( val serialized = serializer.toBinary(value) val out = new FileOutputStream(file) - out.write(serialized) - out.close() + try { + out.write(serialized) + } finally { + out.close() + } } def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = { val fileData = new Array[Byte](file.length().asInstanceOf[Int]) val dis = new DataInputStream(new FileInputStream(file)) - dis.readFully(fileData) - dis.close() + try { + dis.readFully(fileData) + } finally { + dis.close() + } val clazz = m.runtimeClass.asInstanceOf[Class[T]] val serializer = serialization.serializerFor(clazz) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index e9304f6bb45d0..bac459e835a3f 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -73,7 +73,21 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) val outputStream = new FileOutputStream(file) - blockManager.dataSerializeStream(blockId, outputStream, values) + try { + try { + blockManager.dataSerializeStream(blockId, outputStream, values) + } finally { + // Close outputStream here because it should be closed before file is deleted. + outputStream.close() + } + } catch { + case e: Throwable => + if (file.exists()) { + file.delete() + } + throw e + } + val length = file.length val timeTaken = System.currentTimeMillis - startTime From 1b97a941a09a2f63d442f435c1b444d857cd6956 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 5 Oct 2014 11:19:17 -0700 Subject: [PATCH 03/15] [SPARK-3007][SQL] Fixes dynamic partitioning support for lower Hadoop versions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is a follow up of #2226 and #2616 to fix Jenkins master SBT build failures for lower Hadoop versions (1.0.x and 2.0.x). The root cause is the semantics difference of `FileSystem.globStatus()` between different versions of Hadoop, as illustrated by the following test code: ```scala object GlobExperiments extends App { val conf = new Configuration() val fs = FileSystem.getLocal(conf) fs.globStatus(new Path("/tmp/wh/*/*/*")).foreach { status => println(status.getPath) } } ``` Target directory structure: ``` /tmp/wh ├── dir0 │   ├── dir1 │   │   └── level2 │   └── level1 └── level0 ``` Hadoop 2.4.1 result: ``` file:/tmp/wh/dir0/dir1/level2 ``` Hadoop 1.0.4 resuet: ``` file:/tmp/wh/dir0/dir1/level2 file:/tmp/wh/dir0/level1 file:/tmp/wh/level0 ``` In #2226 and #2616, we call `FileOutputCommitter.commitJob()` at the end of the job, and the `_SUCCESS` mark file is written. When working with lower Hadoop versions, due to the `globStatus()` semantics issue, `_SUCCESS` is included as a separate partition data file by `Hive.loadDynamicPartitions()`, and fails partition spec checking. The fix introduced in this PR is kind of a hack: when inserting data with dynamic partitioning, we intentionally avoid writing the `_SUCCESS` marker to workaround this issue. Hive doesn't suffer this issue because `FileSinkOperator` doesn't call `FileOutputCommitter.commitJob()`, instead, it calls `Utilities.mvFileToFinalPath()` to cleanup the output directory and then loads it into Hive warehouse by with `loadDynamicPartitions()`/`loadPartition()`/`loadTable()`. This approach is better because it handles failed job and speculative tasks properly. We should add this step to `InsertIntoHiveTable` in another PR. Author: Cheng Lian Closes #2663 from liancheng/dp-hadoop-1-fix and squashes the following commits: 0177dae [Cheng Lian] Fixes dynamic partitioning support for lower Hadoop versions --- .../spark/sql/hive/hiveWriterContainers.scala | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index ac5c7a8220296..6ccbc22a4acfb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -55,8 +55,8 @@ private[hive] class SparkHiveWriterContainer( private var taID: SerializableWritable[TaskAttemptID] = null @transient private var writer: FileSinkOperator.RecordWriter = null - @transient private lazy val committer = conf.value.getOutputCommitter - @transient private lazy val jobContext = newJobContext(conf.value, jID.value) + @transient protected lazy val committer = conf.value.getOutputCommitter + @transient protected lazy val jobContext = newJobContext(conf.value, jID.value) @transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value) @transient private lazy val outputFormat = conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef,Writable]] @@ -122,8 +122,6 @@ private[hive] class SparkHiveWriterContainer( } } - // ********* Private Functions ********* - private def setIDs(jobId: Int, splitId: Int, attemptId: Int) { jobID = jobId splitID = splitId @@ -157,12 +155,18 @@ private[hive] object SparkHiveWriterContainer { } } +private[spark] object SparkHiveDynamicPartitionWriterContainer { + val SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = "mapreduce.fileoutputcommitter.marksuccessfuljobs" +} + private[spark] class SparkHiveDynamicPartitionWriterContainer( @transient jobConf: JobConf, fileSinkConf: FileSinkDesc, dynamicPartColNames: Array[String]) extends SparkHiveWriterContainer(jobConf, fileSinkConf) { + import SparkHiveDynamicPartitionWriterContainer._ + private val defaultPartName = jobConf.get( ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal) @@ -179,6 +183,20 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( commit() } + override def commitJob(): Unit = { + // This is a hack to avoid writing _SUCCESS mark file. In lower versions of Hadoop (e.g. 1.0.4), + // semantics of FileSystem.globStatus() is different from higher versions (e.g. 2.4.1) and will + // include _SUCCESS file when glob'ing for dynamic partition data files. + // + // Better solution is to add a step similar to what Hive FileSinkOperator.jobCloseOp does: + // calling something like Utilities.mvFileToFinalPath to cleanup the output directory and then + // load it with loadDynamicPartitions/loadPartition/loadTable. + val oldMarker = jobConf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true) + jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false) + super.commitJob() + jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker) + } + override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = { val dynamicPartPath = dynamicPartColNames .zip(row.takeRight(dynamicPartColNames.length)) From e222221e24c122300bbde6d5ec4002a7c42b2e24 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 5 Oct 2014 13:22:40 -0700 Subject: [PATCH 04/15] HOTFIX: Fix unicode error in merge script. The merge script builds up a big command array and sometimes this contains both unicode and ascii strings. This doesn't work if you try to join them into a single string. Longer term a solution is to go and make sure the source of all strings is unicode. This patch provides a simpler solution... just print the array rather than joining. I actually prefer printing an array here anyways since joining on spaces is lossy in the case of arguments that themselves contain spaces. Author: Patrick Wendell Closes #2645 from pwendell/merge-script and squashes the following commits: 167b792 [Patrick Wendell] HOTFIX: Fix unicode error in merge script. --- dev/merge_spark_pr.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py index a8e92e36fe0d8..02ac20984add9 100755 --- a/dev/merge_spark_pr.py +++ b/dev/merge_spark_pr.py @@ -73,11 +73,10 @@ def fail(msg): def run_cmd(cmd): + print cmd if isinstance(cmd, list): - print " ".join(cmd) return subprocess.check_output(cmd) else: - print cmd return subprocess.check_output(cmd.split(" ")) From 79b2108de30bf91c8e58bb36405d334aeb2a00ad Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 5 Oct 2014 17:44:38 -0700 Subject: [PATCH 05/15] [Minor] Trivial fix to make codes more readable It should just use `maxResults` there. Author: Liang-Chi Hsieh Closes #2654 from viirya/trivial_fix and squashes the following commits: 1362289 [Liang-Chi Hsieh] Trivial fix to make codes more readable. --- .../src/main/scala/org/apache/spark/sql/hive/HiveContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 8bcc098bbb620..fad3b39f81413 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -268,7 +268,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { */ protected[sql] def runSqlHive(sql: String): Seq[String] = { val maxResults = 100000 - val results = runHive(sql, 100000) + val results = runHive(sql, maxResults) // It is very confusing when you only get back some of the results... if (results.size == maxResults) sys.error("RESULTS POSSIBLY TRUNCATED") results From 58f5361caaa2f898e38ae4b3794167881e20a818 Mon Sep 17 00:00:00 2001 From: scwf Date: Sun, 5 Oct 2014 17:47:20 -0700 Subject: [PATCH 06/15] [SPARK-3792][SQL] Enable JavaHiveQLSuite Do not use TestSQLContext in JavaHiveQLSuite, that may lead to two SparkContexts in one jvm and enable JavaHiveQLSuite Author: scwf Closes #2652 from scwf/fix-JavaHiveQLSuite and squashes the following commits: be35c91 [scwf] enable JavaHiveQLSuite --- .../sql/hive/api/java/JavaHiveQLSuite.scala | 27 +++++++------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala index 9644b707eb1a0..46b11b582b26d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala @@ -25,34 +25,30 @@ import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.api.java.JavaSchemaRDD import org.apache.spark.sql.execution.ExplainCommand import org.apache.spark.sql.hive.test.TestHive -import org.apache.spark.sql.test.TestSQLContext // Implicits import scala.collection.JavaConversions._ class JavaHiveQLSuite extends FunSuite { - lazy val javaCtx = new JavaSparkContext(TestSQLContext.sparkContext) + lazy val javaCtx = new JavaSparkContext(TestHive.sparkContext) // There is a little trickery here to avoid instantiating two HiveContexts in the same JVM lazy val javaHiveCtx = new JavaHiveContext(javaCtx) { override val sqlContext = TestHive } - ignore("SELECT * FROM src") { + test("SELECT * FROM src") { assert( javaHiveCtx.sql("SELECT * FROM src").collect().map(_.getInt(0)) === TestHive.sql("SELECT * FROM src").collect().map(_.getInt(0)).toSeq) } - private val explainCommandClassName = - classOf[ExplainCommand].getSimpleName.stripSuffix("$") - def isExplanation(result: JavaSchemaRDD) = { val explanation = result.collect().map(_.getString(0)) - explanation.size > 1 && explanation.head.startsWith(explainCommandClassName) + explanation.size > 1 && explanation.head.startsWith("== Physical Plan ==") } - ignore("Query Hive native command execution result") { + test("Query Hive native command execution result") { val tableName = "test_native_commands" assertResult(0) { @@ -63,23 +59,18 @@ class JavaHiveQLSuite extends FunSuite { javaHiveCtx.sql(s"CREATE TABLE $tableName(key INT, value STRING)").count() } - javaHiveCtx.sql("SHOW TABLES").registerTempTable("show_tables") - assert( javaHiveCtx - .sql("SELECT result FROM show_tables") + .sql("SHOW TABLES") .collect() .map(_.getString(0)) .contains(tableName)) - assertResult(Array(Array("key", "int", "None"), Array("value", "string", "None"))) { - javaHiveCtx.sql(s"DESCRIBE $tableName").registerTempTable("describe_table") - - + assertResult(Array(Array("key", "int"), Array("value", "string"))) { javaHiveCtx - .sql("SELECT result FROM describe_table") + .sql(s"describe $tableName") .collect() - .map(_.getString(0).split("\t").map(_.trim)) + .map(row => Array(row.get(0).asInstanceOf[String], row.get(1).asInstanceOf[String])) .toArray } @@ -89,7 +80,7 @@ class JavaHiveQLSuite extends FunSuite { TestHive.reset() } - ignore("Exactly once semantics for DDL and command statements") { + test("Exactly once semantics for DDL and command statements") { val tableName = "test_exactly_once" val q0 = javaHiveCtx.sql(s"CREATE TABLE $tableName(key INT, value STRING)") From 34b97a067d1b370fbed8ecafab2f48501a35d783 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 5 Oct 2014 17:51:59 -0700 Subject: [PATCH 07/15] [SPARK-3645][SQL] Makes table caching eager by default and adds syntax for lazy caching Although lazy caching for in-memory table seems consistent with the `RDD.cache()` API, it's relatively confusing for users who mainly work with SQL and not familiar with Spark internals. The `CACHE TABLE t; SELECT COUNT(*) FROM t;` pattern is also commonly seen just to ensure predictable performance. This PR makes both the `CACHE TABLE t [AS SELECT ...]` statement and the `SQLContext.cacheTable()` API eager by default, and adds a new `CACHE LAZY TABLE t [AS SELECT ...]` syntax to provide lazy in-memory table caching. Also, took the chance to make some refactoring: `CacheCommand` and `CacheTableAsSelectCommand` are now merged and renamed to `CacheTableCommand` since the former is strictly a special case of the latter. A new `UncacheTableCommand` is added for the `UNCACHE TABLE t` statement. Author: Cheng Lian Closes #2513 from liancheng/eager-caching and squashes the following commits: fe92287 [Cheng Lian] Makes table caching eager by default and adds syntax for lazy caching --- .../apache/spark/sql/catalyst/SqlParser.scala | 45 +++--- .../spark/sql/catalyst/analysis/Catalog.scala | 2 +- .../sql/catalyst/plans/logical/commands.scala | 15 +- .../org/apache/spark/sql/CacheManager.scala | 9 +- .../columnar/InMemoryColumnarTableScan.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 8 +- .../apache/spark/sql/execution/commands.scala | 47 +++--- .../apache/spark/sql/CachedTableSuite.scala | 145 +++++++++++++----- .../spark/sql/hive/ExtendedHiveQlParser.scala | 66 ++++---- .../org/apache/spark/sql/hive/TestHive.scala | 6 +- .../spark/sql/hive/CachedTableSuite.scala | 78 ++++++++-- 11 files changed, 265 insertions(+), 158 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 26336332c05a2..854b5b461bdc8 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -67,11 +67,12 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected implicit def asParser(k: Keyword): Parser[String] = lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _) + protected val ABS = Keyword("ABS") protected val ALL = Keyword("ALL") protected val AND = Keyword("AND") + protected val APPROXIMATE = Keyword("APPROXIMATE") protected val AS = Keyword("AS") protected val ASC = Keyword("ASC") - protected val APPROXIMATE = Keyword("APPROXIMATE") protected val AVG = Keyword("AVG") protected val BETWEEN = Keyword("BETWEEN") protected val BY = Keyword("BY") @@ -80,9 +81,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val COUNT = Keyword("COUNT") protected val DESC = Keyword("DESC") protected val DISTINCT = Keyword("DISTINCT") + protected val EXCEPT = Keyword("EXCEPT") protected val FALSE = Keyword("FALSE") protected val FIRST = Keyword("FIRST") - protected val LAST = Keyword("LAST") protected val FROM = Keyword("FROM") protected val FULL = Keyword("FULL") protected val GROUP = Keyword("GROUP") @@ -91,42 +92,42 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val IN = Keyword("IN") protected val INNER = Keyword("INNER") protected val INSERT = Keyword("INSERT") + protected val INTERSECT = Keyword("INTERSECT") protected val INTO = Keyword("INTO") protected val IS = Keyword("IS") protected val JOIN = Keyword("JOIN") + protected val LAST = Keyword("LAST") + protected val LAZY = Keyword("LAZY") protected val LEFT = Keyword("LEFT") + protected val LIKE = Keyword("LIKE") protected val LIMIT = Keyword("LIMIT") + protected val LOWER = Keyword("LOWER") protected val MAX = Keyword("MAX") protected val MIN = Keyword("MIN") protected val NOT = Keyword("NOT") protected val NULL = Keyword("NULL") protected val ON = Keyword("ON") protected val OR = Keyword("OR") - protected val OVERWRITE = Keyword("OVERWRITE") - protected val LIKE = Keyword("LIKE") - protected val RLIKE = Keyword("RLIKE") - protected val UPPER = Keyword("UPPER") - protected val LOWER = Keyword("LOWER") - protected val REGEXP = Keyword("REGEXP") protected val ORDER = Keyword("ORDER") protected val OUTER = Keyword("OUTER") + protected val OVERWRITE = Keyword("OVERWRITE") + protected val REGEXP = Keyword("REGEXP") protected val RIGHT = Keyword("RIGHT") + protected val RLIKE = Keyword("RLIKE") protected val SELECT = Keyword("SELECT") protected val SEMI = Keyword("SEMI") + protected val SQRT = Keyword("SQRT") protected val STRING = Keyword("STRING") + protected val SUBSTR = Keyword("SUBSTR") + protected val SUBSTRING = Keyword("SUBSTRING") protected val SUM = Keyword("SUM") protected val TABLE = Keyword("TABLE") protected val TIMESTAMP = Keyword("TIMESTAMP") protected val TRUE = Keyword("TRUE") protected val UNCACHE = Keyword("UNCACHE") protected val UNION = Keyword("UNION") + protected val UPPER = Keyword("UPPER") protected val WHERE = Keyword("WHERE") - protected val INTERSECT = Keyword("INTERSECT") - protected val EXCEPT = Keyword("EXCEPT") - protected val SUBSTR = Keyword("SUBSTR") - protected val SUBSTRING = Keyword("SUBSTRING") - protected val SQRT = Keyword("SQRT") - protected val ABS = Keyword("ABS") // Use reflection to find the reserved words defined in this class. protected val reservedWords = @@ -183,17 +184,15 @@ class SqlParser extends StandardTokenParsers with PackratParsers { } protected lazy val cache: Parser[LogicalPlan] = - CACHE ~ TABLE ~> ident ~ opt(AS ~> select) <~ opt(";") ^^ { - case tableName ~ None => - CacheCommand(tableName, true) - case tableName ~ Some(plan) => - CacheTableAsSelectCommand(tableName, plan) + CACHE ~> opt(LAZY) ~ (TABLE ~> ident) ~ opt(AS ~> select) <~ opt(";") ^^ { + case isLazy ~ tableName ~ plan => + CacheTableCommand(tableName, plan, isLazy.isDefined) } - + protected lazy val unCache: Parser[LogicalPlan] = UNCACHE ~ TABLE ~> ident <~ opt(";") ^^ { - case tableName => CacheCommand(tableName, false) - } + case tableName => UncacheTableCommand(tableName) + } protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",") @@ -283,7 +282,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { termExpression ~ ">=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => GreaterThanOrEqual(e1, e2) } | termExpression ~ "!=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => Not(EqualTo(e1, e2)) } | termExpression ~ "<>" ~ termExpression ^^ { case e1 ~ _ ~ e2 => Not(EqualTo(e1, e2)) } | - termExpression ~ BETWEEN ~ termExpression ~ AND ~ termExpression ^^ { + termExpression ~ BETWEEN ~ termExpression ~ AND ~ termExpression ^^ { case e ~ _ ~ el ~ _ ~ eu => And(GreaterThanOrEqual(e, el), LessThanOrEqual(e, eu)) } | termExpression ~ RLIKE ~ termExpression ^^ { case e1 ~ _ ~ e2 => RLike(e1, e2) } | diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index 616f1e2ecb60f..2059a91ba0612 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -87,7 +87,7 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog { tableName: String, alias: Option[String] = None): LogicalPlan = { val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName) - val table = tables.get(tblName).getOrElse(sys.error(s"Table Not Found: $tableName")) + val table = tables.getOrElse(tblName, sys.error(s"Table Not Found: $tableName")) val tableWithQualifiers = Subquery(tblName, table) // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala index 8366639fa0e8b..9a3848cfc6b62 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala @@ -56,9 +56,15 @@ case class ExplainCommand(plan: LogicalPlan, extended: Boolean = false) extends } /** - * Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command. + * Returned for the "CACHE TABLE tableName [AS SELECT ...]" command. */ -case class CacheCommand(tableName: String, doCache: Boolean) extends Command +case class CacheTableCommand(tableName: String, plan: Option[LogicalPlan], isLazy: Boolean) + extends Command + +/** + * Returned for the "UNCACHE TABLE tableName" command. + */ +case class UncacheTableCommand(tableName: String) extends Command /** * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command. @@ -75,8 +81,3 @@ case class DescribeCommand( AttributeReference("data_type", StringType, nullable = false)(), AttributeReference("comment", StringType, nullable = false)()) } - -/** - * Returned for the "CACHE TABLE tableName AS SELECT .." command. - */ -case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) extends Command diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala index aebdbb68e49b8..3bf7382ac67a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala @@ -91,14 +91,10 @@ private[sql] trait CacheManager { } /** Removes the data for the given SchemaRDD from the cache */ - private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = false): Unit = writeLock { + private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = true): Unit = writeLock { val planToCache = query.queryExecution.optimizedPlan val dataIndex = cachedData.indexWhere(_.plan.sameResult(planToCache)) - - if (dataIndex < 0) { - throw new IllegalArgumentException(s"Table $query is not cached.") - } - + require(dataIndex >= 0, s"Table $query is not cached.") cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking) cachedData.remove(dataIndex) } @@ -135,5 +131,4 @@ private[sql] trait CacheManager { case _ => } } - } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index cec82a7f2df94..4f79173a26f88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -111,7 +111,7 @@ private[sql] case class InMemoryRelation( override def newInstance() = { new InMemoryRelation( - output.map(_.newInstance), + output.map(_.newInstance()), useCompression, batchSize, storageLevel, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index cf93d5ad7b503..5c16d0c624128 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -304,10 +304,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { Seq(execution.SetCommand(key, value, plan.output)(context)) case logical.ExplainCommand(logicalPlan, extended) => Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context)) - case logical.CacheCommand(tableName, cache) => - Seq(execution.CacheCommand(tableName, cache)(context)) - case logical.CacheTableAsSelectCommand(tableName, plan) => - Seq(execution.CacheTableAsSelectCommand(tableName, plan)) + case logical.CacheTableCommand(tableName, optPlan, isLazy) => + Seq(execution.CacheTableCommand(tableName, optPlan, isLazy)) + case logical.UncacheTableCommand(tableName) => + Seq(execution.UncacheTableCommand(tableName)) case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index f88099ec0761e..d49633c24ad4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -138,49 +138,54 @@ case class ExplainCommand( * :: DeveloperApi :: */ @DeveloperApi -case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext) +case class CacheTableCommand( + tableName: String, + plan: Option[LogicalPlan], + isLazy: Boolean) extends LeafNode with Command { override protected lazy val sideEffectResult = { - if (doCache) { - context.cacheTable(tableName) - } else { - context.uncacheTable(tableName) + import sqlContext._ + + plan.foreach(_.registerTempTable(tableName)) + val schemaRDD = table(tableName) + schemaRDD.cache() + + if (!isLazy) { + // Performs eager caching + schemaRDD.count() } + Seq.empty[Row] } override def output: Seq[Attribute] = Seq.empty } + /** * :: DeveloperApi :: */ @DeveloperApi -case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])( - @transient context: SQLContext) - extends LeafNode with Command { - +case class UncacheTableCommand(tableName: String) extends LeafNode with Command { override protected lazy val sideEffectResult: Seq[Row] = { - Row("# Registered as a temporary table", null, null) +: - child.output.map(field => Row(field.name, field.dataType.toString, null)) + sqlContext.table(tableName).unpersist() + Seq.empty[Row] } + + override def output: Seq[Attribute] = Seq.empty } /** * :: DeveloperApi :: */ @DeveloperApi -case class CacheTableAsSelectCommand(tableName: String, logicalPlan: LogicalPlan) +case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])( + @transient context: SQLContext) extends LeafNode with Command { - - override protected[sql] lazy val sideEffectResult = { - import sqlContext._ - logicalPlan.registerTempTable(tableName) - cacheTable(tableName) - Seq.empty[Row] - } - override def output: Seq[Attribute] = Seq.empty - + override protected lazy val sideEffectResult: Seq[Row] = { + Row("# Registered as a temporary table", null, null) +: + child.output.map(field => Row(field.name, field.dataType.toString, null)) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 957388e99bd85..1e624f97004f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -18,30 +18,39 @@ package org.apache.spark.sql import org.apache.spark.sql.TestData._ -import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan} -import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} +import org.apache.spark.sql.test.TestSQLContext._ +import org.apache.spark.storage.RDDBlockId case class BigData(s: String) class CachedTableSuite extends QueryTest { - import TestSQLContext._ TestData // Load test tables. - /** - * Throws a test failed exception when the number of cached tables differs from the expected - * number. - */ def assertCached(query: SchemaRDD, numCachedTables: Int = 1): Unit = { val planWithCaching = query.queryExecution.withCachedData val cachedData = planWithCaching collect { case cached: InMemoryRelation => cached } - if (cachedData.size != numCachedTables) { - fail( - s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" + + assert( + cachedData.size == numCachedTables, + s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" + planWithCaching) - } + } + + def rddIdOf(tableName: String): Int = { + val executedPlan = table(tableName).queryExecution.executedPlan + executedPlan.collect { + case InMemoryColumnarTableScan(_, _, relation) => + relation.cachedColumnBuffers.id + case _ => + fail(s"Table $tableName is not cached\n" + executedPlan) + }.head + } + + def isMaterialized(rddId: Int): Boolean = { + sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty } test("too big for memory") { @@ -52,10 +61,33 @@ class CachedTableSuite extends QueryTest { uncacheTable("bigData") } - test("calling .cache() should use inmemory columnar caching") { + test("calling .cache() should use in-memory columnar caching") { table("testData").cache() + assertCached(table("testData")) + } + + test("calling .unpersist() should drop in-memory columnar cache") { + table("testData").cache() + table("testData").count() + table("testData").unpersist(true) + assertCached(table("testData"), 0) + } + + test("isCached") { + cacheTable("testData") assertCached(table("testData")) + assert(table("testData").queryExecution.withCachedData match { + case _: InMemoryRelation => true + case _ => false + }) + + uncacheTable("testData") + assert(!isCached("testData")) + assert(table("testData").queryExecution.withCachedData match { + case _: InMemoryRelation => false + case _ => true + }) } test("SPARK-1669: cacheTable should be idempotent") { @@ -64,32 +96,27 @@ class CachedTableSuite extends QueryTest { cacheTable("testData") assertCached(table("testData")) - cacheTable("testData") - table("testData").queryExecution.analyzed match { - case InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) => - fail("cacheTable is not idempotent") + assertResult(1, "InMemoryRelation not found, testData should have been cached") { + table("testData").queryExecution.withCachedData.collect { + case r: InMemoryRelation => r + }.size + } - case _ => + cacheTable("testData") + assertResult(0, "Double InMemoryRelations found, cacheTable() is not idempotent") { + table("testData").queryExecution.withCachedData.collect { + case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) => r + }.size } } test("read from cached table and uncache") { cacheTable("testData") - - checkAnswer( - table("testData"), - testData.collect().toSeq - ) - + checkAnswer(table("testData"), testData.collect().toSeq) assertCached(table("testData")) uncacheTable("testData") - - checkAnswer( - table("testData"), - testData.collect().toSeq - ) - + checkAnswer(table("testData"), testData.collect().toSeq) assertCached(table("testData"), 0) } @@ -99,10 +126,12 @@ class CachedTableSuite extends QueryTest { } } - test("SELECT Star Cached Table") { + test("SELECT star from cached table") { sql("SELECT * FROM testData").registerTempTable("selectStar") cacheTable("selectStar") - sql("SELECT * FROM selectStar WHERE key = 1").collect() + checkAnswer( + sql("SELECT * FROM selectStar WHERE key = 1"), + Seq(Row(1, "1"))) uncacheTable("selectStar") } @@ -120,23 +149,57 @@ class CachedTableSuite extends QueryTest { sql("CACHE TABLE testData") assertCached(table("testData")) - assert(isCached("testData"), "Table 'testData' should be cached") + val rddId = rddIdOf("testData") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") sql("UNCACHE TABLE testData") - assertCached(table("testData"), 0) assert(!isCached("testData"), "Table 'testData' should not be cached") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") } - - test("CACHE TABLE tableName AS SELECT Star Table") { + + test("CACHE TABLE tableName AS SELECT * FROM anotherTable") { sql("CACHE TABLE testCacheTable AS SELECT * FROM testData") - sql("SELECT * FROM testCacheTable WHERE key = 1").collect() - assert(isCached("testCacheTable"), "Table 'testCacheTable' should be cached") + assertCached(table("testCacheTable")) + + val rddId = rddIdOf("testCacheTable") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") + uncacheTable("testCacheTable") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") } - - test("'CACHE TABLE tableName AS SELECT ..'") { - sql("CACHE TABLE testCacheTable AS SELECT * FROM testData") - assert(isCached("testCacheTable"), "Table 'testCacheTable' should be cached") + + test("CACHE TABLE tableName AS SELECT ...") { + sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10") + assertCached(table("testCacheTable")) + + val rddId = rddIdOf("testCacheTable") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") + uncacheTable("testCacheTable") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + } + + test("CACHE LAZY TABLE tableName") { + sql("CACHE LAZY TABLE testData") + assertCached(table("testData")) + + val rddId = rddIdOf("testData") + assert( + !isMaterialized(rddId), + "Lazily cached in-memory table shouldn't be materialized eagerly") + + sql("SELECT COUNT(*) FROM testData").collect() + assert( + isMaterialized(rddId), + "Lazily cached in-memory table should have been materialized") + + uncacheTable("testData") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala index e7e1cb980c2ae..c5844e92eaaa9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala @@ -24,11 +24,11 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.SqlLexical /** - * A parser that recognizes all HiveQL constructs together with several Spark SQL specific + * A parser that recognizes all HiveQL constructs together with several Spark SQL specific * extensions like CACHE TABLE and UNCACHE TABLE. */ -private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with PackratParsers { - +private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with PackratParsers { + def apply(input: String): LogicalPlan = { // Special-case out set commands since the value fields can be // complex to handle without RegexParsers. Also this approach @@ -54,16 +54,17 @@ private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with Packr protected case class Keyword(str: String) - protected val CACHE = Keyword("CACHE") - protected val SET = Keyword("SET") protected val ADD = Keyword("ADD") - protected val JAR = Keyword("JAR") - protected val TABLE = Keyword("TABLE") protected val AS = Keyword("AS") - protected val UNCACHE = Keyword("UNCACHE") - protected val FILE = Keyword("FILE") + protected val CACHE = Keyword("CACHE") protected val DFS = Keyword("DFS") + protected val FILE = Keyword("FILE") + protected val JAR = Keyword("JAR") + protected val LAZY = Keyword("LAZY") + protected val SET = Keyword("SET") protected val SOURCE = Keyword("SOURCE") + protected val TABLE = Keyword("TABLE") + protected val UNCACHE = Keyword("UNCACHE") protected implicit def asParser(k: Keyword): Parser[String] = lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _) @@ -79,57 +80,56 @@ private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with Packr override val lexical = new SqlLexical(reservedWords) - protected lazy val query: Parser[LogicalPlan] = + protected lazy val query: Parser[LogicalPlan] = cache | uncache | addJar | addFile | dfs | source | hiveQl protected lazy val hiveQl: Parser[LogicalPlan] = - remainingQuery ^^ { - case r => HiveQl.createPlan(r.trim()) + restInput ^^ { + case statement => HiveQl.createPlan(statement.trim()) } - /** It returns all remaining query */ - protected lazy val remainingQuery: Parser[String] = new Parser[String] { + // Returns the whole input string + protected lazy val wholeInput: Parser[String] = new Parser[String] { def apply(in: Input) = - Success( - in.source.subSequence(in.offset, in.source.length).toString, - in.drop(in.source.length())) + Success(in.source.toString, in.drop(in.source.length())) } - /** It returns all query */ - protected lazy val allQuery: Parser[String] = new Parser[String] { + // Returns the rest of the input string that are not parsed yet + protected lazy val restInput: Parser[String] = new Parser[String] { def apply(in: Input) = - Success(in.source.toString, in.drop(in.source.length())) + Success( + in.source.subSequence(in.offset, in.source.length).toString, + in.drop(in.source.length())) } protected lazy val cache: Parser[LogicalPlan] = - CACHE ~ TABLE ~> ident ~ opt(AS ~> hiveQl) ^^ { - case tableName ~ None => CacheCommand(tableName, true) - case tableName ~ Some(plan) => - CacheTableAsSelectCommand(tableName, plan) + CACHE ~> opt(LAZY) ~ (TABLE ~> ident) ~ opt(AS ~> hiveQl) ^^ { + case isLazy ~ tableName ~ plan => + CacheTableCommand(tableName, plan, isLazy.isDefined) } protected lazy val uncache: Parser[LogicalPlan] = UNCACHE ~ TABLE ~> ident ^^ { - case tableName => CacheCommand(tableName, false) + case tableName => UncacheTableCommand(tableName) } protected lazy val addJar: Parser[LogicalPlan] = - ADD ~ JAR ~> remainingQuery ^^ { - case rq => AddJar(rq.trim()) + ADD ~ JAR ~> restInput ^^ { + case jar => AddJar(jar.trim()) } protected lazy val addFile: Parser[LogicalPlan] = - ADD ~ FILE ~> remainingQuery ^^ { - case rq => AddFile(rq.trim()) + ADD ~ FILE ~> restInput ^^ { + case file => AddFile(file.trim()) } protected lazy val dfs: Parser[LogicalPlan] = - DFS ~> allQuery ^^ { - case aq => NativeCommand(aq.trim()) + DFS ~> wholeInput ^^ { + case command => NativeCommand(command.trim()) } protected lazy val source: Parser[LogicalPlan] = - SOURCE ~> remainingQuery ^^ { - case rq => SourceCommand(rq.trim()) + SOURCE ~> restInput ^^ { + case file => SourceCommand(file.trim()) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index c0e69393cc2e3..a4354c1379c63 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.plans.logical.{CacheCommand, LogicalPlan, NativeCommand} +import org.apache.spark.sql.catalyst.plans.logical.{CacheTableCommand, LogicalPlan, NativeCommand} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.hive._ import org.apache.spark.sql.SQLConf @@ -67,7 +67,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { lazy val metastorePath = getTempFilePath("sparkHiveMetastore").getCanonicalPath /** Sets up the system initially or after a RESET command */ - protected def configure() { + protected def configure(): Unit = { setConf("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$metastorePath;create=true") setConf("hive.metastore.warehouse.dir", warehousePath) @@ -154,7 +154,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { override lazy val analyzed = { val describedTables = logical match { case NativeCommand(describedTable(tbl)) => tbl :: Nil - case CacheCommand(tbl, _) => tbl :: Nil + case CacheTableCommand(tbl, _, _) => tbl :: Nil case _ => Nil } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 158cfb5bbee7c..2060e1f1a7a4b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -17,13 +17,13 @@ package org.apache.spark.sql.hive -import org.apache.spark.sql.{QueryTest, SchemaRDD} -import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan} +import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.{QueryTest, SchemaRDD} +import org.apache.spark.storage.RDDBlockId class CachedTableSuite extends QueryTest { - import TestHive._ - /** * Throws a test failed exception when the number of cached tables differs from the expected * number. @@ -34,11 +34,24 @@ class CachedTableSuite extends QueryTest { case cached: InMemoryRelation => cached } - if (cachedData.size != numCachedTables) { - fail( - s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" + - planWithCaching) - } + assert( + cachedData.size == numCachedTables, + s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" + + planWithCaching) + } + + def rddIdOf(tableName: String): Int = { + val executedPlan = table(tableName).queryExecution.executedPlan + executedPlan.collect { + case InMemoryColumnarTableScan(_, _, relation) => + relation.cachedColumnBuffers.id + case _ => + fail(s"Table $tableName is not cached\n" + executedPlan) + }.head + } + + def isMaterialized(rddId: Int): Boolean = { + sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty } test("cache table") { @@ -102,16 +115,47 @@ class CachedTableSuite extends QueryTest { assert(!TestHive.isCached("src"), "Table 'src' should not be cached") } - test("CACHE TABLE AS SELECT") { - assertCached(sql("SELECT * FROM src"), 0) - sql("CACHE TABLE test AS SELECT key FROM src") + test("CACHE TABLE tableName AS SELECT * FROM anotherTable") { + sql("CACHE TABLE testCacheTable AS SELECT * FROM src") + assertCached(table("testCacheTable")) - checkAnswer( - sql("SELECT * FROM test"), - sql("SELECT key FROM src").collect().toSeq) + val rddId = rddIdOf("testCacheTable") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") - assertCached(sql("SELECT * FROM test")) + uncacheTable("testCacheTable") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + } + + test("CACHE TABLE tableName AS SELECT ...") { + sql("CACHE TABLE testCacheTable AS SELECT key FROM src LIMIT 10") + assertCached(table("testCacheTable")) + + val rddId = rddIdOf("testCacheTable") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") + + uncacheTable("testCacheTable") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + } - assertCached(sql("SELECT * FROM test JOIN test"), 2) + test("CACHE LAZY TABLE tableName") { + sql("CACHE LAZY TABLE src") + assertCached(table("src")) + + val rddId = rddIdOf("src") + assert( + !isMaterialized(rddId), + "Lazily cached in-memory table shouldn't be materialized eagerly") + + sql("SELECT COUNT(*) FROM src").collect() + assert( + isMaterialized(rddId), + "Lazily cached in-memory table should have been materialized") + + uncacheTable("src") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") } } From 90897ea5f24b03c9f3455a62c7f68b3d3f0435ad Mon Sep 17 00:00:00 2001 From: Renat Yusupov Date: Sun, 5 Oct 2014 17:56:24 -0700 Subject: [PATCH 08/15] [SPARK-3776][SQL] Wrong conversion to Catalyst for Option[Product] Author: Renat Yusupov Closes #2641 from r3natko/feature/catalyst_option and squashes the following commits: 55d0c06 [Renat Yusupov] [SQL] SPARK-3776: Wrong conversion to Catalyst for Option[Product] --- .../spark/sql/catalyst/ScalaReflection.scala | 2 +- .../sql/catalyst/ScalaReflectionSuite.scala | 21 ++++++++++++++++--- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 88a8fa7c28e0f..b3ae8e6779700 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -33,7 +33,7 @@ object ScalaReflection { /** Converts Scala objects to catalyst rows / types */ def convertToCatalyst(a: Any): Any = a match { - case o: Option[_] => o.orNull + case o: Option[_] => o.map(convertToCatalyst).orNull case s: Seq[_] => s.map(convertToCatalyst) case m: Map[_, _] => m.map { case (k, v) => convertToCatalyst(k) -> convertToCatalyst(v) } case p: Product => new GenericRow(p.productIterator.map(convertToCatalyst).toArray) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala index 428607d8c8253..488e373854bb3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala @@ -53,7 +53,8 @@ case class OptionalData( floatField: Option[Float], shortField: Option[Short], byteField: Option[Byte], - booleanField: Option[Boolean]) + booleanField: Option[Boolean], + structField: Option[PrimitiveData]) case class ComplexData( arrayField: Seq[Int], @@ -100,7 +101,7 @@ class ScalaReflectionSuite extends FunSuite { nullable = true)) } - test("optinal data") { + test("optional data") { val schema = schemaFor[OptionalData] assert(schema === Schema( StructType(Seq( @@ -110,7 +111,8 @@ class ScalaReflectionSuite extends FunSuite { StructField("floatField", FloatType, nullable = true), StructField("shortField", ShortType, nullable = true), StructField("byteField", ByteType, nullable = true), - StructField("booleanField", BooleanType, nullable = true))), + StructField("booleanField", BooleanType, nullable = true), + StructField("structField", schemaFor[PrimitiveData].dataType, nullable = true))), nullable = true)) } @@ -228,4 +230,17 @@ class ScalaReflectionSuite extends FunSuite { assert(ArrayType(IntegerType) === typeOfObject3(Seq(1, 2, 3))) assert(ArrayType(ArrayType(IntegerType)) === typeOfObject3(Seq(Seq(1,2,3)))) } + + test("convert PrimitiveData to catalyst") { + val data = PrimitiveData(1, 1, 1, 1, 1, 1, true) + val convertedData = Seq(1, 1.toLong, 1.toDouble, 1.toFloat, 1.toShort, 1.toByte, true) + assert(convertToCatalyst(data) === convertedData) + } + + test("convert Option[Product] to catalyst") { + val primitiveData = PrimitiveData(1, 1, 1, 1, 1, 1, true) + val data = OptionalData(Some(1), Some(1), Some(1), Some(1), Some(1), Some(1), Some(true), Some(primitiveData)) + val convertedData = Seq(1, 1.toLong, 1.toDouble, 1.toFloat, 1.toShort, 1.toByte, true, convertToCatalyst(primitiveData)) + assert(convertToCatalyst(data) === convertedData) + } } From 8d22dbb5ec7a0727afdfebbbc2c57ffdb384dd0b Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Sun, 5 Oct 2014 18:44:12 -0700 Subject: [PATCH 09/15] SPARK-3794 [CORE] Building spark core fails due to inadvertent dependency on Commons IO Remove references to Commons IO FileUtils and replace with pure Java version, which doesn't need to traverse the whole directory tree first. I think this method could be refined further if it would be alright to rename it and its args and break it down into two methods. I'm starting with a simple recursive rendition. Author: Sean Owen Closes #2662 from srowen/SPARK-3794 and squashes the following commits: 4cd172f [Sean Owen] Remove references to Commons IO FileUtils and replace with pure Java version, which doesn't need to traverse the whole directory tree first --- .../apache/spark/deploy/worker/Worker.scala | 1 - .../scala/org/apache/spark/util/Utils.scala | 20 +++++++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 3b13f43a1868c..9b52cb06fb6fa 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -29,7 +29,6 @@ import scala.language.postfixOps import akka.actor._ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} -import org.apache.commons.io.FileUtils import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a67124140f9da..3d307b3c16d3e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -35,8 +35,6 @@ import scala.util.control.{ControlThrowable, NonFatal} import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder -import org.apache.commons.io.FileUtils -import org.apache.commons.io.filefilter.TrueFileFilter import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration import org.apache.log4j.PropertyConfigurator @@ -710,18 +708,20 @@ private[spark] object Utils extends Logging { * Determines if a directory contains any files newer than cutoff seconds. * * @param dir must be the path to a directory, or IllegalArgumentException is thrown - * @param cutoff measured in seconds. Returns true if there are any files in dir newer than this. + * @param cutoff measured in seconds. Returns true if there are any files or directories in the + * given directory whose last modified time is later than this many seconds ago */ def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = { - val currentTimeMillis = System.currentTimeMillis if (!dir.isDirectory) { - throw new IllegalArgumentException (dir + " is not a directory!") - } else { - val files = FileUtils.listFilesAndDirs(dir, TrueFileFilter.TRUE, TrueFileFilter.TRUE) - val cutoffTimeInMillis = (currentTimeMillis - (cutoff * 1000)) - val newFiles = files.filter { _.lastModified > cutoffTimeInMillis } - newFiles.nonEmpty + throw new IllegalArgumentException("$dir is not a directory!") } + val filesAndDirs = dir.listFiles() + val cutoffTimeInMillis = System.currentTimeMillis - (cutoff * 1000) + + filesAndDirs.exists(_.lastModified() > cutoffTimeInMillis) || + filesAndDirs.filter(_.isDirectory).exists( + subdir => doesDirectoryContainAnyNewFiles(subdir, cutoff) + ) } /** From fd7b15539669b14996a51610d6724ca0811f9d65 Mon Sep 17 00:00:00 2001 From: Nathan Kronenfeld Date: Sun, 5 Oct 2014 21:03:48 -0700 Subject: [PATCH 10/15] Rectify gereneric parameter names between SparkContext and AccumulablePa... AccumulableParam gave its generic parameters as 'R, T', whereas SparkContext labeled them 'T, R'. Trivial, but really confusing. I resolved this in favor of AccumulableParam, because it seemed to have some logic for its names. I also extended this minimal, but at least present, justification into the SparkContext comments. Author: Nathan Kronenfeld Closes #2637 from nkronenfeld/accumulators and squashes the following commits: 98d6b74 [Nathan Kronenfeld] Rectify gereneric parameter names between SparkContext and AccumulableParam --- .../main/scala/org/apache/spark/SparkContext.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 97109b9f41b60..396cdd1247e07 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -779,20 +779,20 @@ class SparkContext(config: SparkConf) extends Logging { /** * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values * with `+=`. Only the driver can access the accumuable's `value`. - * @tparam T accumulator type - * @tparam R type that can be added to the accumulator + * @tparam R accumulator result type + * @tparam T type that can be added to the accumulator */ - def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) = + def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) = new Accumulable(initialValue, param) /** * Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the * Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can * access the accumuable's `value`. - * @tparam T accumulator type - * @tparam R type that can be added to the accumulator + * @tparam R accumulator result type + * @tparam T type that can be added to the accumulator */ - def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) = + def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) = new Accumulable(initialValue, param, Some(name)) /** From c9ae79fba25cd49ca70ca398bc75434202d26a97 Mon Sep 17 00:00:00 2001 From: scwf Date: Sun, 5 Oct 2014 21:36:20 -0700 Subject: [PATCH 11/15] [SPARK-3765][Doc] Add test information to sbt build docs Add testing with sbt to doc ```building-spark.md``` Author: scwf Closes #2629 from scwf/sbt-doc and squashes the following commits: fd9cf29 [scwf] add testing with sbt to docs --- docs/building-spark.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/docs/building-spark.md b/docs/building-spark.md index 901c157162fee..b2940ee4029e8 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -171,6 +171,21 @@ can be set to control the SBT build. For example: sbt/sbt -Pyarn -Phadoop-2.3 assembly +# Testing with SBT + +Some of the tests require Spark to be packaged first, so always run `sbt/sbt assembly` the first time. The following is an example of a correct (build, test) sequence: + + sbt/sbt -Pyarn -Phadoop-2.3 -Phive assembly + sbt/sbt -Pyarn -Phadoop-2.3 -Phive test + +To run only a specific test suite as follows: + + sbt/sbt -Pyarn -Phadoop-2.3 -Phive "test-only org.apache.spark.repl.ReplSuite" + +To run test suites of a specific sub project as follows: + + sbt/sbt -Pyarn -Phadoop-2.3 -Phive core/test + # Speeding up Compilation with Zinc [Zinc](https://github.com/typesafehub/zinc) is a long-running server version of SBT's incremental From 20ea54cc7a5176ebc63bfa9393a9bf84619bfc66 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 6 Oct 2014 14:05:45 -0700 Subject: [PATCH 12/15] [SPARK-2461] [PySpark] Add a toString method to GeneralizedLinearModel Add a toString method to GeneralizedLinearModel, also change `__str__` to `__repr__` for some classes, to provide better message in repr. This PR is based on #1388, thanks to sryza! closes #1388 Author: Sandy Ryza Author: Davies Liu Closes #2625 from davies/string and squashes the following commits: 3544aad [Davies Liu] fix LinearModel 0bcd642 [Davies Liu] Merge branch 'sandy-spark-2461' of github.com:sryza/spark 1ce5c2d [Sandy Ryza] __repr__ back to __str__ in a couple places aa9e962 [Sandy Ryza] Switch __str__ to __repr__ a0c5041 [Sandy Ryza] Add labels back in 1aa17f5 [Sandy Ryza] Match existing conventions fac1bc4 [Sandy Ryza] Fix PEP8 error f7b58ed [Sandy Ryza] SPARK-2461. Add a toString method to GeneralizedLinearModel --- .../spark/mllib/regression/GeneralizedLinearAlgorithm.scala | 2 ++ python/pyspark/mllib/regression.py | 3 +++ python/pyspark/serializers.py | 6 +++--- python/pyspark/sql.py | 2 +- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index d0fe4179685ca..00dfc86c9e0bd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -75,6 +75,8 @@ abstract class GeneralizedLinearModel(val weights: Vector, val intercept: Double def predict(testData: Vector): Double = { predictPoint(testData, weights, intercept) } + + override def toString() = "(weights=%s, intercept=%s)".format(weights, intercept) } /** diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index cbdbc09858013..8fe8c6db2ad9c 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -66,6 +66,9 @@ def weights(self): def intercept(self): return self._intercept + def __repr__(self): + return "(weights=%s, intercept=%s)" % (self._coeff, self._intercept) + class LinearRegressionModelBase(LinearModel): diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 2672da36c1f50..099fa54cf2bd7 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -211,7 +211,7 @@ def __eq__(self, other): return (isinstance(other, BatchedSerializer) and other.serializer == self.serializer) - def __str__(self): + def __repr__(self): return "BatchedSerializer<%s>" % str(self.serializer) @@ -279,7 +279,7 @@ def __eq__(self, other): return (isinstance(other, CartesianDeserializer) and self.key_ser == other.key_ser and self.val_ser == other.val_ser) - def __str__(self): + def __repr__(self): return "CartesianDeserializer<%s, %s>" % \ (str(self.key_ser), str(self.val_ser)) @@ -306,7 +306,7 @@ def __eq__(self, other): return (isinstance(other, PairDeserializer) and self.key_ser == other.key_ser and self.val_ser == other.val_ser) - def __str__(self): + def __repr__(self): return "PairDeserializer<%s, %s>" % (str(self.key_ser), str(self.val_ser)) diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 974b5e287bc00..114644ab8b79d 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -201,7 +201,7 @@ def __init__(self, elementType, containsNull=True): self.elementType = elementType self.containsNull = containsNull - def __str__(self): + def __repr__(self): return "ArrayType(%s,%s)" % (self.elementType, str(self.containsNull).lower()) From 4f01265f7d62e070ba42c251255e385644c1b16c Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 6 Oct 2014 14:07:53 -0700 Subject: [PATCH 13/15] [SPARK-3786] [PySpark] speedup tests This patch try to speed up tests of PySpark, re-use the SparkContext in tests.py and mllib/tests.py to reduce the overhead of create SparkContext, remove some test cases, which did not make sense. It also improve the performance of some cases, such as MergerTests and SortTests. before this patch: real 21m27.320s user 4m42.967s sys 0m17.343s after this patch: real 9m47.541s user 2m12.947s sys 0m14.543s It almost cut the time by half. Author: Davies Liu Closes #2646 from davies/tests and squashes the following commits: c54de60 [Davies Liu] revert change about memory limit 6a2a4b0 [Davies Liu] refactor of tests, speedup 100% --- python/pyspark/mllib/tests.py | 2 +- python/pyspark/shuffle.py | 5 +- python/pyspark/tests.py | 92 ++++++++++++++++------------------- python/run-tests | 74 ++++++++++++++-------------- 4 files changed, 82 insertions(+), 91 deletions(-) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index f72e88ba6e2ba..5c20e100e144f 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -32,7 +32,7 @@ from pyspark.serializers import PickleSerializer from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, _convert_to_vector from pyspark.mllib.regression import LabeledPoint -from pyspark.tests import PySparkTestCase +from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase _have_scipy = False diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index ce597cbe91e15..d57a802e4734a 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -396,7 +396,6 @@ def _external_items(self): for v in self.data.iteritems(): yield v self.data.clear() - gc.collect() # remove the merged partition for j in range(self.spills): @@ -428,7 +427,7 @@ def _recursive_merged_items(self, start): subdirs = [os.path.join(d, "parts", str(i)) for d in self.localdirs] m = ExternalMerger(self.agg, self.memory_limit, self.serializer, - subdirs, self.scale * self.partitions) + subdirs, self.scale * self.partitions, self.partitions) m.pdata = [{} for _ in range(self.partitions)] limit = self._next_limit() @@ -486,7 +485,7 @@ def sorted(self, iterator, key=None, reverse=False): goes above the limit. """ global MemoryBytesSpilled, DiskBytesSpilled - batch = 10 + batch = 100 chunks, current_chunk = [], [] iterator = iter(iterator) while True: diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 6fb6bc998c752..7f05d48ade2b3 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -67,10 +67,10 @@ SPARK_HOME = os.environ["SPARK_HOME"] -class TestMerger(unittest.TestCase): +class MergerTests(unittest.TestCase): def setUp(self): - self.N = 1 << 16 + self.N = 1 << 14 self.l = [i for i in xrange(self.N)] self.data = zip(self.l, self.l) self.agg = Aggregator(lambda x: [x], @@ -115,7 +115,7 @@ def test_medium_dataset(self): sum(xrange(self.N)) * 3) def test_huge_dataset(self): - m = ExternalMerger(self.agg, 10) + m = ExternalMerger(self.agg, 10, partitions=3) m.mergeCombiners(map(lambda (k, v): (k, [str(v)]), self.data * 10)) self.assertTrue(m.spills >= 1) self.assertEqual(sum(len(v) for k, v in m._recursive_merged_items(0)), @@ -123,7 +123,7 @@ def test_huge_dataset(self): m._cleanup() -class TestSorter(unittest.TestCase): +class SorterTests(unittest.TestCase): def test_in_memory_sort(self): l = range(1024) random.shuffle(l) @@ -244,16 +244,25 @@ def tearDown(self): sys.path = self._old_sys_path -class TestCheckpoint(PySparkTestCase): +class ReusedPySparkTestCase(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.sc = SparkContext('local[4]', cls.__name__, batchSize=2) + + @classmethod + def tearDownClass(cls): + cls.sc.stop() + + +class CheckpointTests(ReusedPySparkTestCase): def setUp(self): - PySparkTestCase.setUp(self) self.checkpointDir = tempfile.NamedTemporaryFile(delete=False) os.unlink(self.checkpointDir.name) self.sc.setCheckpointDir(self.checkpointDir.name) def tearDown(self): - PySparkTestCase.tearDown(self) shutil.rmtree(self.checkpointDir.name) def test_basic_checkpointing(self): @@ -288,7 +297,7 @@ def test_checkpoint_and_restore(self): self.assertEquals([1, 2, 3, 4], recovered.collect()) -class TestAddFile(PySparkTestCase): +class AddFileTests(PySparkTestCase): def test_add_py_file(self): # To ensure that we're actually testing addPyFile's effects, check that @@ -354,7 +363,7 @@ def func(x): self.assertEqual(["My Server"], self.sc.parallelize(range(1)).map(func).collect()) -class TestRDDFunctions(PySparkTestCase): +class RDDTests(ReusedPySparkTestCase): def test_id(self): rdd = self.sc.parallelize(range(10)) @@ -365,12 +374,6 @@ def test_id(self): self.assertEqual(id + 1, id2) self.assertEqual(id2, rdd2.id()) - def test_failed_sparkcontext_creation(self): - # Regression test for SPARK-1550 - self.sc.stop() - self.assertRaises(Exception, lambda: SparkContext("an-invalid-master-name")) - self.sc = SparkContext("local") - def test_save_as_textfile_with_unicode(self): # Regression test for SPARK-970 x = u"\u00A1Hola, mundo!" @@ -636,7 +639,7 @@ def test_distinct(self): self.assertEquals(result.count(), 3) -class TestProfiler(PySparkTestCase): +class ProfilerTests(PySparkTestCase): def setUp(self): self._old_sys_path = list(sys.path) @@ -666,10 +669,9 @@ def heavy_foo(x): self.assertTrue("rdd_%d.pstats" % id in os.listdir(d)) -class TestSQL(PySparkTestCase): +class SQLTests(ReusedPySparkTestCase): def setUp(self): - PySparkTestCase.setUp(self) self.sqlCtx = SQLContext(self.sc) def test_udf(self): @@ -754,27 +756,19 @@ def test_serialize_nested_array_and_map(self): self.assertEqual("2", row.d) -class TestIO(PySparkTestCase): - - def test_stdout_redirection(self): - import subprocess - - def func(x): - subprocess.check_call('ls', shell=True) - self.sc.parallelize([1]).foreach(func) +class InputFormatTests(ReusedPySparkTestCase): + @classmethod + def setUpClass(cls): + ReusedPySparkTestCase.setUpClass() + cls.tempdir = tempfile.NamedTemporaryFile(delete=False) + os.unlink(cls.tempdir.name) + cls.sc._jvm.WriteInputFormatTestDataGenerator.generateData(cls.tempdir.name, cls.sc._jsc) -class TestInputFormat(PySparkTestCase): - - def setUp(self): - PySparkTestCase.setUp(self) - self.tempdir = tempfile.NamedTemporaryFile(delete=False) - os.unlink(self.tempdir.name) - self.sc._jvm.WriteInputFormatTestDataGenerator.generateData(self.tempdir.name, self.sc._jsc) - - def tearDown(self): - PySparkTestCase.tearDown(self) - shutil.rmtree(self.tempdir.name) + @classmethod + def tearDownClass(cls): + ReusedPySparkTestCase.tearDownClass() + shutil.rmtree(cls.tempdir.name) def test_sequencefiles(self): basepath = self.tempdir.name @@ -954,15 +948,13 @@ def test_converters(self): self.assertEqual(maps, em) -class TestOutputFormat(PySparkTestCase): +class OutputFormatTests(ReusedPySparkTestCase): def setUp(self): - PySparkTestCase.setUp(self) self.tempdir = tempfile.NamedTemporaryFile(delete=False) os.unlink(self.tempdir.name) def tearDown(self): - PySparkTestCase.tearDown(self) shutil.rmtree(self.tempdir.name, ignore_errors=True) def test_sequencefiles(self): @@ -1243,8 +1235,7 @@ def test_malformed_RDD(self): basepath + "/malformed/sequence")) -class TestDaemon(unittest.TestCase): - +class DaemonTests(unittest.TestCase): def connect(self, port): from socket import socket, AF_INET, SOCK_STREAM sock = socket(AF_INET, SOCK_STREAM) @@ -1290,7 +1281,7 @@ def test_termination_sigterm(self): self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM)) -class TestWorker(PySparkTestCase): +class WorkerTests(PySparkTestCase): def test_cancel_task(self): temp = tempfile.NamedTemporaryFile(delete=True) @@ -1342,11 +1333,6 @@ def run(): rdd = self.sc.parallelize(range(100), 1) self.assertEqual(100, rdd.map(str).count()) - def test_fd_leak(self): - N = 1100 # fd limit is 1024 by default - rdd = self.sc.parallelize(range(N), N) - self.assertEquals(N, rdd.count()) - def test_after_exception(self): def raise_exception(_): raise Exception() @@ -1379,7 +1365,7 @@ def test_accumulator_when_reuse_worker(self): self.assertEqual(sum(range(100)), acc1.value) -class TestSparkSubmit(unittest.TestCase): +class SparkSubmitTests(unittest.TestCase): def setUp(self): self.programDir = tempfile.mkdtemp() @@ -1492,6 +1478,8 @@ def test_single_script_on_cluster(self): |sc = SparkContext() |print sc.parallelize([1, 2, 3]).map(foo).collect() """) + # this will fail if you have different spark.executor.memory + # in conf/spark-defaults.conf proc = subprocess.Popen( [self.sparkSubmit, "--master", "local-cluster[1,1,512]", script], stdout=subprocess.PIPE) @@ -1500,7 +1488,11 @@ def test_single_script_on_cluster(self): self.assertIn("[2, 4, 6]", out) -class ContextStopTests(unittest.TestCase): +class ContextTests(unittest.TestCase): + + def test_failed_sparkcontext_creation(self): + # Regression test for SPARK-1550 + self.assertRaises(Exception, lambda: SparkContext("an-invalid-master-name")) def test_stop(self): sc = SparkContext() diff --git a/python/run-tests b/python/run-tests index a7ec270c7da21..c713861eb77bb 100755 --- a/python/run-tests +++ b/python/run-tests @@ -34,7 +34,7 @@ rm -rf metastore warehouse function run_test() { echo "Running test: $1" - SPARK_TESTING=1 "$FWDIR"/bin/pyspark $1 2>&1 | tee -a unit-tests.log + SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 2>&1 | tee -a unit-tests.log FAILED=$((PIPESTATUS[0]||$FAILED)) @@ -48,6 +48,37 @@ function run_test() { fi } +function run_core_tests() { + echo "Run core tests ..." + run_test "pyspark/rdd.py" + run_test "pyspark/context.py" + run_test "pyspark/conf.py" + PYSPARK_DOC_TEST=1 run_test "pyspark/broadcast.py" + PYSPARK_DOC_TEST=1 run_test "pyspark/accumulators.py" + PYSPARK_DOC_TEST=1 run_test "pyspark/serializers.py" + run_test "pyspark/shuffle.py" + run_test "pyspark/tests.py" +} + +function run_sql_tests() { + echo "Run sql tests ..." + run_test "pyspark/sql.py" +} + +function run_mllib_tests() { + echo "Run mllib tests ..." + run_test "pyspark/mllib/classification.py" + run_test "pyspark/mllib/clustering.py" + run_test "pyspark/mllib/linalg.py" + run_test "pyspark/mllib/random.py" + run_test "pyspark/mllib/recommendation.py" + run_test "pyspark/mllib/regression.py" + run_test "pyspark/mllib/stat.py" + run_test "pyspark/mllib/tree.py" + run_test "pyspark/mllib/util.py" + run_test "pyspark/mllib/tests.py" +} + echo "Running PySpark tests. Output is in python/unit-tests.log." export PYSPARK_PYTHON="python" @@ -60,29 +91,9 @@ fi echo "Testing with Python version:" $PYSPARK_PYTHON --version -run_test "pyspark/rdd.py" -run_test "pyspark/context.py" -run_test "pyspark/conf.py" -run_test "pyspark/sql.py" -# These tests are included in the module-level docs, and so must -# be handled on a higher level rather than within the python file. -export PYSPARK_DOC_TEST=1 -run_test "pyspark/broadcast.py" -run_test "pyspark/accumulators.py" -run_test "pyspark/serializers.py" -unset PYSPARK_DOC_TEST -run_test "pyspark/shuffle.py" -run_test "pyspark/tests.py" -run_test "pyspark/mllib/classification.py" -run_test "pyspark/mllib/clustering.py" -run_test "pyspark/mllib/linalg.py" -run_test "pyspark/mllib/random.py" -run_test "pyspark/mllib/recommendation.py" -run_test "pyspark/mllib/regression.py" -run_test "pyspark/mllib/stat.py" -run_test "pyspark/mllib/tests.py" -run_test "pyspark/mllib/tree.py" -run_test "pyspark/mllib/util.py" +run_core_tests +run_sql_tests +run_mllib_tests # Try to test with PyPy if [ $(which pypy) ]; then @@ -90,19 +101,8 @@ if [ $(which pypy) ]; then echo "Testing with PyPy version:" $PYSPARK_PYTHON --version - run_test "pyspark/rdd.py" - run_test "pyspark/context.py" - run_test "pyspark/conf.py" - run_test "pyspark/sql.py" - # These tests are included in the module-level docs, and so must - # be handled on a higher level rather than within the python file. - export PYSPARK_DOC_TEST=1 - run_test "pyspark/broadcast.py" - run_test "pyspark/accumulators.py" - run_test "pyspark/serializers.py" - unset PYSPARK_DOC_TEST - run_test "pyspark/shuffle.py" - run_test "pyspark/tests.py" + run_core_tests + run_sql_tests fi if [[ $FAILED == 0 ]]; then From 2300eb58ae79a86e65b3ff608a578f5d4c09892b Mon Sep 17 00:00:00 2001 From: cocoatomo Date: Mon, 6 Oct 2014 14:08:40 -0700 Subject: [PATCH 14/15] [SPARK-3773][PySpark][Doc] Sphinx build warning When building Sphinx documents for PySpark, we have 12 warnings. Their causes are almost docstrings in broken ReST format. To reproduce this issue, we should run following commands on the commit: 6e27cb630de69fa5acb510b4e2f6b980742b1957. ```bash $ cd ./python/docs $ make clean html ... /Users//MyRepos/Scala/spark/python/pyspark/__init__.py:docstring of pyspark.SparkContext.sequenceFile:4: ERROR: Unexpected indentation. /Users//MyRepos/Scala/spark/python/pyspark/__init__.py:docstring of pyspark.RDD.saveAsSequenceFile:4: ERROR: Unexpected indentation. /Users//MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.LogisticRegressionWithSGD.train:14: ERROR: Unexpected indentation. /Users//MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.LogisticRegressionWithSGD.train:16: WARNING: Definition list ends without a blank line; unexpected unindent. /Users//MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.LogisticRegressionWithSGD.train:17: WARNING: Block quote ends without a blank line; unexpected unindent. /Users//MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.SVMWithSGD.train:14: ERROR: Unexpected indentation. /Users//MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.SVMWithSGD.train:16: WARNING: Definition list ends without a blank line; unexpected unindent. /Users//MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.SVMWithSGD.train:17: WARNING: Block quote ends without a blank line; unexpected unindent. /Users//MyRepos/Scala/spark/python/docs/pyspark.mllib.rst:50: WARNING: missing attribute mentioned in :members: or __all__: module pyspark.mllib.regression, attribute RidgeRegressionModelLinearRegressionWithSGD /Users//MyRepos/Scala/spark/python/pyspark/mllib/tree.py:docstring of pyspark.mllib.tree.DecisionTreeModel.predict:3: ERROR: Unexpected indentation. ... checking consistency... /Users//MyRepos/Scala/spark/python/docs/modules.rst:: WARNING: document isn't included in any toctree ... copying static files... WARNING: html_static_path entry u'/Users//MyRepos/Scala/spark/python/docs/_static' does not exist ... build succeeded, 12 warnings. ``` Author: cocoatomo Closes #2653 from cocoatomo/issues/3773-sphinx-build-warnings and squashes the following commits: 6f65661 [cocoatomo] [SPARK-3773][PySpark][Doc] Sphinx build warning --- python/docs/modules.rst | 7 ------- python/pyspark/context.py | 1 + python/pyspark/mllib/classification.py | 26 ++++++++++++++++---------- python/pyspark/mllib/regression.py | 15 +++++++++------ python/pyspark/mllib/tree.py | 1 + python/pyspark/rdd.py | 1 + 6 files changed, 28 insertions(+), 23 deletions(-) delete mode 100644 python/docs/modules.rst diff --git a/python/docs/modules.rst b/python/docs/modules.rst deleted file mode 100644 index 183564659fbcf..0000000000000 --- a/python/docs/modules.rst +++ /dev/null @@ -1,7 +0,0 @@ -. -= - -.. toctree:: - :maxdepth: 4 - - pyspark diff --git a/python/pyspark/context.py b/python/pyspark/context.py index e9418320ff781..a45d79d6424c7 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -410,6 +410,7 @@ def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None, Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is as follows: + 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes 2. Serialization is attempted via Pyrolite pickling diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index ac142fb49a90c..a765b1c4f7d87 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -89,11 +89,14 @@ def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, @param regParam: The regularizer parameter (default: 1.0). @param regType: The type of regularizer used for training our model. - Allowed values: "l1" for using L1Updater, - "l2" for using - SquaredL2Updater, - "none" for no regularizer. - (default: "none") + + :Allowed values: + - "l1" for using L1Updater + - "l2" for using SquaredL2Updater + - "none" for no regularizer + + (default: "none") + @param intercept: Boolean parameter which indicates the use or not of the augmented representation for training data (i.e. whether bias features @@ -158,11 +161,14 @@ def train(cls, data, iterations=100, step=1.0, regParam=1.0, @param initialWeights: The initial weights (default: None). @param regType: The type of regularizer used for training our model. - Allowed values: "l1" for using L1Updater, - "l2" for using - SquaredL2Updater, - "none" for no regularizer. - (default: "none") + + :Allowed values: + - "l1" for using L1Updater + - "l2" for using SquaredL2Updater, + - "none" for no regularizer. + + (default: "none") + @param intercept: Boolean parameter which indicates the use or not of the augmented representation for training data (i.e. whether bias features diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index 8fe8c6db2ad9c..54f34a98337ca 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -22,7 +22,7 @@ from pyspark.mllib.linalg import SparseVector, _convert_to_vector from pyspark.serializers import PickleSerializer, AutoBatchedSerializer -__all__ = ['LabeledPoint', 'LinearModel', 'LinearRegressionModel', 'RidgeRegressionModel' +__all__ = ['LabeledPoint', 'LinearModel', 'LinearRegressionModel', 'RidgeRegressionModel', 'LinearRegressionWithSGD', 'LassoWithSGD', 'RidgeRegressionWithSGD'] @@ -155,11 +155,14 @@ def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, @param regParam: The regularizer parameter (default: 1.0). @param regType: The type of regularizer used for training our model. - Allowed values: "l1" for using L1Updater, - "l2" for using - SquaredL2Updater, - "none" for no regularizer. - (default: "none") + + :Allowed values: + - "l1" for using L1Updater, + - "l2" for using SquaredL2Updater, + - "none" for no regularizer. + + (default: "none") + @param intercept: Boolean parameter which indicates the use or not of the augmented representation for training data (i.e. whether bias features diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py index afdcdbdf3ae01..5d7abfb96b7fe 100644 --- a/python/pyspark/mllib/tree.py +++ b/python/pyspark/mllib/tree.py @@ -48,6 +48,7 @@ def __del__(self): def predict(self, x): """ Predict the label of one or more examples. + :param x: Data point (feature vector), or an RDD of data points (feature vectors). """ diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index dc6497772e502..e77669aad76b6 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1208,6 +1208,7 @@ def saveAsSequenceFile(self, path, compressionCodecClass=None): Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file system, using the L{org.apache.hadoop.io.Writable} types that we convert from the RDD's key and value types. The mechanism is as follows: + 1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects. 2. Keys and values of this Java RDD are converted to Writables and written out. From 69c3f441a9b6e942d6c08afecd59a0349d61cc7b Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Mon, 6 Oct 2014 14:19:06 -0700 Subject: [PATCH 15/15] [SPARK-3479] [Build] Report failed test category This PR allows SparkQA (i.e. Jenkins) to report in its posts to GitHub what category of test failed, if one can be determined. The failure categories are: * general failure * RAT checks failed * Scala style checks failed * Python style checks failed * Build failed * Spark unit tests failed * PySpark unit tests failed * MiMa checks failed This PR also fixes the diffing logic used to determine if a patch introduces new classes. Author: Nicholas Chammas Closes #2606 from nchammas/report-failed-test-category and squashes the following commits: d67df03 [Nicholas Chammas] report what test category failed --- dev/run-tests | 32 ++++++++++++- dev/run-tests-codes.sh | 27 +++++++++++ dev/run-tests-jenkins | 102 ++++++++++++++++++++++++++++------------- 3 files changed, 126 insertions(+), 35 deletions(-) create mode 100644 dev/run-tests-codes.sh diff --git a/dev/run-tests b/dev/run-tests index c3d8f49cdd993..4be2baaf48cd1 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -24,6 +24,16 @@ cd "$FWDIR" # Remove work directory rm -rf ./work +source "$FWDIR/dev/run-tests-codes.sh" + +CURRENT_BLOCK=$BLOCK_GENERAL + +function handle_error () { + echo "[error] Got a return code of $? on line $1 of the run-tests script." + exit $CURRENT_BLOCK +} + + # Build against the right verison of Hadoop. { if [ -n "$AMPLAB_JENKINS_BUILD_PROFILE" ]; then @@ -91,26 +101,34 @@ if [ -n "$AMPLAB_JENKINS" ]; then fi fi -# Fail fast -set -e set -o pipefail +trap 'handle_error $LINENO' ERR echo "" echo "=========================================================================" echo "Running Apache RAT checks" echo "=========================================================================" + +CURRENT_BLOCK=$BLOCK_RAT + ./dev/check-license echo "" echo "=========================================================================" echo "Running Scala style checks" echo "=========================================================================" + +CURRENT_BLOCK=$BLOCK_SCALA_STYLE + ./dev/lint-scala echo "" echo "=========================================================================" echo "Running Python style checks" echo "=========================================================================" + +CURRENT_BLOCK=$BLOCK_PYTHON_STYLE + ./dev/lint-python echo "" @@ -118,6 +136,8 @@ echo "=========================================================================" echo "Building Spark" echo "=========================================================================" +CURRENT_BLOCK=$BLOCK_BUILD + { # We always build with Hive because the PySpark Spark SQL tests need it. BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive" @@ -141,6 +161,8 @@ echo "=========================================================================" echo "Running Spark unit tests" echo "=========================================================================" +CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS + { # If the Spark SQL tests are enabled, run the tests with the Hive profiles enabled. # This must be a single argument, as it is. @@ -175,10 +197,16 @@ echo "" echo "=========================================================================" echo "Running PySpark tests" echo "=========================================================================" + +CURRENT_BLOCK=$BLOCK_PYSPARK_UNIT_TESTS + ./python/run-tests echo "" echo "=========================================================================" echo "Detecting binary incompatibilites with MiMa" echo "=========================================================================" + +CURRENT_BLOCK=$BLOCK_MIMA + ./dev/mima diff --git a/dev/run-tests-codes.sh b/dev/run-tests-codes.sh new file mode 100644 index 0000000000000..1348e0609dda4 --- /dev/null +++ b/dev/run-tests-codes.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +readonly BLOCK_GENERAL=10 +readonly BLOCK_RAT=11 +readonly BLOCK_SCALA_STYLE=12 +readonly BLOCK_PYTHON_STYLE=13 +readonly BLOCK_BUILD=14 +readonly BLOCK_SPARK_UNIT_TESTS=15 +readonly BLOCK_PYSPARK_UNIT_TESTS=16 +readonly BLOCK_MIMA=17 diff --git a/dev/run-tests-jenkins b/dev/run-tests-jenkins index 0b1e31b9413cf..451f3b771cc76 100755 --- a/dev/run-tests-jenkins +++ b/dev/run-tests-jenkins @@ -26,9 +26,23 @@ FWDIR="$(cd `dirname $0`/..; pwd)" cd "$FWDIR" +source "$FWDIR/dev/run-tests-codes.sh" + COMMENTS_URL="https://api.github.com/repos/apache/spark/issues/$ghprbPullId/comments" PULL_REQUEST_URL="https://github.com/apache/spark/pull/$ghprbPullId" +# Important Environment Variables +# --- +# $ghprbActualCommit +#+ This is the hash of the most recent commit in the PR. +#+ The merge-base of this and master is the commit from which the PR was branched. +# $sha1 +#+ If the patch merges cleanly, this is a reference to the merge commit hash +#+ (e.g. "origin/pr/2606/merge"). +#+ If the patch does not merge cleanly, it is equal to $ghprbActualCommit. +#+ The merge-base of this and master in the case of a clean merge is the most recent commit +#+ against master. + COMMIT_URL="https://github.com/apache/spark/commit/${ghprbActualCommit}" # GitHub doesn't auto-link short hashes when submitted via the API, unfortunately. :( SHORT_COMMIT_HASH="${ghprbActualCommit:0:7}" @@ -84,42 +98,46 @@ function post_message () { fi } + +# We diff master...$ghprbActualCommit because that gets us changes introduced in the PR +#+ and not anything else added to master since the PR was branched. + # check PR merge-ability and check for new public classes { if [ "$sha1" == "$ghprbActualCommit" ]; then - merge_note=" * This patch **does not** merge cleanly!" + merge_note=" * This patch **does not merge cleanly**." else merge_note=" * This patch merges cleanly." + fi + + source_files=$( + git diff master...$ghprbActualCommit --name-only `# diff patch against master from branch point` \ + | grep -v -e "\/test" `# ignore files in test directories` \ + | grep -e "\.py$" -e "\.java$" -e "\.scala$" `# include only code files` \ + | tr "\n" " " + ) + new_public_classes=$( + git diff master...$ghprbActualCommit ${source_files} `# diff patch against master from branch point` \ + | grep "^\+" `# filter in only added lines` \ + | sed -r -e "s/^\+//g" `# remove the leading +` \ + | grep -e "trait " -e "class " `# filter in lines with these key words` \ + | grep -e "{" -e "(" `# filter in lines with these key words, too` \ + | grep -v -e "\@\@" -e "private" `# exclude lines with these words` \ + | grep -v -e "^// " -e "^/\*" -e "^ \* " `# exclude comment lines` \ + | sed -r -e "s/\{.*//g" `# remove from the { onwards` \ + | sed -r -e "s/\}//g" `# just in case, remove }; they mess the JSON` \ + | sed -r -e "s/\"/\\\\\"/g" `# escape double quotes; they mess the JSON` \ + | sed -r -e "s/^(.*)$/\`\1\`/g" `# surround with backticks for style` \ + | sed -r -e "s/^/ \* /g" `# prepend ' *' to start of line` \ + | sed -r -e "s/$/\\\n/g" `# append newline to end of line` \ + | tr -d "\n" `# remove actual LF characters` + ) - source_files=$( - git diff master... --name-only `# diff patch against master from branch point` \ - | grep -v -e "\/test" `# ignore files in test directories` \ - | grep -e "\.py$" -e "\.java$" -e "\.scala$" `# include only code files` \ - | tr "\n" " " - ) - new_public_classes=$( - git diff master... ${source_files} `# diff patch against master from branch point` \ - | grep "^\+" `# filter in only added lines` \ - | sed -r -e "s/^\+//g" `# remove the leading +` \ - | grep -e "trait " -e "class " `# filter in lines with these key words` \ - | grep -e "{" -e "(" `# filter in lines with these key words, too` \ - | grep -v -e "\@\@" -e "private" `# exclude lines with these words` \ - | grep -v -e "^// " -e "^/\*" -e "^ \* " `# exclude comment lines` \ - | sed -r -e "s/\{.*//g" `# remove from the { onwards` \ - | sed -r -e "s/\}//g" `# just in case, remove }; they mess the JSON` \ - | sed -r -e "s/\"/\\\\\"/g" `# escape double quotes; they mess the JSON` \ - | sed -r -e "s/^(.*)$/\`\1\`/g" `# surround with backticks for style` \ - | sed -r -e "s/^/ \* /g" `# prepend ' *' to start of line` \ - | sed -r -e "s/$/\\\n/g" `# append newline to end of line` \ - | tr -d "\n" `# remove actual LF characters` - ) - - if [ "$new_public_classes" == "" ]; then - public_classes_note=" * This patch adds no public classes." - else - public_classes_note=" * This patch adds the following public classes _(experimental)_:" - public_classes_note="${public_classes_note}\n${new_public_classes}" - fi + if [ -z "$new_public_classes" ]; then + public_classes_note=" * This patch adds no public classes." + else + public_classes_note=" * This patch adds the following public classes _(experimental)_:" + public_classes_note="${public_classes_note}\n${new_public_classes}" fi } @@ -147,12 +165,30 @@ function post_message () { post_message "$fail_message" exit $test_result + elif [ "$test_result" -eq "0" ]; then + test_result_note=" * This patch **passes all tests**." else - if [ "$test_result" -eq "0" ]; then - test_result_note=" * This patch **passes** unit tests." + if [ "$test_result" -eq "$BLOCK_GENERAL" ]; then + failing_test="some tests" + elif [ "$test_result" -eq "$BLOCK_RAT" ]; then + failing_test="RAT tests" + elif [ "$test_result" -eq "$BLOCK_SCALA_STYLE" ]; then + failing_test="Scala style tests" + elif [ "$test_result" -eq "$BLOCK_PYTHON_STYLE" ]; then + failing_test="Python style tests" + elif [ "$test_result" -eq "$BLOCK_BUILD" ]; then + failing_test="to build" + elif [ "$test_result" -eq "$BLOCK_SPARK_UNIT_TESTS" ]; then + failing_test="Spark unit tests" + elif [ "$test_result" -eq "$BLOCK_PYSPARK_UNIT_TESTS" ]; then + failing_test="PySpark unit tests" + elif [ "$test_result" -eq "$BLOCK_MIMA" ]; then + failing_test="MiMa tests" else - test_result_note=" * This patch **fails** unit tests." + failing_test="some tests" fi + + test_result_note=" * This patch **fails $failing_test**." fi }