Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23908][SQL] Add transform function. #21954

Closed
wants to merge 1 commit into from

Conversation

ueshin
Copy link
Member

@ueshin ueshin commented Aug 2, 2018

What changes were proposed in this pull request?

This pr adds transform function which transforms elements in an array using the function.
Optionally we can take the index of each element as the second argument.

> SELECT transform(array(1, 2, 3), x -> x + 1);
 array(2, 3, 4)
> SELECT transform(array(1, 2, 3), (x, i) -> x + i);
 array(1, 3, 5)

How was this patch tested?

Added tests.

@holdensmagicalunicorn
Copy link

@ueshin, thanks! I am a bot who has found some folks who might be able to help with the review:@rxin, @cloud-fan and @hvanhovell

@gatorsmile
Copy link
Member

cc @hvanhovell

@transient lazy val functionsForEval: Seq[Expression] = functions.map {
case LambdaFunction(function, arguments, hidden) =>
val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
function.transformUp {
Copy link
Member

@viirya viirya Aug 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to transform NamedLambdaVariable in function by arguments here? Aren't arguments also NamedLambdaVariable and we already resolved expressions in function at ResolveLambdaVariables?

Copy link
Member Author

@ueshin ueshin Aug 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried whether the NamedLambdaVariable is instantiated separately during serialization or something. In that case, we might not be able to refer the same instance and set the argument values correctly.

@ueshin ueshin force-pushed the issues/SPARK-23908/transform branch from ee450c5 to c3bf6a0 Compare August 2, 2018 05:00
@SparkQA
Copy link

SparkQA commented Aug 2, 2018

Test build #93934 has finished for PR 21954 at commit ee450c5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 2, 2018

Test build #93950 has finished for PR 21954 at commit c3bf6a0.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan]
  • s\"its class is $
  • case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan]
  • case class NamedLambdaVariable(
  • case class LambdaFunction(
  • trait HigherOrderFunction extends Expression
  • trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes
  • case class ArrayTransform(

@ueshin
Copy link
Member Author

ueshin commented Aug 2, 2018

Jenkins, retest this please.

val (elementType, containsNull) = input.dataType match {
case ArrayType(elementType, containsNull) => (elementType, containsNull)
case _ =>
val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this happen?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then shall we fail the analysis before going into bind?

name: String,
dataType: DataType,
nullable: Boolean,
value: AtomicReference[Any] = new AtomicReference(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are only using the AtomicReference as an container right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, also when creating functionsForEval. I needed it for transformUp work properly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You did? Could you elaborate? There shouldn't be any current access here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I tried to make copies of NamedLambdaVariables, the transformUp doesn't replace the variables, and generated wrong results.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, maybe I should override fastEquals instead of using AtomicReference?

Copy link
Member Author

@ueshin ueshin Aug 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, seems like just overriding fastEquals is not enough..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that makes sense. Let's leave it for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. One request, can you add a little bit of documentation on how execution currently works.

@SparkQA
Copy link

SparkQA commented Aug 2, 2018

Test build #93973 has finished for PR 21954 at commit c3bf6a0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan]
  • s\"its class is $
  • case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan]
  • case class NamedLambdaVariable(
  • case class LambdaFunction(
  • trait HigherOrderFunction extends Expression
  • trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes
  • case class ArrayTransform(

@ueshin
Copy link
Member Author

ueshin commented Aug 2, 2018

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Aug 2, 2018

Test build #94002 has finished for PR 21954 at commit c3bf6a0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan]
  • s\"its class is $
  • case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan]
  • case class NamedLambdaVariable(
  • case class LambdaFunction(
  • trait HigherOrderFunction extends Expression
  • trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes
  • case class ArrayTransform(

@gatorsmile
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Aug 2, 2018

Test build #94019 has finished for PR 21954 at commit c3bf6a0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan]
  • s\"its class is $
  • case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan]
  • case class NamedLambdaVariable(
  • case class LambdaFunction(
  • trait HigherOrderFunction extends Expression
  • trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes
  • case class ArrayTransform(

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Thanks! Merged to master

@asfgit asfgit closed this in 02f9677 Aug 2, 2018

override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, expectingFunctionType)

@transient lazy val functionForEval: Expression = functionsForEval.head
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be a lazy val? Seq#head is very cheap.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, makes sense. Currently we have some prs for other higher-order functions, so I'll see them and submit a follow-up if needed.

}

override def eval(input: InternalRow): Any = {
val arr = this.input.eval(input).asInstanceOf[ArrayData]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should do some renaming to avoid the conflict, e.g. rename ArrayBasedHigherOrderFunction#input to inputArray

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll see the other prs and submit a follow-up as well.

*/
private def createLambda(
e: Expression,
partialArguments: Seq[(DataType, Boolean)]): LambdaFunction = e match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why call it "partial"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are partial because we only pass the dataType and nullable flag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about argInfo?

private def resolve(e: Expression, parentLambdaMap: LambdaVariableMap): Expression = e match {
case _ if e.resolved => e

case h: HigherOrderFunction if h.inputResolved =>
Copy link
Contributor

@cloud-fan cloud-fan Aug 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some basic type check here? Then we can fail fast if the ArrayTransform#input is not array type, and we don't need the hacky workaround in ArrayTransform#bind

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think about it later.

select transform(ys, 0) as v from nested;

-- Transform a null array
select transform(cast(null as array<int>), x -> x + 1) as v;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add a test for nested lambda?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we have some at #21965 and #21982.

}

override def eval(input: InternalRow): Any = {
val arr = this.input.eval(input).asInstanceOf[ArrayData]
Copy link

@arybin93 arybin93 Mar 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have ideas about this problem?
https://issues.apache.org/jira/browse/SPARK-27052

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants