Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30154][ML] PySpark UDF to convert MLlib vectors to dense arrays #26910

Closed
wants to merge 9 commits into from

Conversation

WeichenXu123
Copy link
Contributor

@WeichenXu123 WeichenXu123 commented Dec 16, 2019

What changes were proposed in this pull request?

PySpark UDF to convert MLlib vectors to dense arrays.
Example:

from pyspark.ml.functions import vector_to_array
df.select(vector_to_array(col("features"))

Why are the changes needed?

If a PySpark user wants to convert MLlib sparse/dense vectors in a DataFrame into dense arrays, an efficient approach is to do that in JVM. However, it requires PySpark user to write Scala code and register it as a UDF. Often this is infeasible for a pure python project.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

UT.

@SparkQA
Copy link

SparkQA commented Dec 16, 2019

Test build #115396 has finished for PR 26910 at commit 794a10b.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 16, 2019

Test build #115401 has finished for PR 26910 at commit afc71af.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 17, 2019

Test build #115424 has finished for PR 26910 at commit e2bb6c0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@mengxr mengxr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeichenXu123 Made another pass. I'm a little concerned about the data conversion from JVM to Python, particularly, what happens if I use a Pandas UDF to wrap the vector_to_array function. Say:

df = spark.read...

@pandas_udf("int")
def predict(batch):
  # expect batch to be a pd.Series of numpy array here
  ...
  return preds

predictions = df.select(predict(vector_to_array(col("features")))

predictions.write....

Does the array data get boxed somewhere in the conversion path? I assume not but we should double confirm. Could you either profile the JVM or verify the code path?

cc: @HyukjinKwon

case v: OldVector => v.toArray
case _ => throw new IllegalArgumentException(
"function vector_to_array require an argument of type " +
"`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mention input type (or null) in the error message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean including null or vec.getClass.getName in the error msg to help debugging.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add a test for the error message.

@HyukjinKwon
Copy link
Member

@cloud-fan, I know UDT became private and we plan to redesign it later.
However, what about we allow the case when the UDT is cast into its own sqlType, or do you know why we don't allow this case?

scala> val df = Seq(Tuple1(org.apache.spark.ml.linalg.Vectors.dense(1.0, 2.0, 3.0))).toDF("vec")
df: org.apache.spark.sql.DataFrame = [vec: vector]

scala> df.selectExpr("cast(vec as string)").show()
+-------------+
|          vec|
+-------------+
|[1.0,2.0,3.0]|
+-------------+

scala> df.selectExpr("cast(vec as struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)").show()
org.apache.spark.sql.AnalysisException: cannot resolve '`vec`' due to data type mismatch: cannot cast struct<type:tinyint,size:int,indices:array<int>,values:array<double>> to struct<type:tinyint,size:int,indices:array<int>,values:array<double>>; line 1 pos 0;
'Project [unresolvedalias(cast(vec#74 as struct<type:tinyint,size:int,indices:array<int>,values:array<double>>), None)]
+- Project [_1#71 AS vec#74]
   +- LocalRelation [_1#71]

  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:146)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:137)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:310)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)

Currently, UDT can be cast to string but cannot be its own sqlType (https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala#L88-L99).

It's internally InternalRow so I think it seems fine to allow this case for now.

@WeichenXu123
Copy link
Contributor Author

Quote from @mengxr

what happens if I use a Pandas UDF to wrap the vector_to_array function, Does the array data get boxed somewhere in the conversion path ?

I think if this is true, this should be array type issue, not related to my vector_to_array function. Isn't it ? @HyukjinKwon

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Dec 18, 2019

I don't think such things happen anyway. The problem is UDF itself as it needs to convert Catalyst type to Scala type and needs to convert it back, which is pretty slow.

@SparkQA
Copy link

SparkQA commented Dec 18, 2019

Test build #115495 has finished for PR 26910 at commit 5aacfbc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@WeichenXu123
Copy link
Contributor Author

@HyukjinKwon @mengxr @srowen Any more comments ? Thanks:)

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Dec 19, 2019

Once we allow #26910 (comment) then I think we won't need this function. WDYT @cloud-fan.

@mengxr
Copy link
Contributor

mengxr commented Dec 19, 2019

@HyukjinKwon I don't think UDT change should block this PR. Even we can cast it to the sqlType, it is still tedious for a PySpark user to do this simple conversion. And Pandas UDF doesn't support nested columns. So they need to move the nested columns to top level.

@mengxr
Copy link
Contributor

mengxr commented Dec 19, 2019

@WeichenXu123 You haven't addressed the comment on running doctest yet.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Dec 19, 2019

@mengxr, sure, Once we allow cast and extraction from UDT directly (e.g., vector.values), we can deprecate and remove out this API later. I don't mind adding this first because doing cast & extraction against UDT would be probably a big job.

@SparkQA
Copy link

SparkQA commented Dec 19, 2019

Test build #115554 has finished for PR 26910 at commit a41d01a.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 19, 2019

Test build #115565 has finished for PR 26910 at commit 22865e0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case v: OldVector => v.toArray
case _ => throw new IllegalArgumentException(
"function vector_to_array require an argument of type " +
"`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean including null or vec.getClass.getName in the error msg to help debugging.

case v: OldVector => v.toArray
case _ => throw new IllegalArgumentException(
"function vector_to_array require an argument of type " +
"`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add a test for the error message.

@SparkQA
Copy link

SparkQA commented Dec 21, 2019

Test build #115639 has finished for PR 26910 at commit d257dce.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 88542bc Jan 7, 2020
@mengxr
Copy link
Contributor

mengxr commented Jan 7, 2020

LGTM. Merged into master. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants