-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23589][SQL] ExternalMapToCatalyst should support interpreted execution #20980
Conversation
override def eval(input: InternalRow): Any = { | ||
val result = child.eval(input) | ||
if (result != null) { | ||
val mapValue = result.asInstanceOf[Map[Any, Any]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The external map can be java.util.Map
or scala.collection.Map
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
Test build #88924 has finished for PR 20980 at commit
|
Test build #88980 has finished for PR 20980 at commit
|
retest this please |
Test build #89005 has finished for PR 20980 at commit
|
ping |
Test build #89549 has finished for PR 20980 at commit
|
retest this please |
val result = child.eval(input) | ||
if (result != null) { | ||
val (keys, values) = mapCatalystConverter(result) | ||
new ArrayBasedMapData(ArrayData.toArrayData(keys), ArrayData.toArrayData(values)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this is probably Ok
since ArrayData.toArrayData
calls new GenericArrayData(any)
which calls GenericArrayData.anyToSeq
which converts the array into a seq (WrappedArray
) which is then converted back into an array. It might copy at the end, but I am not entirely sure.
Can we cut the red tape here and just create GenericArrayData
objects on the arrays directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ya, I also think you're right. I'll fix.
} | ||
} | ||
|
||
test("SPARK-23589 ExternalMapToCatalyst should support interpreted execution") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add a test for a null
key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -357,7 +357,8 @@ object JavaTypeInference { | |||
} | |||
} | |||
|
|||
private def serializerFor(inputObject: Expression, typeToken: TypeToken[_]): Expression = { | |||
private[catalyst] def serializerFor( | |||
inputObject: Expression, typeToken: TypeToken[_]): Expression = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: put typeToken
on a new line?
Test build #89564 has finished for PR 20980 at commit
|
Test build #89606 has finished for PR 20980 at commit
|
retest this please |
Test build #89619 has finished for PR 20980 at commit
|
} | ||
} | ||
|
||
test("SPARK-23589 ExternalMapToCatalyst should support interpreted execution") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more thing, can you please directly add ExternalMapToCatalyst
expressions here. Using javaSerializerFor
for this is pretty confusing and might cause us to test a different code path at some point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@maropu one more thing, otherwise LGTM. |
Test build #89698 has finished for PR 20980 at commit
|
retest this please |
Test build #89709 has finished for PR 20980 at commit
|
private def scalaMapSerializerFor[T: TypeTag, U: TypeTag](inputObject: Expression): Expression = { | ||
import org.apache.spark.sql.catalyst.ScalaReflection._ | ||
|
||
val curId = new java.util.concurrent.atomic.AtomicInteger() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this?
val entry = iter.next() | ||
val (key, value) = (entry.getKey, entry.getValue) | ||
keys(i) = if (key != null) { | ||
keyConverter.eval(InternalRow.fromSeq(key :: Nil)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please reuse the InternalRow
.
var i = 0 | ||
for ((key, value) <- data) { | ||
keys(i) = if (key != null) { | ||
keyConverter.eval(InternalRow.fromSeq(key :: Nil)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - merging to master. Can you address the nits in a follow-up? Thanks!
ok, Thanks! |
…yst eval ## What changes were proposed in this pull request? This pr is a follow-up of #20980 and fixes code to reuse `InternalRow` for converting input keys/values in `ExternalMapToCatalyst` eval. ## How was this patch tested? Existing tests. Author: Takeshi Yamamuro <[email protected]> Closes #21137 from maropu/SPARK-23589-FOLLOWUP.
What changes were proposed in this pull request?
This pr supported interpreted mode for
ExternalMapToCatalyst
.How was this patch tested?
Added tests in
ObjectExpressionsSuite
.