diff --git a/.gitignore b/.gitignore
index 07524bc429e92..8ecf536e79a5f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,7 +60,6 @@ dev/create-release/*final
spark-*-bin-*.tgz
unit-tests.log
/lib/
-ec2/lib/
rat-results.txt
scalastyle.txt
scalastyle-output.xml
diff --git a/LICENSE b/LICENSE
index a2f75b817ab37..9c944ac610afe 100644
--- a/LICENSE
+++ b/LICENSE
@@ -264,7 +264,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
- (The New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
+ (The New BSD License) Py4J (net.sf.py4j:py4j:0.9.1 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index beacc39500aaa..34be7f0ebd752 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -130,6 +130,7 @@ exportMethods("%in%",
"count",
"countDistinct",
"crc32",
+ "hash",
"cume_dist",
"date_add",
"date_format",
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index df36bc869acb4..9bb7876b384ce 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -340,6 +340,26 @@ setMethod("crc32",
column(jc)
})
+#' hash
+#'
+#' Calculates the hash code of given columns, and returns the result as a int column.
+#'
+#' @rdname hash
+#' @name hash
+#' @family misc_funcs
+#' @export
+#' @examples \dontrun{hash(df$c)}
+setMethod("hash",
+ signature(x = "Column"),
+ function(x, ...) {
+ jcols <- lapply(list(x, ...), function (x) {
+ stopifnot(class(x) == "Column")
+ x@jc
+ })
+ jc <- callJStatic("org.apache.spark.sql.functions", "hash", jcols)
+ column(jc)
+ })
+
#' dayofmonth
#'
#' Extracts the day of the month as an integer from a given date/timestamp/string.
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index ba6861709754d..5ba68e3a4f378 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -736,6 +736,10 @@ setGeneric("countDistinct", function(x, ...) { standardGeneric("countDistinct")
#' @export
setGeneric("crc32", function(x) { standardGeneric("crc32") })
+#' @rdname hash
+#' @export
+setGeneric("hash", function(x, ...) { standardGeneric("hash") })
+
#' @rdname cume_dist
#' @export
setGeneric("cume_dist", function(x) { standardGeneric("cume_dist") })
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index eaf60beda3473..97625b94a0e23 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -922,7 +922,7 @@ test_that("column functions", {
c <- column("a")
c1 <- abs(c) + acos(c) + approxCountDistinct(c) + ascii(c) + asin(c) + atan(c)
c2 <- avg(c) + base64(c) + bin(c) + bitwiseNOT(c) + cbrt(c) + ceil(c) + cos(c)
- c3 <- cosh(c) + count(c) + crc32(c) + exp(c)
+ c3 <- cosh(c) + count(c) + crc32(c) + hash(c) + exp(c)
c4 <- explode(c) + expm1(c) + factorial(c) + first(c) + floor(c) + hex(c)
c5 <- hour(c) + initcap(c) + last(c) + last_day(c) + length(c)
c6 <- log(c) + (c) + log1p(c) + log2(c) + lower(c) + ltrim(c) + max(c) + md5(c)
diff --git a/bin/pyspark b/bin/pyspark
index 5eaa17d3c2016..2ac4a8be250d6 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -67,7 +67,7 @@ export PYSPARK_PYTHON
# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
-export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
+export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:$PYTHONPATH"
# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd
index a97d884f0bf39..51d6d15f66c69 100644
--- a/bin/pyspark2.cmd
+++ b/bin/pyspark2.cmd
@@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)
set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
-set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9-src.zip;%PYTHONPATH%
+set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9.1-src.zip;%PYTHONPATH%
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
diff --git a/core/pom.xml b/core/pom.xml
index 34ecb19654f1a..3bec5debc2968 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -350,7 +350,7 @@
net.sf.py4jpy4j
- 0.9
+ 0.9.1org.apache.spark
diff --git a/core/src/main/java/org/apache/spark/api/java/Optional.java b/core/src/main/java/org/apache/spark/api/java/Optional.java
new file mode 100644
index 0000000000000..ca7babc3f01c7
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/Optional.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java;
+
+import java.io.Serializable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
Like {@code java.util.Optional} in Java 8, {@code scala.Option} in Scala, and
+ * {@code com.google.common.base.Optional} in Google Guava, this class represents a
+ * value of a given type that may or may not exist. It is used in methods that wish
+ * to optionally return a value, in preference to returning {@code null}.
+ *
+ *
In fact, the class here is a reimplementation of the essential API of both
+ * {@code java.util.Optional} and {@code com.google.common.base.Optional}. From
+ * {@code java.util.Optional}, it implements:
+ *
+ *
+ *
{@link #empty()}
+ *
{@link #of(Object)}
+ *
{@link #ofNullable(Object)}
+ *
{@link #get()}
+ *
{@link #orElse(Object)}
+ *
{@link #isPresent()}
+ *
+ *
+ *
From {@code com.google.common.base.Optional} it implements:
+ *
+ *
+ *
{@link #absent()}
+ *
{@link #of(Object)}
+ *
{@link #fromNullable(Object)}
+ *
{@link #get()}
+ *
{@link #or(Object)}
+ *
{@link #orNull()}
+ *
{@link #isPresent()}
+ *
+ *
+ *
{@code java.util.Optional} itself is not used at this time because the
+ * project does not require Java 8. Using {@code com.google.common.base.Optional}
+ * has in the past caused serious library version conflicts with Guava that can't
+ * be resolved by shading. Hence this work-alike clone.
+ *
+ * @param type of value held inside
+ */
+public final class Optional implements Serializable {
+
+ private static final Optional> EMPTY = new Optional<>();
+
+ private final T value;
+
+ private Optional() {
+ this.value = null;
+ }
+
+ private Optional(T value) {
+ Preconditions.checkNotNull(value);
+ this.value = value;
+ }
+
+ // java.util.Optional API (subset)
+
+ /**
+ * @return an empty {@code Optional}
+ */
+ public static Optional empty() {
+ @SuppressWarnings("unchecked")
+ Optional t = (Optional) EMPTY;
+ return t;
+ }
+
+ /**
+ * @param value non-null value to wrap
+ * @return {@code Optional} wrapping this value
+ * @throws NullPointerException if value is null
+ */
+ public static Optional of(T value) {
+ return new Optional<>(value);
+ }
+
+ /**
+ * @param value value to wrap, which may be null
+ * @return {@code Optional} wrapping this value, which may be empty
+ */
+ public static Optional ofNullable(T value) {
+ if (value == null) {
+ return empty();
+ } else {
+ return of(value);
+ }
+ }
+
+ /**
+ * @return the value wrapped by this {@code Optional}
+ * @throws NullPointerException if this is empty (contains no value)
+ */
+ public T get() {
+ Preconditions.checkNotNull(value);
+ return value;
+ }
+
+ /**
+ * @param other value to return if this is empty
+ * @return this {@code Optional}'s value if present, or else the given value
+ */
+ public T orElse(T other) {
+ return value != null ? value : other;
+ }
+
+ /**
+ * @return true iff this {@code Optional} contains a value (non-empty)
+ */
+ public boolean isPresent() {
+ return value != null;
+ }
+
+ // Guava API (subset)
+ // of(), get() and isPresent() are identically present in the Guava API
+
+ /**
+ * @return an empty {@code Optional}
+ */
+ public static Optional absent() {
+ return empty();
+ }
+
+ /**
+ * @param value value to wrap, which may be null
+ * @return {@code Optional} wrapping this value, which may be empty
+ */
+ public static Optional fromNullable(T value) {
+ return ofNullable(value);
+ }
+
+ /**
+ * @param other value to return if this is empty
+ * @return this {@code Optional}'s value if present, or else the given value
+ */
+ public T or(T other) {
+ return value != null ? value : other;
+ }
+
+ /**
+ * @return this {@code Optional}'s value if present, or else null
+ */
+ public T orNull() {
+ return value;
+ }
+
+ // Common methods
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof Optional)) {
+ return false;
+ }
+ Optional> other = (Optional>) obj;
+ return value == null ? other.value == null : value.equals(other.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return value == null ? 0 : value.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return value == null ? "Optional.empty" : String.format("Optional[%s]", value);
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 59af1052ebd05..fb04472ee73fd 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.ClassTag
-import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
@@ -655,7 +654,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* keys; this also retains the original RDD's partitioning.
*/
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
- import scala.collection.JavaConverters._
def fn: (V) => Iterable[U] = (x: V) => f.call(x).asScala
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.flatMapValues(fn))
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 242438237f987..0f8d13cf5cc2f 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -24,7 +24,6 @@ import java.util.{Comparator, Iterator => JIterator, List => JList}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
-import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.spark._
@@ -122,7 +121,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
- import scala.collection.JavaConverters._
def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -132,7 +130,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
- import scala.collection.JavaConverters._
def fn: (T) => Iterable[jl.Double] = (x: T) => f.call(x).asScala
new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
}
@@ -142,7 +139,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
- import scala.collection.JavaConverters._
def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 9990b22e14a25..01433ca2efc14 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.ClassTag
-import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.{InputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
index b2a4d053fa650..f820401da2fc3 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
@@ -22,13 +22,12 @@ import java.util.Map.Entry
import scala.collection.mutable
-import com.google.common.base.Optional
-
private[spark] object JavaUtils {
def optionToOptional[T](option: Option[T]): Optional[T] =
- option match {
- case Some(value) => Optional.of(value)
- case None => Optional.absent()
+ if (option.isDefined) {
+ Optional.of(option.get)
+ } else {
+ Optional.empty[T]
}
// Workaround for SPARK-3926 / SI-8911
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index 2d97cd9a9a208..bda872746c8b8 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -32,7 +32,7 @@ private[spark] object PythonUtils {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
- pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9-src.zip").mkString(File.separator)
+ pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9.1-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
pythonPath.mkString(File.pathSeparator)
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index 94719a4572ef6..7de9df1e489fb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -77,7 +77,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
This implementation is non-blocking, asynchronously handling the
results of each job and triggering the next job using callbacks on futures.
*/
- def continue(partsScanned: Long)(implicit jobSubmitter: JobSubmitter) : Future[Seq[T]] =
+ def continue(partsScanned: Int)(implicit jobSubmitter: JobSubmitter): Future[Seq[T]] =
if (results.size >= num || partsScanned >= totalParts) {
Future.successful(results.toSeq)
} else {
@@ -99,7 +99,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
}
val left = num - results.size
- val p = partsScanned.toInt until math.min(partsScanned + numPartsToTry, totalParts).toInt
+ val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val buf = new Array[Array[T]](p.size)
self.context.setCallSite(callSite)
@@ -109,13 +109,13 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
p,
(index: Int, data: Array[T]) => buf(index) = data,
Unit)
- job.flatMap {_ =>
+ job.flatMap { _ =>
buf.foreach(results ++= _.take(num - results.size))
continue(partsScanned + p.size)
}
}
- new ComplexFutureAction[Seq[T]](continue(0L)(_))
+ new ComplexFutureAction[Seq[T]](continue(0)(_))
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
index 18e8cddbc40db..57108dcedcf0c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
@@ -50,7 +50,7 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
sc: SparkContext,
var rdd1 : RDD[T],
var rdd2 : RDD[U])
- extends RDD[Pair[T, U]](sc, Nil)
+ extends RDD[(T, U)](sc, Nil)
with Serializable {
val numPartitionsInRdd2 = rdd2.partitions.length
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 146609ae3911a..7a1197830443f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -24,6 +24,7 @@ import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
@@ -93,7 +94,13 @@ class NewHadoopRDD[K, V](
// issues, this cloning is disabled by default.
NewHadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
logDebug("Cloning Hadoop Configuration")
- new Configuration(conf)
+ // The Configuration passed in is actually a JobConf and possibly contains credentials.
+ // To keep those credentials properly we have to create a new JobConf not a Configuration.
+ if (conf.isInstanceOf[JobConf]) {
+ new JobConf(conf)
+ } else {
+ new Configuration(conf)
+ }
}
} else {
conf
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index e25657cc109be..9dad7944144d8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -95,7 +95,7 @@ abstract class RDD[T: ClassTag](
/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
- this(oneParent.context , List(new OneToOneDependency(oneParent)))
+ this(oneParent.context, List(new OneToOneDependency(oneParent)))
private[spark] def conf = sc.conf
// =======================================================================
@@ -970,6 +970,13 @@ abstract class RDD[T: ClassTag](
* apply the fold to each element sequentially in some defined ordering. For functions
* that are not commutative, the result may differ from that of a fold applied to a
* non-distributed collection.
+ *
+ * @param zeroValue the initial value for the accumulated result of each partition for the `op`
+ * operator, and also the initial value for the combine results from different
+ * partitions for the `op` operator - this will typically be the neutral
+ * element (e.g. `Nil` for list concatenation or `0` for summation)
+ * @param op an operator used to both accumulate results within a partition and combine results
+ * from different partitions
*/
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
@@ -988,6 +995,13 @@ abstract class RDD[T: ClassTag](
* and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
* allowed to modify and return their first argument instead of creating a new U to avoid memory
* allocation.
+ *
+ * @param zeroValue the initial value for the accumulated result of each partition for the
+ * `seqOp` operator, and also the initial value for the combine results from
+ * different partitions for the `combOp` operator - this will typically be the
+ * neutral element (e.g. `Nil` for list concatenation or `0` for summation)
+ * @param seqOp an operator used to accumulate results within a partition
+ * @param combOp an associative operator used to combine results from different partitions
*/
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
@@ -1190,7 +1204,7 @@ abstract class RDD[T: ClassTag](
} else {
val buf = new ArrayBuffer[T]
val totalParts = this.partitions.length
- var partsScanned = 0L
+ var partsScanned = 0
while (buf.size < num && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
@@ -1209,7 +1223,7 @@ abstract class RDD[T: ClassTag](
}
val left = num - buf.size
- val p = partsScanned.toInt until math.min(partsScanned + numPartsToTry, totalParts).toInt
+ val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
res.foreach(buf ++= _.take(num - buf.size))
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 5feb1dc2e5b74..9cd52d6c2bef5 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -115,7 +115,7 @@ class StageData private[spark](
val status: StageStatus,
val stageId: Int,
val attemptId: Int,
- val numActiveTasks: Int ,
+ val numActiveTasks: Int,
val numCompleteTasks: Int,
val numFailedTasks: Int,
diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
index 14b6ba4af489a..58c8560a3d049 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
@@ -29,7 +29,6 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
extends Logging {
@volatile private var outputStream: FileOutputStream = null
@volatile private var markedForStop = false // has the appender been asked to stopped
- @volatile private var stopped = false // has the appender stopped
// Thread that reads the input stream and writes to file
private val writingThread = new Thread("File appending thread for " + file) {
@@ -47,11 +46,7 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
* or because of any error in appending
*/
def awaitTermination() {
- synchronized {
- if (!stopped) {
- wait()
- }
- }
+ writingThread.join()
}
/** Stop the appender */
@@ -77,10 +72,6 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
logError(s"Error writing stream to file $file", e)
} finally {
closeFile()
- synchronized {
- stopped = true
- notifyAll()
- }
}
}
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 47382e4231563..44d5cac7c2de5 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -21,7 +21,17 @@
import java.nio.channels.FileChannel;
import java.nio.ByteBuffer;
import java.net.URI;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.*;
import scala.Tuple2;
@@ -35,7 +45,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.base.Throwables;
-import com.google.common.base.Optional;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import org.apache.hadoop.io.IntWritable;
@@ -49,7 +58,12 @@
import org.junit.Before;
import org.junit.Test;
-import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.input.PortableDataStream;
import org.apache.spark.partial.BoundedDouble;
@@ -1785,32 +1799,6 @@ public void testAsyncActionErrorWrapping() throws Exception {
Assert.assertTrue(future.isDone());
}
-
- /**
- * Test for SPARK-3647. This test needs to use the maven-built assembly to trigger the issue,
- * since that's the only artifact where Guava classes have been relocated.
- */
- @Test
- public void testGuavaOptional() {
- // Stop the context created in setUp() and start a local-cluster one, to force usage of the
- // assembly.
- sc.stop();
- JavaSparkContext localCluster = new JavaSparkContext("local-cluster[1,1,1024]", "JavaAPISuite");
- try {
- JavaRDD rdd1 = localCluster.parallelize(Arrays.asList(1, 2, null), 3);
- JavaRDD> rdd2 = rdd1.map(
- new Function>() {
- @Override
- public Optional call(Integer i) {
- return Optional.fromNullable(i);
- }
- });
- rdd2.collect();
- } finally {
- localCluster.stop();
- }
- }
-
static class Class1 {}
static class Class2 {}
diff --git a/core/src/test/java/org/apache/spark/api/java/OptionalSuite.java b/core/src/test/java/org/apache/spark/api/java/OptionalSuite.java
new file mode 100644
index 0000000000000..4b97c18198c1a
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/api/java/OptionalSuite.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests {@link Optional}.
+ */
+public class OptionalSuite {
+
+ @Test
+ public void testEmpty() {
+ Assert.assertFalse(Optional.empty().isPresent());
+ Assert.assertNull(Optional.empty().orNull());
+ Assert.assertEquals("foo", Optional.empty().or("foo"));
+ Assert.assertEquals("foo", Optional.empty().orElse("foo"));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testEmptyGet() {
+ Optional.empty().get();
+ }
+
+ @Test
+ public void testAbsent() {
+ Assert.assertFalse(Optional.absent().isPresent());
+ Assert.assertNull(Optional.absent().orNull());
+ Assert.assertEquals("foo", Optional.absent().or("foo"));
+ Assert.assertEquals("foo", Optional.absent().orElse("foo"));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testAbsentGet() {
+ Optional.absent().get();
+ }
+
+ @Test
+ public void testOf() {
+ Assert.assertTrue(Optional.of(1).isPresent());
+ Assert.assertNotNull(Optional.of(1).orNull());
+ Assert.assertEquals(Integer.valueOf(1), Optional.of(1).get());
+ Assert.assertEquals(Integer.valueOf(1), Optional.of(1).or(2));
+ Assert.assertEquals(Integer.valueOf(1), Optional.of(1).orElse(2));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testOfWithNull() {
+ Optional.of(null);
+ }
+
+ @Test
+ public void testOfNullable() {
+ Assert.assertTrue(Optional.ofNullable(1).isPresent());
+ Assert.assertNotNull(Optional.ofNullable(1).orNull());
+ Assert.assertEquals(Integer.valueOf(1), Optional.ofNullable(1).get());
+ Assert.assertEquals(Integer.valueOf(1), Optional.ofNullable(1).or(2));
+ Assert.assertEquals(Integer.valueOf(1), Optional.ofNullable(1).orElse(2));
+ Assert.assertFalse(Optional.ofNullable(null).isPresent());
+ Assert.assertNull(Optional.ofNullable(null).orNull());
+ Assert.assertEquals(Integer.valueOf(2), Optional.ofNullable(null).or(2));
+ Assert.assertEquals(Integer.valueOf(2), Optional.ofNullable(null).orElse(2));
+ }
+
+ @Test
+ public void testFromNullable() {
+ Assert.assertTrue(Optional.fromNullable(1).isPresent());
+ Assert.assertNotNull(Optional.fromNullable(1).orNull());
+ Assert.assertEquals(Integer.valueOf(1), Optional.fromNullable(1).get());
+ Assert.assertEquals(Integer.valueOf(1), Optional.fromNullable(1).or(2));
+ Assert.assertEquals(Integer.valueOf(1), Optional.fromNullable(1).orElse(2));
+ Assert.assertFalse(Optional.fromNullable(null).isPresent());
+ Assert.assertNull(Optional.fromNullable(null).orNull());
+ Assert.assertEquals(Integer.valueOf(2), Optional.fromNullable(null).or(2));
+ Assert.assertEquals(Integer.valueOf(2), Optional.fromNullable(null).orElse(2));
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/Smuggle.scala b/core/src/test/scala/org/apache/spark/Smuggle.scala
index 01694a6e6f741..9f0a1b4c25dd1 100644
--- a/core/src/test/scala/org/apache/spark/Smuggle.scala
+++ b/core/src/test/scala/org/apache/spark/Smuggle.scala
@@ -21,6 +21,7 @@ import java.util.UUID
import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.mutable
+import scala.language.implicitConversions
/**
* Utility wrapper to "smuggle" objects into tasks while bypassing serialization.
diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
index 4e72b89bfcc40..76451788d2406 100644
--- a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
@@ -178,7 +178,7 @@ class DoubleRDDSuite extends SparkFunSuite with SharedSparkContext {
test("WorksWithOutOfRangeWithInfiniteBuckets") {
// Verify that out of range works with two buckets
val rdd = sc.parallelize(Seq(10.01, -0.01, Double.NaN))
- val buckets = Array(-1.0/0.0 , 0.0, 1.0/0.0)
+ val buckets = Array(-1.0/0.0, 0.0, 1.0/0.0)
val histogramResults = rdd.histogram(buckets)
val expectedHistogramResults = Array(1, 1)
assert(histogramResults === expectedHistogramResults)
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 24acbed4d7258..ef2ed445005d3 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -482,6 +482,10 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
assert(nums.take(501) === (1 to 501).toArray)
assert(nums.take(999) === (1 to 999).toArray)
assert(nums.take(1000) === (1 to 999).toArray)
+
+ nums = sc.parallelize(1 to 2, 2)
+ assert(nums.take(2147483638).size === 2)
+ assert(nums.takeAsync(2147483638).get.size === 2)
}
test("top with predefined ordering") {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index 504e5780f3d8a..e111e2e9f6163 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -76,7 +76,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
test("check spark-class location correctly") {
val conf = new SparkConf
- conf.set("spark.mesos.executor.home" , "/mesos-home")
+ conf.set("spark.mesos.executor.home", "/mesos-home")
val listenerBus = mock[LiveListenerBus]
listenerBus.post(
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
similarity index 87%
rename from core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
rename to core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
index f200ff36c7dd5..d21ce73f4021e 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
@@ -19,18 +19,18 @@ package org.apache.spark.shuffle.sort
import java.io.{File, FileInputStream, FileOutputStream}
+import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
-import org.mockito.{Mock, MockitoAnnotations}
import org.scalatest.BeforeAndAfterEach
+import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.shuffle.IndexShuffleBlockResolver
import org.apache.spark.storage._
import org.apache.spark.util.Utils
-import org.apache.spark.{SparkConf, SparkFunSuite}
class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEach {
@@ -64,12 +64,15 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
}
test("commit shuffle files multiple times") {
- val lengths = Array[Long](10, 0, 20)
val resolver = new IndexShuffleBlockResolver(conf, blockManager)
+ val lengths = Array[Long](10, 0, 20)
val dataTmp = File.createTempFile("shuffle", null, tempDir)
val out = new FileOutputStream(dataTmp)
- out.write(new Array[Byte](30))
- out.close()
+ Utils.tryWithSafeFinally {
+ out.write(new Array[Byte](30))
+ } {
+ out.close()
+ }
resolver.writeIndexFileAndCommit(1, 2, lengths, dataTmp)
val dataFile = resolver.getDataFile(1, 2)
@@ -77,12 +80,15 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
assert(dataFile.length() === 30)
assert(!dataTmp.exists())
+ val lengths2 = new Array[Long](3)
val dataTmp2 = File.createTempFile("shuffle", null, tempDir)
val out2 = new FileOutputStream(dataTmp2)
- val lengths2 = new Array[Long](3)
- out2.write(Array[Byte](1))
- out2.write(new Array[Byte](29))
- out2.close()
+ Utils.tryWithSafeFinally {
+ out2.write(Array[Byte](1))
+ out2.write(new Array[Byte](29))
+ } {
+ out2.close()
+ }
resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2)
assert(lengths2.toSeq === lengths.toSeq)
assert(dataFile.exists())
@@ -90,20 +96,27 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
assert(!dataTmp2.exists())
// The dataFile should be the previous one
- val in = new FileInputStream(dataFile)
val firstByte = new Array[Byte](1)
- in.read(firstByte)
+ val in = new FileInputStream(dataFile)
+ Utils.tryWithSafeFinally {
+ in.read(firstByte)
+ } {
+ in.close()
+ }
assert(firstByte(0) === 0)
// remove data file
dataFile.delete()
+ val lengths3 = Array[Long](10, 10, 15)
val dataTmp3 = File.createTempFile("shuffle", null, tempDir)
val out3 = new FileOutputStream(dataTmp3)
- val lengths3 = Array[Long](10, 10, 15)
- out3.write(Array[Byte](2))
- out3.write(new Array[Byte](34))
- out3.close()
+ Utils.tryWithSafeFinally {
+ out3.write(Array[Byte](2))
+ out3.write(new Array[Byte](34))
+ } {
+ out3.close()
+ }
resolver.writeIndexFileAndCommit(1, 2, lengths3, dataTmp3)
assert(lengths3.toSeq != lengths.toSeq)
assert(dataFile.exists())
@@ -111,9 +124,13 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
assert(!dataTmp2.exists())
// The dataFile should be the previous one
- val in2 = new FileInputStream(dataFile)
val firstByte2 = new Array[Byte](1)
- in2.read(firstByte2)
+ val in2 = new FileInputStream(dataFile)
+ Utils.tryWithSafeFinally {
+ in2.read(firstByte2)
+ } {
+ in2.close()
+ }
assert(firstByte2(0) === 2)
}
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 21db3b1c9ffbd..67210e5d4c50e 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -505,38 +505,27 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
test("in-memory LRU storage") {
- store = makeBlockManager(12000)
- val a1 = new Array[Byte](4000)
- val a2 = new Array[Byte](4000)
- val a3 = new Array[Byte](4000)
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
- store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
- store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY)
- assert(store.getSingle("a2").isDefined, "a2 was not in store")
- assert(store.getSingle("a3").isDefined, "a3 was not in store")
- assert(store.getSingle("a1") === None, "a1 was in store")
- assert(store.getSingle("a2").isDefined, "a2 was not in store")
- // At this point a2 was gotten last, so LRU will getSingle rid of a3
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
- assert(store.getSingle("a1").isDefined, "a1 was not in store")
- assert(store.getSingle("a2").isDefined, "a2 was not in store")
- assert(store.getSingle("a3") === None, "a3 was in store")
+ testInMemoryLRUStorage(StorageLevel.MEMORY_ONLY)
}
test("in-memory LRU storage with serialization") {
+ testInMemoryLRUStorage(StorageLevel.MEMORY_ONLY_SER)
+ }
+
+ private def testInMemoryLRUStorage(storageLevel: StorageLevel): Unit = {
store = makeBlockManager(12000)
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
- store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
- store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a1", a1, storageLevel)
+ store.putSingle("a2", a2, storageLevel)
+ store.putSingle("a3", a3, storageLevel)
assert(store.getSingle("a2").isDefined, "a2 was not in store")
assert(store.getSingle("a3").isDefined, "a3 was not in store")
assert(store.getSingle("a1") === None, "a1 was in store")
assert(store.getSingle("a2").isDefined, "a2 was not in store")
// At this point a2 was gotten last, so LRU will getSingle rid of a3
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a1", a1, storageLevel)
assert(store.getSingle("a1").isDefined, "a1 was not in store")
assert(store.getSingle("a2").isDefined, "a2 was not in store")
assert(store.getSingle("a3") === None, "a3 was in store")
@@ -618,62 +607,35 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
test("disk and memory storage") {
- store = makeBlockManager(12000)
- val a1 = new Array[Byte](4000)
- val a2 = new Array[Byte](4000)
- val a3 = new Array[Byte](4000)
- store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
- store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
- store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
- assert(store.getSingle("a2").isDefined, "a2 was not in store")
- assert(store.getSingle("a3").isDefined, "a3 was not in store")
- assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
- assert(store.getSingle("a1").isDefined, "a1 was not in store")
- assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
+ testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getSingle)
}
test("disk and memory storage with getLocalBytes") {
- store = makeBlockManager(12000)
- val a1 = new Array[Byte](4000)
- val a2 = new Array[Byte](4000)
- val a3 = new Array[Byte](4000)
- store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
- store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
- store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
- assert(store.getLocalBytes("a2").isDefined, "a2 was not in store")
- assert(store.getLocalBytes("a3").isDefined, "a3 was not in store")
- assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
- assert(store.getLocalBytes("a1").isDefined, "a1 was not in store")
- assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
+ testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getLocalBytes)
}
test("disk and memory storage with serialization") {
- store = makeBlockManager(12000)
- val a1 = new Array[Byte](4000)
- val a2 = new Array[Byte](4000)
- val a3 = new Array[Byte](4000)
- store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
- store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
- store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
- assert(store.getSingle("a2").isDefined, "a2 was not in store")
- assert(store.getSingle("a3").isDefined, "a3 was not in store")
- assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
- assert(store.getSingle("a1").isDefined, "a1 was not in store")
- assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
+ testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getSingle)
}
test("disk and memory storage with serialization and getLocalBytes") {
+ testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getLocalBytes)
+ }
+
+ def testDiskAndMemoryStorage(
+ storageLevel: StorageLevel,
+ accessMethod: BlockManager => BlockId => Option[_]): Unit = {
store = makeBlockManager(12000)
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
- store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
- store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
- store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
- assert(store.getLocalBytes("a2").isDefined, "a2 was not in store")
- assert(store.getLocalBytes("a3").isDefined, "a3 was not in store")
- assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
- assert(store.getLocalBytes("a1").isDefined, "a1 was not in store")
+ store.putSingle("a1", a1, storageLevel)
+ store.putSingle("a2", a2, storageLevel)
+ store.putSingle("a3", a3, storageLevel)
+ assert(accessMethod(store)("a2").isDefined, "a2 was not in store")
+ assert(accessMethod(store)("a3").isDefined, "a3 was not in store")
+ assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store")
+ assert(accessMethod(store)("a1").isDefined, "a1 was not in store")
assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
}
diff --git a/dev/create-release/release-tag.sh b/dev/create-release/release-tag.sh
index b0a3374becc6a..d404939d1caee 100755
--- a/dev/create-release/release-tag.sh
+++ b/dev/create-release/release-tag.sh
@@ -64,9 +64,6 @@ git commit -a -m "Preparing Spark release $RELEASE_TAG"
echo "Creating tag $RELEASE_TAG at the head of $GIT_BRANCH"
git tag $RELEASE_TAG
-# TODO: It would be nice to do some verifications here
-# i.e. check whether ec2 scripts have the new version
-
# Create next version
$MVN versions:set -DnewVersion=$NEXT_VERSION | grep -v "no value" # silence logs
git commit -a -m "Preparing development version $NEXT_VERSION"
diff --git a/dev/create-release/releaseutils.py b/dev/create-release/releaseutils.py
index 7f152b7f53559..5d0ac16b3b0a1 100755
--- a/dev/create-release/releaseutils.py
+++ b/dev/create-release/releaseutils.py
@@ -159,7 +159,6 @@ def get_commits(tag):
"build": CORE_COMPONENT,
"deploy": CORE_COMPONENT,
"documentation": CORE_COMPONENT,
- "ec2": "EC2",
"examples": CORE_COMPONENT,
"graphx": "GraphX",
"input/output": CORE_COMPONENT,
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index e4373f79f7922..53034a25d46ab 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -84,13 +84,13 @@ hadoop-yarn-server-web-proxy-2.2.0.jar
httpclient-4.3.2.jar
httpcore-4.3.2.jar
ivy-2.4.0.jar
-jackson-annotations-2.4.4.jar
-jackson-core-2.4.4.jar
+jackson-annotations-2.5.3.jar
+jackson-core-2.5.3.jar
jackson-core-asl-1.9.13.jar
-jackson-databind-2.4.4.jar
+jackson-databind-2.5.3.jar
jackson-jaxrs-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
-jackson-module-scala_2.10-2.4.4.jar
+jackson-module-scala_2.10-2.5.3.jar
jackson-xc-1.9.13.jar
janino-2.7.8.jar
jansi-1.4.jar
@@ -160,7 +160,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
-py4j-0.9.jar
+py4j-0.9.1.jar
pyrolite-4.9.jar
quasiquotes_2.10-2.0.0-M8.jar
reflectasm-1.07-shaded.jar
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index 7478181406d07..a23e260641aeb 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -79,13 +79,13 @@ hadoop-yarn-server-web-proxy-2.3.0.jar
httpclient-4.3.2.jar
httpcore-4.3.2.jar
ivy-2.4.0.jar
-jackson-annotations-2.4.4.jar
-jackson-core-2.4.4.jar
+jackson-annotations-2.5.3.jar
+jackson-core-2.5.3.jar
jackson-core-asl-1.9.13.jar
-jackson-databind-2.4.4.jar
+jackson-databind-2.5.3.jar
jackson-jaxrs-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
-jackson-module-scala_2.10-2.4.4.jar
+jackson-module-scala_2.10-2.5.3.jar
jackson-xc-1.9.13.jar
janino-2.7.8.jar
jansi-1.4.jar
@@ -151,7 +151,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
-py4j-0.9.jar
+py4j-0.9.1.jar
pyrolite-4.9.jar
quasiquotes_2.10-2.0.0-M8.jar
reflectasm-1.07-shaded.jar
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index faffb8bf398a5..6bedbed1e3355 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -79,13 +79,13 @@ hadoop-yarn-server-web-proxy-2.4.0.jar
httpclient-4.3.2.jar
httpcore-4.3.2.jar
ivy-2.4.0.jar
-jackson-annotations-2.4.4.jar
-jackson-core-2.4.4.jar
+jackson-annotations-2.5.3.jar
+jackson-core-2.5.3.jar
jackson-core-asl-1.9.13.jar
-jackson-databind-2.4.4.jar
+jackson-databind-2.5.3.jar
jackson-jaxrs-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
-jackson-module-scala_2.10-2.4.4.jar
+jackson-module-scala_2.10-2.5.3.jar
jackson-xc-1.9.13.jar
janino-2.7.8.jar
jansi-1.4.jar
@@ -152,7 +152,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
-py4j-0.9.jar
+py4j-0.9.1.jar
pyrolite-4.9.jar
quasiquotes_2.10-2.0.0-M8.jar
reflectasm-1.07-shaded.jar
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index e703c7acd3876..7bfad57b4a4a6 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -85,13 +85,13 @@ htrace-core-3.0.4.jar
httpclient-4.3.2.jar
httpcore-4.3.2.jar
ivy-2.4.0.jar
-jackson-annotations-2.4.4.jar
-jackson-core-2.4.4.jar
+jackson-annotations-2.5.3.jar
+jackson-core-2.5.3.jar
jackson-core-asl-1.9.13.jar
-jackson-databind-2.4.4.jar
+jackson-databind-2.5.3.jar
jackson-jaxrs-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
-jackson-module-scala_2.10-2.4.4.jar
+jackson-module-scala_2.10-2.5.3.jar
jackson-xc-1.9.13.jar
janino-2.7.8.jar
jansi-1.4.jar
@@ -158,7 +158,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
-py4j-0.9.jar
+py4j-0.9.1.jar
pyrolite-4.9.jar
quasiquotes_2.10-2.0.0-M8.jar
reflectasm-1.07-shaded.jar
diff --git a/dev/lint-python b/dev/lint-python
index 0b97213ae3dff..1765a07d2f22b 100755
--- a/dev/lint-python
+++ b/dev/lint-python
@@ -19,7 +19,7 @@
SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
SPARK_ROOT_DIR="$(dirname "$SCRIPT_DIR")"
-PATHS_TO_CHECK="./python/pyspark/ ./ec2/spark_ec2.py ./examples/src/main/python/ ./dev/sparktestsupport"
+PATHS_TO_CHECK="./python/pyspark/ ./examples/src/main/python/ ./dev/sparktestsupport"
PATHS_TO_CHECK="$PATHS_TO_CHECK ./dev/run-tests.py ./python/run-tests.py ./dev/run-tests-jenkins.py"
PEP8_REPORT_PATH="$SPARK_ROOT_DIR/dev/pep8-report.txt"
PYLINT_REPORT_PATH="$SPARK_ROOT_DIR/dev/pylint-report.txt"
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 47cd600bd18a4..93a8c15e3ec30 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -406,15 +406,6 @@ def contains_file(self, filename):
should_run_build_tests=True
)
-ec2 = Module(
- name="ec2",
- dependencies=[],
- source_file_regexes=[
- "ec2/",
- ]
-)
-
-
yarn = Module(
name="yarn",
dependencies=[],
@@ -435,7 +426,7 @@ def contains_file(self, filename):
# No other modules should directly depend on this module.
root = Module(
name="root",
- dependencies=[],
+ dependencies=[build], # Changes to build should trigger all tests.
source_file_regexes=[],
# In order to run all of the tests, enable every test profile:
build_profile_flags=list(set(
diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh
index 424ce6ad7663c..3cb5d2be2a91a 100755
--- a/dev/test-dependencies.sh
+++ b/dev/test-dependencies.sh
@@ -70,19 +70,10 @@ $MVN -q versions:set -DnewVersion=$TEMP_VERSION -DgenerateBackupPoms=false > /de
# Generate manifests for each Hadoop profile:
for HADOOP_PROFILE in "${HADOOP_PROFILES[@]}"; do
echo "Performing Maven install for $HADOOP_PROFILE"
- $MVN $HADOOP2_MODULE_PROFILES -P$HADOOP_PROFILE jar:jar install:install -q \
- -pl '!assembly' \
- -pl '!examples' \
- -pl '!external/flume-assembly' \
- -pl '!external/kafka-assembly' \
- -pl '!external/twitter' \
- -pl '!external/flume' \
- -pl '!external/mqtt' \
- -pl '!external/mqtt-assembly' \
- -pl '!external/zeromq' \
- -pl '!external/kafka' \
- -pl '!tags' \
- -DskipTests
+ $MVN $HADOOP2_MODULE_PROFILES -P$HADOOP_PROFILE jar:jar jar:test-jar install:install clean -q
+
+ echo "Performing Maven validate for $HADOOP_PROFILE"
+ $MVN $HADOOP2_MODULE_PROFILES -P$HADOOP_PROFILE validate -q
echo "Generating dependency manifest for $HADOOP_PROFILE"
mkdir -p dev/pr-deps
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index 62d75eff71057..d493f62f0e578 100755
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -98,8 +98,6 @@
diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index faaf154d243f5..2810112f5294e 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -53,8 +53,6 @@ The system currently supports three cluster managers:
and service applications.
* [Hadoop YARN](running-on-yarn.html) -- the resource manager in Hadoop 2.
-In addition, Spark's [EC2 launch scripts](ec2-scripts.html) make it easy to launch a standalone
-cluster on Amazon EC2.
# Submitting Applications
diff --git a/docs/ec2-scripts.md b/docs/ec2-scripts.md
deleted file mode 100644
index 7f60f82b966fe..0000000000000
--- a/docs/ec2-scripts.md
+++ /dev/null
@@ -1,192 +0,0 @@
----
-layout: global
-title: Running Spark on EC2
----
-
-The `spark-ec2` script, located in Spark's `ec2` directory, allows you
-to launch, manage and shut down Spark clusters on Amazon EC2. It automatically
-sets up Spark and HDFS on the cluster for you. This guide describes
-how to use `spark-ec2` to launch clusters, how to run jobs on them, and how
-to shut them down. It assumes you've already signed up for an EC2 account
-on the [Amazon Web Services site](http://aws.amazon.com/).
-
-`spark-ec2` is designed to manage multiple named clusters. You can
-launch a new cluster (telling the script its size and giving it a name),
-shutdown an existing cluster, or log into a cluster. Each cluster is
-identified by placing its machines into EC2 security groups whose names
-are derived from the name of the cluster. For example, a cluster named
-`test` will contain a master node in a security group called
-`test-master`, and a number of slave nodes in a security group called
-`test-slaves`. The `spark-ec2` script will create these security groups
-for you based on the cluster name you request. You can also use them to
-identify machines belonging to each cluster in the Amazon EC2 Console.
-
-
-# Before You Start
-
-- Create an Amazon EC2 key pair for yourself. This can be done by
- logging into your Amazon Web Services account through the [AWS
- console](http://aws.amazon.com/console/), clicking Key Pairs on the
- left sidebar, and creating and downloading a key. Make sure that you
- set the permissions for the private key file to `600` (i.e. only you
- can read and write it) so that `ssh` will work.
-- Whenever you want to use the `spark-ec2` script, set the environment
- variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` to your
- Amazon EC2 access key ID and secret access key. These can be
- obtained from the [AWS homepage](http://aws.amazon.com/) by clicking
- Account \> Security Credentials \> Access Credentials.
-
-# Launching a Cluster
-
-- Go into the `ec2` directory in the release of Spark you downloaded.
-- Run
- `./spark-ec2 -k -i -s launch `,
- where `` is the name of your EC2 key pair (that you gave it
- when you created it), `` is the private key file for your
- key pair, `` is the number of slave nodes to launch (try
- 1 at first), and `` is the name to give to your
- cluster.
-
- For example:
-
- ```bash
- export AWS_SECRET_ACCESS_KEY=AaBbCcDdEeFGgHhIiJjKkLlMmNnOoPpQqRrSsTtU
-export AWS_ACCESS_KEY_ID=ABCDEFG1234567890123
-./spark-ec2 --key-pair=awskey --identity-file=awskey.pem --region=us-west-1 --zone=us-west-1a launch my-spark-cluster
- ```
-
-- After everything launches, check that the cluster scheduler is up and sees
- all the slaves by going to its web UI, which will be printed at the end of
- the script (typically `http://:8080`).
-
-You can also run `./spark-ec2 --help` to see more usage options. The
-following options are worth pointing out:
-
-- `--instance-type=` can be used to specify an EC2
-instance type to use. For now, the script only supports 64-bit instance
-types, and the default type is `m1.large` (which has 2 cores and 7.5 GB
-RAM). Refer to the Amazon pages about [EC2 instance
-types](http://aws.amazon.com/ec2/instance-types) and [EC2
-pricing](http://aws.amazon.com/ec2/#pricing) for information about other
-instance types.
-- `--region=` specifies an EC2 region in which to launch
-instances. The default region is `us-east-1`.
-- `--zone=` can be used to specify an EC2 availability zone
-to launch instances in. Sometimes, you will get an error because there
-is not enough capacity in one zone, and you should try to launch in
-another.
-- `--ebs-vol-size=` will attach an EBS volume with a given amount
- of space to each node so that you can have a persistent HDFS cluster
- on your nodes across cluster restarts (see below).
-- `--spot-price=` will launch the worker nodes as
- [Spot Instances](http://aws.amazon.com/ec2/spot-instances/),
- bidding for the given maximum price (in dollars).
-- `--spark-version=` will pre-load the cluster with the
- specified version of Spark. The `` can be a version number
- (e.g. "0.7.3") or a specific git hash. By default, a recent
- version will be used.
-- `--spark-git-repo=` will let you run a custom version of
- Spark that is built from the given git repository. By default, the
- [Apache Github mirror](https://github.com/apache/spark) will be used.
- When using a custom Spark version, `--spark-version` must be set to git
- commit hash, such as 317e114, instead of a version number.
-- If one of your launches fails due to e.g. not having the right
-permissions on your private key file, you can run `launch` with the
-`--resume` option to restart the setup process on an existing cluster.
-
-# Launching a Cluster in a VPC
-
-- Run
- `./spark-ec2 -k -i -s --vpc-id= --subnet-id= launch `,
- where `` is the name of your EC2 key pair (that you gave it
- when you created it), `` is the private key file for your
- key pair, `` is the number of slave nodes to launch (try
- 1 at first), `` is the name of your VPC, `` is the
- name of your subnet, and `` is the name to give to your
- cluster.
-
- For example:
-
- ```bash
- export AWS_SECRET_ACCESS_KEY=AaBbCcDdEeFGgHhIiJjKkLlMmNnOoPpQqRrSsTtU
-export AWS_ACCESS_KEY_ID=ABCDEFG1234567890123
-./spark-ec2 --key-pair=awskey --identity-file=awskey.pem --region=us-west-1 --zone=us-west-1a --vpc-id=vpc-a28d24c7 --subnet-id=subnet-4eb27b39 --spark-version=1.1.0 launch my-spark-cluster
- ```
-
-# Running Applications
-
-- Go into the `ec2` directory in the release of Spark you downloaded.
-- Run `./spark-ec2 -k -i login ` to
- SSH into the cluster, where `` and `` are as
- above. (This is just for convenience; you could also use
- the EC2 console.)
-- To deploy code or data within your cluster, you can log in and use the
- provided script `~/spark-ec2/copy-dir`, which,
- given a directory path, RSYNCs it to the same location on all the slaves.
-- If your application needs to access large datasets, the fastest way to do
- that is to load them from Amazon S3 or an Amazon EBS device into an
- instance of the Hadoop Distributed File System (HDFS) on your nodes.
- The `spark-ec2` script already sets up a HDFS instance for you. It's
- installed in `/root/ephemeral-hdfs`, and can be accessed using the
- `bin/hadoop` script in that directory. Note that the data in this
- HDFS goes away when you stop and restart a machine.
-- There is also a *persistent HDFS* instance in
- `/root/persistent-hdfs` that will keep data across cluster restarts.
- Typically each node has relatively little space of persistent data
- (about 3 GB), but you can use the `--ebs-vol-size` option to
- `spark-ec2` to attach a persistent EBS volume to each node for
- storing the persistent HDFS.
-- Finally, if you get errors while running your application, look at the slave's logs
- for that application inside of the scheduler work directory (/root/spark/work). You can
- also view the status of the cluster using the web UI: `http://:8080`.
-
-# Configuration
-
-You can edit `/root/spark/conf/spark-env.sh` on each machine to set Spark configuration options, such
-as JVM options. This file needs to be copied to **every machine** to reflect the change. The easiest way to
-do this is to use a script we provide called `copy-dir`. First edit your `spark-env.sh` file on the master,
-then run `~/spark-ec2/copy-dir /root/spark/conf` to RSYNC it to all the workers.
-
-The [configuration guide](configuration.html) describes the available configuration options.
-
-# Terminating a Cluster
-
-***Note that there is no way to recover data on EC2 nodes after shutting
-them down! Make sure you have copied everything important off the nodes
-before stopping them.***
-
-- Go into the `ec2` directory in the release of Spark you downloaded.
-- Run `./spark-ec2 destroy `.
-
-# Pausing and Restarting Clusters
-
-The `spark-ec2` script also supports pausing a cluster. In this case,
-the VMs are stopped but not terminated, so they
-***lose all data on ephemeral disks*** but keep the data in their
-root partitions and their `persistent-hdfs`. Stopped machines will not
-cost you any EC2 cycles, but ***will*** continue to cost money for EBS
-storage.
-
-- To stop one of your clusters, go into the `ec2` directory and run
-`./spark-ec2 --region= stop `.
-- To restart it later, run
-`./spark-ec2 -i --region= start `.
-- To ultimately destroy the cluster and stop consuming EBS space, run
-`./spark-ec2 --region= destroy ` as described in the previous
-section.
-
-# Limitations
-
-- Support for "cluster compute" nodes is limited -- there's no way to specify a
- locality group. However, you can launch slave nodes in your
- `-slaves` group manually and then use `spark-ec2 launch
- --resume` to start a cluster with them.
-
-If you have a patch or suggestion for one of these limitations, feel free to
-[contribute](contributing-to-spark.html) it!
-
-# Accessing Data in S3
-
-Spark's file interface allows it to process data in Amazon S3 using the same URI formats that are supported for Hadoop. You can specify a path in S3 as input through a URI of the form `s3n:///path`. To provide AWS credentials for S3 access, launch the Spark cluster with the option `--copy-aws-credentials`. Full instructions on S3 access using the Hadoop input libraries can be found on the [Hadoop S3 page](http://wiki.apache.org/hadoop/AmazonS3).
-
-In addition to using a single input file, you can also use a directory of files as input by simply giving the path to the directory.
diff --git a/docs/index.md b/docs/index.md
index ae26f97c86c21..9dfc52a2bdc9b 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -64,7 +64,7 @@ To run Spark interactively in a R interpreter, use `bin/sparkR`:
./bin/sparkR --master local[2]
Example applications are also provided in R. For example,
-
+
./bin/spark-submit examples/src/main/r/dataframe.R
# Launching on a Cluster
@@ -73,7 +73,6 @@ The Spark [cluster mode overview](cluster-overview.html) explains the key concep
Spark can run both by itself, or over several existing cluster managers. It currently provides several
options for deployment:
-* [Amazon EC2](ec2-scripts.html): our EC2 scripts let you launch a cluster in about 5 minutes
* [Standalone Deploy Mode](spark-standalone.html): simplest way to deploy Spark on a private cluster
* [Apache Mesos](running-on-mesos.html)
* [Hadoop YARN](running-on-yarn.html)
@@ -103,7 +102,7 @@ options for deployment:
* [Cluster Overview](cluster-overview.html): overview of concepts and components when running on a cluster
* [Submitting Applications](submitting-applications.html): packaging and deploying applications
* Deployment modes:
- * [Amazon EC2](ec2-scripts.html): scripts that let you launch a cluster on EC2 in about 5 minutes
+ * [Amazon EC2](https://github.com/amplab/spark-ec2): scripts that let you launch a cluster on EC2 in about 5 minutes
* [Standalone Deploy Mode](spark-standalone.html): launch a standalone cluster quickly without a third-party cluster manager
* [Mesos](running-on-mesos.html): deploy a private cluster using
[Apache Mesos](http://mesos.apache.org)
diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md
index 36327c6efeaf3..6c587b3f0d8db 100644
--- a/docs/job-scheduling.md
+++ b/docs/job-scheduling.md
@@ -91,7 +91,7 @@ pre-packaged distribution.
2. Add this jar to the classpath of all `NodeManager`s in your cluster.
3. In the `yarn-site.xml` on each node, add `spark_shuffle` to `yarn.nodemanager.aux-services`,
then set `yarn.nodemanager.aux-services.spark_shuffle.class` to
-`org.apache.spark.network.yarn.YarnShuffleService` and `spark.shuffle.service.enabled` to true.
+`org.apache.spark.network.yarn.YarnShuffleService`.
4. Restart all `NodeManager`s in your cluster.
All other relevant configurations are optional and under the `spark.dynamicAllocation.*` and
diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md
index 20b35612cab95..aac8f7560a4f8 100644
--- a/docs/mllib-linear-methods.md
+++ b/docs/mllib-linear-methods.md
@@ -590,7 +590,8 @@ val parsedData = data.map { line =>
// Building the model
val numIterations = 100
-val model = LinearRegressionWithSGD.train(parsedData, numIterations)
+val stepSize = 0.00000001
+val model = LinearRegressionWithSGD.train(parsedData, numIterations, stepSize)
// Evaluate model on training examples and compute training error
val valuesAndPreds = parsedData.map { point =>
@@ -655,8 +656,9 @@ public class LinearRegression {
// Building the model
int numIterations = 100;
+ double stepSize = 0.00000001;
final LinearRegressionModel model =
- LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations);
+ LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations, stepSize);
// Evaluate model on training examples and compute training error
JavaRDD> valuesAndPreds = parsedData.map(
@@ -706,7 +708,7 @@ data = sc.textFile("data/mllib/ridge-data/lpsa.data")
parsedData = data.map(parsePoint)
# Build the model
-model = LinearRegressionWithSGD.train(parsedData)
+model = LinearRegressionWithSGD.train(parsedData, iterations=100, step=0.00000001)
# Evaluate the model on training data
valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index b058833616433..bc89c781562bd 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -2151,6 +2151,11 @@ options.
...
{% endhighlight %}
+ - From Spark 1.6, LongType casts to TimestampType expect seconds instead of microseconds. This
+ change was made to match the behavior of Hive 1.2 for more consistent type casting to TimestampType
+ from numeric types. See [SPARK-11724](https://issues.apache.org/jira/browse/SPARK-11724) for
+ details.
+
## Upgrading From Spark SQL 1.4 to 1.5
- Optimized execution using manually managed memory (Tungsten) is now enabled by default, along with
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 1edc0fe34706b..8fd075d02b78e 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -881,7 +881,6 @@ Scala code, take a look at the example
{% highlight java %}
-import com.google.common.base.Optional;
Function2, Optional, Optional> updateFunction =
new Function2, Optional, Optional>() {
@Override public Optional call(List values, Optional state) {
diff --git a/ec2/README b/ec2/README
deleted file mode 100644
index 72434f24bf98d..0000000000000
--- a/ec2/README
+++ /dev/null
@@ -1,4 +0,0 @@
-This folder contains a script, spark-ec2, for launching Spark clusters on
-Amazon EC2. Usage instructions are available online at:
-
-http://spark.apache.org/docs/latest/ec2-scripts.html
diff --git a/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh b/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh
deleted file mode 100644
index 4f3e8da809f7f..0000000000000
--- a/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh
+++ /dev/null
@@ -1,34 +0,0 @@
-#!/usr/bin/env bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# These variables are automatically filled in by the spark-ec2 script.
-export MASTERS="{{master_list}}"
-export SLAVES="{{slave_list}}"
-export HDFS_DATA_DIRS="{{hdfs_data_dirs}}"
-export MAPRED_LOCAL_DIRS="{{mapred_local_dirs}}"
-export SPARK_LOCAL_DIRS="{{spark_local_dirs}}"
-export MODULES="{{modules}}"
-export SPARK_VERSION="{{spark_version}}"
-export TACHYON_VERSION="{{tachyon_version}}"
-export HADOOP_MAJOR_VERSION="{{hadoop_major_version}}"
-export SWAP_MB="{{swap}}"
-export SPARK_WORKER_INSTANCES="{{spark_worker_instances}}"
-export SPARK_MASTER_OPTS="{{spark_master_opts}}"
-export AWS_ACCESS_KEY_ID="{{aws_access_key_id}}"
-export AWS_SECRET_ACCESS_KEY="{{aws_secret_access_key}}"
diff --git a/ec2/spark-ec2 b/ec2/spark-ec2
deleted file mode 100755
index 26e7d22655694..0000000000000
--- a/ec2/spark-ec2
+++ /dev/null
@@ -1,25 +0,0 @@
-#!/bin/sh
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Preserve the user's CWD so that relative paths are passed correctly to
-#+ the underlying Python script.
-SPARK_EC2_DIR="$(dirname "$0")"
-
-python -Wdefault "${SPARK_EC2_DIR}/spark_ec2.py" "$@"
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
deleted file mode 100755
index 19d5980560fef..0000000000000
--- a/ec2/spark_ec2.py
+++ /dev/null
@@ -1,1530 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import division, print_function, with_statement
-
-import codecs
-import hashlib
-import itertools
-import logging
-import os
-import os.path
-import pipes
-import random
-import shutil
-import string
-from stat import S_IRUSR
-import subprocess
-import sys
-import tarfile
-import tempfile
-import textwrap
-import time
-import warnings
-from datetime import datetime
-from optparse import OptionParser
-from sys import stderr
-
-if sys.version < "3":
- from urllib2 import urlopen, Request, HTTPError
-else:
- from urllib.request import urlopen, Request
- from urllib.error import HTTPError
- raw_input = input
- xrange = range
-
-SPARK_EC2_VERSION = "1.6.0"
-SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))
-
-VALID_SPARK_VERSIONS = set([
- "0.7.3",
- "0.8.0",
- "0.8.1",
- "0.9.0",
- "0.9.1",
- "0.9.2",
- "1.0.0",
- "1.0.1",
- "1.0.2",
- "1.1.0",
- "1.1.1",
- "1.2.0",
- "1.2.1",
- "1.3.0",
- "1.3.1",
- "1.4.0",
- "1.4.1",
- "1.5.0",
- "1.5.1",
- "1.5.2",
- "1.6.0",
-])
-
-SPARK_TACHYON_MAP = {
- "1.0.0": "0.4.1",
- "1.0.1": "0.4.1",
- "1.0.2": "0.4.1",
- "1.1.0": "0.5.0",
- "1.1.1": "0.5.0",
- "1.2.0": "0.5.0",
- "1.2.1": "0.5.0",
- "1.3.0": "0.5.0",
- "1.3.1": "0.5.0",
- "1.4.0": "0.6.4",
- "1.4.1": "0.6.4",
- "1.5.0": "0.7.1",
- "1.5.1": "0.7.1",
- "1.5.2": "0.7.1",
- "1.6.0": "0.8.2",
-}
-
-DEFAULT_SPARK_VERSION = SPARK_EC2_VERSION
-DEFAULT_SPARK_GITHUB_REPO = "https://github.com/apache/spark"
-
-# Default location to get the spark-ec2 scripts (and ami-list) from
-DEFAULT_SPARK_EC2_GITHUB_REPO = "https://github.com/amplab/spark-ec2"
-DEFAULT_SPARK_EC2_BRANCH = "branch-1.5"
-
-
-def setup_external_libs(libs):
- """
- Download external libraries from PyPI to SPARK_EC2_DIR/lib/ and prepend them to our PATH.
- """
- PYPI_URL_PREFIX = "https://pypi.python.org/packages/source"
- SPARK_EC2_LIB_DIR = os.path.join(SPARK_EC2_DIR, "lib")
-
- if not os.path.exists(SPARK_EC2_LIB_DIR):
- print("Downloading external libraries that spark-ec2 needs from PyPI to {path}...".format(
- path=SPARK_EC2_LIB_DIR
- ))
- print("This should be a one-time operation.")
- os.mkdir(SPARK_EC2_LIB_DIR)
-
- for lib in libs:
- versioned_lib_name = "{n}-{v}".format(n=lib["name"], v=lib["version"])
- lib_dir = os.path.join(SPARK_EC2_LIB_DIR, versioned_lib_name)
-
- if not os.path.isdir(lib_dir):
- tgz_file_path = os.path.join(SPARK_EC2_LIB_DIR, versioned_lib_name + ".tar.gz")
- print(" - Downloading {lib}...".format(lib=lib["name"]))
- download_stream = urlopen(
- "{prefix}/{first_letter}/{lib_name}/{lib_name}-{lib_version}.tar.gz".format(
- prefix=PYPI_URL_PREFIX,
- first_letter=lib["name"][:1],
- lib_name=lib["name"],
- lib_version=lib["version"]
- )
- )
- with open(tgz_file_path, "wb") as tgz_file:
- tgz_file.write(download_stream.read())
- with open(tgz_file_path, "rb") as tar:
- if hashlib.md5(tar.read()).hexdigest() != lib["md5"]:
- print("ERROR: Got wrong md5sum for {lib}.".format(lib=lib["name"]), file=stderr)
- sys.exit(1)
- tar = tarfile.open(tgz_file_path)
- tar.extractall(path=SPARK_EC2_LIB_DIR)
- tar.close()
- os.remove(tgz_file_path)
- print(" - Finished downloading {lib}.".format(lib=lib["name"]))
- sys.path.insert(1, lib_dir)
-
-
-# Only PyPI libraries are supported.
-external_libs = [
- {
- "name": "boto",
- "version": "2.34.0",
- "md5": "5556223d2d0cc4d06dd4829e671dcecd"
- }
-]
-
-setup_external_libs(external_libs)
-
-import boto
-from boto.ec2.blockdevicemapping import BlockDeviceMapping, BlockDeviceType, EBSBlockDeviceType
-from boto import ec2
-
-
-class UsageError(Exception):
- pass
-
-
-# Configure and parse our command-line arguments
-def parse_args():
- parser = OptionParser(
- prog="spark-ec2",
- version="%prog {v}".format(v=SPARK_EC2_VERSION),
- usage="%prog [options] \n\n"
- + " can be: launch, destroy, login, stop, start, get-master, reboot-slaves")
-
- parser.add_option(
- "-s", "--slaves", type="int", default=1,
- help="Number of slaves to launch (default: %default)")
- parser.add_option(
- "-w", "--wait", type="int",
- help="DEPRECATED (no longer necessary) - Seconds to wait for nodes to start")
- parser.add_option(
- "-k", "--key-pair",
- help="Key pair to use on instances")
- parser.add_option(
- "-i", "--identity-file",
- help="SSH private key file to use for logging into instances")
- parser.add_option(
- "-p", "--profile", default=None,
- help="If you have multiple profiles (AWS or boto config), you can configure " +
- "additional, named profiles by using this option (default: %default)")
- parser.add_option(
- "-t", "--instance-type", default="m1.large",
- help="Type of instance to launch (default: %default). " +
- "WARNING: must be 64-bit; small instances won't work")
- parser.add_option(
- "-m", "--master-instance-type", default="",
- help="Master instance type (leave empty for same as instance-type)")
- parser.add_option(
- "-r", "--region", default="us-east-1",
- help="EC2 region used to launch instances in, or to find them in (default: %default)")
- parser.add_option(
- "-z", "--zone", default="",
- help="Availability zone to launch instances in, or 'all' to spread " +
- "slaves across multiple (an additional $0.01/Gb for bandwidth" +
- "between zones applies) (default: a single zone chosen at random)")
- parser.add_option(
- "-a", "--ami",
- help="Amazon Machine Image ID to use")
- parser.add_option(
- "-v", "--spark-version", default=DEFAULT_SPARK_VERSION,
- help="Version of Spark to use: 'X.Y.Z' or a specific git hash (default: %default)")
- parser.add_option(
- "--spark-git-repo",
- default=DEFAULT_SPARK_GITHUB_REPO,
- help="Github repo from which to checkout supplied commit hash (default: %default)")
- parser.add_option(
- "--spark-ec2-git-repo",
- default=DEFAULT_SPARK_EC2_GITHUB_REPO,
- help="Github repo from which to checkout spark-ec2 (default: %default)")
- parser.add_option(
- "--spark-ec2-git-branch",
- default=DEFAULT_SPARK_EC2_BRANCH,
- help="Github repo branch of spark-ec2 to use (default: %default)")
- parser.add_option(
- "--deploy-root-dir",
- default=None,
- help="A directory to copy into / on the first master. " +
- "Must be absolute. Note that a trailing slash is handled as per rsync: " +
- "If you omit it, the last directory of the --deploy-root-dir path will be created " +
- "in / before copying its contents. If you append the trailing slash, " +
- "the directory is not created and its contents are copied directly into /. " +
- "(default: %default).")
- parser.add_option(
- "--hadoop-major-version", default="1",
- help="Major version of Hadoop. Valid options are 1 (Hadoop 1.0.4), 2 (CDH 4.2.0), yarn " +
- "(Hadoop 2.4.0) (default: %default)")
- parser.add_option(
- "-D", metavar="[ADDRESS:]PORT", dest="proxy_port",
- help="Use SSH dynamic port forwarding to create a SOCKS proxy at " +
- "the given local address (for use with login)")
- parser.add_option(
- "--resume", action="store_true", default=False,
- help="Resume installation on a previously launched cluster " +
- "(for debugging)")
- parser.add_option(
- "--ebs-vol-size", metavar="SIZE", type="int", default=0,
- help="Size (in GB) of each EBS volume.")
- parser.add_option(
- "--ebs-vol-type", default="standard",
- help="EBS volume type (e.g. 'gp2', 'standard').")
- parser.add_option(
- "--ebs-vol-num", type="int", default=1,
- help="Number of EBS volumes to attach to each node as /vol[x]. " +
- "The volumes will be deleted when the instances terminate. " +
- "Only possible on EBS-backed AMIs. " +
- "EBS volumes are only attached if --ebs-vol-size > 0. " +
- "Only support up to 8 EBS volumes.")
- parser.add_option(
- "--placement-group", type="string", default=None,
- help="Which placement group to try and launch " +
- "instances into. Assumes placement group is already " +
- "created.")
- parser.add_option(
- "--swap", metavar="SWAP", type="int", default=1024,
- help="Swap space to set up per node, in MB (default: %default)")
- parser.add_option(
- "--spot-price", metavar="PRICE", type="float",
- help="If specified, launch slaves as spot instances with the given " +
- "maximum price (in dollars)")
- parser.add_option(
- "--ganglia", action="store_true", default=True,
- help="Setup Ganglia monitoring on cluster (default: %default). NOTE: " +
- "the Ganglia page will be publicly accessible")
- parser.add_option(
- "--no-ganglia", action="store_false", dest="ganglia",
- help="Disable Ganglia monitoring for the cluster")
- parser.add_option(
- "-u", "--user", default="root",
- help="The SSH user you want to connect as (default: %default)")
- parser.add_option(
- "--delete-groups", action="store_true", default=False,
- help="When destroying a cluster, delete the security groups that were created")
- parser.add_option(
- "--use-existing-master", action="store_true", default=False,
- help="Launch fresh slaves, but use an existing stopped master if possible")
- parser.add_option(
- "--worker-instances", type="int", default=1,
- help="Number of instances per worker: variable SPARK_WORKER_INSTANCES. Not used if YARN " +
- "is used as Hadoop major version (default: %default)")
- parser.add_option(
- "--master-opts", type="string", default="",
- help="Extra options to give to master through SPARK_MASTER_OPTS variable " +
- "(e.g -Dspark.worker.timeout=180)")
- parser.add_option(
- "--user-data", type="string", default="",
- help="Path to a user-data file (most AMIs interpret this as an initialization script)")
- parser.add_option(
- "--authorized-address", type="string", default="0.0.0.0/0",
- help="Address to authorize on created security groups (default: %default)")
- parser.add_option(
- "--additional-security-group", type="string", default="",
- help="Additional security group to place the machines in")
- parser.add_option(
- "--additional-tags", type="string", default="",
- help="Additional tags to set on the machines; tags are comma-separated, while name and " +
- "value are colon separated; ex: \"Task:MySparkProject,Env:production\"")
- parser.add_option(
- "--copy-aws-credentials", action="store_true", default=False,
- help="Add AWS credentials to hadoop configuration to allow Spark to access S3")
- parser.add_option(
- "--subnet-id", default=None,
- help="VPC subnet to launch instances in")
- parser.add_option(
- "--vpc-id", default=None,
- help="VPC to launch instances in")
- parser.add_option(
- "--private-ips", action="store_true", default=False,
- help="Use private IPs for instances rather than public if VPC/subnet " +
- "requires that.")
- parser.add_option(
- "--instance-initiated-shutdown-behavior", default="stop",
- choices=["stop", "terminate"],
- help="Whether instances should terminate when shut down or just stop")
- parser.add_option(
- "--instance-profile-name", default=None,
- help="IAM profile name to launch instances under")
-
- (opts, args) = parser.parse_args()
- if len(args) != 2:
- parser.print_help()
- sys.exit(1)
- (action, cluster_name) = args
-
- # Boto config check
- # http://boto.cloudhackers.com/en/latest/boto_config_tut.html
- home_dir = os.getenv('HOME')
- if home_dir is None or not os.path.isfile(home_dir + '/.boto'):
- if not os.path.isfile('/etc/boto.cfg'):
- # If there is no boto config, check aws credentials
- if not os.path.isfile(home_dir + '/.aws/credentials'):
- if os.getenv('AWS_ACCESS_KEY_ID') is None:
- print("ERROR: The environment variable AWS_ACCESS_KEY_ID must be set",
- file=stderr)
- sys.exit(1)
- if os.getenv('AWS_SECRET_ACCESS_KEY') is None:
- print("ERROR: The environment variable AWS_SECRET_ACCESS_KEY must be set",
- file=stderr)
- sys.exit(1)
- return (opts, action, cluster_name)
-
-
-# Get the EC2 security group of the given name, creating it if it doesn't exist
-def get_or_make_group(conn, name, vpc_id):
- groups = conn.get_all_security_groups()
- group = [g for g in groups if g.name == name]
- if len(group) > 0:
- return group[0]
- else:
- print("Creating security group " + name)
- return conn.create_security_group(name, "Spark EC2 group", vpc_id)
-
-
-def get_validate_spark_version(version, repo):
- if "." in version:
- version = version.replace("v", "")
- if version not in VALID_SPARK_VERSIONS:
- print("Don't know about Spark version: {v}".format(v=version), file=stderr)
- sys.exit(1)
- return version
- else:
- github_commit_url = "{repo}/commit/{commit_hash}".format(repo=repo, commit_hash=version)
- request = Request(github_commit_url)
- request.get_method = lambda: 'HEAD'
- try:
- response = urlopen(request)
- except HTTPError as e:
- print("Couldn't validate Spark commit: {url}".format(url=github_commit_url),
- file=stderr)
- print("Received HTTP response code of {code}.".format(code=e.code), file=stderr)
- sys.exit(1)
- return version
-
-
-# Source: http://aws.amazon.com/amazon-linux-ami/instance-type-matrix/
-# Last Updated: 2015-06-19
-# For easy maintainability, please keep this manually-inputted dictionary sorted by key.
-EC2_INSTANCE_TYPES = {
- "c1.medium": "pvm",
- "c1.xlarge": "pvm",
- "c3.large": "pvm",
- "c3.xlarge": "pvm",
- "c3.2xlarge": "pvm",
- "c3.4xlarge": "pvm",
- "c3.8xlarge": "pvm",
- "c4.large": "hvm",
- "c4.xlarge": "hvm",
- "c4.2xlarge": "hvm",
- "c4.4xlarge": "hvm",
- "c4.8xlarge": "hvm",
- "cc1.4xlarge": "hvm",
- "cc2.8xlarge": "hvm",
- "cg1.4xlarge": "hvm",
- "cr1.8xlarge": "hvm",
- "d2.xlarge": "hvm",
- "d2.2xlarge": "hvm",
- "d2.4xlarge": "hvm",
- "d2.8xlarge": "hvm",
- "g2.2xlarge": "hvm",
- "g2.8xlarge": "hvm",
- "hi1.4xlarge": "pvm",
- "hs1.8xlarge": "pvm",
- "i2.xlarge": "hvm",
- "i2.2xlarge": "hvm",
- "i2.4xlarge": "hvm",
- "i2.8xlarge": "hvm",
- "m1.small": "pvm",
- "m1.medium": "pvm",
- "m1.large": "pvm",
- "m1.xlarge": "pvm",
- "m2.xlarge": "pvm",
- "m2.2xlarge": "pvm",
- "m2.4xlarge": "pvm",
- "m3.medium": "hvm",
- "m3.large": "hvm",
- "m3.xlarge": "hvm",
- "m3.2xlarge": "hvm",
- "m4.large": "hvm",
- "m4.xlarge": "hvm",
- "m4.2xlarge": "hvm",
- "m4.4xlarge": "hvm",
- "m4.10xlarge": "hvm",
- "r3.large": "hvm",
- "r3.xlarge": "hvm",
- "r3.2xlarge": "hvm",
- "r3.4xlarge": "hvm",
- "r3.8xlarge": "hvm",
- "t1.micro": "pvm",
- "t2.micro": "hvm",
- "t2.small": "hvm",
- "t2.medium": "hvm",
- "t2.large": "hvm",
-}
-
-
-def get_tachyon_version(spark_version):
- return SPARK_TACHYON_MAP.get(spark_version, "")
-
-
-# Attempt to resolve an appropriate AMI given the architecture and region of the request.
-def get_spark_ami(opts):
- if opts.instance_type in EC2_INSTANCE_TYPES:
- instance_type = EC2_INSTANCE_TYPES[opts.instance_type]
- else:
- instance_type = "pvm"
- print("Don't recognize %s, assuming type is pvm" % opts.instance_type, file=stderr)
-
- # URL prefix from which to fetch AMI information
- ami_prefix = "{r}/{b}/ami-list".format(
- r=opts.spark_ec2_git_repo.replace("https://github.com", "https://raw.github.com", 1),
- b=opts.spark_ec2_git_branch)
-
- ami_path = "%s/%s/%s" % (ami_prefix, opts.region, instance_type)
- reader = codecs.getreader("ascii")
- try:
- ami = reader(urlopen(ami_path)).read().strip()
- except:
- print("Could not resolve AMI at: " + ami_path, file=stderr)
- sys.exit(1)
-
- print("Spark AMI: " + ami)
- return ami
-
-
-# Launch a cluster of the given name, by setting up its security groups,
-# and then starting new instances in them.
-# Returns a tuple of EC2 reservation objects for the master and slaves
-# Fails if there already instances running in the cluster's groups.
-def launch_cluster(conn, opts, cluster_name):
- if opts.identity_file is None:
- print("ERROR: Must provide an identity file (-i) for ssh connections.", file=stderr)
- sys.exit(1)
-
- if opts.key_pair is None:
- print("ERROR: Must provide a key pair name (-k) to use on instances.", file=stderr)
- sys.exit(1)
-
- user_data_content = None
- if opts.user_data:
- with open(opts.user_data) as user_data_file:
- user_data_content = user_data_file.read()
-
- print("Setting up security groups...")
- master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id)
- slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id)
- authorized_address = opts.authorized_address
- if master_group.rules == []: # Group was just now created
- if opts.vpc_id is None:
- master_group.authorize(src_group=master_group)
- master_group.authorize(src_group=slave_group)
- else:
- master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
- src_group=master_group)
- master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
- src_group=master_group)
- master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
- src_group=master_group)
- master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
- src_group=slave_group)
- master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
- src_group=slave_group)
- master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
- src_group=slave_group)
- master_group.authorize('tcp', 22, 22, authorized_address)
- master_group.authorize('tcp', 8080, 8081, authorized_address)
- master_group.authorize('tcp', 18080, 18080, authorized_address)
- master_group.authorize('tcp', 19999, 19999, authorized_address)
- master_group.authorize('tcp', 50030, 50030, authorized_address)
- master_group.authorize('tcp', 50070, 50070, authorized_address)
- master_group.authorize('tcp', 60070, 60070, authorized_address)
- master_group.authorize('tcp', 4040, 4045, authorized_address)
- # Rstudio (GUI for R) needs port 8787 for web access
- master_group.authorize('tcp', 8787, 8787, authorized_address)
- # HDFS NFS gateway requires 111,2049,4242 for tcp & udp
- master_group.authorize('tcp', 111, 111, authorized_address)
- master_group.authorize('udp', 111, 111, authorized_address)
- master_group.authorize('tcp', 2049, 2049, authorized_address)
- master_group.authorize('udp', 2049, 2049, authorized_address)
- master_group.authorize('tcp', 4242, 4242, authorized_address)
- master_group.authorize('udp', 4242, 4242, authorized_address)
- # RM in YARN mode uses 8088
- master_group.authorize('tcp', 8088, 8088, authorized_address)
- if opts.ganglia:
- master_group.authorize('tcp', 5080, 5080, authorized_address)
- if slave_group.rules == []: # Group was just now created
- if opts.vpc_id is None:
- slave_group.authorize(src_group=master_group)
- slave_group.authorize(src_group=slave_group)
- else:
- slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
- src_group=master_group)
- slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
- src_group=master_group)
- slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
- src_group=master_group)
- slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1,
- src_group=slave_group)
- slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535,
- src_group=slave_group)
- slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535,
- src_group=slave_group)
- slave_group.authorize('tcp', 22, 22, authorized_address)
- slave_group.authorize('tcp', 8080, 8081, authorized_address)
- slave_group.authorize('tcp', 50060, 50060, authorized_address)
- slave_group.authorize('tcp', 50075, 50075, authorized_address)
- slave_group.authorize('tcp', 60060, 60060, authorized_address)
- slave_group.authorize('tcp', 60075, 60075, authorized_address)
-
- # Check if instances are already running in our groups
- existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name,
- die_on_error=False)
- if existing_slaves or (existing_masters and not opts.use_existing_master):
- print("ERROR: There are already instances running in group %s or %s" %
- (master_group.name, slave_group.name), file=stderr)
- sys.exit(1)
-
- # Figure out Spark AMI
- if opts.ami is None:
- opts.ami = get_spark_ami(opts)
-
- # we use group ids to work around https://github.com/boto/boto/issues/350
- additional_group_ids = []
- if opts.additional_security_group:
- additional_group_ids = [sg.id
- for sg in conn.get_all_security_groups()
- if opts.additional_security_group in (sg.name, sg.id)]
- print("Launching instances...")
-
- try:
- image = conn.get_all_images(image_ids=[opts.ami])[0]
- except:
- print("Could not find AMI " + opts.ami, file=stderr)
- sys.exit(1)
-
- # Create block device mapping so that we can add EBS volumes if asked to.
- # The first drive is attached as /dev/sds, 2nd as /dev/sdt, ... /dev/sdz
- block_map = BlockDeviceMapping()
- if opts.ebs_vol_size > 0:
- for i in range(opts.ebs_vol_num):
- device = EBSBlockDeviceType()
- device.size = opts.ebs_vol_size
- device.volume_type = opts.ebs_vol_type
- device.delete_on_termination = True
- block_map["/dev/sd" + chr(ord('s') + i)] = device
-
- # AWS ignores the AMI-specified block device mapping for M3 (see SPARK-3342).
- if opts.instance_type.startswith('m3.'):
- for i in range(get_num_disks(opts.instance_type)):
- dev = BlockDeviceType()
- dev.ephemeral_name = 'ephemeral%d' % i
- # The first ephemeral drive is /dev/sdb.
- name = '/dev/sd' + string.ascii_letters[i + 1]
- block_map[name] = dev
-
- # Launch slaves
- if opts.spot_price is not None:
- # Launch spot instances with the requested price
- print("Requesting %d slaves as spot instances with price $%.3f" %
- (opts.slaves, opts.spot_price))
- zones = get_zones(conn, opts)
- num_zones = len(zones)
- i = 0
- my_req_ids = []
- for zone in zones:
- num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
- slave_reqs = conn.request_spot_instances(
- price=opts.spot_price,
- image_id=opts.ami,
- launch_group="launch-group-%s" % cluster_name,
- placement=zone,
- count=num_slaves_this_zone,
- key_name=opts.key_pair,
- security_group_ids=[slave_group.id] + additional_group_ids,
- instance_type=opts.instance_type,
- block_device_map=block_map,
- subnet_id=opts.subnet_id,
- placement_group=opts.placement_group,
- user_data=user_data_content,
- instance_profile_name=opts.instance_profile_name)
- my_req_ids += [req.id for req in slave_reqs]
- i += 1
-
- print("Waiting for spot instances to be granted...")
- try:
- while True:
- time.sleep(10)
- reqs = conn.get_all_spot_instance_requests()
- id_to_req = {}
- for r in reqs:
- id_to_req[r.id] = r
- active_instance_ids = []
- for i in my_req_ids:
- if i in id_to_req and id_to_req[i].state == "active":
- active_instance_ids.append(id_to_req[i].instance_id)
- if len(active_instance_ids) == opts.slaves:
- print("All %d slaves granted" % opts.slaves)
- reservations = conn.get_all_reservations(active_instance_ids)
- slave_nodes = []
- for r in reservations:
- slave_nodes += r.instances
- break
- else:
- print("%d of %d slaves granted, waiting longer" % (
- len(active_instance_ids), opts.slaves))
- except:
- print("Canceling spot instance requests")
- conn.cancel_spot_instance_requests(my_req_ids)
- # Log a warning if any of these requests actually launched instances:
- (master_nodes, slave_nodes) = get_existing_cluster(
- conn, opts, cluster_name, die_on_error=False)
- running = len(master_nodes) + len(slave_nodes)
- if running:
- print(("WARNING: %d instances are still running" % running), file=stderr)
- sys.exit(0)
- else:
- # Launch non-spot instances
- zones = get_zones(conn, opts)
- num_zones = len(zones)
- i = 0
- slave_nodes = []
- for zone in zones:
- num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
- if num_slaves_this_zone > 0:
- slave_res = image.run(
- key_name=opts.key_pair,
- security_group_ids=[slave_group.id] + additional_group_ids,
- instance_type=opts.instance_type,
- placement=zone,
- min_count=num_slaves_this_zone,
- max_count=num_slaves_this_zone,
- block_device_map=block_map,
- subnet_id=opts.subnet_id,
- placement_group=opts.placement_group,
- user_data=user_data_content,
- instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior,
- instance_profile_name=opts.instance_profile_name)
- slave_nodes += slave_res.instances
- print("Launched {s} slave{plural_s} in {z}, regid = {r}".format(
- s=num_slaves_this_zone,
- plural_s=('' if num_slaves_this_zone == 1 else 's'),
- z=zone,
- r=slave_res.id))
- i += 1
-
- # Launch or resume masters
- if existing_masters:
- print("Starting master...")
- for inst in existing_masters:
- if inst.state not in ["shutting-down", "terminated"]:
- inst.start()
- master_nodes = existing_masters
- else:
- master_type = opts.master_instance_type
- if master_type == "":
- master_type = opts.instance_type
- if opts.zone == 'all':
- opts.zone = random.choice(conn.get_all_zones()).name
- master_res = image.run(
- key_name=opts.key_pair,
- security_group_ids=[master_group.id] + additional_group_ids,
- instance_type=master_type,
- placement=opts.zone,
- min_count=1,
- max_count=1,
- block_device_map=block_map,
- subnet_id=opts.subnet_id,
- placement_group=opts.placement_group,
- user_data=user_data_content,
- instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior,
- instance_profile_name=opts.instance_profile_name)
-
- master_nodes = master_res.instances
- print("Launched master in %s, regid = %s" % (zone, master_res.id))
-
- # This wait time corresponds to SPARK-4983
- print("Waiting for AWS to propagate instance metadata...")
- time.sleep(15)
-
- # Give the instances descriptive names and set additional tags
- additional_tags = {}
- if opts.additional_tags.strip():
- additional_tags = dict(
- map(str.strip, tag.split(':', 1)) for tag in opts.additional_tags.split(',')
- )
-
- for master in master_nodes:
- master.add_tags(
- dict(additional_tags, Name='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id))
- )
-
- for slave in slave_nodes:
- slave.add_tags(
- dict(additional_tags, Name='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id))
- )
-
- # Return all the instances
- return (master_nodes, slave_nodes)
-
-
-def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
- """
- Get the EC2 instances in an existing cluster if available.
- Returns a tuple of lists of EC2 instance objects for the masters and slaves.
- """
- print("Searching for existing cluster {c} in region {r}...".format(
- c=cluster_name, r=opts.region))
-
- def get_instances(group_names):
- """
- Get all non-terminated instances that belong to any of the provided security groups.
-
- EC2 reservation filters and instance states are documented here:
- http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html#options
- """
- reservations = conn.get_all_reservations(
- filters={"instance.group-name": group_names})
- instances = itertools.chain.from_iterable(r.instances for r in reservations)
- return [i for i in instances if i.state not in ["shutting-down", "terminated"]]
-
- master_instances = get_instances([cluster_name + "-master"])
- slave_instances = get_instances([cluster_name + "-slaves"])
-
- if any((master_instances, slave_instances)):
- print("Found {m} master{plural_m}, {s} slave{plural_s}.".format(
- m=len(master_instances),
- plural_m=('' if len(master_instances) == 1 else 's'),
- s=len(slave_instances),
- plural_s=('' if len(slave_instances) == 1 else 's')))
-
- if not master_instances and die_on_error:
- print("ERROR: Could not find a master for cluster {c} in region {r}.".format(
- c=cluster_name, r=opts.region), file=sys.stderr)
- sys.exit(1)
-
- return (master_instances, slave_instances)
-
-
-# Deploy configuration files and run setup scripts on a newly launched
-# or started EC2 cluster.
-def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
- master = get_dns_name(master_nodes[0], opts.private_ips)
- if deploy_ssh_key:
- print("Generating cluster's SSH key on master...")
- key_setup = """
- [ -f ~/.ssh/id_rsa ] ||
- (ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa &&
- cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys)
- """
- ssh(master, opts, key_setup)
- dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh'])
- print("Transferring cluster's SSH key to slaves...")
- for slave in slave_nodes:
- slave_address = get_dns_name(slave, opts.private_ips)
- print(slave_address)
- ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar)
-
- modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs',
- 'mapreduce', 'spark-standalone', 'tachyon', 'rstudio']
-
- if opts.hadoop_major_version == "1":
- modules = list(filter(lambda x: x != "mapreduce", modules))
-
- if opts.ganglia:
- modules.append('ganglia')
-
- # Clear SPARK_WORKER_INSTANCES if running on YARN
- if opts.hadoop_major_version == "yarn":
- opts.worker_instances = ""
-
- # NOTE: We should clone the repository before running deploy_files to
- # prevent ec2-variables.sh from being overwritten
- print("Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format(
- r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch))
- ssh(
- host=master,
- opts=opts,
- command="rm -rf spark-ec2"
- + " && "
- + "git clone {r} -b {b} spark-ec2".format(r=opts.spark_ec2_git_repo,
- b=opts.spark_ec2_git_branch)
- )
-
- print("Deploying files to master...")
- deploy_files(
- conn=conn,
- root_dir=SPARK_EC2_DIR + "/" + "deploy.generic",
- opts=opts,
- master_nodes=master_nodes,
- slave_nodes=slave_nodes,
- modules=modules
- )
-
- if opts.deploy_root_dir is not None:
- print("Deploying {s} to master...".format(s=opts.deploy_root_dir))
- deploy_user_files(
- root_dir=opts.deploy_root_dir,
- opts=opts,
- master_nodes=master_nodes
- )
-
- print("Running setup on master...")
- setup_spark_cluster(master, opts)
- print("Done!")
-
-
-def setup_spark_cluster(master, opts):
- ssh(master, opts, "chmod u+x spark-ec2/setup.sh")
- ssh(master, opts, "spark-ec2/setup.sh")
- print("Spark standalone cluster started at http://%s:8080" % master)
-
- if opts.ganglia:
- print("Ganglia started at http://%s:5080/ganglia" % master)
-
-
-def is_ssh_available(host, opts, print_ssh_output=True):
- """
- Check if SSH is available on a host.
- """
- s = subprocess.Popen(
- ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3',
- '%s@%s' % (opts.user, host), stringify_command('true')],
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT # we pipe stderr through stdout to preserve output order
- )
- cmd_output = s.communicate()[0] # [1] is stderr, which we redirected to stdout
-
- if s.returncode != 0 and print_ssh_output:
- # extra leading newline is for spacing in wait_for_cluster_state()
- print(textwrap.dedent("""\n
- Warning: SSH connection error. (This could be temporary.)
- Host: {h}
- SSH return code: {r}
- SSH output: {o}
- """).format(
- h=host,
- r=s.returncode,
- o=cmd_output.strip()
- ))
-
- return s.returncode == 0
-
-
-def is_cluster_ssh_available(cluster_instances, opts):
- """
- Check if SSH is available on all the instances in a cluster.
- """
- for i in cluster_instances:
- dns_name = get_dns_name(i, opts.private_ips)
- if not is_ssh_available(host=dns_name, opts=opts):
- return False
- else:
- return True
-
-
-def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state):
- """
- Wait for all the instances in the cluster to reach a designated state.
-
- cluster_instances: a list of boto.ec2.instance.Instance
- cluster_state: a string representing the desired state of all the instances in the cluster
- value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as
- 'running', 'terminated', etc.
- (would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250)
- """
- sys.stdout.write(
- "Waiting for cluster to enter '{s}' state.".format(s=cluster_state)
- )
- sys.stdout.flush()
-
- start_time = datetime.now()
- num_attempts = 0
-
- while True:
- time.sleep(5 * num_attempts) # seconds
-
- for i in cluster_instances:
- i.update()
-
- max_batch = 100
- statuses = []
- for j in xrange(0, len(cluster_instances), max_batch):
- batch = [i.id for i in cluster_instances[j:j + max_batch]]
- statuses.extend(conn.get_all_instance_status(instance_ids=batch))
-
- if cluster_state == 'ssh-ready':
- if all(i.state == 'running' for i in cluster_instances) and \
- all(s.system_status.status == 'ok' for s in statuses) and \
- all(s.instance_status.status == 'ok' for s in statuses) and \
- is_cluster_ssh_available(cluster_instances, opts):
- break
- else:
- if all(i.state == cluster_state for i in cluster_instances):
- break
-
- num_attempts += 1
-
- sys.stdout.write(".")
- sys.stdout.flush()
-
- sys.stdout.write("\n")
-
- end_time = datetime.now()
- print("Cluster is now in '{s}' state. Waited {t} seconds.".format(
- s=cluster_state,
- t=(end_time - start_time).seconds
- ))
-
-
-# Get number of local disks available for a given EC2 instance type.
-def get_num_disks(instance_type):
- # Source: http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/InstanceStorage.html
- # Last Updated: 2015-06-19
- # For easy maintainability, please keep this manually-inputted dictionary sorted by key.
- disks_by_instance = {
- "c1.medium": 1,
- "c1.xlarge": 4,
- "c3.large": 2,
- "c3.xlarge": 2,
- "c3.2xlarge": 2,
- "c3.4xlarge": 2,
- "c3.8xlarge": 2,
- "c4.large": 0,
- "c4.xlarge": 0,
- "c4.2xlarge": 0,
- "c4.4xlarge": 0,
- "c4.8xlarge": 0,
- "cc1.4xlarge": 2,
- "cc2.8xlarge": 4,
- "cg1.4xlarge": 2,
- "cr1.8xlarge": 2,
- "d2.xlarge": 3,
- "d2.2xlarge": 6,
- "d2.4xlarge": 12,
- "d2.8xlarge": 24,
- "g2.2xlarge": 1,
- "g2.8xlarge": 2,
- "hi1.4xlarge": 2,
- "hs1.8xlarge": 24,
- "i2.xlarge": 1,
- "i2.2xlarge": 2,
- "i2.4xlarge": 4,
- "i2.8xlarge": 8,
- "m1.small": 1,
- "m1.medium": 1,
- "m1.large": 2,
- "m1.xlarge": 4,
- "m2.xlarge": 1,
- "m2.2xlarge": 1,
- "m2.4xlarge": 2,
- "m3.medium": 1,
- "m3.large": 1,
- "m3.xlarge": 2,
- "m3.2xlarge": 2,
- "m4.large": 0,
- "m4.xlarge": 0,
- "m4.2xlarge": 0,
- "m4.4xlarge": 0,
- "m4.10xlarge": 0,
- "r3.large": 1,
- "r3.xlarge": 1,
- "r3.2xlarge": 1,
- "r3.4xlarge": 1,
- "r3.8xlarge": 2,
- "t1.micro": 0,
- "t2.micro": 0,
- "t2.small": 0,
- "t2.medium": 0,
- "t2.large": 0,
- }
- if instance_type in disks_by_instance:
- return disks_by_instance[instance_type]
- else:
- print("WARNING: Don't know number of disks on instance type %s; assuming 1"
- % instance_type, file=stderr)
- return 1
-
-
-# Deploy the configuration file templates in a given local directory to
-# a cluster, filling in any template parameters with information about the
-# cluster (e.g. lists of masters and slaves). Files are only deployed to
-# the first master instance in the cluster, and we expect the setup
-# script to be run on that instance to copy them to other nodes.
-#
-# root_dir should be an absolute path to the directory with the files we want to deploy.
-def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
- active_master = get_dns_name(master_nodes[0], opts.private_ips)
-
- num_disks = get_num_disks(opts.instance_type)
- hdfs_data_dirs = "/mnt/ephemeral-hdfs/data"
- mapred_local_dirs = "/mnt/hadoop/mrlocal"
- spark_local_dirs = "/mnt/spark"
- if num_disks > 1:
- for i in range(2, num_disks + 1):
- hdfs_data_dirs += ",/mnt%d/ephemeral-hdfs/data" % i
- mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i
- spark_local_dirs += ",/mnt%d/spark" % i
-
- cluster_url = "%s:7077" % active_master
-
- if "." in opts.spark_version:
- # Pre-built Spark deploy
- spark_v = get_validate_spark_version(opts.spark_version, opts.spark_git_repo)
- tachyon_v = get_tachyon_version(spark_v)
- else:
- # Spark-only custom deploy
- spark_v = "%s|%s" % (opts.spark_git_repo, opts.spark_version)
- tachyon_v = ""
- print("Deploying Spark via git hash; Tachyon won't be set up")
- modules = filter(lambda x: x != "tachyon", modules)
-
- master_addresses = [get_dns_name(i, opts.private_ips) for i in master_nodes]
- slave_addresses = [get_dns_name(i, opts.private_ips) for i in slave_nodes]
- worker_instances_str = "%d" % opts.worker_instances if opts.worker_instances else ""
- template_vars = {
- "master_list": '\n'.join(master_addresses),
- "active_master": active_master,
- "slave_list": '\n'.join(slave_addresses),
- "cluster_url": cluster_url,
- "hdfs_data_dirs": hdfs_data_dirs,
- "mapred_local_dirs": mapred_local_dirs,
- "spark_local_dirs": spark_local_dirs,
- "swap": str(opts.swap),
- "modules": '\n'.join(modules),
- "spark_version": spark_v,
- "tachyon_version": tachyon_v,
- "hadoop_major_version": opts.hadoop_major_version,
- "spark_worker_instances": worker_instances_str,
- "spark_master_opts": opts.master_opts
- }
-
- if opts.copy_aws_credentials:
- template_vars["aws_access_key_id"] = conn.aws_access_key_id
- template_vars["aws_secret_access_key"] = conn.aws_secret_access_key
- else:
- template_vars["aws_access_key_id"] = ""
- template_vars["aws_secret_access_key"] = ""
-
- # Create a temp directory in which we will place all the files to be
- # deployed after we substitue template parameters in them
- tmp_dir = tempfile.mkdtemp()
- for path, dirs, files in os.walk(root_dir):
- if path.find(".svn") == -1:
- dest_dir = os.path.join('/', path[len(root_dir):])
- local_dir = tmp_dir + dest_dir
- if not os.path.exists(local_dir):
- os.makedirs(local_dir)
- for filename in files:
- if filename[0] not in '#.~' and filename[-1] != '~':
- dest_file = os.path.join(dest_dir, filename)
- local_file = tmp_dir + dest_file
- with open(os.path.join(path, filename)) as src:
- with open(local_file, "w") as dest:
- text = src.read()
- for key in template_vars:
- text = text.replace("{{" + key + "}}", template_vars[key])
- dest.write(text)
- dest.close()
- # rsync the whole directory over to the master machine
- command = [
- 'rsync', '-rv',
- '-e', stringify_command(ssh_command(opts)),
- "%s/" % tmp_dir,
- "%s@%s:/" % (opts.user, active_master)
- ]
- subprocess.check_call(command)
- # Remove the temp directory we created above
- shutil.rmtree(tmp_dir)
-
-
-# Deploy a given local directory to a cluster, WITHOUT parameter substitution.
-# Note that unlike deploy_files, this works for binary files.
-# Also, it is up to the user to add (or not) the trailing slash in root_dir.
-# Files are only deployed to the first master instance in the cluster.
-#
-# root_dir should be an absolute path.
-def deploy_user_files(root_dir, opts, master_nodes):
- active_master = get_dns_name(master_nodes[0], opts.private_ips)
- command = [
- 'rsync', '-rv',
- '-e', stringify_command(ssh_command(opts)),
- "%s" % root_dir,
- "%s@%s:/" % (opts.user, active_master)
- ]
- subprocess.check_call(command)
-
-
-def stringify_command(parts):
- if isinstance(parts, str):
- return parts
- else:
- return ' '.join(map(pipes.quote, parts))
-
-
-def ssh_args(opts):
- parts = ['-o', 'StrictHostKeyChecking=no']
- parts += ['-o', 'UserKnownHostsFile=/dev/null']
- if opts.identity_file is not None:
- parts += ['-i', opts.identity_file]
- return parts
-
-
-def ssh_command(opts):
- return ['ssh'] + ssh_args(opts)
-
-
-# Run a command on a host through ssh, retrying up to five times
-# and then throwing an exception if ssh continues to fail.
-def ssh(host, opts, command):
- tries = 0
- while True:
- try:
- return subprocess.check_call(
- ssh_command(opts) + ['-t', '-t', '%s@%s' % (opts.user, host),
- stringify_command(command)])
- except subprocess.CalledProcessError as e:
- if tries > 5:
- # If this was an ssh failure, provide the user with hints.
- if e.returncode == 255:
- raise UsageError(
- "Failed to SSH to remote host {0}.\n"
- "Please check that you have provided the correct --identity-file and "
- "--key-pair parameters and try again.".format(host))
- else:
- raise e
- print("Error executing remote command, retrying after 30 seconds: {0}".format(e),
- file=stderr)
- time.sleep(30)
- tries = tries + 1
-
-
-# Backported from Python 2.7 for compatiblity with 2.6 (See SPARK-1990)
-def _check_output(*popenargs, **kwargs):
- if 'stdout' in kwargs:
- raise ValueError('stdout argument not allowed, it will be overridden.')
- process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs)
- output, unused_err = process.communicate()
- retcode = process.poll()
- if retcode:
- cmd = kwargs.get("args")
- if cmd is None:
- cmd = popenargs[0]
- raise subprocess.CalledProcessError(retcode, cmd, output=output)
- return output
-
-
-def ssh_read(host, opts, command):
- return _check_output(
- ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)])
-
-
-def ssh_write(host, opts, command, arguments):
- tries = 0
- while True:
- proc = subprocess.Popen(
- ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)],
- stdin=subprocess.PIPE)
- proc.stdin.write(arguments)
- proc.stdin.close()
- status = proc.wait()
- if status == 0:
- break
- elif tries > 5:
- raise RuntimeError("ssh_write failed with error %s" % proc.returncode)
- else:
- print("Error {0} while executing remote command, retrying after 30 seconds".
- format(status), file=stderr)
- time.sleep(30)
- tries = tries + 1
-
-
-# Gets a list of zones to launch instances in
-def get_zones(conn, opts):
- if opts.zone == 'all':
- zones = [z.name for z in conn.get_all_zones()]
- else:
- zones = [opts.zone]
- return zones
-
-
-# Gets the number of items in a partition
-def get_partition(total, num_partitions, current_partitions):
- num_slaves_this_zone = total // num_partitions
- if (total % num_partitions) - current_partitions > 0:
- num_slaves_this_zone += 1
- return num_slaves_this_zone
-
-
-# Gets the IP address, taking into account the --private-ips flag
-def get_ip_address(instance, private_ips=False):
- ip = instance.ip_address if not private_ips else \
- instance.private_ip_address
- return ip
-
-
-# Gets the DNS name, taking into account the --private-ips flag
-def get_dns_name(instance, private_ips=False):
- dns = instance.public_dns_name if not private_ips else \
- instance.private_ip_address
- if not dns:
- raise UsageError("Failed to determine hostname of {0}.\n"
- "Please check that you provided --private-ips if "
- "necessary".format(instance))
- return dns
-
-
-def real_main():
- (opts, action, cluster_name) = parse_args()
-
- # Input parameter validation
- get_validate_spark_version(opts.spark_version, opts.spark_git_repo)
-
- if opts.wait is not None:
- # NOTE: DeprecationWarnings are silent in 2.7+ by default.
- # To show them, run Python with the -Wdefault switch.
- # See: https://docs.python.org/3.5/whatsnew/2.7.html
- warnings.warn(
- "This option is deprecated and has no effect. "
- "spark-ec2 automatically waits as long as necessary for clusters to start up.",
- DeprecationWarning
- )
-
- if opts.identity_file is not None:
- if not os.path.exists(opts.identity_file):
- print("ERROR: The identity file '{f}' doesn't exist.".format(f=opts.identity_file),
- file=stderr)
- sys.exit(1)
-
- file_mode = os.stat(opts.identity_file).st_mode
- if not (file_mode & S_IRUSR) or not oct(file_mode)[-2:] == '00':
- print("ERROR: The identity file must be accessible only by you.", file=stderr)
- print('You can fix this with: chmod 400 "{f}"'.format(f=opts.identity_file),
- file=stderr)
- sys.exit(1)
-
- if opts.instance_type not in EC2_INSTANCE_TYPES:
- print("Warning: Unrecognized EC2 instance type for instance-type: {t}".format(
- t=opts.instance_type), file=stderr)
-
- if opts.master_instance_type != "":
- if opts.master_instance_type not in EC2_INSTANCE_TYPES:
- print("Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format(
- t=opts.master_instance_type), file=stderr)
- # Since we try instance types even if we can't resolve them, we check if they resolve first
- # and, if they do, see if they resolve to the same virtualization type.
- if opts.instance_type in EC2_INSTANCE_TYPES and \
- opts.master_instance_type in EC2_INSTANCE_TYPES:
- if EC2_INSTANCE_TYPES[opts.instance_type] != \
- EC2_INSTANCE_TYPES[opts.master_instance_type]:
- print("Error: spark-ec2 currently does not support having a master and slaves "
- "with different AMI virtualization types.", file=stderr)
- print("master instance virtualization type: {t}".format(
- t=EC2_INSTANCE_TYPES[opts.master_instance_type]), file=stderr)
- print("slave instance virtualization type: {t}".format(
- t=EC2_INSTANCE_TYPES[opts.instance_type]), file=stderr)
- sys.exit(1)
-
- if opts.ebs_vol_num > 8:
- print("ebs-vol-num cannot be greater than 8", file=stderr)
- sys.exit(1)
-
- # Prevent breaking ami_prefix (/, .git and startswith checks)
- # Prevent forks with non spark-ec2 names for now.
- if opts.spark_ec2_git_repo.endswith("/") or \
- opts.spark_ec2_git_repo.endswith(".git") or \
- not opts.spark_ec2_git_repo.startswith("https://github.com") or \
- not opts.spark_ec2_git_repo.endswith("spark-ec2"):
- print("spark-ec2-git-repo must be a github repo and it must not have a trailing / or .git. "
- "Furthermore, we currently only support forks named spark-ec2.", file=stderr)
- sys.exit(1)
-
- if not (opts.deploy_root_dir is None or
- (os.path.isabs(opts.deploy_root_dir) and
- os.path.isdir(opts.deploy_root_dir) and
- os.path.exists(opts.deploy_root_dir))):
- print("--deploy-root-dir must be an absolute path to a directory that exists "
- "on the local file system", file=stderr)
- sys.exit(1)
-
- try:
- if opts.profile is None:
- conn = ec2.connect_to_region(opts.region)
- else:
- conn = ec2.connect_to_region(opts.region, profile_name=opts.profile)
- except Exception as e:
- print((e), file=stderr)
- sys.exit(1)
-
- # Select an AZ at random if it was not specified.
- if opts.zone == "":
- opts.zone = random.choice(conn.get_all_zones()).name
-
- if action == "launch":
- if opts.slaves <= 0:
- print("ERROR: You have to start at least 1 slave", file=sys.stderr)
- sys.exit(1)
- if opts.resume:
- (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
- else:
- (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name)
- wait_for_cluster_state(
- conn=conn,
- opts=opts,
- cluster_instances=(master_nodes + slave_nodes),
- cluster_state='ssh-ready'
- )
- setup_cluster(conn, master_nodes, slave_nodes, opts, True)
-
- elif action == "destroy":
- (master_nodes, slave_nodes) = get_existing_cluster(
- conn, opts, cluster_name, die_on_error=False)
-
- if any(master_nodes + slave_nodes):
- print("The following instances will be terminated:")
- for inst in master_nodes + slave_nodes:
- print("> %s" % get_dns_name(inst, opts.private_ips))
- print("ALL DATA ON ALL NODES WILL BE LOST!!")
-
- msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name)
- response = raw_input(msg)
- if response == "y":
- print("Terminating master...")
- for inst in master_nodes:
- inst.terminate()
- print("Terminating slaves...")
- for inst in slave_nodes:
- inst.terminate()
-
- # Delete security groups as well
- if opts.delete_groups:
- group_names = [cluster_name + "-master", cluster_name + "-slaves"]
- wait_for_cluster_state(
- conn=conn,
- opts=opts,
- cluster_instances=(master_nodes + slave_nodes),
- cluster_state='terminated'
- )
- print("Deleting security groups (this will take some time)...")
- attempt = 1
- while attempt <= 3:
- print("Attempt %d" % attempt)
- groups = [g for g in conn.get_all_security_groups() if g.name in group_names]
- success = True
- # Delete individual rules in all groups before deleting groups to
- # remove dependencies between them
- for group in groups:
- print("Deleting rules in security group " + group.name)
- for rule in group.rules:
- for grant in rule.grants:
- success &= group.revoke(ip_protocol=rule.ip_protocol,
- from_port=rule.from_port,
- to_port=rule.to_port,
- src_group=grant)
-
- # Sleep for AWS eventual-consistency to catch up, and for instances
- # to terminate
- time.sleep(30) # Yes, it does have to be this long :-(
- for group in groups:
- try:
- # It is needed to use group_id to make it work with VPC
- conn.delete_security_group(group_id=group.id)
- print("Deleted security group %s" % group.name)
- except boto.exception.EC2ResponseError:
- success = False
- print("Failed to delete security group %s" % group.name)
-
- # Unfortunately, group.revoke() returns True even if a rule was not
- # deleted, so this needs to be rerun if something fails
- if success:
- break
-
- attempt += 1
-
- if not success:
- print("Failed to delete all security groups after 3 tries.")
- print("Try re-running in a few minutes.")
-
- elif action == "login":
- (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
- if not master_nodes[0].public_dns_name and not opts.private_ips:
- print("Master has no public DNS name. Maybe you meant to specify --private-ips?")
- else:
- master = get_dns_name(master_nodes[0], opts.private_ips)
- print("Logging into master " + master + "...")
- proxy_opt = []
- if opts.proxy_port is not None:
- proxy_opt = ['-D', opts.proxy_port]
- subprocess.check_call(
- ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, master)])
-
- elif action == "reboot-slaves":
- response = raw_input(
- "Are you sure you want to reboot the cluster " +
- cluster_name + " slaves?\n" +
- "Reboot cluster slaves " + cluster_name + " (y/N): ")
- if response == "y":
- (master_nodes, slave_nodes) = get_existing_cluster(
- conn, opts, cluster_name, die_on_error=False)
- print("Rebooting slaves...")
- for inst in slave_nodes:
- if inst.state not in ["shutting-down", "terminated"]:
- print("Rebooting " + inst.id)
- inst.reboot()
-
- elif action == "get-master":
- (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
- if not master_nodes[0].public_dns_name and not opts.private_ips:
- print("Master has no public DNS name. Maybe you meant to specify --private-ips?")
- else:
- print(get_dns_name(master_nodes[0], opts.private_ips))
-
- elif action == "stop":
- response = raw_input(
- "Are you sure you want to stop the cluster " +
- cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " +
- "BUT THE CLUSTER WILL KEEP USING SPACE ON\n" +
- "AMAZON EBS IF IT IS EBS-BACKED!!\n" +
- "All data on spot-instance slaves will be lost.\n" +
- "Stop cluster " + cluster_name + " (y/N): ")
- if response == "y":
- (master_nodes, slave_nodes) = get_existing_cluster(
- conn, opts, cluster_name, die_on_error=False)
- print("Stopping master...")
- for inst in master_nodes:
- if inst.state not in ["shutting-down", "terminated"]:
- inst.stop()
- print("Stopping slaves...")
- for inst in slave_nodes:
- if inst.state not in ["shutting-down", "terminated"]:
- if inst.spot_instance_request_id:
- inst.terminate()
- else:
- inst.stop()
-
- elif action == "start":
- (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
- print("Starting slaves...")
- for inst in slave_nodes:
- if inst.state not in ["shutting-down", "terminated"]:
- inst.start()
- print("Starting master...")
- for inst in master_nodes:
- if inst.state not in ["shutting-down", "terminated"]:
- inst.start()
- wait_for_cluster_state(
- conn=conn,
- opts=opts,
- cluster_instances=(master_nodes + slave_nodes),
- cluster_state='ssh-ready'
- )
-
- # Determine types of running instances
- existing_master_type = master_nodes[0].instance_type
- existing_slave_type = slave_nodes[0].instance_type
- # Setting opts.master_instance_type to the empty string indicates we
- # have the same instance type for the master and the slaves
- if existing_master_type == existing_slave_type:
- existing_master_type = ""
- opts.master_instance_type = existing_master_type
- opts.instance_type = existing_slave_type
-
- setup_cluster(conn, master_nodes, slave_nodes, opts, False)
-
- else:
- print("Invalid action: %s" % action, file=stderr)
- sys.exit(1)
-
-
-def main():
- try:
- real_main()
- except UsageError as e:
- print("\nError:\n", e, file=stderr)
- sys.exit(1)
-
-
-if __name__ == "__main__":
- logging.basicConfig()
- main()
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
index 779fac01c4be0..3d8babba04a53 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
@@ -56,6 +56,7 @@ public static void main(String[] args) {
// Compute raw scores on the test set.
JavaRDD> predictionAndLabels = test.map(
new Function>() {
+ @Override
public Tuple2