diff --git a/assembly/pom.xml b/assembly/pom.xml index de7b75258e3c5..4146168fc804b 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../pom.xml diff --git a/bagel/pom.xml b/bagel/pom.xml index bd51b112e26fa..93db0d5efda5f 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index 55bfe0b841ea4..b2b788a4bc13b 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../pom.xml diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6eaf6794764c7..24d1a8f9eceae 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -49,6 +49,7 @@ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkD import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage._ +import org.apache.spark.SPARK_VERSION import org.apache.spark.ui.SparkUI import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils} @@ -825,7 +826,7 @@ class SparkContext(config: SparkConf) extends Logging { } /** The version of Spark on which this application is running. */ - def version = SparkContext.SPARK_VERSION + def version = SPARK_VERSION /** * Return a map from the slave to the max memory available for caching and the remaining @@ -1297,8 +1298,6 @@ class SparkContext(config: SparkConf) extends Logging { */ object SparkContext extends Logging { - private[spark] val SPARK_VERSION = "1.2.0-SNAPSHOT" - private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description" private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id" diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 2973d002cc428..20a7444cfc5ee 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -217,7 +217,7 @@ object SparkEnv extends Logging { val shortShuffleMgrNames = Map( "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager") - val shuffleMgrName = conf.get("spark.shuffle.manager", "hash") + val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index 5cdbc306e56a0..e2fc9c649925e 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -44,4 +44,5 @@ package org.apache package object spark { // For package docs only + val SPARK_VERSION = "1.2.0-SNAPSHOT" } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 1cf55e86f6c81..a9b905b0d1a63 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1127,15 +1127,19 @@ abstract class RDD[T: ClassTag]( * @return an array of top elements */ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = { - mapPartitions { items => - // Priority keeps the largest elements, so let's reverse the ordering. - val queue = new BoundedPriorityQueue[T](num)(ord.reverse) - queue ++= util.collection.Utils.takeOrdered(items, num)(ord) - Iterator.single(queue) - }.reduce { (queue1, queue2) => - queue1 ++= queue2 - queue1 - }.toArray.sorted(ord) + if (num == 0) { + Array.empty + } else { + mapPartitions { items => + // Priority keeps the largest elements, so let's reverse the ordering. + val queue = new BoundedPriorityQueue[T](num)(ord.reverse) + queue ++= util.collection.Utils.takeOrdered(items, num)(ord) + Iterator.single(queue) + }.reduce { (queue1, queue2) => + queue1 ++= queue2 + queue1 + }.toArray.sorted(ord) + } } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 2ccc27324ac8c..6fcf9e31543ed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -241,9 +241,9 @@ class DAGScheduler( callSite: CallSite) : Stage = { + val parentStages = getParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() - val stage = - new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite) + val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 4b99f630440ad..64b32ae0edaac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -29,6 +29,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.{Logging, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec +import org.apache.spark.SPARK_VERSION import org.apache.spark.util.{FileLogger, JsonProtocol, Utils} /** @@ -86,7 +87,7 @@ private[spark] class EventLoggingListener( sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC) logger.newFile(COMPRESSION_CODEC_PREFIX + codec) } - logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION) + logger.newFile(SPARK_VERSION_PREFIX + SPARK_VERSION) logger.newFile(LOG_PREFIX + logger.fileIndex) } diff --git a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala new file mode 100644 index 0000000000000..2acc02a54fa3d --- /dev/null +++ b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.scalatest.BeforeAndAfterAll + +class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll { + + // This test suite should run all tests in ShuffleSuite with hash-based shuffle. + + override def beforeAll() { + System.setProperty("spark.shuffle.manager", "hash") + } + + override def afterAll() { + System.clearProperty("spark.shuffle.manager") + } +} diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index b13ddf96bc77c..15aa4d83800fa 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.MutablePair -class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext { +abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext { val conf = new SparkConf(loadDefaults = false) diff --git a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala index 5c02c00586ef4..639e56c488db4 100644 --- a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala @@ -24,8 +24,7 @@ class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll { // This test suite should run all tests in ShuffleSuite with sort-based shuffle. override def beforeAll() { - System.setProperty("spark.shuffle.manager", - "org.apache.spark.shuffle.sort.SortShuffleManager") + System.setProperty("spark.shuffle.manager", "sort") } override def afterAll() { diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 926d4fecb5b91..499dcda3dae8f 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -521,6 +521,13 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(sortedLowerK === Array(1, 2, 3, 4, 5)) } + test("takeOrdered with limit 0") { + val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + val rdd = sc.makeRDD(nums, 2) + val sortedLowerK = rdd.takeOrdered(0) + assert(sortedLowerK.size === 0) + } + test("takeOrdered with custom ordering") { val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) implicit val ord = implicitly[Ordering[Int]].reverse diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 0bb91febde9d7..aa73469b6acd8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -27,6 +27,7 @@ import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ import org.apache.spark._ +import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} @@ -97,10 +98,12 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 val sparkListener = new SparkListener() { - val successfulStages = new HashSet[Int]() - val failedStages = new ArrayBuffer[Int]() + val successfulStages = new HashSet[Int] + val failedStages = new ArrayBuffer[Int] + val stageByOrderOfExecution = new ArrayBuffer[Int] override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { val stageInfo = stageCompleted.stageInfo + stageByOrderOfExecution += stageInfo.stageId if (stageInfo.failureReason.isEmpty) { successfulStages += stageInfo.stageId } else { @@ -231,6 +234,13 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F runEvent(JobCancelled(jobId)) } + test("[SPARK-3353] parent stage should have lower stage id") { + sparkListener.stageByOrderOfExecution.clear() + sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count() + assert(sparkListener.stageByOrderOfExecution.length === 2) + assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1)) + } + test("zero split job") { var numResults = 0 val fakeListener = new JobListener() { @@ -457,7 +467,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F null, null)) assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) - assert(sparkListener.failedStages.contains(0)) + assert(sparkListener.failedStages.contains(1)) // The second ResultTask fails, with a fetch failure for the output from the second mapper. runEvent(CompletionEvent( @@ -515,8 +525,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F // Listener bus should get told about the map stage failing, but not the reduce stage // (since the reduce stage hasn't been started yet). assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) - assert(sparkListener.failedStages.contains(1)) - assert(sparkListener.failedStages.size === 1) + assert(sparkListener.failedStages.toSet === Set(0)) assertDataStructuresEmpty } @@ -563,14 +572,12 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F val stageFailureMessage = "Exception failure in map stage" failed(taskSets(0), stageFailureMessage) - assert(cancelledStages.contains(1)) + assert(cancelledStages.toSet === Set(0, 2)) // Make sure the listeners got told about both failed stages. assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) assert(sparkListener.successfulStages.isEmpty) - assert(sparkListener.failedStages.contains(1)) - assert(sparkListener.failedStages.contains(3)) - assert(sparkListener.failedStages.size === 2) + assert(sparkListener.failedStages.toSet === Set(0, 2)) assert(listener1.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage") assert(listener2.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage") diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index fead883793430..e5315bc93e217 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec +import org.apache.spark.SPARK_VERSION import org.apache.spark.util.{JsonProtocol, Utils} import java.io.File @@ -196,7 +197,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { def assertInfoCorrect(info: EventLoggingInfo, loggerStopped: Boolean) { assert(info.logPaths.size > 0) - assert(info.sparkVersion === SparkContext.SPARK_VERSION) + assert(info.sparkVersion === SPARK_VERSION) assert(info.compressionCodec.isDefined === compressionCodec.isDefined) info.compressionCodec.foreach { codec => assert(compressionCodec.isDefined) @@ -381,7 +382,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { private def assertSparkVersionIsValid(logFiles: Array[FileStatus]) { val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isSparkVersionFile) assert(file.isDefined) - assert(EventLoggingListener.parseSparkVersion(file.get) === SparkContext.SPARK_VERSION) + assert(EventLoggingListener.parseSparkVersion(file.get) === SPARK_VERSION) } private def assertCompressionCodecIsValid(logFiles: Array[FileStatus], compressionCodec: String) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 3b0b8e2f68c97..ab35e8edc4ebf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -180,7 +180,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers rdd3.count() assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) listener.stageInfos.size should be {2} // Shuffle map stage + result stage - val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 2).get + val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 3).get stageInfo3.rddInfos.size should be {1} // ShuffledRDD stageInfo3.rddInfos.forall(_.numPartitions == 4) should be {true} stageInfo3.rddInfos.exists(_.name == "Trois") should be {true} diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index ac3931e3d0a73..511d76c9144cc 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -42,6 +42,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { conf.set("spark.serializer.objectStreamReset", "1") conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") conf.set("spark.shuffle.spill.compress", codec.isDefined.toString) + conf.set("spark.shuffle.compress", codec.isDefined.toString) codec.foreach { c => conf.set("spark.io.compression.codec", c) } // Ensure that we actually have multiple batches per spill file conf.set("spark.shuffle.spill.batchSize", "10") diff --git a/dev/check-license b/dev/check-license index 625ec161bc571..558e038afc01a 100755 --- a/dev/check-license +++ b/dev/check-license @@ -32,9 +32,9 @@ acquire_rat_jar () { printf "Attempting to fetch rat\n" JAR_DL=${JAR}.part if hash curl 2>/dev/null; then - (curl --progress-bar ${URL1} > "$JAR_DL" || curl --progress-bar ${URL2} > "$JAR_DL") && mv "$JAR_DL" "$JAR" + (curl --silent ${URL1} > "$JAR_DL" || curl --silent ${URL2} > "$JAR_DL") && mv "$JAR_DL" "$JAR" elif hash wget 2>/dev/null; then - (wget --progress=bar ${URL1} -O "$JAR_DL" || wget --progress=bar ${URL2} -O "$JAR_DL") && mv "$JAR_DL" "$JAR" + (wget --quiet ${URL1} -O "$JAR_DL" || wget --quiet ${URL2} -O "$JAR_DL") && mv "$JAR_DL" "$JAR" else printf "You do not have curl or wget installed, please install rat manually.\n" exit -1 diff --git a/dev/lint-python b/dev/lint-python index a1e890faa8fa6..79bf70f0b8b13 100755 --- a/dev/lint-python +++ b/dev/lint-python @@ -30,6 +30,7 @@ cd $SPARK_ROOT_DIR #+ - Download this from a more reliable source. (GitHub raw can be flaky, apparently. (?)) PEP8_SCRIPT_PATH="$SPARK_ROOT_DIR/dev/pep8.py" PEP8_SCRIPT_REMOTE_PATH="https://raw.githubusercontent.com/jcrocholl/pep8/1.5.7/pep8.py" +PEP8_PATHS_TO_CHECK="./python/pyspark/ ./ec2/spark_ec2.py ./examples/src/main/python/" curl --silent -o "$PEP8_SCRIPT_PATH" "$PEP8_SCRIPT_REMOTE_PATH" curl_status=$? @@ -44,7 +45,7 @@ fi #+ first, but we do so so that the check status can #+ be output before the report, like with the #+ scalastyle and RAT checks. -python $PEP8_SCRIPT_PATH ./python/pyspark > "$PEP8_REPORT_PATH" +python $PEP8_SCRIPT_PATH $PEP8_PATHS_TO_CHECK > "$PEP8_REPORT_PATH" pep8_status=${PIPESTATUS[0]} #$? if [ $pep8_status -ne 0 ]; then @@ -54,7 +55,7 @@ else echo "PEP 8 checks passed." fi -rm -f "$PEP8_REPORT_PATH" +rm "$PEP8_REPORT_PATH" rm "$PEP8_SCRIPT_PATH" exit $pep8_status diff --git a/docs/configuration.md b/docs/configuration.md index 65a422caabb7e..36178efb97103 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -293,12 +293,11 @@ Apart from these, the following properties are also available, and may be useful spark.shuffle.manager - HASH + sort - Implementation to use for shuffling data. A hash-based shuffle manager is the default, but - starting in Spark 1.1 there is an experimental sort-based shuffle manager that is more - memory-efficient in environments with small executors, such as YARN. To use that, change - this value to SORT. + Implementation to use for shuffling data. There are two implementations available: + sort and hash. Sort-based shuffle is more memory-efficient and is + the default option starting in 1.2. diff --git a/docs/img/streaming-arch.png b/docs/img/streaming-arch.png index bc57b460fdf8b..ac35f1d34cf3d 100644 Binary files a/docs/img/streaming-arch.png and b/docs/img/streaming-arch.png differ diff --git a/docs/img/streaming-figures.pptx b/docs/img/streaming-figures.pptx index 1b18c2ee0ea3e..d1cc25e379f46 100644 Binary files a/docs/img/streaming-figures.pptx and b/docs/img/streaming-figures.pptx differ diff --git a/docs/img/streaming-kinesis-arch.png b/docs/img/streaming-kinesis-arch.png new file mode 100644 index 0000000000000..bea5fa88df985 Binary files /dev/null and b/docs/img/streaming-kinesis-arch.png differ diff --git a/docs/index.md b/docs/index.md index 4ac0982ae54f1..7fe6b43d32af7 100644 --- a/docs/index.md +++ b/docs/index.md @@ -103,6 +103,8 @@ options for deployment: * [Security](security.html): Spark security support * [Hardware Provisioning](hardware-provisioning.html): recommendations for cluster hardware * [3rd Party Hadoop Distributions](hadoop-third-party-distributions.html): using common Hadoop distributions +* Integration with other storage systems: + * [OpenStack Swift](storage-openstack-swift.html) * [Building Spark with Maven](building-with-maven.html): build Spark using the Maven system * [Contributing to Spark](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 8f7fb5431cfb6..1814fef465cac 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -68,6 +68,16 @@ val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD {% endhighlight %} +In addition to the basic SQLContext, you can also create a HiveContext, which provides a strict +super set of the functionality provided by the basic SQLContext. Additional features include +the ability to write queries using the more complete HiveQL parser, access to HiveUDFs, and the +ability to read data from Hive tables. To use a HiveContext, you do not need to have an +existing hive setup, and all of the data sources available to a SQLContext are still available. +HiveContext is only packaged separately to avoid including all of Hive's dependencies in the default +Spark build. If these dependencies are not a problem for your application then using HiveContext +is recommended for the 1.2 release of Spark. Future releases will focus on bringing SQLContext up to +feature parity with a HiveContext. +
@@ -81,6 +91,16 @@ JavaSparkContext sc = ...; // An existing JavaSparkContext. JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc); {% endhighlight %} +In addition to the basic SQLContext, you can also create a HiveContext, which provides a strict +super set of the functionality provided by the basic SQLContext. Additional features include +the ability to write queries using the more complete HiveQL parser, access to HiveUDFs, and the +ability to read data from Hive tables. To use a HiveContext, you do not need to have an +existing hive setup, and all of the data sources available to a SQLContext are still available. +HiveContext is only packaged separately to avoid including all of Hive's dependencies in the default +Spark build. If these dependencies are not a problem for your application then using HiveContext +is recommended for the 1.2 release of Spark. Future releases will focus on bringing SQLContext up to +feature parity with a HiveContext. +
@@ -94,36 +114,52 @@ from pyspark.sql import SQLContext sqlContext = SQLContext(sc) {% endhighlight %} -
+In addition to the basic SQLContext, you can also create a HiveContext, which provides a strict +super set of the functionality provided by the basic SQLContext. Additional features include +the ability to write queries using the more complete HiveQL parser, access to HiveUDFs, and the +ability to read data from Hive tables. To use a HiveContext, you do not need to have an +existing hive setup, and all of the data sources available to a SQLContext are still available. +HiveContext is only packaged separately to avoid including all of Hive's dependencies in the default +Spark build. If these dependencies are not a problem for your application then using HiveContext +is recommended for the 1.2 release of Spark. Future releases will focus on bringing SQLContext up to +feature parity with a HiveContext. -# Data Sources - -
-
-Spark SQL supports operating on a variety of data sources through the `SchemaRDD` interface. -Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources.
-
-Spark SQL supports operating on a variety of data sources through the `JavaSchemaRDD` interface. -Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources. -
+The specific variant of SQL that is used to parse queries can also be selected using the +`spark.sql.dialect` option. This parameter can be changed using either the `setConf` method on +a SQLContext or by using a `SET key=value` command in SQL. For a SQLContext, the only dialect +available is "sql" which uses a simple SQL parser provided by Spark SQL. In a HiveContext, the +default is "hiveql", though "sql" is also available. Since the HiveQL parser is much more complete, + this is recommended for most use cases. + +# Data Sources -
Spark SQL supports operating on a variety of data sources through the `SchemaRDD` interface. -Once a dataset has been loaded, it can be registered as a table and even joined with data from other sources. -
-
+A SchemaRDD can be operated on as normal RDDs and can also be registered as a temporary table. +Registering a SchemaRDD as a table allows you to run SQL queries over its data. This section +describes the various methods for loading data into a SchemaRDD. ## RDDs +Spark SQL supports two different methods for converting existing RDDs into SchemaRDDs. The first +method uses reflection to infer the schema of an RDD that contains specific types of objects. This +reflection based approach leads to more concise code and works well went the schema is known ahead +of time, while you are writing your Spark application. + +The second method for creating SchemaRDDs is through a programmatic interface that allows you to +construct a schema and then apply it to and existing RDD. While this method is more verbose, it allows +you to construct SchemaRDDs when the columns and their types are not known until runtime. + +### Inferring the Schema Using Reflection
-One type of table that is supported by Spark SQL is an RDD of Scala case classes. The case class +The Scala interaface for Spark SQL supports automatically converting an RDD containing case classes +to a SchemaRDD. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Sequences or Arrays. This RDD can be implicitly converted to a SchemaRDD and then be @@ -156,8 +192,9 @@ teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
-One type of table that is supported by Spark SQL is an RDD of [JavaBeans](http://stackoverflow.com/questions/3295496/what-is-a-javabean-exactly). The BeanInfo -defines the schema of the table. Currently, Spark SQL does not support JavaBeans that contain +Spark SQL supports automatically converting an RDD of [JavaBeans](http://stackoverflow.com/questions/3295496/what-is-a-javabean-exactly) +into a Schema RDD. The BeanInfo, obtained using reflection, defines the schema of the table. +Currently, Spark SQL does not support JavaBeans that contain nested or contain complex types such as Lists or Arrays. You can create a JavaBean by creating a class that implements Serializable and has getters and setters for all of its fields. @@ -192,7 +229,7 @@ for the JavaBean. {% highlight java %} // sc is an existing JavaSparkContext. -JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc) +JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc); // Load a text file and convert each line to a JavaBean. JavaRDD people = sc.textFile("examples/src/main/resources/people.txt").map( @@ -229,24 +266,24 @@ List teenagerNames = teenagers.map(new Function() {
-One type of table that is supported by Spark SQL is an RDD of dictionaries. The keys of the -dictionary define the columns names of the table, and the types are inferred by looking at the first -row. Any RDD of dictionaries can converted to a SchemaRDD and then registered as a table. Tables -can be used in subsequent SQL statements. +Spark SQL can convert an RDD of Row objects to a SchemaRDD, inferring the datatypes . Rows are constructed by passing a list of +key/value pairs as kwargs to the Row class. The keys of this list define the columns names of the table, +and the types are inferred by looking at the first row. Since we currently only look at the first +row, it is important that there is no missing data in the first row of the RDD. In future version we +plan to more completely infer the schema by looking at more data, similar to the inference that is +performed on JSON files. {% highlight python %} # sc is an existing SparkContext. -from pyspark.sql import SQLContext +from pyspark.sql import SQLContext, Row sqlContext = SQLContext(sc) # Load a text file and convert each line to a dictionary. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) -people = parts.map(lambda p: {"name": p[0], "age": int(p[1])}) +people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # Infer the schema, and register the SchemaRDD as a table. -# In future versions of PySpark we would like to add support for registering RDDs with other -# datatypes as tables schemaPeople = sqlContext.inferSchema(people) schemaPeople.registerTempTable("people") @@ -263,15 +300,191 @@ for teenName in teenNames.collect():
-**Note that Spark SQL currently uses a very basic SQL parser.** -Users that want a more complete dialect of SQL should look at the HiveQL support provided by -`HiveContext`. +### Programmatically Specifying the Schema + +
+ +
+ +In cases that case classes cannot be defined ahead of time (for example, +the structure of records is encoded in a string or a text dataset will be parsed +and fields will be projected differently for different users), +a `SchemaRDD` can be created programmatically with three steps. + +1. Create an RDD of `Row`s from the original RDD; +2. Create the schema represented by a `StructType` matching the structure of +`Row`s in the RDD created in the step 1. +3. Apply the schema to the RDD of `Row`s via `applySchema` method provided +by `SQLContext`. + +For example: +{% highlight scala %} +// sc is an existing SparkContext. +val sqlContext = new org.apache.spark.sql.SQLContext(sc) + +// Create an RDD +val people = sc.textFile("examples/src/main/resources/people.txt") + +// The schema is encoded in a string +val schemaString = "name age" + +// Import Spark SQL data types and Row. +import org.apache.spark.sql._ + +// Generate the schema based on the string of schema +val schema = + StructType( + schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) + +// Convert records of the RDD (people) to Rows. +val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) + +// Apply the schema to the RDD. +val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema) + +// Register the SchemaRDD as a table. +peopleSchemaRDD.registerTempTable("people") + +// SQL statements can be run by using the sql methods provided by sqlContext. +val results = sqlContext.sql("SELECT name FROM people") + +// The results of SQL queries are SchemaRDDs and support all the normal RDD operations. +// The columns of a row in the result can be accessed by ordinal. +results.map(t => "Name: " + t(0)).collect().foreach(println) +{% endhighlight %} + + +
+ +
+ +In cases that JavaBean classes cannot be defined ahead of time (for example, +the structure of records is encoded in a string or a text dataset will be parsed and +fields will be projected differently for different users), +a `SchemaRDD` can be created programmatically with three steps. + +1. Create an RDD of `Row`s from the original RDD; +2. Create the schema represented by a `StructType` matching the structure of +`Row`s in the RDD created in the step 1. +3. Apply the schema to the RDD of `Row`s via `applySchema` method provided +by `JavaSQLContext`. + +For example: +{% highlight java %} +// Import factory methods provided by DataType. +import org.apache.spark.sql.api.java.DataType +// Import StructType and StructField +import org.apache.spark.sql.api.java.StructType +import org.apache.spark.sql.api.java.StructField +// Import Row. +import org.apache.spark.sql.api.java.Row + +// sc is an existing JavaSparkContext. +JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc); + +// Load a text file and convert each line to a JavaBean. +JavaRDD people = sc.textFile("examples/src/main/resources/people.txt"); + +// The schema is encoded in a string +String schemaString = "name age"; + +// Generate the schema based on the string of schema +List fields = new ArrayList(); +for (String fieldName: schemaString.split(" ")) { + fields.add(DataType.createStructField(fieldName, DataType.StringType, true)); +} +StructType schema = DataType.createStructType(fields); + +// Convert records of the RDD (people) to Rows. +JavaRDD rowRDD = people.map( + new Function() { + public Row call(String record) throws Exception { + String[] fields = record.split(","); + return Row.create(fields[0], fields[1].trim()); + } + }); + +// Apply the schema to the RDD. +JavaSchemaRDD peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema); + +// Register the SchemaRDD as a table. +peopleSchemaRDD.registerTempTable("people"); + +// SQL can be run over RDDs that have been registered as tables. +JavaSchemaRDD results = sqlContext.sql("SELECT name FROM people"); + +// The results of SQL queries are SchemaRDDs and support all the normal RDD operations. +// The columns of a row in the result can be accessed by ordinal. +List names = results.map(new Function() { + public String call(Row row) { + return "Name: " + row.getString(0); + } +}).collect(); + +{% endhighlight %} + +
+ +
+ +For some cases (for example, the structure of records is encoded in a string or +a text dataset will be parsed and fields will be projected differently for +different users), it is desired to create `SchemaRDD` with a programmatically way. +It can be done with three steps. + +1. Create an RDD of tuples or lists from the original RDD; +2. Create the schema represented by a `StructType` matching the structure of +tuples or lists in the RDD created in the step 1. +3. Apply the schema to the RDD via `applySchema` method provided by `SQLContext`. + +For example: +{% highlight python %} +# Import SQLContext and data types +from pyspark.sql import * + +# sc is an existing SparkContext. +sqlContext = SQLContext(sc) + +# Load a text file and convert each line to a tuple. +lines = sc.textFile("examples/src/main/resources/people.txt") +parts = lines.map(lambda l: l.split(",")) +people = parts.map(lambda p: (p[0], p[1].strip())) + +# The schema is encoded in a string. +schemaString = "name age" + +fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] +schema = StructType(fields) + +# Apply the schema to the RDD. +schemaPeople = sqlContext.applySchema(people, schema) + +# Register the SchemaRDD as a table. +schemaPeople.registerTempTable("people") + +# SQL can be run over SchemaRDDs that have been registered as a table. +results = sqlContext.sql("SELECT name FROM people") + +# The results of SQL queries are RDDs and support all the normal RDD operations. +names = results.map(lambda p: "Name: " + p.name) +for name in names.collect(): + print name +{% endhighlight %} + + +
+ +
## Parquet Files [Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema -of the original data. Using the data from the above example: +of the original data. + +### Loading Data Programmatically + +Using the data from the above example:
@@ -349,7 +562,40 @@ for teenName in teenNames.collect():
-
+
+ +### Configuration + +Configuration of parquet can be done using the `setConf` method on SQLContext or by running +`SET key=value` commands using SQL. + + + + + + + + + + + + + + + + + + +
Property NameDefaultMeaning
spark.sql.parquet.binaryAsStringfalse + Some other parquet producing systems, in particular Impala and older versions of Spark SQL, do + not differentiate between binary data and strings when writing out the parquet schema. This + flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems. +
spark.sql.parquet.cacheMetadatafalse + Turns on caching of parquet schema metadata. Can speed up querying +
spark.sql.parquet.compression.codecsnappy + Sets the compression codec use when writing parquet files. Acceptable values include: + uncompressed, snappy, gzip, lzo. +
## JSON Datasets
@@ -493,13 +739,13 @@ directory. {% highlight scala %} // sc is an existing SparkContext. -val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) +val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) -hiveContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") -hiveContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") +sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") +sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL -hiveContext.sql("FROM src SELECT key, value").collect().foreach(println) +sqlContext.sql("FROM src SELECT key, value").collect().foreach(println) {% endhighlight %}
@@ -513,13 +759,13 @@ expressed in HiveQL. {% highlight java %} // sc is an existing JavaSparkContext. -JavaHiveContext hiveContext = new org.apache.spark.sql.hive.api.java.HiveContext(sc); +JavaHiveContext sqlContext = new org.apache.spark.sql.hive.api.java.HiveContext(sc); -hiveContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); -hiveContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); +sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); +sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL. -Row[] results = hiveContext.sql("FROM src SELECT key, value").collect(); +Row[] results = sqlContext.sql("FROM src SELECT key, value").collect(); {% endhighlight %} @@ -535,44 +781,97 @@ expressed in HiveQL. {% highlight python %} # sc is an existing SparkContext. from pyspark.sql import HiveContext -hiveContext = HiveContext(sc) +sqlContext = HiveContext(sc) -hiveContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") -hiveContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") +sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") +sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries can be expressed in HiveQL. -results = hiveContext.sql("FROM src SELECT key, value").collect() +results = sqlContext.sql("FROM src SELECT key, value").collect() {% endhighlight %}
-# Writing Language-Integrated Relational Queries +# Performance Tuning -**Language-Integrated queries are currently only supported in Scala.** - -Spark SQL also supports a domain specific language for writing queries. Once again, -using the data from the above examples: +For some workloads it is possible to improve performance by either caching data in memory, or by +turning on some experimental options. -{% highlight scala %} -// sc is an existing SparkContext. -val sqlContext = new org.apache.spark.sql.SQLContext(sc) -// Importing the SQL context gives access to all the public SQL functions and implicit conversions. -import sqlContext._ -val people: RDD[Person] = ... // An RDD of case class objects, from the first example. +## Caching Data In Memory -// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19' -val teenagers = people.where('age >= 10).where('age <= 19).select('name) -teenagers.map(t => "Name: " + t(0)).collect().foreach(println) -{% endhighlight %} +Spark SQL can cache tables using an in-memory columnar format by calling `cacheTable("tableName")`. +Then Spark SQL will scan only required columns and will automatically tune compression to minimize +memory usage and GC pressure. You can call `uncacheTable("tableName")` to remove the table from memory. -The DSL uses Scala symbols to represent columns in the underlying table, which are identifiers -prefixed with a tick (`'`). Implicit conversions turn these symbols into expressions that are -evaluated by the SQL execution engine. A full list of the functions supported can be found in the -[ScalaDoc](api/scala/index.html#org.apache.spark.sql.SchemaRDD). +Note that if you just call `cache` rather than `cacheTable`, tables will _not_ be cached in +in-memory columnar format. So we strongly recommend using `cacheTable` whenever you want to +cache tables. - +Configuration of in-memory caching can be done using the `setConf` method on SQLContext or by running +`SET key=value` commands using SQL. + + + + + + + + + + + + + + +
Property NameDefaultMeaning
spark.sql.inMemoryColumnarStorage.compressedfalse + When set to true Spark SQL will automatically select a compression codec for each column based + on statistics of the data. +
spark.sql.inMemoryColumnarStorage.batchSize1000 + Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization + and compression, but risk OOMs when caching data. +
+ +## Other Configuration + +The following options can also be used to tune the performance of query execution. It is possible +that these options will be deprecated in future release as more optimizations are performed automatically. + + + + + + + + + + + + + + + + + + +
Property NameDefaultMeaning
spark.sql.autoBroadcastJoinThresholdfalse + Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when + performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently + statistics are only supported for Hive Metastore tables where the command + `ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan` has been run. +
spark.sql.codegenfalse + When true, code will be dynamically generated at runtime for expression evaluation in a specific + query. For some queries with complicated expression this option can lead to significant speed-ups. + However, for simple queries this can actually slow down query execution. +
spark.sql.shuffle.partitions200 + Configures the number of partitions to use when shuffling data for joins or aggregations. +
+ +# Other SQL Interfaces + +Spark SQL also supports interfaces for running SQL queries directly without the need to write any +code. ## Running the Thrift JDBC server @@ -602,14 +901,28 @@ Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. You may also use the beeline script comes with Hive. +## Running the Spark SQL CLI + +The Spark SQL CLI is a convenient tool to run the Hive metastore service in local mode and execute +queries input from command line. Note: the Spark SQL CLI cannot talk to the Thrift JDBC server. + +To start the Spark SQL CLI, run the following in the Spark directory: + + ./bin/spark-sql + +Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. +You may run `./bin/spark-sql --help` for a complete list of all available +options. + +# Compatibility with Other Systems + +## Migration Guide for Shark Users To set a [Fair Scheduler](job-scheduling.html#fair-scheduler-pools) pool for a JDBC client session, users can set the `spark.sql.thriftserver.scheduler.pool` variable: SET spark.sql.thriftserver.scheduler.pool=accounting; -### Migration Guide for Shark Users - -#### Reducer number +### Reducer number In Shark, default reducer number is 1 and is controlled by the property `mapred.reduce.tasks`. Spark SQL deprecates this property by a new property `spark.sql.shuffle.partitions`, whose default value @@ -625,7 +938,7 @@ You may also put this property in `hive-site.xml` to override the default value. For now, the `mapred.reduce.tasks` property is still recognized, and is converted to `spark.sql.shuffle.partitions` automatically. -#### Caching +### Caching The `shark.cache` table property no longer exists, and tables whose name end with `_cached` are no longer automatically cached. Instead, we provide `CACHE TABLE` and `UNCACHE TABLE` statements to @@ -634,9 +947,9 @@ let user control table caching explicitly: CACHE TABLE logs_last_month; UNCACHE TABLE logs_last_month; -**NOTE:** `CACHE TABLE tbl` is lazy, it only marks table `tbl` as "need to by cached if necessary", -but doesn't actually cache it until a query that touches `tbl` is executed. To force the table to be -cached, you may simply count the table immediately after executing `CACHE TABLE`: +**NOTE:** `CACHE TABLE tbl` is lazy, similar to `.cache` on an RDD. This command only marks `tbl` to ensure that +partitions are cached when calculated but doesn't actually cache it until a query that touches `tbl` is executed. +To force the table to be cached, you may simply count the table immediately after executing `CACHE TABLE`: CACHE TABLE logs_last_month; SELECT COUNT(1) FROM logs_last_month; @@ -647,15 +960,18 @@ Several caching related features are not supported yet: * RDD reloading * In-memory cache write through policy -### Compatibility with Apache Hive +## Compatibility with Apache Hive + +Spark SQL is designed to be compatible with the Hive Metastore, SerDes and UDFs. Currently Spark +SQL is based on Hive 0.12.0. #### Deploying in Existing Hive Warehouses -Spark SQL Thrift JDBC server is designed to be "out of the box" compatible with existing Hive +The Spark SQL Thrift JDBC server is designed to be "out of the box" compatible with existing Hive installations. You do not need to modify your existing Hive Metastore or change the data placement or partitioning of your tables. -#### Supported Hive Features +### Supported Hive Features Spark SQL supports the vast majority of Hive features, such as: @@ -705,13 +1021,14 @@ Spark SQL supports the vast majority of Hive features, such as: * `MAP<>` * `STRUCT<>` -#### Unsupported Hive Functionality +### Unsupported Hive Functionality Below is a list of Hive features that we don't support yet. Most of these features are rarely used in Hive deployments. **Major Hive Features** +* Spark SQL does not currently support inserting to tables using dynamic partitioning. * Tables with buckets: bucket is the hash partitioning within a Hive table partition. Spark SQL doesn't support buckets yet. @@ -721,11 +1038,11 @@ in Hive deployments. have the same input format. * Non-equi outer join: For the uncommon use case of using outer joins with non-equi join conditions (e.g. condition "`key < 10`"), Spark SQL will output wrong result for the `NULL` tuple. -* `UNIONTYPE` +* `UNION` type and `DATE` type * Unique join * Single query multi insert * Column statistics collecting: Spark SQL does not piggyback scans to collect column statistics at - the moment. + the moment and only supports populating the sizeInBytes field of the hive metastore. **Hive Input/Output Formats** @@ -735,7 +1052,7 @@ in Hive deployments. **Hive Optimizations** A handful of Hive optimizations are not yet included in Spark. Some of these (such as indexes) are -not necessary due to Spark SQL's in-memory computational model. Others are slotted for future +less important due to Spark SQL's in-memory computational model. Others are slotted for future releases of Spark SQL. * Block level bitmap indexes and virtual columns (used to build indexes) @@ -743,8 +1060,7 @@ releases of Spark SQL. Hive automatically converts the join into a map join. We are adding this auto conversion in the next release. * Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you - need to control the degree of parallelism post-shuffle using "`SET spark.sql.shuffle.partitions=[num_tasks];`". We are going to add auto-setting of parallelism in the - next release. + need to control the degree of parallelism post-shuffle using "`SET spark.sql.shuffle.partitions=[num_tasks];`". * Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still launches tasks to compute the result. * Skew data flag: Spark SQL does not follow the skew data flags in Hive. @@ -753,25 +1069,471 @@ releases of Spark SQL. Hive can optionally merge the small files into fewer large files to avoid overflowing the HDFS metadata. Spark SQL does not support that. -## Running the Spark SQL CLI +# Writing Language-Integrated Relational Queries -The Spark SQL CLI is a convenient tool to run the Hive metastore service in local mode and execute -queries input from command line. Note: the Spark SQL CLI cannot talk to the Thrift JDBC server. +**Language-Integrated queries are experimental and currently only supported in Scala.** -To start the Spark SQL CLI, run the following in the Spark directory: +Spark SQL also supports a domain specific language for writing queries. Once again, +using the data from the above examples: - ./bin/spark-sql +{% highlight scala %} +// sc is an existing SparkContext. +val sqlContext = new org.apache.spark.sql.SQLContext(sc) +// Importing the SQL context gives access to all the public SQL functions and implicit conversions. +import sqlContext._ +val people: RDD[Person] = ... // An RDD of case class objects, from the first example. -Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. -You may run `./bin/spark-sql --help` for a complete list of all available -options. +// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19' +val teenagers = people.where('age >= 10).where('age <= 19).select('name) +teenagers.map(t => "Name: " + t(0)).collect().foreach(println) +{% endhighlight %} -# Cached tables +The DSL uses Scala symbols to represent columns in the underlying table, which are identifiers +prefixed with a tick (`'`). Implicit conversions turn these symbols into expressions that are +evaluated by the SQL execution engine. A full list of the functions supported can be found in the +[ScalaDoc](api/scala/index.html#org.apache.spark.sql.SchemaRDD). -Spark SQL can cache tables using an in-memory columnar format by calling `cacheTable("tableName")`. -Then Spark SQL will scan only required columns and will automatically tune compression to minimize -memory usage and GC pressure. You can call `uncacheTable("tableName")` to remove the table from memory. + + +# Spark SQL DataType Reference + +* Numeric types + - `ByteType`: Represents 1-byte signed integer numbers. + The range of numbers is from `-128` to `127`. + - `ShortType`: Represents 2-byte signed integer numbers. + The range of numbers is from `-32768` to `32767`. + - `IntegerType`: Represents 4-byte signed integer numbers. + The range of numbers is from `-2147483648` to `2147483647`. + - `LongType`: Represents 8-byte signed integer numbers. + The range of numbers is from `-9223372036854775808` to `9223372036854775807`. + - `FloatType`: Represents 4-byte single-precision floating point numbers. + - `DoubleType`: Represents 8-byte double-precision floating point numbers. + - `DecimalType`: +* String type + - `StringType`: Represents character string values. +* Binary type + - `BinaryType`: Represents byte sequence values. +* Boolean type + - `BooleanType`: Represents boolean values. +* Datetime type + - `TimestampType`: Represents values comprising values of fields year, month, day, + hour, minute, and second. +* Complex types + - `ArrayType(elementType, containsNull)`: Represents values comprising a sequence of + elements with the type of `elementType`. `containsNull` is used to indicate if + elements in a `ArrayType` value can have `null` values. + - `MapType(keyType, valueType, valueContainsNull)`: + Represents values comprising a set of key-value pairs. The data type of keys are + described by `keyType` and the data type of values are described by `valueType`. + For a `MapType` value, keys are not allowed to have `null` values. `valueContainsNull` + is used to indicate if values of a `MapType` value can have `null` values. + - `StructType(fields)`: Represents values with the structure described by + a sequence of `StructField`s (`fields`). + * `StructField(name, dataType, nullable)`: Represents a field in a `StructType`. + The name of a field is indicated by `name`. The data type of a field is indicated + by `dataType`. `nullable` is used to indicate if values of this fields can have + `null` values. + +
+
+ +All data types of Spark SQL are located in the package `org.apache.spark.sql`. +You can access them by doing +{% highlight scala %} +import org.apache.spark.sql._ +{% endhighlight %} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Data typeValue type in ScalaAPI to access or create a data type
ByteType Byte + ByteType +
ShortType Short + ShortType +
IntegerType Int + IntegerType +
LongType Long + LongType +
FloatType Float + FloatType +
DoubleType Double + DoubleType +
DecimalType scala.math.sql.BigDecimal + DecimalType +
StringType String + StringType +
BinaryType Array[Byte] + BinaryType +
BooleanType Boolean + BooleanType +
TimestampType java.sql.Timestamp + TimestampType +
ArrayType scala.collection.Seq + ArrayType(elementType, [containsNull])
+ Note: The default value of containsNull is false. +
MapType scala.collection.Map + MapType(keyType, valueType, [valueContainsNull])
+ Note: The default value of valueContainsNull is true. +
StructType org.apache.spark.sql.Row + StructType(fields)
+ Note: fields is a Seq of StructFields. Also, two fields with the same + name are not allowed. +
StructField The value type in Scala of the data type of this field + (For example, Int for a StructField with the data type IntegerType) + StructField(name, dataType, nullable) +
+ +
+ +
+ +All data types of Spark SQL are located in the package of +`org.apache.spark.sql.api.java`. To access or create a data type, +please use factory methods provided in +`org.apache.spark.sql.api.java.DataType`. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Data typeValue type in JavaAPI to access or create a data type
ByteType byte or Byte + DataType.ByteType +
ShortType short or Short + DataType.ShortType +
IntegerType int or Integer + DataType.IntegerType +
LongType long or Long + DataType.LongType +
FloatType float or Float + DataType.FloatType +
DoubleType double or Double + DataType.DoubleType +
DecimalType java.math.BigDecimal + DataType.DecimalType +
StringType String + DataType.StringType +
BinaryType byte[] + DataType.BinaryType +
BooleanType boolean or Boolean + DataType.BooleanType +
TimestampType java.sql.Timestamp + DataType.TimestampType +
ArrayType java.util.List + DataType.createArrayType(elementType)
+ Note: The value of containsNull will be false
+ DataType.createArrayType(elementType, containsNull). +
MapType java.util.Map + DataType.createMapType(keyType, valueType)
+ Note: The value of valueContainsNull will be true.
+ DataType.createMapType(keyType, valueType, valueContainsNull)
+
StructType org.apache.spark.sql.api.java + DataType.createStructType(fields)
+ Note: fields is a List or an array of StructFields. + Also, two fields with the same name are not allowed. +
StructField The value type in Java of the data type of this field + (For example, int for a StructField with the data type IntegerType) + DataType.createStructField(name, dataType, nullable) +
+ +
+ +
+ +All data types of Spark SQL are located in the package of `pyspark.sql`. +You can access them by doing +{% highlight python %} +from pyspark.sql import * +{% endhighlight %} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Data typeValue type in PythonAPI to access or create a data type
ByteType + int or long
+ Note: Numbers will be converted to 1-byte signed integer numbers at runtime. + Please make sure that numbers are within the range of -128 to 127. +
+ ByteType() +
ShortType + int or long
+ Note: Numbers will be converted to 2-byte signed integer numbers at runtime. + Please make sure that numbers are within the range of -32768 to 32767. +
+ ShortType() +
IntegerType int or long + IntegerType() +
LongType + long
+ Note: Numbers will be converted to 8-byte signed integer numbers at runtime. + Please make sure that numbers are within the range of + -9223372036854775808 to 9223372036854775807. + Otherwise, please convert data to decimal.Decimal and use DecimalType. +
+ LongType() +
FloatType + float
+ Note: Numbers will be converted to 4-byte single-precision floating + point numbers at runtime. +
+ FloatType() +
DoubleType float + DoubleType() +
DecimalType decimal.Decimal + DecimalType() +
StringType string + StringType() +
BinaryType bytearray + BinaryType() +
BooleanType bool + BooleanType() +
TimestampType datetime.datetime + TimestampType() +
ArrayType list, tuple, or array + ArrayType(elementType, [containsNull])
+ Note: The default value of containsNull is False. +
MapType dict + MapType(keyType, valueType, [valueContainsNull])
+ Note: The default value of valueContainsNull is True. +
StructType list or tuple + StructType(fields)
+ Note: fields is a Seq of StructFields. Also, two fields with the same + name are not allowed. +
StructField The value type in Python of the data type of this field + (For example, Int for a StructField with the data type IntegerType) + StructField(name, dataType, nullable) +
+ +
+ +
-Note that if you just call `cache` rather than `cacheTable`, tables will _not_ be cached in -in-memory columnar format. So we strongly recommend using `cacheTable` whenever you want to -cache tables. diff --git a/docs/storage-openstack-swift.md b/docs/storage-openstack-swift.md new file mode 100644 index 0000000000000..c39ef1ce59e1c --- /dev/null +++ b/docs/storage-openstack-swift.md @@ -0,0 +1,152 @@ +--- +layout: global +title: Accessing OpenStack Swift from Spark +--- + +Spark's support for Hadoop InputFormat allows it to process data in OpenStack Swift using the +same URI formats as in Hadoop. You can specify a path in Swift as input through a +URI of the form swift://container.PROVIDER/path. You will also need to set your +Swift security credentials, through core-site.xml or via +SparkContext.hadoopConfiguration. +Current Swift driver requires Swift to use Keystone authentication method. + +# Configuring Swift for Better Data Locality + +Although not mandatory, it is recommended to configure the proxy server of Swift with +list_endpoints to have better data locality. More information is +[available here](https://github.com/openstack/swift/blob/master/swift/common/middleware/list_endpoints.py). + + +# Dependencies + +The Spark application should include hadoop-openstack dependency. +For example, for Maven support, add the following to the pom.xml file: + +{% highlight xml %} + + ... + + org.apache.hadoop + hadoop-openstack + 2.3.0 + + ... + +{% endhighlight %} + + +# Configuration Parameters + +Create core-site.xml and place it inside Spark's conf directory. +There are two main categories of parameters that should to be configured: declaration of the +Swift driver and the parameters that are required by Keystone. + +Configuration of Hadoop to use Swift File system achieved via + + + + + + + +
Property NameValue
fs.swift.implorg.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem
+ +Additional parameters required by Keystone (v2.0) and should be provided to the Swift driver. Those +parameters will be used to perform authentication in Keystone to access Swift. The following table +contains a list of Keystone mandatory parameters. PROVIDER can be any name. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Property NameMeaningRequired
fs.swift.service.PROVIDER.auth.urlKeystone Authentication URLMandatory
fs.swift.service.PROVIDER.auth.endpoint.prefixKeystone endpoints prefixOptional
fs.swift.service.PROVIDER.tenantTenantMandatory
fs.swift.service.PROVIDER.usernameUsernameMandatory
fs.swift.service.PROVIDER.passwordPasswordMandatory
fs.swift.service.PROVIDER.http.portHTTP portMandatory
fs.swift.service.PROVIDER.regionKeystone regionMandatory
fs.swift.service.PROVIDER.publicIndicates if all URLs are publicMandatory
+ +For example, assume PROVIDER=SparkTest and Keystone contains user tester with password testing +defined for tenant test. Then core-site.xml should include: + +{% highlight xml %} + + + fs.swift.impl + org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem + + + fs.swift.service.SparkTest.auth.url + http://127.0.0.1:5000/v2.0/tokens + + + fs.swift.service.SparkTest.auth.endpoint.prefix + endpoints + + fs.swift.service.SparkTest.http.port + 8080 + + + fs.swift.service.SparkTest.region + RegionOne + + + fs.swift.service.SparkTest.public + true + + + fs.swift.service.SparkTest.tenant + test + + + fs.swift.service.SparkTest.username + tester + + + fs.swift.service.SparkTest.password + testing + + +{% endhighlight %} + +Notice that +fs.swift.service.PROVIDER.tenant, +fs.swift.service.PROVIDER.username, +fs.swift.service.PROVIDER.password contains sensitive information and keeping them in +core-site.xml is not always a good approach. +We suggest to keep those parameters in core-site.xml for testing purposes when running Spark +via spark-shell. +For job submissions they should be provided via sparkContext.hadoopConfiguration. diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md index 079d4c5550537..c6090d9ec30c7 100644 --- a/docs/streaming-kinesis-integration.md +++ b/docs/streaming-kinesis-integration.md @@ -3,8 +3,8 @@ layout: global title: Spark Streaming + Kinesis Integration --- [Amazon Kinesis](http://aws.amazon.com/kinesis/) is a fully managed service for real-time processing of streaming data at massive scale. -The Kinesis input DStream and receiver uses the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License (ASL). -The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, checkpointing through the concept of Workers, Checkpoints, and Shard Leases. +The Kinesis receiver creates an input DStream using the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License (ASL). +The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, checkpointing through the concepts of Workers, Checkpoints, and Shard Leases. Here we explain how to configure Spark Streaming to receive data from Kinesis. #### Configuring Kinesis @@ -15,7 +15,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m #### Configuring Spark Streaming Application -1. **Linking:** In your SBT/Maven projrect definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information). +1. **Linking:** In your SBT/Maven project definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information). groupId = org.apache.spark artifactId = spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} @@ -23,10 +23,11 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m **Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your application.** -2. **Programming:** In the streaming application code, import `KinesisUtils` and create input DStream as follows. +2. **Programming:** In the streaming application code, import `KinesisUtils` and create the input DStream as follows:
+ import org.apache.spark.streaming.Duration import org.apache.spark.streaming.kinesis._ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream @@ -34,11 +35,13 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position]) See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$) - and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the next subsection for instructions to run the example. + and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the Running the Example section for instructions on how to run the example.
- import org.apache.spark.streaming.flume.*; + import org.apache.spark.streaming.Duration; + import org.apache.spark.streaming.kinesis.*; + import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; JavaReceiverInputDStream kinesisStream = KinesisUtils.createStream( streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position]); @@ -49,36 +52,73 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
- `[endpoint URL]`: Valid Kinesis endpoints URL can be found [here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region). + - `streamingContext`: StreamingContext containg an application name used by Kinesis to tie this Kinesis application to the Kinesis stream - `[checkpoint interval]`: The interval at which the Kinesis client library is going to save its position in the stream. For starters, set it to the same as the batch interval of the streaming application. + - `[Kinesis stream name]`: The Kinesis stream that this streaming application receives from + - The application name used in the streaming context becomes the Kinesis application name + - The application name must be unique for a given account and region. + - The Kinesis backend automatically associates the application name to the Kinesis stream using a DynamoDB table (always in the us-east-1 region) created during Kinesis Client Library initialization. + - Changing the application name or stream name can lead to Kinesis errors in some cases. If you see errors, you may need to manually delete the DynamoDB table. - `[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see later section and Amazon Kinesis API documentation for more details). - *Points to remember:* + - `[endpoint URL]`: Valid Kinesis endpoints URL can be found [here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region). - - The name used in the context of the streaming application must be unique for a given account and region. Changing the app name or stream name could lead to Kinesis errors as only a single logical application can process a single stream. - - A single Kinesis input DStream can receive many Kinesis shards by spinning up multiple KinesisRecordProcessor threads. Note that there is no correlation between number of shards in Kinesis and the number of partitions in the generated RDDs that is used for processing the data. - - You never need more KinesisReceivers than the number of shards in your stream as each will spin up at least one KinesisRecordProcessor thread. - - Horizontal scaling is achieved by autoscaling additional Kinesis input DStreams (separate processes) up to the number of current shards for a given stream, of course. + - `[checkpoint interval]`: The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application. -3. **Deploying:** Package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). + - `[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see Kinesis Checkpointing section and Amazon Kinesis API documentation for more details). - - A DynamoDB table and CloudWatch namespace are created during KCL initialization using this Kinesis application name. This DynamoDB table lives in the us-east-1 region regardless of the Kinesis endpoint URL. It is used to store KCL's checkpoint information. - - If you are seeing errors after changing the app name or stream name, it may be necessary to manually delete the DynamoDB table and start from scratch. +3. **Deploying:** Package `spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). + + *Points to remember at runtime:* + + - Kinesis data processing is ordered per partition and occurs at-least once per message. + + - Multiple applications can read from the same Kinesis stream. Kinesis will maintain the application-specific shard and checkpoint info in DynamodDB. + + - A single Kinesis stream shard is processed by one input DStream at a time. + +

+ Spark Streaming Kinesis Architecture + +

+ + - A single Kinesis input DStream can read from multiple shards of a Kinesis stream by creating multiple KinesisRecordProcessor threads. + + - Multiple input DStreams running in separate processes/instances can read from a Kinesis stream. + + - You never need more Kinesis input DStreams than the number of Kinesis stream shards as each input DStream will create at least one KinesisRecordProcessor thread that handles a single shard. + + - Horizontal scaling is achieved by adding/removing Kinesis input DStreams (within a single process or across multiple processes/instances) - up to the total number of Kinesis stream shards per the previous point. + + - The Kinesis input DStream will balance the load between all DStreams - even across processes/instances. + + - The Kinesis input DStream will balance the load during re-shard events (merging and splitting) due to changes in load. + + - As a best practice, it's recommended that you avoid re-shard jitter by over-provisioning when possible. + + - Each Kinesis input DStream maintains its own checkpoint info. See the Kinesis Checkpointing section for more details. + + - There is no correlation between the number of Kinesis stream shards and the number of RDD partitions/shards created across the Spark cluster during input DStream processing. These are 2 independent partitioning schemes. #### Running the Example To run the example, + - Download Spark source and follow the [instructions](building-with-maven.html) to build Spark with profile *-Pkinesis-asl*. - mvn -Pkinesis-asl -DskipTests clean package + mvn -Pkinesis-asl -DskipTests clean package + -- Set up Kinesis stream (see earlier section). Note the name of the Kinesis stream, and the endpoint URL corresponding to the region the stream is based on. +- Set up Kinesis stream (see earlier section) within AWS. Note the name of the Kinesis stream and the endpoint URL corresponding to the region where the stream was created. - Set up the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_KEY with your AWS credentials. - In the Spark root directory, run the example as +
@@ -92,19 +132,19 @@ To run the example,
- This will wait for data to be received from Kinesis. + This will wait for data to be received from the Kinesis stream. -- To generate random string data, in another terminal, run the associated Kinesis data producer. +- To generate random string data to put onto the Kinesis stream, in another terminal, run the associated Kinesis data producer. bin/run-example streaming.KinesisWordCountProducerASL [Kinesis stream name] [endpoint URL] 1000 10 - This will push random words to the Kinesis stream, which should then be received and processed by the running example. + This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream. This data should then be received and processed by the running example. #### Kinesis Checkpointing -The Kinesis receiver checkpoints the position of the stream that has been read periodically, so that the system can recover from failures and continue processing where it had left off. Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling. The provided example handles this throttling with a random-backoff-retry strategy. - -- If no Kinesis checkpoint info exists, the KinesisReceiver will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) or from the latest tip (InitialPostitionInStream.LATEST). This is configurable. +- Each Kinesis input DStream periodically stores the current position of the stream in the backing DynamoDB table. This allows the system to recover from failures and continue processing where the DStream left off. -- InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no KinesisReceivers are running (and no checkpoint info is being stored). In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data. +- Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling. The provided example handles this throttling with a random-backoff-retry strategy. -- InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records where the impact is dependent on checkpoint frequency. +- If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) or from the latest tip (InitialPostitionInStream.LATEST). This is configurable. +- InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored). +- InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency. diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 3d4bce49666ed..41f170580f452 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -233,7 +233,7 @@ $ ./bin/run-example streaming.NetworkWordCount localhost 9999
{% highlight bash %} -$ ./bin/run-example JavaNetworkWordCount localhost 9999 +$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999 {% endhighlight %}
@@ -262,7 +262,7 @@ hello world {% highlight bash %} # TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount -$ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999 +$ ./bin/run-example streaming.NetworkWordCount localhost 9999 ... ------------------------------------------- Time: 1357008430000 ms @@ -285,12 +285,22 @@ need to know to write your streaming applications. ## Linking -To write your own Spark Streaming program, you will have to add the following dependency to your - SBT or Maven project: +Similar to Spark, Spark Streaming is available through Maven Central. To write your own Spark Streaming program, you will have to add the following dependency to your SBT or Maven project. + +
+
- groupId = org.apache.spark - artifactId = spark-streaming_{{site.SCALA_BINARY_VERSION}} - version = {{site.SPARK_VERSION}} + + org.apache.spark + spark-streaming_{{site.SCALA_BINARY_VERSION}} + {{site.SPARK_VERSION}} + +
+
+ + libraryDependencies += "org.apache.spark" % "spark-streaming_{{site.SCALA_BINARY_VERSION}}" % "{{site.SPARK_VERSION}}" +
+
For ingesting data from sources like Kafka, Flume, and Kinesis that are not present in the Spark Streaming core @@ -302,7 +312,7 @@ some of the common ones are as follows. SourceArtifact Kafka spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}} Flume spark-streaming-flume_{{site.SCALA_BINARY_VERSION}} - Kinesis
spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} + Kinesis
spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} [Apache Software License] Twitter spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}} ZeroMQ spark-streaming-zeromq_{{site.SCALA_BINARY_VERSION}} MQTT spark-streaming-mqtt_{{site.SCALA_BINARY_VERSION}} @@ -373,7 +383,7 @@ or a special __"local[\*]"__ string to run in local mode. In practice, when runn you will not want to hardcode `master` in the program, but rather [launch the application with `spark-submit`](submitting-applications.html) and receive it there. However, for local testing and unit tests, you can pass "local[*]" to run Spark Streaming -in-process. Note that this internally creates a [JavaSparkContext](api/java/index.html?org/apache/spark/api/java/JavaSparkContext.html) (starting point of all Spark functionality) which can be accessed as `ssc.sparkContext`. +in-process. Note that this internally creates a [JavaSparkContext](api/java/index.html?org/apache/spark/api/java/JavaSparkContext.html) (starting point of all Spark functionality) which can be accessed as `ssc.sparkContext`. The batch interval must be set based on the latency requirements of your application and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-size) @@ -447,11 +457,12 @@ Spark Streaming has two categories of streaming sources. - *Basic sources*: Sources directly available in the StreamingContext API. Example: file systems, socket connections, and Akka actors. - *Advanced sources*: Sources like Kafka, Flume, Kinesis, Twitter, etc. are available through extra utility classes. These require linking against extra dependencies as discussed in the [linking](#linking) section. -Every input DStream (except file stream) is associated with a single [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) object which receives the data from a source and stores it in Spark's memory for processing. A receiver is run within a Spark worker/executor as a long-running task, hence it occupies one of the cores allocated to the Spark Streaming application. Hence, it is important to remember that Spark Streaming application needs to be allocated enough cores to process the received data, as well as, to run the receiver(s). Therefore, few important points to remember are: +Every input DStream (except file stream) is associated with a single [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) object which receives the data from a source and stores it in Spark's memory for processing. So every input DStream receives a single stream of data. Note that in a streaming application, you can create multiple input DStreams to receive multiple streams of data in parallel. This is discussed later in the [Performance Tuning](#level-of-parallelism-in-data-receiving) section. + +A receiver is run within a Spark worker/executor as a long-running task, hence it occupies one of the cores allocated to the Spark Streaming application. Hence, it is important to remember that Spark Streaming application needs to be allocated enough cores to process the received data, as well as, to run the receiver(s). Therefore, few important points to remember are: ##### Points to remember: {:.no_toc} - - If the number of cores allocated to the application is less than or equal to the number of input DStreams / receivers, then the system will receive data, but not be able to process them. - When running locally, if you master URL is set to "local", then there is only one core to run tasks. That is insufficient for programs with even one input DStream (file streams are okay) as the receiver will occupy that core and there will be no core left to process the data. @@ -1089,9 +1100,34 @@ parallelizing the data receiving. Note that each input DStream creates a single receiver (running on a worker machine) that receives a single stream of data. Receiving multiple data streams can therefore be achieved by creating multiple input DStreams and configuring them to receive different partitions of the data stream from the source(s). -For example, a single Kafka input stream receiving two topics of data can be split into two +For example, a single Kafka input DStream receiving two topics of data can be split into two Kafka input streams, each receiving only one topic. This would run two receivers on two workers, -thus allowing data to be received in parallel, and increasing overall throughput. +thus allowing data to be received in parallel, and increasing overall throughput. These multiple +DStream can be unioned together to create a single DStream. Then the transformations that was +being applied on the single input DStream can applied on the unified stream. This is done as follows. + +
+
+{% highlight scala %} +val numStreams = 5 +val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) } +val unifiedStream = streamingContext.union(kafkaStreams) +unifiedStream.print() +{% endhighlight %} +
+
+{% highlight java %} +int numStreams = 5; +List> kafkaStreams = new ArrayList>(numStreams); +for (int i = 0; i < numStreams; i++) { + kafkaStreams.add(KafkaUtils.createStream(...)); +} +JavaPairDStream unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); +unifiedStream.print(); +{% endhighlight %} +
+
+ Another parameter that should be considered is the receiver's blocking interval. For most receivers, the received data is coalesced together into large blocks of data before storing inside Spark's memory. @@ -1107,7 +1143,7 @@ before further processing. ### Level of Parallelism in Data Processing {:.no_toc} -Cluster resources maybe under-utilized if the number of parallel tasks used in any stage of the +Cluster resources can be under-utilized if the number of parallel tasks used in any stage of the computation is not high enough. For example, for distributed reduce operations like `reduceByKey` and `reduceByKeyAndWindow`, the default number of parallel tasks is decided by the [config property] (configuration.html#spark-properties) `spark.default.parallelism`. You can pass the level of diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 1670faca4a480..bfd07593b92ed 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -38,9 +38,12 @@ from boto.ec2.blockdevicemapping import BlockDeviceMapping, BlockDeviceType, EBSBlockDeviceType from boto import ec2 +DEFAULT_SPARK_VERSION = "1.0.0" + # A URL prefix from which to fetch AMI information AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list" + class UsageError(Exception): pass @@ -56,10 +59,10 @@ def parse_args(): help="Show this help message and exit") parser.add_option( "-s", "--slaves", type="int", default=1, - help="Number of slaves to launch (default: 1)") + help="Number of slaves to launch (default: %default)") parser.add_option( "-w", "--wait", type="int", default=120, - help="Seconds to wait for nodes to start (default: 120)") + help="Seconds to wait for nodes to start (default: %default)") parser.add_option( "-k", "--key-pair", help="Key pair to use on instances") @@ -68,7 +71,7 @@ def parse_args(): help="SSH private key file to use for logging into instances") parser.add_option( "-t", "--instance-type", default="m1.large", - help="Type of instance to launch (default: m1.large). " + + help="Type of instance to launch (default: %default). " + "WARNING: must be 64-bit; small instances won't work") parser.add_option( "-m", "--master-instance-type", default="", @@ -83,15 +86,15 @@ def parse_args(): "between zones applies)") parser.add_option("-a", "--ami", help="Amazon Machine Image ID to use") parser.add_option( - "-v", "--spark-version", default="1.0.0", - help="Version of Spark to use: 'X.Y.Z' or a specific git hash") + "-v", "--spark-version", default=DEFAULT_SPARK_VERSION, + help="Version of Spark to use: 'X.Y.Z' or a specific git hash (default: %default)") parser.add_option( "--spark-git-repo", default="https://github.com/apache/spark", help="Github repo from which to checkout supplied commit hash") parser.add_option( "--hadoop-major-version", default="1", - help="Major version of Hadoop (default: 1)") + help="Major version of Hadoop (default: %default)") parser.add_option( "-D", metavar="[ADDRESS:]PORT", dest="proxy_port", help="Use SSH dynamic port forwarding to create a SOCKS proxy at " + @@ -115,21 +118,21 @@ def parse_args(): "Only support up to 8 EBS volumes.") parser.add_option( "--swap", metavar="SWAP", type="int", default=1024, - help="Swap space to set up per node, in MB (default: 1024)") + help="Swap space to set up per node, in MB (default: %default)") parser.add_option( "--spot-price", metavar="PRICE", type="float", help="If specified, launch slaves as spot instances with the given " + "maximum price (in dollars)") parser.add_option( "--ganglia", action="store_true", default=True, - help="Setup Ganglia monitoring on cluster (default: on). NOTE: " + + help="Setup Ganglia monitoring on cluster (default: %default). NOTE: " + "the Ganglia page will be publicly accessible") parser.add_option( "--no-ganglia", action="store_false", dest="ganglia", help="Disable Ganglia monitoring for the cluster") parser.add_option( "-u", "--user", default="root", - help="The SSH user you want to connect as (default: root)") + help="The SSH user you want to connect as (default: %default)") parser.add_option( "--delete-groups", action="store_true", default=False, help="When destroying a cluster, delete the security groups that were created.") @@ -138,7 +141,7 @@ def parse_args(): help="Launch fresh slaves, but use an existing stopped master if possible") parser.add_option( "--worker-instances", type="int", default=1, - help="Number of instances per worker: variable SPARK_WORKER_INSTANCES (default: 1)") + help="Number of instances per worker: variable SPARK_WORKER_INSTANCES (default: %default)") parser.add_option( "--master-opts", type="string", default="", help="Extra options to give to master through SPARK_MASTER_OPTS variable " + @@ -151,7 +154,7 @@ def parse_args(): help="Use this prefix for the security group rather than the cluster name.") parser.add_option( "--authorized-address", type="string", default="0.0.0.0/0", - help="Address to authorize on created security groups (default: 0.0.0.0/0)") + help="Address to authorize on created security groups (default: %default)") parser.add_option( "--additional-security-group", type="string", default="", help="Additional security group to place the machines in") @@ -342,7 +345,6 @@ def launch_cluster(conn, opts, cluster_name): if opts.ami is None: opts.ami = get_spark_ami(opts) - additional_groups = [] if opts.additional_security_group: additional_groups = [sg @@ -363,7 +365,7 @@ def launch_cluster(conn, opts, cluster_name): for i in range(opts.ebs_vol_num): device = EBSBlockDeviceType() device.size = opts.ebs_vol_size - device.volume_type=opts.ebs_vol_type + device.volume_type = opts.ebs_vol_type device.delete_on_termination = True block_map["/dev/sd" + chr(ord('s') + i)] = device @@ -495,6 +497,7 @@ def launch_cluster(conn, opts, cluster_name): # Return all the instances return (master_nodes, slave_nodes) + def tag_instance(instance, name): for i in range(0, 5): try: @@ -507,9 +510,12 @@ def tag_instance(instance, name): # Get the EC2 instances in an existing cluster if available. # Returns a tuple of lists of EC2 instance objects for the masters and slaves + + def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): print "Searching for existing cluster " + cluster_name + "..." - # Search all the spot instance requests, and copy any tags from the spot instance request to the cluster. + # Search all the spot instance requests, and copy any tags from the spot + # instance request to the cluster. spot_instance_requests = conn.get_all_spot_instance_requests() for req in spot_instance_requests: if req.state != u'active': @@ -520,7 +526,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): for res in reservations: active = [i for i in res.instances if is_active(i)] for instance in active: - if (instance.tags.get(u'Name') == None): + if (instance.tags.get(u'Name') is None): tag_instance(instance, name) # Now proceed to detect master and slaves instances. reservations = conn.get_all_instances() @@ -540,13 +546,16 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): return (master_nodes, slave_nodes) else: if master_nodes == [] and slave_nodes != []: - print >> sys.stderr, "ERROR: Could not find master in with name " + cluster_name + "-master" + print >> sys.stderr, "ERROR: Could not find master in with name " + \ + cluster_name + "-master" else: print >> sys.stderr, "ERROR: Could not find any existing cluster" sys.exit(1) # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster. + + def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): master = master_nodes[0].public_dns_name if deploy_ssh_key: @@ -890,7 +899,8 @@ def real_main(): if opts.security_group_prefix is None: group_names = [cluster_name + "-master", cluster_name + "-slaves"] else: - group_names = [opts.security_group_prefix + "-master", opts.security_group_prefix + "-slaves"] + group_names = [opts.security_group_prefix + "-master", + opts.security_group_prefix + "-slaves"] attempt = 1 while attempt <= 3: diff --git a/examples/pom.xml b/examples/pom.xml index 9b12cb0c29c9f..3f46c40464d3b 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../pom.xml diff --git a/examples/src/main/python/avro_inputformat.py b/examples/src/main/python/avro_inputformat.py index e902ae29753c0..cfda8d8327aa3 100644 --- a/examples/src/main/python/avro_inputformat.py +++ b/examples/src/main/python/avro_inputformat.py @@ -23,7 +23,8 @@ Read data file users.avro in local Spark distro: $ cd $SPARK_HOME -$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \ +$ ./bin/spark-submit --driver-class-path /path/to/example/jar \ +> ./examples/src/main/python/avro_inputformat.py \ > examples/src/main/resources/users.avro {u'favorite_color': None, u'name': u'Alyssa', u'favorite_numbers': [3, 9, 15, 20]} {u'favorite_color': u'red', u'name': u'Ben', u'favorite_numbers': []} @@ -40,7 +41,8 @@ ] } -$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \ +$ ./bin/spark-submit --driver-class-path /path/to/example/jar \ +> ./examples/src/main/python/avro_inputformat.py \ > examples/src/main/resources/users.avro examples/src/main/resources/user.avsc {u'favorite_color': None, u'name': u'Alyssa'} {u'favorite_color': u'red', u'name': u'Ben'} @@ -51,8 +53,10 @@ Usage: avro_inputformat [reader_schema_file] Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/avro_inputformat.py [reader_schema_file] - Assumes you have Avro data stored in . Reader schema can be optionally specified in [reader_schema_file]. + ./bin/spark-submit --driver-class-path /path/to/example/jar \ + /path/to/examples/avro_inputformat.py [reader_schema_file] + Assumes you have Avro data stored in . Reader schema can be optionally specified + in [reader_schema_file]. """ exit(-1) @@ -62,9 +66,10 @@ conf = None if len(sys.argv) == 3: schema_rdd = sc.textFile(sys.argv[2], 1).collect() - conf = {"avro.schema.input.key" : reduce(lambda x, y: x+y, schema_rdd)} + conf = {"avro.schema.input.key": reduce(lambda x, y: x + y, schema_rdd)} - avro_rdd = sc.newAPIHadoopFile(path, + avro_rdd = sc.newAPIHadoopFile( + path, "org.apache.avro.mapreduce.AvroKeyInputFormat", "org.apache.avro.mapred.AvroKey", "org.apache.hadoop.io.NullWritable", diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py index e4a897f61e39d..05f34b74df45a 100644 --- a/examples/src/main/python/cassandra_inputformat.py +++ b/examples/src/main/python/cassandra_inputformat.py @@ -51,7 +51,8 @@ Usage: cassandra_inputformat Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_inputformat.py + ./bin/spark-submit --driver-class-path /path/to/example/jar \ + /path/to/examples/cassandra_inputformat.py Assumes you have some data in Cassandra already, running on , in and """ exit(-1) @@ -61,12 +62,12 @@ cf = sys.argv[3] sc = SparkContext(appName="CassandraInputFormat") - conf = {"cassandra.input.thrift.address":host, - "cassandra.input.thrift.port":"9160", - "cassandra.input.keyspace":keyspace, - "cassandra.input.columnfamily":cf, - "cassandra.input.partitioner.class":"Murmur3Partitioner", - "cassandra.input.page.row.size":"3"} + conf = {"cassandra.input.thrift.address": host, + "cassandra.input.thrift.port": "9160", + "cassandra.input.keyspace": keyspace, + "cassandra.input.columnfamily": cf, + "cassandra.input.partitioner.class": "Murmur3Partitioner", + "cassandra.input.page.row.size": "3"} cass_rdd = sc.newAPIHadoopRDD( "org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat", "java.util.Map", diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py index 836c35b5c6794..d144539e58b8f 100644 --- a/examples/src/main/python/cassandra_outputformat.py +++ b/examples/src/main/python/cassandra_outputformat.py @@ -50,7 +50,8 @@ Usage: cassandra_outputformat Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_outputformat.py + ./bin/spark-submit --driver-class-path /path/to/example/jar \ + /path/to/examples/cassandra_outputformat.py Assumes you have created the following table in Cassandra already, running on , in . @@ -67,16 +68,16 @@ cf = sys.argv[3] sc = SparkContext(appName="CassandraOutputFormat") - conf = {"cassandra.output.thrift.address":host, - "cassandra.output.thrift.port":"9160", - "cassandra.output.keyspace":keyspace, - "cassandra.output.partitioner.class":"Murmur3Partitioner", - "cassandra.output.cql":"UPDATE " + keyspace + "." + cf + " SET fname = ?, lname = ?", - "mapreduce.output.basename":cf, - "mapreduce.outputformat.class":"org.apache.cassandra.hadoop.cql3.CqlOutputFormat", - "mapreduce.job.output.key.class":"java.util.Map", - "mapreduce.job.output.value.class":"java.util.List"} - key = {"user_id" : int(sys.argv[4])} + conf = {"cassandra.output.thrift.address": host, + "cassandra.output.thrift.port": "9160", + "cassandra.output.keyspace": keyspace, + "cassandra.output.partitioner.class": "Murmur3Partitioner", + "cassandra.output.cql": "UPDATE " + keyspace + "." + cf + " SET fname = ?, lname = ?", + "mapreduce.output.basename": cf, + "mapreduce.outputformat.class": "org.apache.cassandra.hadoop.cql3.CqlOutputFormat", + "mapreduce.job.output.key.class": "java.util.Map", + "mapreduce.job.output.value.class": "java.util.List"} + key = {"user_id": int(sys.argv[4])} sc.parallelize([(key, sys.argv[5:])]).saveAsNewAPIHadoopDataset( conf=conf, keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter", diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py index befacee0dea56..3b16010f1cb97 100644 --- a/examples/src/main/python/hbase_inputformat.py +++ b/examples/src/main/python/hbase_inputformat.py @@ -51,7 +51,8 @@ Usage: hbase_inputformat Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_inputformat.py
+ ./bin/spark-submit --driver-class-path /path/to/example/jar \ + /path/to/examples/hbase_inputformat.py
Assumes you have some data in HBase already, running on , in
""" exit(-1) @@ -61,12 +62,15 @@ sc = SparkContext(appName="HBaseInputFormat") conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table} + keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" + valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" + hbase_rdd = sc.newAPIHadoopRDD( "org.apache.hadoop.hbase.mapreduce.TableInputFormat", "org.apache.hadoop.hbase.io.ImmutableBytesWritable", "org.apache.hadoop.hbase.client.Result", - keyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter", - valueConverter="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter", + keyConverter=keyConv, + valueConverter=valueConv, conf=conf) output = hbase_rdd.collect() for (k, v) in output: diff --git a/examples/src/main/python/hbase_outputformat.py b/examples/src/main/python/hbase_outputformat.py index 49bbc5aebdb0b..abb425b1f886a 100644 --- a/examples/src/main/python/hbase_outputformat.py +++ b/examples/src/main/python/hbase_outputformat.py @@ -44,8 +44,10 @@ Usage: hbase_outputformat
Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_outputformat.py - Assumes you have created
with column family in HBase running on already + ./bin/spark-submit --driver-class-path /path/to/example/jar \ + /path/to/examples/hbase_outputformat.py + Assumes you have created
with column family in HBase + running on already """ exit(-1) @@ -55,13 +57,15 @@ conf = {"hbase.zookeeper.quorum": host, "hbase.mapred.outputtable": table, - "mapreduce.outputformat.class" : "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", - "mapreduce.job.output.key.class" : "org.apache.hadoop.hbase.io.ImmutableBytesWritable", - "mapreduce.job.output.value.class" : "org.apache.hadoop.io.Writable"} + "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", + "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable", + "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"} + keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" + valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" sc.parallelize([sys.argv[3:]]).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset( conf=conf, - keyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter", - valueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter") + keyConverter=keyConv, + valueConverter=valueConv) sc.stop() diff --git a/examples/src/main/python/mllib/correlations.py b/examples/src/main/python/mllib/correlations.py index 6b16a56e44af7..4218eca822a99 100755 --- a/examples/src/main/python/mllib/correlations.py +++ b/examples/src/main/python/mllib/correlations.py @@ -28,7 +28,7 @@ if __name__ == "__main__": - if len(sys.argv) not in [1,2]: + if len(sys.argv) not in [1, 2]: print >> sys.stderr, "Usage: correlations ()" exit(-1) sc = SparkContext(appName="PythonCorrelations") diff --git a/examples/src/main/python/mllib/decision_tree_runner.py b/examples/src/main/python/mllib/decision_tree_runner.py index 6e4a4a0cb6be0..61ea4e06ecf3a 100755 --- a/examples/src/main/python/mllib/decision_tree_runner.py +++ b/examples/src/main/python/mllib/decision_tree_runner.py @@ -21,7 +21,9 @@ This example requires NumPy (http://www.numpy.org/). """ -import numpy, os, sys +import numpy +import os +import sys from operator import add @@ -127,7 +129,7 @@ def usage(): (reindexedData, origToNewLabels) = reindexClassLabels(points) # Train a classifier. - categoricalFeaturesInfo={} # no categorical features + categoricalFeaturesInfo = {} # no categorical features model = DecisionTree.trainClassifier(reindexedData, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo) # Print learned tree and stats. diff --git a/examples/src/main/python/mllib/random_rdd_generation.py b/examples/src/main/python/mllib/random_rdd_generation.py index b388d8d83fb86..1e8892741e714 100755 --- a/examples/src/main/python/mllib/random_rdd_generation.py +++ b/examples/src/main/python/mllib/random_rdd_generation.py @@ -32,8 +32,8 @@ sc = SparkContext(appName="PythonRandomRDDGeneration") - numExamples = 10000 # number of examples to generate - fraction = 0.1 # fraction of data to sample + numExamples = 10000 # number of examples to generate + fraction = 0.1 # fraction of data to sample # Example: RandomRDDs.normalRDD normalRDD = RandomRDDs.normalRDD(sc, numExamples) @@ -45,7 +45,7 @@ print # Example: RandomRDDs.normalVectorRDD - normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = 2) + normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows=numExamples, numCols=2) print 'Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count() print ' First 5 samples:' for sample in normalVectorRDD.take(5): diff --git a/examples/src/main/python/mllib/sampled_rdds.py b/examples/src/main/python/mllib/sampled_rdds.py index ec64a5978c672..92af3af5ebd1e 100755 --- a/examples/src/main/python/mllib/sampled_rdds.py +++ b/examples/src/main/python/mllib/sampled_rdds.py @@ -36,7 +36,7 @@ sc = SparkContext(appName="PythonSampledRDDs") - fraction = 0.1 # fraction of data to sample + fraction = 0.1 # fraction of data to sample examples = MLUtils.loadLibSVMFile(sc, datapath) numExamples = examples.count() @@ -49,9 +49,9 @@ expectedSampleSize = int(numExamples * fraction) print 'Sampling RDD using fraction %g. Expected sample size = %d.' \ % (fraction, expectedSampleSize) - sampledRDD = examples.sample(withReplacement = True, fraction = fraction) + sampledRDD = examples.sample(withReplacement=True, fraction=fraction) print ' RDD.sample(): sample has %d examples' % sampledRDD.count() - sampledArray = examples.takeSample(withReplacement = True, num = expectedSampleSize) + sampledArray = examples.takeSample(withReplacement=True, num=expectedSampleSize) print ' RDD.takeSample(): sample has %d examples' % len(sampledArray) print @@ -66,7 +66,7 @@ fractions = {} for k in keyCountsA.keys(): fractions[k] = fraction - sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement = True, fractions = fractions) + sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement=True, fractions=fractions) keyCountsB = sampledByKeyRDD.countByKey() sizeB = sum(keyCountsB.values()) print ' Sampled %d examples using approximate stratified sampling (by label). ==> Sample' \ diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py index fc37459dc74aa..ee9036adfa281 100755 --- a/examples/src/main/python/pi.py +++ b/examples/src/main/python/pi.py @@ -35,7 +35,7 @@ def f(_): y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 < 1 else 0 - count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add) + count = sc.parallelize(xrange(1, n + 1), slices).map(f).reduce(add) print "Pi is roughly %f" % (4.0 * count / n) sc.stop() diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index b345276b08ba3..ac291bd4fde20 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/flume/pom.xml b/external/flume/pom.xml index f71f6b6c4f931..7d31e32283d88 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index 4e2275ab238f7..2067c473f0e3f 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index dc48a08c93de2..371f1f1e9d39a 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml index b93ad016f84f0..1d7dd49d15c22 100644 --- a/external/twitter/pom.xml +++ b/external/twitter/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml index 22c1fff23d9a2..7e48968feb3bc 100644 --- a/external/zeromq/pom.xml +++ b/external/zeromq/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml index 5308bb4e440ea..8658ecf5abfab 100644 --- a/extras/java8-tests/pom.xml +++ b/extras/java8-tests/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml index a54b34235dfb4..560244ad93369 100644 --- a/extras/kinesis-asl/pom.xml +++ b/extras/kinesis-asl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml index a5b162a0482e4..71a078d58a8d8 100644 --- a/extras/spark-ganglia-lgpl/pom.xml +++ b/extras/spark-ganglia-lgpl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/graphx/pom.xml b/graphx/pom.xml index 6dd52fc618b1e..3f49b1d63b6e1 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../pom.xml diff --git a/make-distribution.sh b/make-distribution.sh index f030d3f430581..14aed4a4b655b 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -50,7 +50,8 @@ while (( "$#" )); do case $1 in --hadoop) echo "Error: '--hadoop' is no longer supported:" - echo "Error: use Maven options -Phadoop.version and -Pyarn.version" + echo "Error: use Maven profiles and options -Dhadoop.version and -Dyarn.version instead." + echo "Error: Related profiles include hadoop-0.23, hdaoop-2.2, hadoop-2.3 and hadoop-2.4." exit_with_usage ;; --with-yarn) diff --git a/mllib/pom.xml b/mllib/pom.xml index c7a1e2ae75c84..a5eeef88e9d62 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../pom.xml diff --git a/pom.xml b/pom.xml index a5eaea80afd71..d05190512f742 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT pom Spark Project Parent POM http://spark.apache.org/ @@ -221,6 +221,18 @@ false + + + spark-staging-1030 + Spark 1.1.0 Staging (1030) + https://repository.apache.org/content/repositories/orgapachespark-1030/ + + true + + + false + + diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala index 034ba6a7bf50f..0f5d71afcf616 100644 --- a/project/MimaBuild.scala +++ b/project/MimaBuild.scala @@ -85,7 +85,7 @@ object MimaBuild { def mimaSettings(sparkHome: File, projectRef: ProjectRef) = { val organization = "org.apache.spark" - val previousSparkVersion = "1.0.0" + val previousSparkVersion = "1.1.0" val fullId = "spark-" + projectRef.project + "_2.10" mimaDefaultSettings ++ Seq(previousArtifact := Some(organization % fullId % previousSparkVersion), diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 855d5cc8cf3fd..46b78bd5c7061 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -33,6 +33,18 @@ import com.typesafe.tools.mima.core._ object MimaExcludes { def excludes(version: String) = version match { + case v if v.startsWith("1.2") => + Seq( + MimaBuild.excludeSparkPackage("deploy"), + MimaBuild.excludeSparkPackage("graphx") + ) ++ + // This is @DeveloperAPI, but Mima still gives false-positives: + MimaBuild.excludeSparkClass("scheduler.SparkListenerApplicationStart") ++ + Seq( + // This is @Experimental, but Mima still gives false-positives: + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.foreachAsync") + ) case v if v.startsWith("1.1") => Seq( MimaBuild.excludeSparkPackage("deploy"), diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a26c2c90cb321..45f6d2973ea90 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -184,7 +184,7 @@ object OldDeps { def versionArtifact(id: String): Option[sbt.ModuleID] = { val fullId = id + "_2.10" - Some("org.apache.spark" % fullId % "1.0.0") + Some("org.apache.spark" % fullId % "1.1.0") } def oldDepsSettings() = Defaults.defaultSettings ++ Seq( diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 68062483dedaa..80e51d1a583a0 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -657,7 +657,6 @@ def save_partial(self, obj): def save_file(self, obj): """Save a file""" import StringIO as pystringIO #we can't use cStringIO as it lacks the name attribute - from ..transport.adapter import SerializingAdapter if not hasattr(obj, 'name') or not hasattr(obj, 'mode'): raise pickle.PicklingError("Cannot pickle files that do not map to an actual file") @@ -691,13 +690,10 @@ def save_file(self, obj): tmpfile.close() if tst != '': raise pickle.PicklingError("Cannot pickle file %s as it does not appear to map to a physical, real file" % name) - elif fsize > SerializingAdapter.max_transmit_data: - raise pickle.PicklingError("Cannot pickle file %s as it exceeds cloudconf.py's max_transmit_data of %d" % - (name,SerializingAdapter.max_transmit_data)) else: try: tmpfile = file(name) - contents = tmpfile.read(SerializingAdapter.max_transmit_data) + contents = tmpfile.read() tmpfile.close() except IOError: raise pickle.PicklingError("Cannot pickle file %s as it cannot be read" % name) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 04f13523b431d..266090e3ae8f3 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -212,11 +212,16 @@ def cache(self): self.persist(StorageLevel.MEMORY_ONLY_SER) return self - def persist(self, storageLevel): + def persist(self, storageLevel=StorageLevel.MEMORY_ONLY_SER): """ Set this RDD's storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. + If no storage level is specified defaults to (C{MEMORY_ONLY_SER}). + + >>> rdd = sc.parallelize(["b", "a", "c"]) + >>> rdd.persist().is_cached + True """ self.is_cached = True javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) @@ -2070,6 +2075,7 @@ def pipeline_func(split, iterator): self.ctx = prev.ctx self.prev = prev self._jrdd_val = None + self._id = None self._jrdd_deserializer = self.ctx.serializer self._bypass_serializer = False self._partitionFunc = prev._partitionFunc if self.preservesPartitioning else None @@ -2100,6 +2106,11 @@ def _jrdd(self): self._jrdd_val = python_rdd.asJavaRDD() return self._jrdd_val + def id(self): + if self._id is None: + self._id = self._jrdd.id() + return self._id + def _is_pipelinable(self): return not (self.is_cached or self.is_checkpointed) diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index fde3c29e5e790..89cf76920e353 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -49,9 +49,9 @@ ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ - /__ / .__/\_,_/_/ /_/\_\ version 1.0.0-SNAPSHOT + /__ / .__/\_,_/_/ /_/\_\ version %s /_/ -""") +""" % sc.version) print("Using Python version %s (%s, %s)" % ( platform.python_version(), platform.python_build()[0], diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index e7f573cf6da44..004d4937cbe1c 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -29,6 +29,7 @@ from pyspark.rdd import RDD, PipelinedRDD from pyspark.serializers import BatchedSerializer, PickleSerializer, CloudPickleSerializer +from pyspark.storagelevel import StorageLevel from itertools import chain, ifilter, imap @@ -1524,7 +1525,7 @@ def __init__(self, jschema_rdd, sql_ctx): self.sql_ctx = sql_ctx self._sc = sql_ctx._sc self._jschema_rdd = jschema_rdd - + self._id = None self.is_cached = False self.is_checkpointed = False self.ctx = self.sql_ctx._sc @@ -1542,9 +1543,10 @@ def _jrdd(self): self._lazy_jrdd = self._jschema_rdd.javaToPython() return self._lazy_jrdd - @property - def _id(self): - return self._jrdd.id() + def id(self): + if self._id is None: + self._id = self._jrdd.id() + return self._id def saveAsParquetFile(self, path): """Save the contents as a Parquet file, preserving the schema. @@ -1665,7 +1667,7 @@ def cache(self): self._jschema_rdd.cache() return self - def persist(self, storageLevel): + def persist(self, storageLevel=StorageLevel.MEMORY_ONLY_SER): self.is_cached = True javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) self._jschema_rdd.persist(javaStorageLevel) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 3e74799e82845..9fbeb36f4f1dd 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -169,6 +169,17 @@ def test_namedtuple(self): self.assertEquals(p1, p2) +# Regression test for SPARK-3415 +class CloudPickleTest(unittest.TestCase): + def test_pickling_file_handles(self): + from pyspark.cloudpickle import dumps + from StringIO import StringIO + from pickle import load + out1 = sys.stderr + out2 = load(StringIO(dumps(out1))) + self.assertEquals(out1, out2) + + class PySparkTestCase(unittest.TestCase): def setUp(self): @@ -281,6 +292,15 @@ def func(): class TestRDDFunctions(PySparkTestCase): + def test_id(self): + rdd = self.sc.parallelize(range(10)) + id = rdd.id() + self.assertEqual(id, rdd.id()) + rdd2 = rdd.map(str).filter(bool) + id2 = rdd2.id() + self.assertEqual(id + 1, id2) + self.assertEqual(id2, rdd2.id()) + def test_failed_sparkcontext_creation(self): # Regression test for SPARK-1550 self.sc.stop() diff --git a/repl/pom.xml b/repl/pom.xml index 68f4504450778..fcc5f90d870e8 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../pom.xml diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala index 910b31d209e13..7667a9c11979e 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala @@ -14,6 +14,8 @@ import scala.reflect.internal.util.Position import scala.util.control.Exception.ignoring import scala.tools.nsc.util.stackTraceString +import org.apache.spark.SPARK_VERSION + /** * Machinery for the asynchronous initialization of the repl. */ @@ -26,9 +28,9 @@ trait SparkILoopInit { ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ - /___/ .__/\_,_/_/ /_/\_\ version 1.0.0-SNAPSHOT + /___/ .__/\_,_/_/ /_/\_\ version %s /_/ -""") +""".format(SPARK_VERSION)) import Properties._ val welcomeMsg = "Using Scala %s (%s, Java %s)".format( versionString, javaVmName, javaVersion) diff --git a/sbt/sbt-launch-lib.bash b/sbt/sbt-launch-lib.bash index c91fecf024ad4..fecc3d38a5fbd 100755 --- a/sbt/sbt-launch-lib.bash +++ b/sbt/sbt-launch-lib.bash @@ -51,9 +51,9 @@ acquire_sbt_jar () { printf "Attempting to fetch sbt\n" JAR_DL=${JAR}.part if hash curl 2>/dev/null; then - (curl --progress-bar ${URL1} > ${JAR_DL} || curl --progress-bar ${URL2} > ${JAR_DL}) && mv ${JAR_DL} ${JAR} + (curl --silent ${URL1} > ${JAR_DL} || curl --silent ${URL2} > ${JAR_DL}) && mv ${JAR_DL} ${JAR} elif hash wget 2>/dev/null; then - (wget --progress=bar ${URL1} -O ${JAR_DL} || wget --progress=bar ${URL2} -O ${JAR_DL}) && mv ${JAR_DL} ${JAR} + (wget --quiet ${URL1} -O ${JAR_DL} || wget --quiet ${URL2} -O ${JAR_DL}) && mv ${JAR_DL} ${JAR} else printf "You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n" exit -1 diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 830711a46a35b..0d756f873e486 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/sql/core/pom.xml b/sql/core/pom.xml index c8016e41256d5..bd110218d34f7 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 4137ac7663739..f6f4cf3b80d41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -53,7 +53,7 @@ private[spark] object SQLConf { * * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads). */ -trait SQLConf { +private[sql] trait SQLConf { import SQLConf._ /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala index 0ea1105f082a4..595b4aa36eae3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala @@ -30,7 +30,7 @@ import scala.reflect.runtime.universe.{TypeTag, typeTag} /** * Functions for registering scala lambda functions as UDFs in a SQLContext. */ -protected[sql] trait UDFRegistration { +private[sql] trait UDFRegistration { self: SQLContext => private[spark] def registerPython( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index dc668e7dc934c..6eab2f23c18e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{LeafNode, SparkPlan} -object InMemoryRelation { +private[sql] object InMemoryRelation { def apply(useCompression: Boolean, batchSize: Int, child: SparkPlan): InMemoryRelation = new InMemoryRelation(child.output, useCompression, batchSize, child)() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 4802e40595807..927f40063e47e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -36,25 +36,23 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una override def outputPartitioning = newPartitioning - def output = child.output + override def output = child.output /** We must copy rows when sort based shuffle is on */ protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] - def execute() = attachTree(this , "execute") { + override def execute() = attachTree(this , "execute") { newPartitioning match { case HashPartitioning(expressions, numPartitions) => // TODO: Eliminate redundant expressions in grouping key and value. - val rdd = child.execute().mapPartitions { iter => - if (sortBasedShuffleOn) { - @transient val hashExpressions = - newProjection(expressions, child.output) - + val rdd = if (sortBasedShuffleOn) { + child.execute().mapPartitions { iter => + val hashExpressions = newProjection(expressions, child.output) iter.map(r => (hashExpressions(r), r.copy())) - } else { - @transient val hashExpressions = - newMutableProjection(expressions, child.output)() - + } + } else { + child.execute().mapPartitions { iter => + val hashExpressions = newMutableProjection(expressions, child.output)() val mutablePair = new MutablePair[Row, Row]() iter.map(r => mutablePair.update(hashExpressions(r), r)) } @@ -65,17 +63,18 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una shuffled.map(_._2) case RangePartitioning(sortingExpressions, numPartitions) => - // TODO: RangePartitioner should take an Ordering. - implicit val ordering = new RowOrdering(sortingExpressions, child.output) - - val rdd = child.execute().mapPartitions { iter => - if (sortBasedShuffleOn) { - iter.map(row => (row.copy(), null)) - } else { + val rdd = if (sortBasedShuffleOn) { + child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null))} + } else { + child.execute().mapPartitions { iter => val mutablePair = new MutablePair[Row, Null](null, null) iter.map(row => mutablePair.update(row, null)) } } + + // TODO: RangePartitioner should take an Ordering. + implicit val ordering = new RowOrdering(sortingExpressions, child.output) + val part = new RangePartitioner(numPartitions, rdd, ascending = true) val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part) shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false))) @@ -83,10 +82,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una shuffled.map(_._1) case SinglePartition => - val rdd = child.execute().mapPartitions { iter => - if (sortBasedShuffleOn) { - iter.map(r => (null, r.copy())) - } else { + val rdd = if (sortBasedShuffleOn) { + child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) } + } else { + child.execute().mapPartitions { iter => val mutablePair = new MutablePair[Null, Row]() iter.map(r => mutablePair.update(null, r)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 4abda21ffec96..47bff0c730b8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.execution import scala.collection.mutable.ArrayBuffer import scala.reflect.runtime.universe.TypeTag +import org.apache.spark.{SparkEnv, HashPartitioner, SparkConf} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.rdd.{RDD, ShuffledRDD} -import org.apache.spark.sql.SQLContext +import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ @@ -96,6 +96,9 @@ case class Limit(limit: Int, child: SparkPlan) // TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan: // partition local limit -> exchange into one partition -> partition local limit again + /** We must copy rows when sort based shuffle is on */ + private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] + override def output = child.output /** @@ -143,9 +146,15 @@ case class Limit(limit: Int, child: SparkPlan) } override def execute() = { - val rdd = child.execute().mapPartitions { iter => - val mutablePair = new MutablePair[Boolean, Row]() - iter.take(limit).map(row => mutablePair.update(false, row)) + val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) { + child.execute().mapPartitions { iter => + iter.take(limit).map(row => (false, row.copy())) + } + } else { + child.execute().mapPartitions { iter => + val mutablePair = new MutablePair[Boolean, Row]() + iter.take(limit).map(row => mutablePair.update(false, row)) + } } val part = new HashPartitioner(1) val shuffled = new ShuffledRDD[Boolean, Row, Row](rdd, part) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index 9fd6aed402838..2fc7e1cf23ab7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -382,7 +382,7 @@ private[parquet] class CatalystPrimitiveConverter( parent.updateLong(fieldIndex, value) } -object CatalystArrayConverter { +private[parquet] object CatalystArrayConverter { val INITIAL_ARRAY_SIZE = 20 } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index fe28e0d7269e0..7c83f1cad7d71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Predicate => CatalystPredicate import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkSqlSerializer -object ParquetFilters { +private[sql] object ParquetFilters { val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter" // set this to false if pushdown should be disabled val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.hints.parquetFilterPushdown" diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index c6f60c18804a4..124fc107cb8aa 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index f12b5a69a09f7..bd3f68d92d8c7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -39,7 +39,9 @@ import org.apache.spark.sql.hive.thriftserver.ReflectionUtils /** * Executes queries using Spark SQL, and maintains a list of handles to active queries. */ -class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManager with Logging { +private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext) + extends OperationManager with Logging { + val handleToOperation = ReflectionUtils .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 30ff277e67c88..45a4c6dc98da0 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/streaming/pom.xml b/streaming/pom.xml index ce35520a28609..12f900c91eb98 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../pom.xml diff --git a/tools/pom.xml b/tools/pom.xml index 97abb6b2b63e0..f36674476770c 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../pom.xml diff --git a/yarn/alpha/pom.xml b/yarn/alpha/pom.xml index 51744ece0412d..7dadbba58fd82 100644 --- a/yarn/alpha/pom.xml +++ b/yarn/alpha/pom.xml @@ -20,7 +20,7 @@ org.apache.spark yarn-parent_2.10 - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../pom.xml diff --git a/yarn/pom.xml b/yarn/pom.xml index 3faaf053634d6..7fcd7ee0d4547 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../pom.xml diff --git a/yarn/stable/pom.xml b/yarn/stable/pom.xml index b6c8456d06684..fd934b7726181 100644 --- a/yarn/stable/pom.xml +++ b/yarn/stable/pom.xml @@ -20,7 +20,7 @@ org.apache.spark yarn-parent_2.10 - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ../pom.xml