Skip to content

Commit

Permalink
[SPARK-5097][SQL] DataFrame
Browse files Browse the repository at this point in the history
This pull request redesigns the existing Spark SQL dsl, which already provides data frame like functionalities.

TODOs:
With the exception of Python support, other tasks can be done in separate, follow-up PRs.
- [ ] Audit of the API
- [ ] Documentation
- [ ] More test cases to cover the new API
- [x] Python support
- [ ] Type alias SchemaRDD

Author: Reynold Xin <[email protected]>
Author: Davies Liu <[email protected]>

Closes apache#4173 from rxin/df1 and squashes the following commits:

0a1a73b [Reynold Xin] Merge branch 'df1' of github.com:rxin/spark into df1
23b4427 [Reynold Xin] Mima.
828f70d [Reynold Xin] Merge pull request #7 from davies/df
257b9e6 [Davies Liu] add repartition
6bf2b73 [Davies Liu] fix collect with UDT and tests
e971078 [Reynold Xin] Missing quotes.
b9306b4 [Reynold Xin] Remove removeColumn/updateColumn for now.
a728bf2 [Reynold Xin] Example rename.
e8aa3d3 [Reynold Xin] groupby -> groupBy.
9662c9e [Davies Liu] improve DataFrame Python API
4ae51ea [Davies Liu] python API for dataframe
1e5e454 [Reynold Xin] Fixed a bug with symbol conversion.
2ca74db [Reynold Xin] Couple minor fixes.
ea98ea1 [Reynold Xin] Documentation & literal expressions.
2b22684 [Reynold Xin] Got rid of IntelliJ problems.
02bbfbc [Reynold Xin] Tightening imports.
ffbce66 [Reynold Xin] Fixed compilation error.
59b6d8b [Reynold Xin] Style violation.
b85edfb [Reynold Xin] ALS.
8c37f0a [Reynold Xin] Made MLlib and examples compile
6d53134 [Reynold Xin] Hive module.
d35efd5 [Reynold Xin] Fixed compilation error.
ce4a5d2 [Reynold Xin] Fixed test cases in SQL except ParquetIOSuite.
66d5ef1 [Reynold Xin] SQLContext minor patch.
c9bcdc0 [Reynold Xin] Checkpoint: SQL module compiles!
  • Loading branch information
rxin committed Jan 28, 2015
1 parent b1b35ca commit 119f45d
Show file tree
Hide file tree
Showing 82 changed files with 3,444 additions and 1,572 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;

Expand Down Expand Up @@ -71,7 +71,7 @@ public static void main(String[] args) {
new LabeledDocument(9L, "a e c l", 0.0),
new LabeledDocument(10L, "spark compile", 1.0),
new LabeledDocument(11L, "hadoop software", 0.0));
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
Expand Down Expand Up @@ -112,11 +112,11 @@ public static void main(String[] args) {
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), Document.class);

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test).registerAsTable("prediction");
SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
cvModel.transform(test).registerTempTable("prediction");
DataFrame predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
for (Row r: predictions.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
+ ", prediction=" + r.get(3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;

Expand All @@ -48,13 +48,13 @@ public static void main(String[] args) {

// Prepare training data.
// We use LabeledPoint, which is a JavaBean. Spark SQL can convert RDDs of JavaBeans
// into SchemaRDDs, where it uses the bean metadata to infer the schema.
// into DataFrames, where it uses the bean metadata to infer the schema.
List<LabeledPoint> localTraining = Lists.newArrayList(
new LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)));
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);

// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
Expand Down Expand Up @@ -94,14 +94,14 @@ public static void main(String[] args) {
new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)));
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);

// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'probability' column instead of the usual 'score'
// column since we renamed the lr.scoreCol parameter previously.
model2.transform(test).registerAsTable("results");
SchemaRDD results =
model2.transform(test).registerTempTable("results");
DataFrame results =
jsql.sql("SELECT features, label, probability, prediction FROM results");
for (Row r: results.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;

Expand All @@ -54,7 +54,7 @@ public static void main(String[] args) {
new LabeledDocument(1L, "b d", 0.0),
new LabeledDocument(2L, "spark f g h", 1.0),
new LabeledDocument(3L, "hadoop mapreduce", 0.0));
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
Expand All @@ -79,11 +79,11 @@ public static void main(String[] args) {
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), Document.class);

// Make predictions on test documents.
model.transform(test).registerAsTable("prediction");
SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
model.transform(test).registerTempTable("prediction");
DataFrame predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
for (Row r: predictions.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
+ ", prediction=" + r.get(3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

public class JavaSparkSQL {
public static class Person implements Serializable {
Expand Down Expand Up @@ -74,13 +74,13 @@ public Person call(String line) {
});

// Apply a schema to an RDD of Java Beans and register it as a table.
SchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
DataFrame schemaPeople = sqlCtx.applySchema(people, Person.class);
schemaPeople.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
SchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
DataFrame teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.toJavaRDD().map(new Function<Row, String>() {
@Override
Expand All @@ -93,17 +93,17 @@ public String call(Row row) {
}

System.out.println("=== Data source: Parquet File ===");
// JavaSchemaRDDs can be saved as parquet files, maintaining the schema information.
// DataFrames can be saved as parquet files, maintaining the schema information.
schemaPeople.saveAsParquetFile("people.parquet");

// Read in the parquet file created above.
// Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a JavaSchemaRDD.
SchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");
// The result of loading a parquet file is also a DataFrame.
DataFrame parquetFile = sqlCtx.parquetFile("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
SchemaRDD teenagers2 =
DataFrame teenagers2 =
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
@Override
Expand All @@ -119,8 +119,8 @@ public String call(Row row) {
// A JSON dataset is pointed by path.
// The path can be either a single text file or a directory storing text files.
String path = "examples/src/main/resources/people.json";
// Create a JavaSchemaRDD from the file(s) pointed by path
SchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path);
// Create a DataFrame from the file(s) pointed by path
DataFrame peopleFromJsonFile = sqlCtx.jsonFile(path);

// Because the schema of a JSON dataset is automatically inferred, to write queries,
// it is better to take a look at what is the schema.
Expand All @@ -130,13 +130,13 @@ public String call(Row row) {
// |-- age: IntegerType
// |-- name: StringType

// Register this JavaSchemaRDD as a table.
// Register this DataFrame as a table.
peopleFromJsonFile.registerTempTable("people");

// SQL statements can be run by using the sql methods provided by sqlCtx.
SchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
DataFrame teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations.
// The results of SQL queries are DataFrame and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagerNames = teenagers3.toJavaRDD().map(new Function<Row, String>() {
@Override
Expand All @@ -146,14 +146,14 @@ public String call(Row row) {
System.out.println(name);
}

// Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a RDD[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
SchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd());
DataFrame peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd());

// Take a look at the schema of this new JavaSchemaRDD.
// Take a look at the schema of this new DataFrame.
peopleFromJsonRDD.printSchema();
// The schema of anotherPeople is ...
// root
Expand All @@ -164,7 +164,7 @@ public String call(Row row) {

peopleFromJsonRDD.registerTempTable("people2");

SchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
DataFrame peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/python/mllib/dataset_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

"""
An example of how to use SchemaRDD as a dataset for ML. Run with::
An example of how to use DataFrame as a dataset for ML. Run with::
bin/spark-submit examples/src/main/python/mllib/dataset_example.py
"""

Expand Down
16 changes: 8 additions & 8 deletions examples/src/main/python/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,26 @@
some_rdd = sc.parallelize([Row(name="John", age=19),
Row(name="Smith", age=23),
Row(name="Sarah", age=18)])
# Infer schema from the first row, create a SchemaRDD and print the schema
some_schemardd = sqlContext.inferSchema(some_rdd)
some_schemardd.printSchema()
# Infer schema from the first row, create a DataFrame and print the schema
some_df = sqlContext.inferSchema(some_rdd)
some_df.printSchema()

# Another RDD is created from a list of tuples
another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)])
# Schema with two fields - person_name and person_age
schema = StructType([StructField("person_name", StringType(), False),
StructField("person_age", IntegerType(), False)])
# Create a SchemaRDD by applying the schema to the RDD and print the schema
another_schemardd = sqlContext.applySchema(another_rdd, schema)
another_schemardd.printSchema()
# Create a DataFrame by applying the schema to the RDD and print the schema
another_df = sqlContext.applySchema(another_rdd, schema)
another_df.printSchema()
# root
# |-- age: integer (nullable = true)
# |-- name: string (nullable = true)

# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files.
path = os.path.join(os.environ['SPARK_HOME'], "examples/src/main/resources/people.json")
# Create a SchemaRDD from the file(s) pointed to by path
# Create a DataFrame from the file(s) pointed to by path
people = sqlContext.jsonFile(path)
# root
# |-- person_name: string (nullable = false)
Expand All @@ -61,7 +61,7 @@
# |-- age: IntegerType
# |-- name: StringType

# Register this SchemaRDD as a table.
# Register this DataFrame as a table.
people.registerAsTable("people")

# SQL statements can be run by using the sql methods provided by sqlContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.examples.ml

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
Expand Down Expand Up @@ -101,7 +100,7 @@ object CrossValidatorExample {

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test)
.select('id, 'text, 'score, 'prediction)
.select("id", "text", "score", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, score: Double, prediction: Double) =>
println("(" + id + ", " + text + ") --> score=" + score + ", prediction=" + prediction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ object MovieLensALS {

// Evaluate the model.
// TODO: Create an evaluator to compute RMSE.
val mse = predictions.select('rating, 'prediction)
val mse = predictions.select("rating", "prediction").rdd
.flatMap { case Row(rating: Float, prediction: Float) =>
val err = rating.toDouble - prediction
val err2 = err * err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.examples.ml

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.{Vector, Vectors}
Expand All @@ -42,7 +41,7 @@ object SimpleParamsExample {

// Prepare training data.
// We use LabeledPoint, which is a case class. Spark SQL can convert RDDs of Java Beans
// into SchemaRDDs, where it uses the bean metadata to infer the schema.
// into DataFrames, where it uses the bean metadata to infer the schema.
val training = sparkContext.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
Expand Down Expand Up @@ -92,7 +91,7 @@ object SimpleParamsExample {
// Note that model2.transform() outputs a 'probability' column instead of the usual 'score'
// column since we renamed the lr.scoreCol parameter previously.
model2.transform(test)
.select('features, 'label, 'probability, 'prediction)
.select("features", "label", "probability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Double, prediction: Double) =>
println("(" + features + ", " + label + ") -> prob=" + prob + ", prediction=" + prediction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.examples.ml
import scala.beans.BeanInfo

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
Expand Down Expand Up @@ -80,7 +79,7 @@ object SimpleTextClassificationPipeline {

// Make predictions on test documents.
model.transform(test)
.select('id, 'text, 'score, 'prediction)
.select("id", "text", "score", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, score: Double, prediction: Double) =>
println("(" + id + ", " + text + ") --> score=" + score + ", prediction=" + prediction)
Expand Down
Loading

0 comments on commit 119f45d

Please sign in to comment.