Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into sql-partition-s…
Browse files Browse the repository at this point in the history
…peed-up

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
  • Loading branch information
Andrew Or committed May 20, 2015
2 parents 523f042 + b631bf7 commit 17e2943
Show file tree
Hide file tree
Showing 69 changed files with 2,468 additions and 609 deletions.
10 changes: 5 additions & 5 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -757,12 +757,12 @@ test_that("parquetFile works with multiple input paths", {
test_that("describe() on a DataFrame", {
df <- jsonFile(sqlCtx, jsonPath)
stats <- describe(df, "age")
expect_true(collect(stats)[1, "summary"] == "count")
expect_true(collect(stats)[2, "age"] == 24.5)
expect_true(collect(stats)[3, "age"] == 5.5)
expect_equal(collect(stats)[1, "summary"], "count")
expect_equal(collect(stats)[2, "age"], "24.5")
expect_equal(collect(stats)[3, "age"], "5.5")
stats <- describe(df)
expect_true(collect(stats)[4, "name"] == "Andy")
expect_true(collect(stats)[5, "age"] == 30.0)
expect_equal(collect(stats)[4, "name"], "Andy")
expect_equal(collect(stats)[5, "age"], "30")
})

unlink(parquetPath)
Expand Down
72 changes: 72 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,78 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

/**
* Creates a new RDD[Long] containing elements from `start` to `end`(exclusive), increased by
* `step` every element.
*
* @note if we need to cache this RDD, we should make sure each partition does not exceed limit.
*
* @param start the start value.
* @param end the end value.
* @param step the incremental step
* @param numSlices the partition number of the new RDD.
* @return
*/
def range(
start: Long,
end: Long,
step: Long = 1,
numSlices: Int = defaultParallelism): RDD[Long] = withScope {
assertNotStopped()
// when step is 0, range will run infinitely
require(step != 0, "step cannot be 0")
val numElements: BigInt = {
val safeStart = BigInt(start)
val safeEnd = BigInt(end)
if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) {
(safeEnd - safeStart) / step
} else {
// the remainder has the same sign with range, could add 1 more
(safeEnd - safeStart) / step + 1
}
}
parallelize(0 until numSlices, numSlices).mapPartitionsWithIndex((i, _) => {
val partitionStart = (i * numElements) / numSlices * step + start
val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
def getSafeMargin(bi: BigInt): Long =
if (bi.isValidLong) {
bi.toLong
} else if (bi > 0) {
Long.MaxValue
} else {
Long.MinValue
}
val safePartitionStart = getSafeMargin(partitionStart)
val safePartitionEnd = getSafeMargin(partitionEnd)

new Iterator[Long] {
private[this] var number: Long = safePartitionStart
private[this] var overflow: Boolean = false

override def hasNext =
if (!overflow) {
if (step > 0) {
number < safePartitionEnd
} else {
number > safePartitionEnd
}
} else false

override def next() = {
val ret = number
number += step
if (number < ret ^ step < 0) {
// we have Long.MaxValue + Long.MaxValue < Long.MaxValue
// and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a step
// back, we are pretty sure that we have an overflow.
overflow = true
}
ret
}
}
})
}

/** Distribute a local Scala collection to form an RDD.
*
* This method is identical to `parallelize`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,15 @@ private[spark] object PythonUtils {
/**
* Convert list of T into seq of T (for calling API with varargs)
*/
def toSeq[T](cols: JList[T]): Seq[T] = {
cols.toList.toSeq
def toSeq[T](vs: JList[T]): Seq[T] = {
vs.toList.toSeq
}

/**
* Convert list of T into array of T (for calling API with array)
*/
def toArray[T](vs: JList[T]): Array[T] = {
vs.toArray().asInstanceOf[Array[T]]
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,20 +252,6 @@ public void doNotNeedToCallWriteBeforeUnsuccessfulStop() throws IOException {
createWriter(false).stop(false);
}

@Test
public void writeEmptyIterator() throws Exception {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
writer.write(Collections.<Product2<Object, Object>>emptyIterator());
final Option<MapStatus> mapStatus = writer.stop(true);
assertTrue(mapStatus.isDefined());
assertTrue(mergedOutputFile.exists());
assertArrayEquals(new long[NUM_PARTITITONS], partitionSizesInMergedFile);
assertEquals(0, taskMetrics.shuffleWriteMetrics().get().shuffleRecordsWritten());
assertEquals(0, taskMetrics.shuffleWriteMetrics().get().shuffleBytesWritten());
assertEquals(0, taskMetrics.diskBytesSpilled());
assertEquals(0, taskMetrics.memoryBytesSpilled());
}

@Test
public void writeWithoutSpilling() throws Exception {
// In this example, each partition should have exactly one record:
Expand Down
172 changes: 172 additions & 0 deletions docs/ml-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,95 @@ for features_label in featurized.select("features", "label").take(3):
</div>
</div>

## Word2Vec

`Word2Vec` is an `Estimator` which takes sequences of words that represents documents and trains a `Word2VecModel`. The model is a `Map(String, Vector)` essentially, which maps each word to an unique fix-sized vector. The `Word2VecModel` transforms each documents into a vector using the average of all words in the document, which aims to other computations of documents such as similarity calculation consequencely. Please refer to the [MLlib user guide on Word2Vec](mllib-feature-extraction.html#Word2Vec) for more details on Word2Vec.

Word2Vec is implemented in [Word2Vec](api/scala/index.html#org.apache.spark.ml.feature.Word2Vec). In the following code segment, we start with a set of documents, each of them is represented as a sequence of words. For each document, we transform it into a feature vector. This feature vector could then be passed to a learning algorithm.

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
import org.apache.spark.ml.feature.Word2Vec

// Input data: Each row is a bag of words from a sentence or document.
val documentDF = sqlContext.createDataFrame(Seq(
"Hi I heard about Spark".split(" "),
"I wish Java could use case classes".split(" "),
"Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")

// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0)
val model = word2Vec.fit(documentDF)
val result = model.transform(documentDF)
result.select("result").take(3).foreach(println)
{% endhighlight %}
</div>

<div data-lang="java" markdown="1">
{% highlight java %}
import com.google.common.collect.Lists;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.*;

JavaSparkContext jsc = ...
SQLContext sqlContext = ...

// Input data: Each row is a bag of words from a sentence or document.
JavaRDD<Row> jrdd = jsc.parallelize(Lists.newArrayList(
RowFactory.create(Lists.newArrayList("Hi I heard about Spark".split(" "))),
RowFactory.create(Lists.newArrayList("I wish Java could use case classes".split(" "))),
RowFactory.create(Lists.newArrayList("Logistic regression models are neat".split(" ")))
));
StructType schema = new StructType(new StructField[]{
new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
DataFrame documentDF = sqlContext.createDataFrame(jrdd, schema);

// Learn a mapping from words to Vectors.
Word2Vec word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0);
Word2VecModel model = word2Vec.fit(documentDF);
DataFrame result = model.transform(documentDF);
for (Row r: result.select("result").take(3)) {
System.out.println(r);
}
{% endhighlight %}
</div>

<div data-lang="python" markdown="1">
{% highlight python %}
from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = sqlContext.createDataFrame([
("Hi I heard about Spark".split(" "), ),
("I wish Java could use case classes".split(" "), ),
("Logistic regression models are neat".split(" "), )
], ["text"])
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
for feature in result.select("result").take(3):
print(feature)
{% endhighlight %}
</div>
</div>

# Feature Transformers

Expand Down Expand Up @@ -268,5 +357,88 @@ for binarized_feature, in binarizedFeatures.collect():
</div>
</div>

## PolynomialExpansion

[Polynomial expansion](http://en.wikipedia.org/wiki/Polynomial_expansion) is the process of expanding your features into a polynomial space, which is formulated by an n-degree combination of original dimensions. A [PolynomialExpansion](api/scala/index.html#org.apache.spark.ml.feature.PolynomialExpansion) class provides this functionality. The example below shows how to expand your features into a 3-degree polynomial space.

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.mllib.linalg.Vectors

val data = Array(
Vectors.dense(-2.0, 2.3),
Vectors.dense(0.0, 0.0),
Vectors.dense(0.6, -1.1)
)
val df = sqlContext.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val polynomialExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3)
val polyDF = polynomialExpansion.transform(df)
polyDF.select("polyFeatures").take(3).foreach(println)
{% endhighlight %}
</div>

<div data-lang="java" markdown="1">
{% highlight java %}
import com.google.common.collect.Lists;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

JavaSparkContext jsc = ...
SQLContext jsql = ...
PolynomialExpansion polyExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3);
JavaRDD<Row> data = jsc.parallelize(Lists.newArrayList(
RowFactory.create(Vectors.dense(-2.0, 2.3)),
RowFactory.create(Vectors.dense(0.0, 0.0)),
RowFactory.create(Vectors.dense(0.6, -1.1))
));
StructType schema = new StructType(new StructField[] {
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
DataFrame df = jsql.createDataFrame(data, schema);
DataFrame polyDF = polyExpansion.transform(df);
Row[] row = polyDF.select("polyFeatures").take(3);
for (Row r : row) {
System.out.println(r.get(0));
}
{% endhighlight %}
</div>

<div data-lang="python" markdown="1">
{% highlight python %}
from pyspark.ml.feature import PolynomialExpansion
from pyspark.mllib.linalg import Vectors

df = sqlContext.createDataFrame(
[(Vectors.dense([-2.0, 2.3]), ),
(Vectors.dense([0.0, 0.0]), ),
(Vectors.dense([0.6, -1.1]), )],
["features"])
px = PolynomialExpansion(degree=2, inputCol="features", outputCol="polyFeatures")
polyDF = px.transform(df)
for expanded in polyDF.select("polyFeatures").take(3):
print(expanded)
{% endhighlight %}
</div>
</div>

# Feature Selectors

4 changes: 2 additions & 2 deletions docs/ml-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ model2.transform(test.toDF)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
println("($features, $label) -> prob=$prob, prediction=$prediction")
println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}

sc.stop()
Expand Down Expand Up @@ -391,7 +391,7 @@ model.transform(test.toDF)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println("($id, $text) --> prob=$prob, prediction=$prediction")
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}

sc.stop()
Expand Down
Loading

0 comments on commit 17e2943

Please sign in to comment.