-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30154][ML] PySpark UDF to convert MLlib vectors to dense arrays #26910
Conversation
Test build #115396 has finished for PR 26910 at commit
|
Test build #115401 has finished for PR 26910 at commit
|
Test build #115424 has finished for PR 26910 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@WeichenXu123 Made another pass. I'm a little concerned about the data conversion from JVM to Python, particularly, what happens if I use a Pandas UDF to wrap the vector_to_array
function. Say:
df = spark.read...
@pandas_udf("int")
def predict(batch):
# expect batch to be a pd.Series of numpy array here
...
return preds
predictions = df.select(predict(vector_to_array(col("features")))
predictions.write....
Does the array data get boxed somewhere in the conversion path? I assume not but we should double confirm. Could you either profile the JVM or verify the code path?
cc: @HyukjinKwon
case v: OldVector => v.toArray | ||
case _ => throw new IllegalArgumentException( | ||
"function vector_to_array require an argument of type " + | ||
"`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mention input type (or null) in the error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean including null
or vec.getClass.getName
in the error msg to help debugging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add a test for the error message.
@cloud-fan, I know UDT became private and we plan to redesign it later. scala> val df = Seq(Tuple1(org.apache.spark.ml.linalg.Vectors.dense(1.0, 2.0, 3.0))).toDF("vec")
df: org.apache.spark.sql.DataFrame = [vec: vector]
scala> df.selectExpr("cast(vec as string)").show()
+-------------+
| vec|
+-------------+
|[1.0,2.0,3.0]|
+-------------+
scala> df.selectExpr("cast(vec as struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)").show()
org.apache.spark.sql.AnalysisException: cannot resolve '`vec`' due to data type mismatch: cannot cast struct<type:tinyint,size:int,indices:array<int>,values:array<double>> to struct<type:tinyint,size:int,indices:array<int>,values:array<double>>; line 1 pos 0;
'Project [unresolvedalias(cast(vec#74 as struct<type:tinyint,size:int,indices:array<int>,values:array<double>>), None)]
+- Project [_1#71 AS vec#74]
+- LocalRelation [_1#71]
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:146)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:137)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:310)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) Currently, UDT can be cast to string but cannot be its own It's internally |
Quote from @mengxr
I think if this is true, this should be array type issue, not related to my |
I don't think such things happen anyway. The problem is UDF itself as it needs to convert Catalyst type to Scala type and needs to convert it back, which is pretty slow. |
Test build #115495 has finished for PR 26910 at commit
|
@HyukjinKwon @mengxr @srowen Any more comments ? Thanks:) |
Once we allow #26910 (comment) then I think we won't need this function. WDYT @cloud-fan. |
@HyukjinKwon I don't think UDT change should block this PR. Even we can cast it to the sqlType, it is still tedious for a PySpark user to do this simple conversion. And Pandas UDF doesn't support nested columns. So they need to move the nested columns to top level. |
@WeichenXu123 You haven't addressed the comment on running doctest yet. |
@mengxr, sure, Once we allow cast and extraction from UDT directly (e.g., |
Test build #115554 has finished for PR 26910 at commit
|
Test build #115565 has finished for PR 26910 at commit
|
case v: OldVector => v.toArray | ||
case _ => throw new IllegalArgumentException( | ||
"function vector_to_array require an argument of type " + | ||
"`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean including null
or vec.getClass.getName
in the error msg to help debugging.
case v: OldVector => v.toArray | ||
case _ => throw new IllegalArgumentException( | ||
"function vector_to_array require an argument of type " + | ||
"`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add a test for the error message.
Test build #115639 has finished for PR 26910 at commit
|
LGTM. Merged into master. Thanks! |
What changes were proposed in this pull request?
PySpark UDF to convert MLlib vectors to dense arrays.
Example:
Why are the changes needed?
If a PySpark user wants to convert MLlib sparse/dense vectors in a DataFrame into dense arrays, an efficient approach is to do that in JVM. However, it requires PySpark user to write Scala code and register it as a UDF. Often this is infeasible for a pure python project.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UT.