Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5097][SQL] DataFrame #4173

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;

Expand Down Expand Up @@ -71,7 +71,7 @@ public static void main(String[] args) {
new LabeledDocument(9L, "a e c l", 0.0),
new LabeledDocument(10L, "spark compile", 1.0),
new LabeledDocument(11L, "hadoop software", 0.0));
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
Expand Down Expand Up @@ -112,11 +112,11 @@ public static void main(String[] args) {
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), Document.class);

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test).registerAsTable("prediction");
SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
cvModel.transform(test).registerTempTable("prediction");
DataFrame predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
for (Row r: predictions.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
+ ", prediction=" + r.get(3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;

Expand All @@ -48,13 +48,13 @@ public static void main(String[] args) {

// Prepare training data.
// We use LabeledPoint, which is a JavaBean. Spark SQL can convert RDDs of JavaBeans
// into SchemaRDDs, where it uses the bean metadata to infer the schema.
// into DataFrames, where it uses the bean metadata to infer the schema.
List<LabeledPoint> localTraining = Lists.newArrayList(
new LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)));
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);

// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
Expand Down Expand Up @@ -94,14 +94,14 @@ public static void main(String[] args) {
new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)));
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);

// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'probability' column instead of the usual 'score'
// column since we renamed the lr.scoreCol parameter previously.
model2.transform(test).registerAsTable("results");
SchemaRDD results =
model2.transform(test).registerTempTable("results");
DataFrame results =
jsql.sql("SELECT features, label, probability, prediction FROM results");
for (Row r: results.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;

Expand All @@ -54,7 +54,7 @@ public static void main(String[] args) {
new LabeledDocument(1L, "b d", 0.0),
new LabeledDocument(2L, "spark f g h", 1.0),
new LabeledDocument(3L, "hadoop mapreduce", 0.0));
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
Expand All @@ -79,11 +79,11 @@ public static void main(String[] args) {
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), Document.class);

// Make predictions on test documents.
model.transform(test).registerAsTable("prediction");
SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
model.transform(test).registerTempTable("prediction");
DataFrame predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
for (Row r: predictions.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
+ ", prediction=" + r.get(3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

public class JavaSparkSQL {
public static class Person implements Serializable {
Expand Down Expand Up @@ -74,13 +74,13 @@ public Person call(String line) {
});

// Apply a schema to an RDD of Java Beans and register it as a table.
SchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
DataFrame schemaPeople = sqlCtx.applySchema(people, Person.class);
schemaPeople.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
SchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
DataFrame teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.toJavaRDD().map(new Function<Row, String>() {
@Override
Expand All @@ -93,17 +93,17 @@ public String call(Row row) {
}

System.out.println("=== Data source: Parquet File ===");
// JavaSchemaRDDs can be saved as parquet files, maintaining the schema information.
// DataFrames can be saved as parquet files, maintaining the schema information.
schemaPeople.saveAsParquetFile("people.parquet");

// Read in the parquet file created above.
// Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a JavaSchemaRDD.
SchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");
// The result of loading a parquet file is also a DataFrame.
DataFrame parquetFile = sqlCtx.parquetFile("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
SchemaRDD teenagers2 =
DataFrame teenagers2 =
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
@Override
Expand All @@ -119,8 +119,8 @@ public String call(Row row) {
// A JSON dataset is pointed by path.
// The path can be either a single text file or a directory storing text files.
String path = "examples/src/main/resources/people.json";
// Create a JavaSchemaRDD from the file(s) pointed by path
SchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path);
// Create a DataFrame from the file(s) pointed by path
DataFrame peopleFromJsonFile = sqlCtx.jsonFile(path);

// Because the schema of a JSON dataset is automatically inferred, to write queries,
// it is better to take a look at what is the schema.
Expand All @@ -130,13 +130,13 @@ public String call(Row row) {
// |-- age: IntegerType
// |-- name: StringType

// Register this JavaSchemaRDD as a table.
// Register this DataFrame as a table.
peopleFromJsonFile.registerTempTable("people");

// SQL statements can be run by using the sql methods provided by sqlCtx.
SchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
DataFrame teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations.
// The results of SQL queries are DataFrame and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagerNames = teenagers3.toJavaRDD().map(new Function<Row, String>() {
@Override
Expand All @@ -146,14 +146,14 @@ public String call(Row row) {
System.out.println(name);
}

// Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a RDD[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
SchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd());
DataFrame peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd());

// Take a look at the schema of this new JavaSchemaRDD.
// Take a look at the schema of this new DataFrame.
peopleFromJsonRDD.printSchema();
// The schema of anotherPeople is ...
// root
Expand All @@ -164,7 +164,7 @@ public String call(Row row) {

peopleFromJsonRDD.registerTempTable("people2");

SchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
DataFrame peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/python/mllib/dataset_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

"""
An example of how to use SchemaRDD as a dataset for ML. Run with::
An example of how to use DataFrame as a dataset for ML. Run with::
bin/spark-submit examples/src/main/python/mllib/dataset_example.py
"""

Expand Down
16 changes: 8 additions & 8 deletions examples/src/main/python/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,26 @@
some_rdd = sc.parallelize([Row(name="John", age=19),
Row(name="Smith", age=23),
Row(name="Sarah", age=18)])
# Infer schema from the first row, create a SchemaRDD and print the schema
some_schemardd = sqlContext.inferSchema(some_rdd)
some_schemardd.printSchema()
# Infer schema from the first row, create a DataFrame and print the schema
some_df = sqlContext.inferSchema(some_rdd)
some_df.printSchema()

# Another RDD is created from a list of tuples
another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)])
# Schema with two fields - person_name and person_age
schema = StructType([StructField("person_name", StringType(), False),
StructField("person_age", IntegerType(), False)])
# Create a SchemaRDD by applying the schema to the RDD and print the schema
another_schemardd = sqlContext.applySchema(another_rdd, schema)
another_schemardd.printSchema()
# Create a DataFrame by applying the schema to the RDD and print the schema
another_df = sqlContext.applySchema(another_rdd, schema)
another_df.printSchema()
# root
# |-- age: integer (nullable = true)
# |-- name: string (nullable = true)

# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files.
path = os.path.join(os.environ['SPARK_HOME'], "examples/src/main/resources/people.json")
# Create a SchemaRDD from the file(s) pointed to by path
# Create a DataFrame from the file(s) pointed to by path
people = sqlContext.jsonFile(path)
# root
# |-- person_name: string (nullable = false)
Expand All @@ -61,7 +61,7 @@
# |-- age: IntegerType
# |-- name: StringType

# Register this SchemaRDD as a table.
# Register this DataFrame as a table.
people.registerAsTable("people")

# SQL statements can be run by using the sql methods provided by sqlContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.examples.ml

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
Expand Down Expand Up @@ -101,7 +100,7 @@ object CrossValidatorExample {

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test)
.select('id, 'text, 'score, 'prediction)
.select("id", "text", "score", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, score: Double, prediction: Double) =>
println("(" + id + ", " + text + ") --> score=" + score + ", prediction=" + prediction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ object MovieLensALS {

// Evaluate the model.
// TODO: Create an evaluator to compute RMSE.
val mse = predictions.select('rating, 'prediction)
val mse = predictions.select("rating", "prediction").rdd
.flatMap { case Row(rating: Float, prediction: Float) =>
val err = rating.toDouble - prediction
val err2 = err * err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.examples.ml

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.{Vector, Vectors}
Expand All @@ -42,7 +41,7 @@ object SimpleParamsExample {

// Prepare training data.
// We use LabeledPoint, which is a case class. Spark SQL can convert RDDs of Java Beans
// into SchemaRDDs, where it uses the bean metadata to infer the schema.
// into DataFrames, where it uses the bean metadata to infer the schema.
val training = sparkContext.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
Expand Down Expand Up @@ -92,7 +91,7 @@ object SimpleParamsExample {
// Note that model2.transform() outputs a 'probability' column instead of the usual 'score'
// column since we renamed the lr.scoreCol parameter previously.
model2.transform(test)
.select('features, 'label, 'probability, 'prediction)
.select("features", "label", "probability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Double, prediction: Double) =>
println("(" + features + ", " + label + ") -> prob=" + prob + ", prediction=" + prediction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.examples.ml
import scala.beans.BeanInfo

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
Expand Down Expand Up @@ -80,7 +79,7 @@ object SimpleTextClassificationPipeline {

// Make predictions on test documents.
model.transform(test)
.select('id, 'text, 'score, 'prediction)
.select("id", "text", "score", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, score: Double, prediction: Double) =>
println("(" + id + ", " + text + ") --> score=" + score + ", prediction=" + prediction)
Expand Down
Loading