Skip to content

Commit

Permalink
Cleaned up Java UDT Suite, and added warning about element ordering w…
Browse files Browse the repository at this point in the history
…hen creating schema from Java Bean
  • Loading branch information
jkbradley committed Nov 2, 2014
1 parent a571bb6 commit d063380
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {

/**
* Applies a schema to an RDD of Java Beans.
*
* WARNING: The ordering of elements in the schema may differ from Scala.
* If you create a [[org.apache.spark.sql.SchemaRDD]] using [[SQLContext]]
* with the same Java Bean, row elements may be in a different order.
*/
def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): JavaSchemaRDD = {
val attributeSeq = getSchema(beanClass)
Expand Down Expand Up @@ -193,11 +197,16 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
sqlContext.registerRDDAsTable(rdd.baseSchemaRDD, tableName)
}

/** Returns a Catalyst Schema for the given java bean class. */
/**
* Returns a Catalyst Schema for the given java bean class.
*/
protected def getSchema(beanClass: Class[_]): Seq[AttributeReference] = {
// TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
val beanInfo = Introspector.getBeanInfo(beanClass)

// Note: The ordering of elements may differ from when the schema is inferred in Scala.
// This is because beanInfo.getPropertyDescriptors gives no guarantees about
// element ordering.
val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
fields.map { property =>
val (dataType, nullable) = property.getPropertyType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,42 +75,14 @@ public void useScalaUDT() {
Assert.assertTrue(actualFeatures.contains(lp.features()));
}

List<Row> actual = javaSqlCtx.sql("SELECT * FROM points").collect();
List<Row> actual = javaSqlCtx.sql("SELECT label, features FROM points").collect();
List<MyLabeledPoint> actualPoints =
new LinkedList<MyLabeledPoint>();
for (Row r : actual) {
// Note: JavaSQLContext.getSchema switches the ordering of the Row elements
// in the MyLabeledPoint case class.
actualPoints.add(new MyLabeledPoint(
r.getDouble(1), (MyDenseVector)r.get(0)));
actualPoints.add(new MyLabeledPoint(r.getDouble(0), (MyDenseVector)r.get(1)));
}
for (MyLabeledPoint lp : points) {
Assert.assertTrue(actualPoints.contains(lp));
}
/*
// THIS FAILS BECAUSE JavaSQLContext.getSchema switches the ordering of the Row elements
// in the MyLabeledPoint case class.
List<Row> expected = new LinkedList<Row>();
expected.add(Row.create(new MyLabeledPoint(1.0,
new MyDenseVector(new double[]{0.1, 1.0}))));
expected.add(Row.create(new MyLabeledPoint(0.0,
new MyDenseVector(new double[]{0.2, 2.0}))));
System.out.println("Expected:");
for (Row r : expected) {
System.out.println("r: " + r.toString());
for (int i = 0; i < r.length(); ++i) {
System.out.println(" r[i]: " + r.get(i).toString());
}
}
System.out.println("Actual:");
for (Row r : actual) {
System.out.println("r: " + r.toString());
for (int i = 0; i < r.length(); ++i) {
System.out.println(" r[i]: " + r.get(i).toString());
}
Assert.assertTrue(expected.contains(r));
}
*/
}
}

0 comments on commit d063380

Please sign in to comment.