diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 88fed833f922d..bf4b24e98b134 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -62,17 +62,22 @@ private[spark] object JettyUtils extends Logging {
securityMgr: SecurityManager): HttpServlet = {
new HttpServlet {
override def doGet(request: HttpServletRequest, response: HttpServletResponse) {
- if (securityMgr.checkUIViewPermissions(request.getRemoteUser)) {
- response.setContentType("%s;charset=utf-8".format(servletParams.contentType))
- response.setStatus(HttpServletResponse.SC_OK)
- val result = servletParams.responder(request)
- response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
- response.getWriter.println(servletParams.extractFn(result))
- } else {
- response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
- response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
- response.sendError(HttpServletResponse.SC_UNAUTHORIZED,
- "User is not authorized to access this page.")
+ try {
+ if (securityMgr.checkUIViewPermissions(request.getRemoteUser)) {
+ response.setContentType("%s;charset=utf-8".format(servletParams.contentType))
+ response.setStatus(HttpServletResponse.SC_OK)
+ val result = servletParams.responder(request)
+ response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
+ response.getWriter.println(servletParams.extractFn(result))
+ } else {
+ response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
+ response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
+ response.sendError(HttpServletResponse.SC_UNAUTHORIZED,
+ "User is not authorized to access this page.")
+ }
+ } catch {
+ case e: IllegalArgumentException =>
+ response.sendError(HttpServletResponse.SC_BAD_REQUEST, e.getMessage)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
index c82730f524eb7..f0ae95bb8c812 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
@@ -43,7 +43,7 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage
}
id
}.getOrElse {
- return Text(s"Missing executorId parameter")
+ throw new IllegalArgumentException(s"Missing executorId parameter")
}
val time = System.currentTimeMillis()
val maybeThreadDump = sc.get.getExecutorThreadDump(executorId)
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 77d36209c6048..7541d3e9c72e7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -32,7 +32,10 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
- val jobId = request.getParameter("id").toInt
+ val parameterId = request.getParameter("id")
+ require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
+
+ val jobId = parameterId.toInt
val jobDataOption = listener.jobIdToData.get(jobId)
if (jobDataOption.isEmpty) {
val content =
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index 5fc6cc7533150..f47cdc935e539 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -32,6 +32,8 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val poolName = request.getParameter("poolname")
+ require(poolName != null && poolName.nonEmpty, "Missing poolname parameter")
+
val poolToActiveStages = listener.poolToActiveStages
val activeStages = poolToActiveStages.get(poolName) match {
case Some(s) => s.values.toSeq
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 02a3cc3e43c25..05ffd5bc58fbb 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -36,8 +36,14 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
- val stageId = request.getParameter("id").toInt
- val stageAttemptId = request.getParameter("attempt").toInt
+ val parameterId = request.getParameter("id")
+ require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
+
+ val parameterAttempt = request.getParameter("attempt")
+ require(parameterAttempt != null && parameterAttempt.nonEmpty, "Missing attempt parameter")
+
+ val stageId = parameterId.toInt
+ val stageAttemptId = parameterAttempt.toInt
val stageDataOption = listener.stageIdToData.get((stageId, stageAttemptId))
if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) {
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 12d23a92878cf..199f731b92bcc 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -30,7 +30,10 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
- val rddId = request.getParameter("id").toInt
+ val parameterId = request.getParameter("id")
+ require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
+
+ val rddId = parameterId.toInt
val storageStatusList = listener.storageStatusList
val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse {
// Rather than crashing, render an "RDD Not Found" page
diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md
index 7779fbc9c49e4..3d32d03e35c62 100644
--- a/docs/mllib-guide.md
+++ b/docs/mllib-guide.md
@@ -56,25 +56,32 @@ See the **[spark.ml programming guide](ml-guide.html)** for more information on
# Dependencies
-MLlib uses the linear algebra package [Breeze](http://www.scalanlp.org/),
-which depends on [netlib-java](https://github.com/fommil/netlib-java),
-and [jblas](https://github.com/mikiobraun/jblas).
-`netlib-java` and `jblas` depend on native Fortran routines.
-You need to install the
+MLlib uses the linear algebra package
+[Breeze](http://www.scalanlp.org/), which depends on
+[netlib-java](https://github.com/fommil/netlib-java) for optimised
+numerical processing. If natives are not available at runtime, you
+will see a warning message and a pure JVM implementation will be used
+instead.
+
+To learn more about the benefits and background of system optimised
+natives, you may wish to watch Sam Halliday's ScalaX talk on
+[High Performance Linear Algebra in Scala](http://fommil.github.io/scalax14/#/)).
+
+Due to licensing issues with runtime proprietary binaries, we do not
+include `netlib-java`'s native proxies by default. To configure
+`netlib-java` / Breeze to use system optimised binaries, include
+`com.github.fommil.netlib:all:1.1.2` (or build Spark with
+`-Pnetlib-lgpl`) as a dependency of your project and read the
+[netlib-java](https://github.com/fommil/netlib-java) documentation for
+your platform's additional installation instructions.
+
+MLlib also uses [jblas](https://github.com/mikiobraun/jblas) which
+will require you to install the
[gfortran runtime library](https://github.com/mikiobraun/jblas/wiki/Missing-Libraries)
if it is not already present on your nodes.
-MLlib will throw a linking error if it cannot detect these libraries automatically.
-Due to license issues, we do not include `netlib-java`'s native libraries in MLlib's
-dependency set under default settings.
-If no native library is available at runtime, you will see a warning message.
-To use native libraries from `netlib-java`, please build Spark with `-Pnetlib-lgpl` or
-include `com.github.fommil.netlib:all:1.1.2` as a dependency of your project.
-If you want to use optimized BLAS/LAPACK libraries such as
-[OpenBLAS](http://www.openblas.net/), please link its shared libraries to
-`/usr/lib/libblas.so.3` and `/usr/lib/liblapack.so.3`, respectively.
-BLAS/LAPACK libraries on worker nodes should be built without multithreading.
-
-To use MLlib in Python, you will need [NumPy](http://www.numpy.org) version 1.4 or newer.
+
+To use MLlib in Python, you will need [NumPy](http://www.numpy.org)
+version 1.4 or newer.
---
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 3f7242a53d6fd..725b1e47e0cea 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -24,10 +24,12 @@
import hashlib
import logging
import os
+import os.path
import pipes
import random
import shutil
import string
+from stat import S_IRUSR
import subprocess
import sys
import tarfile
@@ -349,6 +351,7 @@ def launch_cluster(conn, opts, cluster_name):
if opts.identity_file is None:
print >> stderr, "ERROR: Must provide an identity file (-i) for ssh connections."
sys.exit(1)
+
if opts.key_pair is None:
print >> stderr, "ERROR: Must provide a key pair name (-k) to use on instances."
sys.exit(1)
@@ -1007,6 +1010,18 @@ def real_main():
DeprecationWarning
)
+ if opts.identity_file is not None:
+ if not os.path.exists(opts.identity_file):
+ print >> stderr,\
+ "ERROR: The identity file '{f}' doesn't exist.".format(f=opts.identity_file)
+ sys.exit(1)
+
+ file_mode = os.stat(opts.identity_file).st_mode
+ if not (file_mode & S_IRUSR) or not oct(file_mode)[-2:] == '00':
+ print >> stderr, "ERROR: The identity file must be accessible only by you."
+ print >> stderr, 'You can fix this with: chmod 400 "{f}"'.format(f=opts.identity_file)
+ sys.exit(1)
+
if opts.ebs_vol_num > 8:
print >> stderr, "ebs-vol-num cannot be greater than 8"
sys.exit(1)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala
index 3515461b52493..9d6f97528148e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala
@@ -79,6 +79,9 @@ private[mllib] object EigenValueDecomposition {
// Mode 1: A*x = lambda*x, A symmetric
iparam(6) = 1
+ require(n * ncv.toLong <= Integer.MAX_VALUE && ncv * (ncv.toLong + 8) <= Integer.MAX_VALUE,
+ s"k = $k and/or n = $n are too large to compute an eigendecomposition")
+
var ido = new intW(0)
var info = new intW(0)
var resid = new Array[Double](n)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 4bb28d1b1e071..caacab943030b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -18,7 +18,7 @@
package org.apache.spark.mllib.recommendation
import org.apache.spark.Logging
-import org.apache.spark.annotation.{DeveloperApi, Experimental}
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.ml.recommendation.{ALS => NewALS}
import org.apache.spark.rdd.RDD
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index ed2f8b41bcae5..9ff06ac362a31 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -17,13 +17,17 @@
package org.apache.spark.mllib.recommendation
+import java.io.IOException
import java.lang.{Integer => JavaInteger}
+import org.apache.hadoop.fs.Path
import org.jblas.DoubleMatrix
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
+import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.storage.StorageLevel
/**
@@ -41,7 +45,8 @@ import org.apache.spark.storage.StorageLevel
class MatrixFactorizationModel(
val rank: Int,
val userFeatures: RDD[(Int, Array[Double])],
- val productFeatures: RDD[(Int, Array[Double])]) extends Serializable with Logging {
+ val productFeatures: RDD[(Int, Array[Double])])
+ extends Saveable with Serializable with Logging {
require(rank > 0)
validateFeatures("User", userFeatures)
@@ -125,6 +130,12 @@ class MatrixFactorizationModel(
recommend(productFeatures.lookup(product).head, userFeatures, num)
.map(t => Rating(t._1, product, t._2))
+ protected override val formatVersion: String = "1.0"
+
+ override def save(sc: SparkContext, path: String): Unit = {
+ MatrixFactorizationModel.SaveLoadV1_0.save(this, path)
+ }
+
private def recommend(
recommendToFeatures: Array[Double],
recommendableFeatures: RDD[(Int, Array[Double])],
@@ -136,3 +147,70 @@ class MatrixFactorizationModel(
scored.top(num)(Ordering.by(_._2))
}
}
+
+object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
+
+ import org.apache.spark.mllib.util.Loader._
+
+ override def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
+ val (loadedClassName, formatVersion, metadata) = loadMetadata(sc, path)
+ val classNameV1_0 = SaveLoadV1_0.thisClassName
+ (loadedClassName, formatVersion) match {
+ case (className, "1.0") if className == classNameV1_0 =>
+ SaveLoadV1_0.load(sc, path)
+ case _ =>
+ throw new IOException("MatrixFactorizationModel.load did not recognize model with" +
+ s"(class: $loadedClassName, version: $formatVersion). Supported:\n" +
+ s" ($classNameV1_0, 1.0)")
+ }
+ }
+
+ private[recommendation]
+ object SaveLoadV1_0 {
+
+ private val thisFormatVersion = "1.0"
+
+ private[recommendation]
+ val thisClassName = "org.apache.spark.mllib.recommendation.MatrixFactorizationModel"
+
+ /**
+ * Saves a [[MatrixFactorizationModel]], where user features are saved under `data/users` and
+ * product features are saved under `data/products`.
+ */
+ def save(model: MatrixFactorizationModel, path: String): Unit = {
+ val sc = model.userFeatures.sparkContext
+ val sqlContext = new SQLContext(sc)
+ import sqlContext.implicits.createDataFrame
+ val metadata = (thisClassName, thisFormatVersion, model.rank)
+ val metadataRDD = sc.parallelize(Seq(metadata), 1).toDataFrame("class", "version", "rank")
+ metadataRDD.toJSON.saveAsTextFile(metadataPath(path))
+ model.userFeatures.toDataFrame("id", "features").saveAsParquetFile(userPath(path))
+ model.productFeatures.toDataFrame("id", "features").saveAsParquetFile(productPath(path))
+ }
+
+ def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
+ val sqlContext = new SQLContext(sc)
+ val (className, formatVersion, metadata) = loadMetadata(sc, path)
+ assert(className == thisClassName)
+ assert(formatVersion == thisFormatVersion)
+ val rank = metadata.select("rank").first().getInt(0)
+ val userFeatures = sqlContext.parquetFile(userPath(path))
+ .map { case Row(id: Int, features: Seq[Double]) =>
+ (id, features.toArray)
+ }
+ val productFeatures = sqlContext.parquetFile(productPath(path))
+ .map { case Row(id: Int, features: Seq[Double]) =>
+ (id, features.toArray)
+ }
+ new MatrixFactorizationModel(rank, userFeatures, productFeatures)
+ }
+
+ private def userPath(path: String): String = {
+ new Path(dataPath(path), "user").toUri.toString
+ }
+
+ private def productPath(path: String): String = {
+ new Path(dataPath(path), "product").toUri.toString
+ }
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala
index b9caecc904a23..9801e87576744 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.FunSuite
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
class MatrixFactorizationModelSuite extends FunSuite with MLlibTestSparkContext {
@@ -53,4 +54,22 @@ class MatrixFactorizationModelSuite extends FunSuite with MLlibTestSparkContext
new MatrixFactorizationModel(rank, userFeatures, prodFeatures1)
}
}
+
+ test("save/load") {
+ val model = new MatrixFactorizationModel(rank, userFeatures, prodFeatures)
+ val tempDir = Utils.createTempDir()
+ val path = tempDir.toURI.toString
+ def collect(features: RDD[(Int, Array[Double])]): Set[(Int, Seq[Double])] = {
+ features.mapValues(_.toSeq).collect().toSet
+ }
+ try {
+ model.save(sc, path)
+ val newModel = MatrixFactorizationModel.load(sc, path)
+ assert(newModel.rank === rank)
+ assert(collect(newModel.userFeatures) === collect(userFeatures))
+ assert(collect(newModel.productFeatures) === collect(prodFeatures))
+ } finally {
+ Utils.deleteRecursively(tempDir)
+ }
+ }
}
diff --git a/pom.xml b/pom.xml
index e0c796b186578..f6f176d2004b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -975,6 +975,10 @@
com.esotericsoftware.kryo
kryo
+
+ org.apache.avro
+ avro-mapred
+
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index ad37b7d0e6f59..2c00659496972 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -424,6 +424,11 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
/** Extends QueryExecution with hive specific features. */
protected[sql] class QueryExecution(logicalPlan: LogicalPlan)
extends super.QueryExecution(logicalPlan) {
+ // Like what we do in runHive, makes sure the session represented by the
+ // `sessionState` field is activated.
+ if (SessionState.get() != sessionState) {
+ SessionState.start(sessionState)
+ }
/**
* Returns the result as a hive compatible sequence of strings. For native commands, the