Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7691] [SQL] Refactor CatalystTypeConverter to use type-specific row accessors #6222

Closed

Conversation

JoshRosen
Copy link
Contributor

This patch significantly refactors CatalystTypeConverters to both clean up the code and enable these conversions to work with future Project Tungsten features.

At a high level, I've reorganized the code so that all functions dealing with the same type are grouped together into type-specific subclasses of CatalystTypeConveter. In addition, I've added new methods that allow the Catalyst Row -> Scala Row conversions to access the Catalyst row's fields through type-specific getTYPE() methods rather than the generic get() / Row.apply methods. This refactoring is a blocker to being able to unit test new operators that I'm developing as part of Project Tungsten, since those operators may output UnsafeRow instances which don't support the generic get().

The stricter type usage of types here has uncovered some bugs in other parts of Spark SQL:

Spark SQL current has undefined behavior for what happens when you try to create a DataFrame from user-specified rows whose values don't match the declared schema. According to the createDataFrame() Scaladoc:

It is important to make sure that the structure of every [[Row]] of the provided RDD matches the provided schema. Otherwise, there will be runtime exception.

Given this, it sounds like it's technically not a break of our API contract to fail-fast when the data types don't match. However, there appear to be many cases where we don't fail even though the types don't match. For example, JavaHashingTFSuite.hasingTF passes a column of integers values for a "label" column which is supposed to contain floats. This column isn't actually read or modified as part of query processing, so its actual concrete type doesn't seem to matter. In other cases, there could be situations where we have generic numeric aggregates that tolerate being called with different numeric types than the schema specified, but this can be okay due to numeric conversions.

In the long run, we will probably want to come up with precise semantics for implicit type conversions / widening when converting Java / Scala rows to Catalyst rows. Until then, though, I think that failing fast with a ClassCastException is a reasonable behavior; this is the approach taken in this patch. Note that certain optimizations in the inbound conversion functions for primitive types mean that we'll probably preserve the old undefined behavior in a majority of cases.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 17, 2015

Test build #32952 has started for PR 6222 at commit 93314c5.

@JoshRosen
Copy link
Contributor Author

I noticed that CatalystTypeConverters and many of its methods are public; is this intentional or should they be private[sql]?

@JoshRosen JoshRosen force-pushed the catalyst-converters-refactoring branch from 93314c5 to 7d3222c Compare May 17, 2015 23:44
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 17, 2015

Test build #32953 has started for PR 6222 at commit 7d3222c.

@JoshRosen
Copy link
Contributor Author

I should probably re-run the original CatalystTypeConverter benchmarks to check that this hasn't regressed performance.

@rayortigas
Copy link

@JoshRosen, thanks for the heads-up.

This refactoring looks good. Grouping the conversions for each type makes it easier to find them. It also sets expectations for any other conversions that might be added.

I'll comment specifically on #5713 over there.

@rayortigas
Copy link

As for the public methods that were in CatalystTypeConverters, I also think they could be private[sql]. You might want to ask @vlyubin, who originally extracted the class (#5279).

@SparkQA
Copy link

SparkQA commented May 18, 2015

Test build #32953 has finished for PR 6222 at commit 7d3222c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/32953/
Test FAILed.

@JoshRosen
Copy link
Contributor Author

Based on the last set of test failures, it looks like there's another corner-case where the actual data types don't match the declared ones.

JavaHashingTFSuite.hasingTF:

sbt.ForkMain$ForkError: java.lang.Integer cannot be cast to java.lang.Double
    at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.getDouble(rows.scala:93)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$DoubleConverter$.toScalaImpl(CatalystTypeConverters.scala:303)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$DoubleConverter$.toScalaImpl(CatalystTypeConverters.scala:302)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toScala(CatalystTypeConverters.scala:87)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toScala(CatalystTypeConverters.scala:235)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toScala(CatalystTypeConverters.scala:202)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToScalaConverter$1.apply(CatalystTypeConverters.scala:360)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeTake$2.apply(SparkPlan.scala:150)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeTake$2.apply(SparkPlan.scala:150)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:150)
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:125)
    at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1157)
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1091)
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1150)
    at org.apache.spark.ml.feature.JavaHashingTFSuite.hashingTF(JavaHashingTFSuite.java:76)

It also looks like there are some cases in HadoopFSRelationTest where we expect to receive a string but receive an Integer instead. I'll take a closer look at these cases to see whether this indicates a bug or whether we should just call toString() on any non-string type that we receive.

@JoshRosen JoshRosen changed the title [SPARK-7691] [WIP] Refactor CatalystTypeConverter to use type-specific row accessors [SPARK-7691] [SQL] [WIP] Refactor CatalystTypeConverter to use type-specific row accessors May 18, 2015
@JoshRosen
Copy link
Contributor Author

I had put this patch on hold while finishing up some 1.4.0 stuff, but I plan to return to this soon. The fact that this is causing lots of test failures suggests that this could break things in bad ways for user code if we're not careful. I think that the main issue is that the old code would implicitly pass through primitives that were of the wrong type and allow implicit numeric conversions to take place. For instance, if I had a table that expected double-valued columns, it would be fine to pass integers since we never did any Scala -> Catalyst conversion for integers and the internal code implicitly handles these conversions somewhere.

I'll see if I can come up with a clean way of allowing these implicit numeric conversions to still take place but guarantee that they're performed as part of the inbound Row conversion rather than implicitly in lower-level code.

@JoshRosen
Copy link
Contributor Author

It looks like we have some undefined behavior that may be very difficult to preserve. If you declare a column to have some primitive type, like float, then proceed to pass in some value of an arbitrary different type, this won't cause any errors as long as you don't perform any operations on that column which rely on it being a particular type. In a nutshell, the schema's type for a column seems to be irrelevant if that column is just passed through unmodified / unaccessed as part of query processing.

@JoshRosen JoshRosen force-pushed the catalyst-converters-refactoring branch from 7d3222c to 300723f Compare May 25, 2015 00:58
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 25, 2015

Test build #33457 has started for PR 6222 at commit 300723f.

@JoshRosen
Copy link
Contributor Author

I've pushed a change which will cause ClassCastExceptions to be thrown as early as possible in the Scala Row -> Catalyst Row conversion process. I've realized that this is going to fail a different set of tests due some null-handling corner-cases that I've missed, so I'm going to create a new CatalystTypeConvertersSuite and will try to write some test cases to serve as a spec for the converter behavior.

Technically, it looks like our Scaladoc API docs give us the leeway to throw exceptions if the input data doesn't match the expected schema and there's no mention of implicit casting / widening, so I think it's technically okay to change this undefined behavior, although it could be frustrating for users.

   * :: DeveloperApi ::
   * Creates a [[DataFrame]] from an [[RDD]] containing [[Row]]s using the given schema.
   * It is important to make sure that the structure of every [[Row]] of the provided RDD matches
   * the provided schema. Otherwise, there will be runtime exception.

@SparkQA
Copy link

SparkQA commented May 25, 2015

Test build #33457 has finished for PR 6222 at commit 300723f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/33457/
Test FAILed.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 25, 2015

Test build #33458 has started for PR 6222 at commit ce357c1.

@JoshRosen JoshRosen changed the title [SPARK-7691] [SQL] [WIP] Refactor CatalystTypeConverter to use type-specific row accessors [SPARK-7691] [SQL] Refactor CatalystTypeConverter to use type-specific row accessors Jun 1, 2015
@JoshRosen JoshRosen force-pushed the catalyst-converters-refactoring branch from ee25e8d to 740341b Compare June 1, 2015 00:11
@JoshRosen
Copy link
Contributor Author

Alright, pushed a commit to document the Option-handling semantics as well as to address the method dispatch issues for primitive types. This should be ready for review now. /cc @davies, you might want to look at this also.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented Jun 1, 2015

Test build #33868 has started for PR 6222 at commit 740341b.

@SparkQA
Copy link

SparkQA commented Jun 1, 2015

Test build #33868 has finished for PR 6222 at commit 740341b.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented Jun 1, 2015

Test build #33902 has started for PR 6222 at commit 740341b.

@SparkQA
Copy link

SparkQA commented Jun 1, 2015

Test build #33902 has finished for PR 6222 at commit 740341b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@davies
Copy link
Contributor

davies commented Jun 1, 2015

LGTM, thanks!

@asfgit asfgit closed this in cafd505 Jun 3, 2015
@JoshRosen JoshRosen deleted the catalyst-converters-refactoring branch June 3, 2015 17:59
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
…c row accessors

This patch significantly refactors CatalystTypeConverters to both clean up the code and enable these conversions to work with future Project Tungsten features.

At a high level, I've reorganized the code so that all functions dealing with the same type are grouped together into type-specific subclasses of `CatalystTypeConveter`.  In addition, I've added new methods that allow the Catalyst Row -> Scala Row conversions to access the Catalyst row's fields through type-specific `getTYPE()` methods rather than the generic `get()` / `Row.apply` methods.  This refactoring is a blocker to being able to unit test new operators that I'm developing as part of Project Tungsten, since those operators may output `UnsafeRow` instances which don't support the generic `get()`.

The stricter type usage of types here has uncovered some bugs in other parts of Spark SQL:

- apache#6217: DescribeCommand is assigned wrong output attributes in SparkStrategies
- apache#6218: DataFrame.describe() should cast all aggregates to String
- apache#6400: Use output schema, not relation schema, for data source input conversion

Spark SQL current has undefined behavior for what happens when you try to create a DataFrame from user-specified rows whose values don't match the declared schema.  According to the `createDataFrame()` Scaladoc:

>  It is important to make sure that the structure of every [[Row]] of the provided RDD matches the provided schema. Otherwise, there will be runtime exception.

Given this, it sounds like it's technically not a break of our API contract to fail-fast when the data types don't match. However, there appear to be many cases where we don't fail even though the types don't match. For example, `JavaHashingTFSuite.hasingTF` passes a column of integers values for a "label" column which is supposed to contain floats.  This column isn't actually read or modified as part of query processing, so its actual concrete type doesn't seem to matter. In other cases, there could be situations where we have generic numeric aggregates that tolerate being called with different numeric types than the schema specified, but this can be okay due to numeric conversions.

In the long run, we will probably want to come up with precise semantics for implicit type conversions / widening when converting Java / Scala rows to Catalyst rows.  Until then, though, I think that failing fast with a ClassCastException is a reasonable behavior; this is the approach taken in this patch.  Note that certain optimizations in the inbound conversion functions for primitive types mean that we'll probably preserve the old undefined behavior in a majority of cases.

Author: Josh Rosen <[email protected]>

Closes apache#6222 from JoshRosen/catalyst-converters-refactoring and squashes the following commits:

740341b [Josh Rosen] Optimize method dispatch for primitive type conversions
befc613 [Josh Rosen] Add tests to document Option-handling behavior.
5989593 [Josh Rosen] Use new SparkFunSuite base in CatalystTypeConvertersSuite
6edf7f8 [Josh Rosen] Re-add convertToScala(), since a Hive test still needs it
3f7b2d8 [Josh Rosen] Initialize converters lazily so that the attributes are resolved first
6ad0ebb [Josh Rosen] Fix JavaHashingTFSuite ClassCastException
677ff27 [Josh Rosen] Fix null handling bug; add tests.
8033d4c [Josh Rosen] Fix serialization error in UserDefinedGenerator.
85bba9d [Josh Rosen] Fix wrong input data in InMemoryColumnarQuerySuite
9c0e4e1 [Josh Rosen] Remove last use of convertToScala().
ae3278d [Josh Rosen] Throw ClassCastException errors during inbound conversions.
7ca7fcb [Josh Rosen] Comments and cleanup
1e87a45 [Josh Rosen] WIP refactoring of CatalystTypeConverters
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
…c row accessors

This patch significantly refactors CatalystTypeConverters to both clean up the code and enable these conversions to work with future Project Tungsten features.

At a high level, I've reorganized the code so that all functions dealing with the same type are grouped together into type-specific subclasses of `CatalystTypeConveter`.  In addition, I've added new methods that allow the Catalyst Row -> Scala Row conversions to access the Catalyst row's fields through type-specific `getTYPE()` methods rather than the generic `get()` / `Row.apply` methods.  This refactoring is a blocker to being able to unit test new operators that I'm developing as part of Project Tungsten, since those operators may output `UnsafeRow` instances which don't support the generic `get()`.

The stricter type usage of types here has uncovered some bugs in other parts of Spark SQL:

- apache#6217: DescribeCommand is assigned wrong output attributes in SparkStrategies
- apache#6218: DataFrame.describe() should cast all aggregates to String
- apache#6400: Use output schema, not relation schema, for data source input conversion

Spark SQL current has undefined behavior for what happens when you try to create a DataFrame from user-specified rows whose values don't match the declared schema.  According to the `createDataFrame()` Scaladoc:

>  It is important to make sure that the structure of every [[Row]] of the provided RDD matches the provided schema. Otherwise, there will be runtime exception.

Given this, it sounds like it's technically not a break of our API contract to fail-fast when the data types don't match. However, there appear to be many cases where we don't fail even though the types don't match. For example, `JavaHashingTFSuite.hasingTF` passes a column of integers values for a "label" column which is supposed to contain floats.  This column isn't actually read or modified as part of query processing, so its actual concrete type doesn't seem to matter. In other cases, there could be situations where we have generic numeric aggregates that tolerate being called with different numeric types than the schema specified, but this can be okay due to numeric conversions.

In the long run, we will probably want to come up with precise semantics for implicit type conversions / widening when converting Java / Scala rows to Catalyst rows.  Until then, though, I think that failing fast with a ClassCastException is a reasonable behavior; this is the approach taken in this patch.  Note that certain optimizations in the inbound conversion functions for primitive types mean that we'll probably preserve the old undefined behavior in a majority of cases.

Author: Josh Rosen <[email protected]>

Closes apache#6222 from JoshRosen/catalyst-converters-refactoring and squashes the following commits:

740341b [Josh Rosen] Optimize method dispatch for primitive type conversions
befc613 [Josh Rosen] Add tests to document Option-handling behavior.
5989593 [Josh Rosen] Use new SparkFunSuite base in CatalystTypeConvertersSuite
6edf7f8 [Josh Rosen] Re-add convertToScala(), since a Hive test still needs it
3f7b2d8 [Josh Rosen] Initialize converters lazily so that the attributes are resolved first
6ad0ebb [Josh Rosen] Fix JavaHashingTFSuite ClassCastException
677ff27 [Josh Rosen] Fix null handling bug; add tests.
8033d4c [Josh Rosen] Fix serialization error in UserDefinedGenerator.
85bba9d [Josh Rosen] Fix wrong input data in InMemoryColumnarQuerySuite
9c0e4e1 [Josh Rosen] Remove last use of convertToScala().
ae3278d [Josh Rosen] Throw ClassCastException errors during inbound conversions.
7ca7fcb [Josh Rosen] Comments and cleanup
1e87a45 [Josh Rosen] WIP refactoring of CatalystTypeConverters
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants