Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23587][SQL] Add interpreted execution for MapObjects expression #20771

Closed
wants to merge 8 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Mar 8, 2018

What changes were proposed in this pull request?

Add interpreted execution for MapObjects expression.

How was this patch tested?

Added unit test.

@SparkQA
Copy link

SparkQA commented Mar 8, 2018

Test build #88079 has finished for PR 20771 at commit 3627dc3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// The data with PythonUserDefinedType are actually stored with the data type of its sqlType.
// When we want to apply MapObjects on it, we have to use it.
lazy private val inputDataType = inputData.dataType match {
case p: PythonUserDefinedType => p.sqlType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the UserDefinedType super class here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I just noticed that this wasn't introduced by you, but please change it anyway)

override def eval(input: InternalRow): Any = {
assert(input.numFields == 1,
"The input row of interpreted LambdaVariable should have only 1 field.")
input.get(0, dataType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a change for this PR. Maybe we should use accessors here? This uses a matching under the hood and is slower than virtual function dispatch. Implementing this would also be useful for BoundReference for example.

Copy link
Member Author

@viirya viirya Mar 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean something like this?

lazy val accessor:  InternalRow => Any = dataType match {
  case IntegerType => (inputRow) => inputRow.getInt(0)
  case LongType => (inputRow) => inputRow.getLong(0)
  ...
}

override def eval(input: InternalRow): Any = accessor(input)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's spin that off into a different ticket if we want to work on it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. After this is merged, I will create another PR for it.

@SparkQA
Copy link

SparkQA commented Mar 8, 2018

Test build #88087 has finished for PR 20771 at commit 07f8143.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

return inputCollection
}

val results = inputDataType match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be doing this during eval. Please move this into a function val.

executeFuncOnCollection(inputCollection.asInstanceOf[ArrayData].array)
}

customCollectionCls match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be doing this during eval. Please move this into a function val.

val inputCollection = inputData.eval(input)

if (inputCollection == null) {
return inputCollection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: It is slightly cleared to return null here.

@SparkQA
Copy link

SparkQA commented Mar 9, 2018

Test build #88114 has finished for PR 20771 at commit 9144287.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class StackTrace(elems: Seq[String])

@viirya
Copy link
Member Author

viirya commented Mar 9, 2018

retest this please.

private lazy val getResults: Seq[_] => Any = customCollectionCls match {
case Some(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
// Scala sequence
_.toSeq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This identity right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yap.

_.toSet
case Some(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) =>
// Java list
if (cls == classOf[java.util.List[_]] || cls == classOf[java.util.AbstractList[_]] ||
Copy link
Contributor

@hvanhovell hvanhovell Mar 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC you are matching against non concrete implementations of java.util.List? Maybe add this as documentation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

_.asJava
} else {
(results) => {
val builder = Try(cls.getConstructor(Integer.TYPE)).map { constructor =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try to do the constructor lookup only once? The duplication that that will cause is ok.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand correctly. Please check update again.

x => executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala)
case ObjectType(cls) if cls == classOf[Object] =>
(inputCollection) => {
if (inputCollection.getClass.isArray) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I am sorry for sounding like a broken record) But can we move this check out of the the function closure?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry...

@SparkQA
Copy link

SparkQA commented Mar 9, 2018

Test build #88119 has finished for PR 20771 at commit 9144287.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class StackTrace(elems: Seq[String])

@SparkQA
Copy link

SparkQA commented Mar 10, 2018

Test build #88146 has finished for PR 20771 at commit e725608.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Mar 23, 2018

ping @hvanhovell

x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq)
case ObjectType(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) =>
x => executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala)
case ObjectType(cls) if cls == classOf[Object] =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugghh... I know understand why this needed. RowEncoder does not pass the needed type information down: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala#L146

This obviously needs to be done during evaluation. You got it right in the previous commit. I am sorry for misunderstanding this, and making you move it. Next time please call me out on this!

} else {
// Specifying concrete implementations of `java.util.List`
(results) => {
val constructors = cls.getConstructors()
Copy link
Contributor

@hvanhovell hvanhovell Mar 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way we can move the constructor resolution out of the closure? I am fine with some code duplication here :)...

x => executeFuncOnCollection(x.asInstanceOf[Seq[_]])
}
case ArrayType(et, _) =>
x => executeFuncOnCollection(x.asInstanceOf[ArrayData].array)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will blow up with UnsafeArrayData :(... It would be nice if we can avoid copying the entire array. We could implement an ArrayData wrapper that implements Seq or Iterable (I slightly prefer the latter).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we implement this wrapper here, or a follow-up?


private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
inputCollection.map { element =>
val row = InternalRow.fromSeq(Seq(element))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT reuse the row object.

}

// Executes lambda function on input collection.
private lazy val executeFunc: Any => Seq[_] = inputDataType match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we shouldn't just return an Iterator instead of a Seq? This seems a bit more flexible, allows us to avoid materializing an intermediate sequence. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

}

// Converts the processed collection to custom collection class if any.
private lazy val getResults: Seq[_] => Any = customCollectionCls match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a catch all clause that throws a nice exception to this match statement?

@SparkQA
Copy link

SparkQA commented Mar 29, 2018

Test build #88696 has finished for PR 20771 at commit f0ba614.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • throw new RuntimeException(s\"class$clsis not supported byMapObjects as \" +

@SparkQA
Copy link

SparkQA commented Mar 29, 2018

Test build #88708 has finished for PR 20771 at commit face72c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 29, 2018

Test build #88709 has finished for PR 20771 at commit d4f0ecb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • throw new RuntimeException(s\"class$`

@viirya
Copy link
Member Author

viirya commented Apr 3, 2018

ping @hvanhovell Do you have any more comments? Thanks.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Merging tot master. Thanks!

@viirya
Copy link
Member Author

viirya commented Apr 3, 2018

Thanks @hvanhovell, I will open another ticket & PR for the accessors, based on #20771 (comment).

@asfgit asfgit closed this in 1035aaa Apr 3, 2018
@hvanhovell
Copy link
Contributor

@viirya can you also file a ticket for the UnsafeArrayData.array issue? We should just provide an IndexedSeq for ArrayData.

@viirya
Copy link
Member Author

viirya commented Apr 4, 2018

@hvanhovell Sure, I will do it too.

robert3005 pushed a commit to palantir/spark that referenced this pull request Apr 4, 2018
## What changes were proposed in this pull request?

Add interpreted execution for `MapObjects` expression.

## How was this patch tested?

Added unit test.

Author: Liang-Chi Hsieh <[email protected]>

Closes apache#20771 from viirya/SPARK-23587.
@hvanhovell
Copy link
Contributor

@viirya to be clear: let's do this into two separate JIRA's/PRs.

@viirya
Copy link
Member Author

viirya commented Apr 4, 2018

@hvanhovell Yes. I thought "do it together" will be confusing, so I changed it to "do it too" later. :)

mshtelma pushed a commit to mshtelma/spark that referenced this pull request Apr 5, 2018
## What changes were proposed in this pull request?

Add interpreted execution for `MapObjects` expression.

## How was this patch tested?

Added unit test.

Author: Liang-Chi Hsieh <[email protected]>

Closes apache#20771 from viirya/SPARK-23587.
@viirya viirya deleted the SPARK-23587 branch December 27, 2023 18:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants