Skip to content

Commit

Permalink
cleanup applySchema
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Feb 10, 2015
1 parent 9526e97 commit d1bd8f2
Show file tree
Hide file tree
Showing 23 changed files with 117 additions and 101 deletions.
12 changes: 6 additions & 6 deletions docs/ml-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ List<LabeledPoint> localTraining = Lists.newArrayList(
new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)));
JavaSchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);
JavaSchemaRDD training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledPoint.class);

// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
Expand Down Expand Up @@ -300,7 +300,7 @@ List<LabeledPoint> localTest = Lists.newArrayList(
new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)));
JavaSchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);
JavaSchemaRDD test = jsql.createDataFrame(jsc.parallelize(localTest), LabeledPoint.class);

// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
Expand Down Expand Up @@ -443,7 +443,7 @@ List<LabeledDocument> localTraining = Lists.newArrayList(
new LabeledDocument(2L, "spark f g h", 1.0),
new LabeledDocument(3L, "hadoop mapreduce", 0.0));
JavaSchemaRDD training =
jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
Expand All @@ -469,7 +469,7 @@ List<Document> localTest = Lists.newArrayList(
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
JavaSchemaRDD test =
jsql.applySchema(jsc.parallelize(localTest), Document.class);
jsql.createDataFrame(jsc.parallelize(localTest), Document.class);

// Make predictions on test documents.
model.transform(test).registerAsTable("prediction");
Expand Down Expand Up @@ -626,7 +626,7 @@ List<LabeledDocument> localTraining = Lists.newArrayList(
new LabeledDocument(10L, "spark compile", 1.0),
new LabeledDocument(11L, "hadoop software", 0.0));
JavaSchemaRDD training =
jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
Expand Down Expand Up @@ -669,7 +669,7 @@ List<Document> localTest = Lists.newArrayList(
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
JavaSchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
JavaSchemaRDD test = jsql.createDataFrame(jsc.parallelize(localTest), Document.class);

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test).registerAsTable("prediction");
Expand Down
16 changes: 8 additions & 8 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public static class Person implements Serializable {
{% endhighlight %}


A schema can be applied to an existing RDD by calling `applySchema` and providing the Class object
A schema can be applied to an existing RDD by calling `createDataFrame` and providing the Class object
for the JavaBean.

{% highlight java %}
Expand All @@ -247,7 +247,7 @@ JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").m
});

// Apply a schema to an RDD of JavaBeans and register it as a table.
JavaSchemaRDD schemaPeople = sqlContext.applySchema(people, Person.class);
JavaSchemaRDD schemaPeople = sqlContext.createDataFrame(people, Person.class);
schemaPeople.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
Expand Down Expand Up @@ -315,7 +315,7 @@ a `SchemaRDD` can be created programmatically with three steps.
1. Create an RDD of `Row`s from the original RDD;
2. Create the schema represented by a `StructType` matching the structure of
`Row`s in the RDD created in Step 1.
3. Apply the schema to the RDD of `Row`s via `applySchema` method provided
3. Apply the schema to the RDD of `Row`s via `createDataFrame` method provided
by `SQLContext`.

For example:
Expand All @@ -341,7 +341,7 @@ val schema =
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// Apply the schema to the RDD.
val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)
val peopleSchemaRDD = sqlContext.createDataFrame(rowRDD, schema)

// Register the SchemaRDD as a table.
peopleSchemaRDD.registerTempTable("people")
Expand All @@ -367,7 +367,7 @@ a `SchemaRDD` can be created programmatically with three steps.
1. Create an RDD of `Row`s from the original RDD;
2. Create the schema represented by a `StructType` matching the structure of
`Row`s in the RDD created in Step 1.
3. Apply the schema to the RDD of `Row`s via `applySchema` method provided
3. Apply the schema to the RDD of `Row`s via `createDataFrame` method provided
by `JavaSQLContext`.

For example:
Expand Down Expand Up @@ -406,7 +406,7 @@ JavaRDD<Row> rowRDD = people.map(
});

// Apply the schema to the RDD.
JavaSchemaRDD peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema);
JavaSchemaRDD peopleSchemaRDD = sqlContext.createDataFrame(rowRDD, schema);

// Register the SchemaRDD as a table.
peopleSchemaRDD.registerTempTable("people");
Expand Down Expand Up @@ -436,7 +436,7 @@ a `SchemaRDD` can be created programmatically with three steps.
1. Create an RDD of tuples or lists from the original RDD;
2. Create the schema represented by a `StructType` matching the structure of
tuples or lists in the RDD created in the step 1.
3. Apply the schema to the RDD via `applySchema` method provided by `SQLContext`.
3. Apply the schema to the RDD via `createDataFrame` method provided by `SQLContext`.

For example:
{% highlight python %}
Expand All @@ -458,7 +458,7 @@ fields = [StructField(field_name, StringType(), True) for field_name in schemaSt
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = sqlContext.applySchema(people, schema)
schemaPeople = sqlContext.createDataFrame(people, schema)

# Register the SchemaRDD as a table.
schemaPeople.registerTempTable("people")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static void main(String[] args) {
new LabeledDocument(9L, "a e c l", 0.0),
new LabeledDocument(10L, "spark compile", 1.0),
new LabeledDocument(11L, "hadoop software", 0.0));
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
Expand Down Expand Up @@ -112,7 +112,7 @@ public static void main(String[] args) {
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), Document.class);

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test).registerTempTable("prediction");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static void main(String[] args) throws Exception {
new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)));
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);
DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledPoint.class);

// Create a LogisticRegression instance. This instance is an Estimator.
MyJavaLogisticRegression lr = new MyJavaLogisticRegression();
Expand All @@ -80,7 +80,7 @@ public static void main(String[] args) throws Exception {
new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)));
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);
DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), LabeledPoint.class);

// Make predictions on test documents. cvModel uses the best model found (lrModel).
DataFrame results = model.transform(test);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static void main(String[] args) {
new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)));
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);
DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledPoint.class);

// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
Expand Down Expand Up @@ -94,7 +94,7 @@ public static void main(String[] args) {
new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)));
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);
DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), LabeledPoint.class);

// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static void main(String[] args) {
new LabeledDocument(1L, "b d", 0.0),
new LabeledDocument(2L, "spark f g h", 1.0),
new LabeledDocument(3L, "hadoop mapreduce", 0.0));
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
Expand All @@ -79,7 +79,7 @@ public static void main(String[] args) {
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), Document.class);

// Make predictions on test documents.
model.transform(test).registerTempTable("prediction");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Person call(String line) {
});

// Apply a schema to an RDD of Java Beans and register it as a table.
DataFrame schemaPeople = sqlCtx.applySchema(people, Person.class);
DataFrame schemaPeople = sqlCtx.createDataFrame(people, Person.class);
schemaPeople.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
Expand Down
4 changes: 2 additions & 2 deletions examples/src/main/python/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
Row(name="Smith", age=23),
Row(name="Sarah", age=18)])
# Infer schema from the first row, create a DataFrame and print the schema
some_df = sqlContext.inferSchema(some_rdd)
some_df = sqlContext.createDataFrame(some_rdd)
some_df.printSchema()

# Another RDD is created from a list of tuples
Expand All @@ -40,7 +40,7 @@
schema = StructType([StructField("person_name", StringType(), False),
StructField("person_age", IntegerType(), False)])
# Create a DataFrame by applying the schema to the RDD and print the schema
another_df = sqlContext.applySchema(another_rdd, schema)
another_df = sqlContext.createDataFrame(another_rdd, schema)
another_df.printSchema()
# root
# |-- age: integer (nullable = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ class CrossValidator extends Estimator[CrossValidatorModel] with CrossValidatorP
val metrics = new Array[Double](epm.size)
val splits = MLUtils.kFold(dataset.rdd, map(numFolds), 0)
splits.zipWithIndex.foreach { case ((training, validation), splitIndex) =>
val trainingDataset = sqlCtx.applySchema(training, schema).cache()
val validationDataset = sqlCtx.applySchema(validation, schema).cache()
val trainingDataset = sqlCtx.createDataFrame(training, schema).cache()
val validationDataset = sqlCtx.createDataFrame(validation, schema).cache()
// multi-model training
logDebug(s"Train split $splitIndex with multiple sets of parameters.")
val models = est.fit(trainingDataset, epm).asInstanceOf[Seq[Model[_]]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void setUp() {
jsql = new SQLContext(jsc);
JavaRDD<LabeledPoint> points =
jsc.parallelize(generateLogisticInputAsList(1.0, 1.0, 100, 42), 2);
dataset = jsql.applySchema(points, LabeledPoint.class);
dataset = jsql.createDataFrame(points, LabeledPoint.class);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void setUp() {
jsql = new SQLContext(jsc);
List<LabeledPoint> points = generateLogisticInputAsList(1.0, 1.0, 100, 42);
datasetRDD = jsc.parallelize(points, 2);
dataset = jsql.applySchema(datasetRDD, LabeledPoint.class);
dataset = jsql.createDataFrame(datasetRDD, LabeledPoint.class);
dataset.registerTempTable("dataset");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void setUp() {
jsql = new SQLContext(jsc);
List<LabeledPoint> points = generateLogisticInputAsList(1.0, 1.0, 100, 42);
datasetRDD = jsc.parallelize(points, 2);
dataset = jsql.applySchema(datasetRDD, LabeledPoint.class);
dataset = jsql.createDataFrame(datasetRDD, LabeledPoint.class);
dataset.registerTempTable("dataset");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void setUp() {
jsc = new JavaSparkContext("local", "JavaCrossValidatorSuite");
jsql = new SQLContext(jsc);
List<LabeledPoint> points = generateLogisticInputAsList(1.0, 1.0, 100, 42);
dataset = jsql.applySchema(jsc.parallelize(points, 2), LabeledPoint.class);
dataset = jsql.createDataFrame(jsc.parallelize(points, 2), LabeledPoint.class);
}

@After
Expand Down
21 changes: 3 additions & 18 deletions python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,11 @@ def __init__(self, sparkContext, sqlContext=None):
:param sqlContext: An optional JVM Scala SQLContext. If set, we do not instatiate a new
SQLContext in the JVM, instead we make all calls to this object.
>>> df = sqlCtx.inferSchema(rdd)
>>> sqlCtx.inferSchema(df) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
TypeError:...
>>> bad_rdd = sc.parallelize([1,2,3])
>>> sqlCtx.inferSchema(bad_rdd) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
ValueError:...
>>> from datetime import datetime
>>> allTypes = sc.parallelize([Row(i=1, s="string", d=1.0, l=1L,
... b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1),
... time=datetime(2014, 8, 1, 14, 1, 5))])
>>> df = sqlCtx.inferSchema(allTypes)
>>> df = sqlCtx.createDataFrame(allTypes)
>>> df.registerTempTable("allTypes")
>>> sqlCtx.sql('select i+1, d+1, not b, list[1], dict["s"], time, row.a '
... 'from allTypes where b and i > 0').collect()
Expand Down Expand Up @@ -355,7 +343,6 @@ def registerRDDAsTable(self, rdd, tableName):
Temporary tables exist only during the lifetime of this instance of
SQLContext.
>>> df = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(df, "table1")
"""
if (rdd.__class__ is DataFrame):
Expand All @@ -370,7 +357,6 @@ def parquetFile(self, *paths):
>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> df = sqlCtx.inferSchema(rdd)
>>> df.saveAsParquetFile(parquetFile)
>>> df2 = sqlCtx.parquetFile(parquetFile)
>>> sorted(df.collect()) == sorted(df2.collect())
Expand Down Expand Up @@ -520,7 +506,6 @@ def func(iterator):
def sql(self, sqlQuery):
"""Return a L{DataFrame} representing the result of the given query.
>>> df = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(df, "table1")
>>> df2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
Expand All @@ -531,7 +516,6 @@ def sql(self, sqlQuery):
def table(self, tableName):
"""Returns the specified table as a L{DataFrame}.
>>> df = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(df, "table1")
>>> df2 = sqlCtx.table("table1")
>>> sorted(df.collect()) == sorted(df2.collect())
Expand Down Expand Up @@ -679,11 +663,12 @@ def _test():
sc = SparkContext('local[4]', 'PythonTest')
globs['sc'] = sc
globs['sqlCtx'] = sqlCtx = SQLContext(sc)
globs['rdd'] = sc.parallelize(
globs['rdd'] = rdd = sc.parallelize(
[Row(field1=1, field2="row1"),
Row(field1=2, field2="row2"),
Row(field1=3, field2="row3")]
)
globs['df'] = sqlCtx.createDataFrame(rdd)
jsonStrings = [
'{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
'{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]},'
Expand Down
Loading

0 comments on commit d1bd8f2

Please sign in to comment.