Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into copyable-python
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed May 12, 2015
2 parents 66ce18c + 640f63b commit 02abf13
Show file tree
Hide file tree
Showing 36 changed files with 1,125 additions and 152 deletions.
7 changes: 4 additions & 3 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ setMethod("isLocal",
callJMethod(x@sdf, "isLocal")
})

#' ShowDF
#' showDF
#'
#' Print the first numRows rows of a DataFrame
#'
Expand All @@ -170,7 +170,8 @@ setMethod("isLocal",
setMethod("showDF",
signature(x = "DataFrame"),
function(x, numRows = 20) {
callJMethod(x@sdf, "showString", numToInt(numRows))
s <- callJMethod(x@sdf, "showString", numToInt(numRows))
cat(s)
})

#' show
Expand All @@ -187,7 +188,7 @@ setMethod("showDF",
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' show(df)
#' df
#'}
setMethod("show", "DataFrame",
function(object) {
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,8 @@ test_that("toJSON() returns an RDD of the correct values", {

test_that("showDF()", {
df <- jsonFile(sqlCtx, jsonPath)
expect_output(showDF(df), "+----+-------+\n| age| name|\n+----+-------+\n|null|Michael|\n| 30| Andy|\n| 19| Justin|\n+----+-------+\n")
s <- capture.output(showDF(df))
expect_output(s , "+----+-------+\n| age| name|\n+----+-------+\n|null|Michael|\n| 30| Andy|\n| 19| Justin|\n+----+-------+\n")
})

test_that("isLocal()", {
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ can be run using:
./dev/run-tests

Please see the guidance on how to
[run all automated tests](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-AutomatedTesting).
[run tests for a module, or individual tests](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools).

## A Note About Hadoop Versions

Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ private[spark] object JettyUtils extends Logging {
} catch {
case e: IllegalArgumentException =>
response.sendError(HttpServletResponse.SC_BAD_REQUEST, e.getMessage)
case e: Exception =>
logWarning(s"GET ${request.getRequestURI} failed: $e", e)
throw e
}
}
// SPARK-5983 ensure TRACE is not supported
Expand Down Expand Up @@ -217,6 +220,9 @@ private[spark] object JettyUtils extends Logging {
val pool = new QueuedThreadPool
pool.setDaemon(true)
server.setThreadPool(pool)
val errorHandler = new ErrorHandler()
errorHandler.setShowStacks(true)
server.addBean(errorHandler)
server.setHandler(collection)
try {
server.start()
Expand Down
75 changes: 70 additions & 5 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -367,11 +367,18 @@ val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split("
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)
{% endhighlight %}

</div>
Expand Down Expand Up @@ -470,7 +477,7 @@ parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.inferSchema(people)
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
Expand Down Expand Up @@ -538,7 +545,7 @@ peopleDataFrame.registerTempTable("people")
val results = sqlContext.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)
{% endhighlight %}

Expand Down Expand Up @@ -1594,6 +1601,64 @@ options.

# Migration Guide

## Upgrading from Spark SQL 1.3 to 1.4

Based on user feedback, we changed the default behavior of `DataFrame.groupBy().agg()` to retain the grouping columns in the resulting `DataFrame`. To keep the behavior in 1.3, set `spark.sql.retainGroupColumns` to `false`.

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}

// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg($"department", max("age"), sum("expense"))

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"))

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")

{% endhighlight %}
</div>

<div data-lang="java" markdown="1">
{% highlight java %}

// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(col("department"), max("age"), sum("expense"));

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"));

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false");

{% endhighlight %}
</div>

<div data-lang="python" markdown="1">
{% highlight python %}

import pyspark.sql.functions as func

# In 1.3.x, in order for the grouping column "department" to show up,
# it must be included explicitly as part of the agg function call.
df.groupBy("department").agg("department"), func.max("age"), func.sum("expense"))

# In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(func.max("age"), func.sum("expense"))

# Revert to 1.3.x behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")

{% endhighlight %}
</div>

</div>


## Upgrading from Spark SQL 1.0-1.2 to 1.3

In Spark 1.3 we removed the "Alpha" label from Spark SQL and as part of this did a cleanup of the
Expand Down Expand Up @@ -1651,7 +1716,7 @@ moved into the udf object in `SQLContext`.

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight java %}
{% highlight scala %}

sqlContext.udf.register("strLen", (s: String) => s.length())

Expand Down
2 changes: 1 addition & 1 deletion docs/submitting-applications.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ for applications that involve the REPL (e.g. Spark shell).
Alternatively, if your application is submitted from a machine far from the worker machines (e.g.
locally on your laptop), it is common to use `cluster` mode to minimize network latency between
the drivers and the executors. Note that `cluster` mode is currently not supported for
Mesos clusters or Python applications.
Mesos clusters. Currently only YARN supports cluster mode for Python applications.

For Python applications, simply pass a `.py` file in the place of `<application-jar>` instead of a JAR,
and add Python `.zip`, `.egg` or `.py` files to the search path with `--py-files`.
Expand Down
131 changes: 131 additions & 0 deletions mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.feature

import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.attribute.NominalAttribute
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
import org.apache.spark.ml.util.SchemaUtils
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}

/**
* :: AlphaComponent ::
* `Bucketizer` maps a column of continuous features to a column of feature buckets.
*/
@AlphaComponent
final class Bucketizer private[ml] (override val parent: Estimator[Bucketizer])
extends Model[Bucketizer] with HasInputCol with HasOutputCol {

def this() = this(null)

/**
* Parameter for mapping continuous features into buckets. With n splits, there are n+1 buckets.
* A bucket defined by splits x,y holds values in the range [x,y). Splits should be strictly
* increasing. Values at -inf, inf must be explicitly provided to cover all Double values;
* otherwise, values outside the splits specified will be treated as errors.
* @group param
*/
val splits: Param[Array[Double]] = new Param[Array[Double]](this, "splits",
"Split points for mapping continuous features into buckets. With n splits, there are n+1 " +
"buckets. A bucket defined by splits x,y holds values in the range [x,y). The splits " +
"should be strictly increasing. Values at -inf, inf must be explicitly provided to cover" +
" all Double values; otherwise, values outside the splits specified will be treated as" +
" errors.",
Bucketizer.checkSplits)

/** @group getParam */
def getSplits: Array[Double] = $(splits)

/** @group setParam */
def setSplits(value: Array[Double]): this.type = set(splits, value)

/** @group setParam */
def setInputCol(value: String): this.type = set(inputCol, value)

/** @group setParam */
def setOutputCol(value: String): this.type = set(outputCol, value)

override def transform(dataset: DataFrame): DataFrame = {
transformSchema(dataset.schema)
val bucketizer = udf { feature: Double =>
Bucketizer.binarySearchForBuckets($(splits), feature)
}
val newCol = bucketizer(dataset($(inputCol)))
val newField = prepOutputField(dataset.schema)
dataset.withColumn($(outputCol), newCol.as($(outputCol), newField.metadata))
}

private def prepOutputField(schema: StructType): StructField = {
val buckets = $(splits).sliding(2).map(bucket => bucket.mkString(", ")).toArray
val attr = new NominalAttribute(name = Some($(outputCol)), isOrdinal = Some(true),
values = Some(buckets))
attr.toStructField()
}

override def transformSchema(schema: StructType): StructType = {
SchemaUtils.checkColumnType(schema, $(inputCol), DoubleType)
SchemaUtils.appendColumn(schema, prepOutputField(schema))
}
}

private[feature] object Bucketizer {
/** We require splits to be of length >= 3 and to be in strictly increasing order. */
def checkSplits(splits: Array[Double]): Boolean = {
if (splits.length < 3) {
false
} else {
var i = 0
while (i < splits.length - 1) {
if (splits(i) >= splits(i + 1)) return false
i += 1
}
true
}
}

/**
* Binary searching in several buckets to place each data point.
* @throws RuntimeException if a feature is < splits.head or >= splits.last
*/
def binarySearchForBuckets(
splits: Array[Double],
feature: Double): Double = {
// Check bounds. We make an exception for +inf so that it can exist in some bin.
if ((feature < splits.head) || (feature >= splits.last && feature != Double.PositiveInfinity)) {
throw new RuntimeException(s"Feature value $feature out of Bucketizer bounds" +
s" [${splits.head}, ${splits.last}). Check your features, or loosen " +
s"the lower/upper bound constraints.")
}
var left = 0
var right = splits.length - 2
while (left < right) {
val mid = (left + right) / 2
val split = splits(mid + 1)
if (feature < split) {
right = mid
} else {
left = mid + 1
}
}
left
}
}
11 changes: 11 additions & 0 deletions mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,15 @@ object SchemaUtils {
val outputFields = schema.fields :+ StructField(colName, dataType, nullable = false)
StructType(outputFields)
}

/**
* Appends a new column to the input schema. This fails if the given output column already exists.
* @param schema input schema
* @param col New column schema
* @return new schema with the input column appended
*/
def appendColumn(schema: StructType, col: StructField): StructType = {
require(!schema.fieldNames.contains(col.name), s"Column ${col.name} already exists.")
StructType(schema.fields :+ col)
}
}
Loading

0 comments on commit 02abf13

Please sign in to comment.