-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23587][SQL] Add interpreted execution for MapObjects expression #20771
Conversation
Test build #88079 has finished for PR 20771 at commit
|
// The data with PythonUserDefinedType are actually stored with the data type of its sqlType. | ||
// When we want to apply MapObjects on it, we have to use it. | ||
lazy private val inputDataType = inputData.dataType match { | ||
case p: PythonUserDefinedType => p.sqlType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the UserDefinedType
super class here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I just noticed that this wasn't introduced by you, but please change it anyway)
override def eval(input: InternalRow): Any = { | ||
assert(input.numFields == 1, | ||
"The input row of interpreted LambdaVariable should have only 1 field.") | ||
input.get(0, dataType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a change for this PR. Maybe we should use accessors here? This uses a matching under the hood and is slower than virtual function dispatch. Implementing this would also be useful for BoundReference for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean something like this?
lazy val accessor: InternalRow => Any = dataType match {
case IntegerType => (inputRow) => inputRow.getInt(0)
case LongType => (inputRow) => inputRow.getLong(0)
...
}
override def eval(input: InternalRow): Any = accessor(input)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's spin that off into a different ticket if we want to work on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. After this is merged, I will create another PR for it.
Test build #88087 has finished for PR 20771 at commit
|
return inputCollection | ||
} | ||
|
||
val results = inputDataType match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't be doing this during eval. Please move this into a function val.
executeFuncOnCollection(inputCollection.asInstanceOf[ArrayData].array) | ||
} | ||
|
||
customCollectionCls match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't be doing this during eval. Please move this into a function val.
val inputCollection = inputData.eval(input) | ||
|
||
if (inputCollection == null) { | ||
return inputCollection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: It is slightly cleared to return null here.
Test build #88114 has finished for PR 20771 at commit
|
retest this please. |
private lazy val getResults: Seq[_] => Any = customCollectionCls match { | ||
case Some(cls) if classOf[Seq[_]].isAssignableFrom(cls) => | ||
// Scala sequence | ||
_.toSeq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This identity right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yap.
_.toSet | ||
case Some(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) => | ||
// Java list | ||
if (cls == classOf[java.util.List[_]] || cls == classOf[java.util.AbstractList[_]] || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC you are matching against non concrete implementations of java.util.List
? Maybe add this as documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
_.asJava | ||
} else { | ||
(results) => { | ||
val builder = Try(cls.getConstructor(Integer.TYPE)).map { constructor => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you try to do the constructor lookup only once? The duplication that that will cause is ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I understand correctly. Please check update again.
x => executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala) | ||
case ObjectType(cls) if cls == classOf[Object] => | ||
(inputCollection) => { | ||
if (inputCollection.getClass.isArray) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I am sorry for sounding like a broken record) But can we move this check out of the the function closure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry...
Test build #88119 has finished for PR 20771 at commit
|
Test build #88146 has finished for PR 20771 at commit
|
ping @hvanhovell |
x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq) | ||
case ObjectType(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) => | ||
x => executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala) | ||
case ObjectType(cls) if cls == classOf[Object] => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugghh... I know understand why this needed. RowEncoder
does not pass the needed type information down: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala#L146
This obviously needs to be done during evaluation. You got it right in the previous commit. I am sorry for misunderstanding this, and making you move it. Next time please call me out on this!
} else { | ||
// Specifying concrete implementations of `java.util.List` | ||
(results) => { | ||
val constructors = cls.getConstructors() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way we can move the constructor resolution out of the closure? I am fine with some code duplication here :)...
x => executeFuncOnCollection(x.asInstanceOf[Seq[_]]) | ||
} | ||
case ArrayType(et, _) => | ||
x => executeFuncOnCollection(x.asInstanceOf[ArrayData].array) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will blow up with UnsafeArrayData
:(... It would be nice if we can avoid copying the entire array. We could implement an ArrayData
wrapper that implements Seq
or Iterable
(I slightly prefer the latter).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we implement this wrapper here, or a follow-up?
|
||
private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = { | ||
inputCollection.map { element => | ||
val row = InternalRow.fromSeq(Seq(element)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT reuse the row object.
} | ||
|
||
// Executes lambda function on input collection. | ||
private lazy val executeFunc: Any => Seq[_] = inputDataType match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we shouldn't just return an Iterator
instead of a Seq
? This seems a bit more flexible, allows us to avoid materializing an intermediate sequence. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea!
} | ||
|
||
// Converts the processed collection to custom collection class if any. | ||
private lazy val getResults: Seq[_] => Any = customCollectionCls match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a catch all clause that throws a nice exception to this match statement?
Test build #88696 has finished for PR 20771 at commit
|
Test build #88708 has finished for PR 20771 at commit
|
Test build #88709 has finished for PR 20771 at commit
|
ping @hvanhovell Do you have any more comments? Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Merging tot master. Thanks!
Thanks @hvanhovell, I will open another ticket & PR for the accessors, based on #20771 (comment). |
@viirya can you also file a ticket for the |
@hvanhovell Sure, I will do it too. |
## What changes were proposed in this pull request? Add interpreted execution for `MapObjects` expression. ## How was this patch tested? Added unit test. Author: Liang-Chi Hsieh <[email protected]> Closes apache#20771 from viirya/SPARK-23587.
@viirya to be clear: let's do this into two separate JIRA's/PRs. |
@hvanhovell Yes. I thought "do it together" will be confusing, so I changed it to "do it too" later. :) |
## What changes were proposed in this pull request? Add interpreted execution for `MapObjects` expression. ## How was this patch tested? Added unit test. Author: Liang-Chi Hsieh <[email protected]> Closes apache#20771 from viirya/SPARK-23587.
What changes were proposed in this pull request?
Add interpreted execution for
MapObjects
expression.How was this patch tested?
Added unit test.