-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23908][SQL] Add transform function. #21954
Conversation
@ueshin, thanks! I am a bot who has found some folks who might be able to help with the review:@rxin, @cloud-fan and @hvanhovell |
cc @hvanhovell |
@transient lazy val functionsForEval: Seq[Expression] = functions.map { | ||
case LambdaFunction(function, arguments, hidden) => | ||
val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap | ||
function.transformUp { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need to transform NamedLambdaVariable
in function
by arguments
here? Aren't arguments
also NamedLambdaVariable
and we already resolved expressions in function
at ResolveLambdaVariables
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worried whether the NamedLambdaVariable
is instantiated separately during serialization or something. In that case, we might not be able to refer the same instance and set the argument values correctly.
ee450c5
to
c3bf6a0
Compare
Test build #93934 has finished for PR 21954 at commit
|
Test build #93950 has finished for PR 21954 at commit
|
Jenkins, retest this please. |
val (elementType, containsNull) = input.dataType match { | ||
case ArrayType(elementType, containsNull) => (elementType, containsNull) | ||
case _ => | ||
val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When does this happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It happens when the first argument is not an array (e.g., https://github.com/apache/spark/pull/21954/files#diff-8e1a34391fdefa4a3a0349d7d454d86fR1798).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then shall we fail the analysis before going into bind
?
name: String, | ||
dataType: DataType, | ||
nullable: Boolean, | ||
value: AtomicReference[Any] = new AtomicReference(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are only using the AtomicReference
as an container right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, also when creating functionsForEval
. I needed it for transformUp
work properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You did? Could you elaborate? There shouldn't be any current access here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I tried to make copies of NamedLambdaVariable
s, the transformUp
doesn't replace the variables, and generated wrong results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, maybe I should override fastEquals
instead of using AtomicReference
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, seems like just overriding fastEquals
is not enough..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that makes sense. Let's leave it for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. One request, can you add a little bit of documentation on how execution currently works.
Test build #93973 has finished for PR 21954 at commit
|
Jenkins, retest this please. |
Test build #94002 has finished for PR 21954 at commit
|
retest this please |
Test build #94019 has finished for PR 21954 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks! Merged to master
|
||
override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, expectingFunctionType) | ||
|
||
@transient lazy val functionForEval: Expression = functionsForEval.head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to be a lazy val
? Seq#head
is very cheap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, makes sense. Currently we have some prs for other higher-order functions, so I'll see them and submit a follow-up if needed.
} | ||
|
||
override def eval(input: InternalRow): Any = { | ||
val arr = this.input.eval(input).asInstanceOf[ArrayData] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we should do some renaming to avoid the conflict, e.g. rename ArrayBasedHigherOrderFunction#input
to inputArray
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll see the other prs and submit a follow-up as well.
*/ | ||
private def createLambda( | ||
e: Expression, | ||
partialArguments: Seq[(DataType, Boolean)]): LambdaFunction = e match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why call it "partial"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are partial because we only pass the dataType and nullable flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about argInfo
?
private def resolve(e: Expression, parentLambdaMap: LambdaVariableMap): Expression = e match { | ||
case _ if e.resolved => e | ||
|
||
case h: HigherOrderFunction if h.inputResolved => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add some basic type check here? Then we can fail fast if the ArrayTransform#input
is not array type, and we don't need the hacky workaround in ArrayTransform#bind
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me think about it later.
select transform(ys, 0) as v from nested; | ||
|
||
-- Transform a null array | ||
select transform(cast(null as array<int>), x -> x + 1) as v; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we add a test for nested lambda?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
override def eval(input: InternalRow): Any = { | ||
val arr = this.input.eval(input).asInstanceOf[ArrayData] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have ideas about this problem?
https://issues.apache.org/jira/browse/SPARK-27052
What changes were proposed in this pull request?
This pr adds
transform
function which transforms elements in an array using the function.Optionally we can take the index of each element as the second argument.
How was this patch tested?
Added tests.